forked from rockstor/rockstor-core
/
smart.py
453 lines (422 loc) · 20.8 KB
/
smart.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
"""
Copyright (c) 2012-2013 RockStor, Inc. <http://rockstor.com>
This file is part of RockStor.
RockStor is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
by the Free Software Foundation; either version 2 of the License,
or (at your option) any later version.
RockStor is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import re
from osi import run_command
from tempfile import mkstemp
from shutil import move
import logging
from system.email_util import email_root
from exceptions import CommandException
logger = logging.getLogger(__name__)
SMART = '/usr/sbin/smartctl'
CAT = '/usr/bin/cat'
LSBLK = '/usr/bin/lsblk'
# enables reading file dumps of smartctl output instead of running smartctl
# currently hardwired to read from eg:- /root/smartdumps/smart-H--info.out
# default setting = False
TESTMODE = False
def info(device, custom_options='', test_mode=TESTMODE):
"""
Retrieve matching properties found in smartctl -H --info output.
Used to populate the Identity / general info tab by views/disk_smart.py
:param device: disk device name
:param test_mode: Not True causes cat from file rather than smartctl command
:return: list of smart parameters extracted from device or test file
"""
if not test_mode:
o, e, rc = run_command(
[SMART, '-H', '--info'] + get_dev_options(device, custom_options),
throw=False)
else: # we are testing so use a smartctl -H --info file dump instead
o, e, rc = run_command([CAT, '/root/smartdumps/smart-H--info.out'])
# List of string matches to look for in smartctrl -H --info output.
# Note the "|" char allows for defining alternative matches ie A or B
matches = ('Model Family:|Vendor:', 'Device Model:|Product:',
'Serial Number:|Serial number:',
'LU WWN Device Id:|Logical Unit id:',
'Firmware Version:|Revision', 'User Capacity:',
'Sector Size:|Logical block size:', 'Rotation Rate:',
'Device is:', 'ATA Version is:',
'SATA Version is:', 'Local Time is:',
'SMART support is:.* Available',
'SMART support is:.* Enabled',
'SMART overall-health self-assessment|SMART Health Status:',)
# create a list of empty strings ready to store our smart results / values
res = ['', ] * len(matches)
version = ''
for line in o:
if (re.match('smartctl ', line) is not None):
version = ' '.join(line.split()[1:4])
for i in range(len(matches)):
if (re.match(matches[i], line) is not None):
# find location of first colon
first_colon = re.search(':', line).start()
# Assume all characters after colon are the result / value and
# strip off begin and end spaces. Limit to 64 chars for db.
res[i] = line[first_colon + 1:].strip()[:64]
# smartctl version is expected at index 14 (15th item)
res.insert(14, version)
return res
def extended_info(device, custom_options='', test_mode=TESTMODE):
"""
Retrieves a list of SMART attributes found from parsing smartctl -a output
Mostly ATA / SATA as SCSI uses a free form syntax for this.
Extracts all lines starting with ID# ATTRIBUTE_NAME and creates a dictionary
of lists containing each lines column entries indexed in the dictionary via
the Attribute name.
:param device: disk device name
:param testmode: Not True causes cat from file rather than smartctl command
:return: dictionary of smart attributes extracted from device or test file
"""
if not test_mode:
o, e, rc = run_command(
[SMART, '-a'] + get_dev_options(device, custom_options),
throw=False)
else: # we are testing so use a smartctl -a file dump instead
o, e, rc = run_command([CAT, '/root/smartdumps/smart-a.out'])
attributes = {}
for i in range(len(o)):
if (re.match('Vendor Specific SMART Attributes with Thresholds:',
o[i]) is not None):
if (len(o) > i + 1):
if (re.match('ID# ATTRIBUTE_NAME', o[i + 1]) is not None):
for j in range(i + 2, len(o)):
if (o[j] == ''):
break
fields = o[j].strip().split()
if (len(fields) > 10):
fields[9] = ' '.join(fields[9:])
attributes[fields[1]] = fields[0:10]
return attributes
def capabilities(device, custom_options='', test_mode=TESTMODE):
"""
Retrieves a list of SMART capabilities found from parsing smartctl -c output
ATA / SATA only.
Extracts all capabilities and build a dictionary of lists containing
ID, Name, Flag, and description for each capability found. The dictionary
is indexed by the capability name.
:param device: disk device name
:param test_mode: Not True causes cat from file rather than smartctl command
:return: dictionary of smart capabilities extracted from device or test file
"""
if not test_mode:
o, e, rc = run_command(
[SMART, '-c'] + get_dev_options(device, custom_options))
else: # we are testing so use a smartctl -c file dump instead
o, e, rc = run_command([CAT, '/root/smartdumps/smart-c.out'])
cap_d = {}
for i in range(len(o)):
if (re.match('=== START OF READ SMART DATA SECTION ===',
o[i]) is not None):
prev_line = None
cur_cap = None
cur_val = None
for j in range(i + 2, len(o)):
if (re.match('.*:\s+\(.*\)', o[j]) is not None):
cap = o[j][:o[j].index(':')]
flag = o[j][(o[j].index('(') + 1):o[j].index(')')].strip()
val = o[j][(o[j].index(')') + 1):].strip()
if (val == 'seconds.' or val == 'minutes.'):
val = '%s %s' % (flag, val)
flag = ''
if (prev_line is not None):
cap = '%s %s' % (prev_line, cap)
prev_line = None
cur_cap = cap
cap_d[cur_cap] = [flag, val]
elif (re.match('\s', o[j]) is not None):
cap_d[cur_cap][1] += '\n'
cap_d[cur_cap][1] += o[j].strip()
else:
prev_line = o[j].strip()
break
return cap_d
def error_logs(device, custom_options='', test_mode=TESTMODE):
"""
Retrieves a parsed list of SMART errors from the output of smartctl -l error
May be empty if no errors, also returns a raw output of the error log itself
:param device: disk device name
:param test_mode: Not True causes cat from file rather than smartctl command
:return: summary: dictionary of lists containing details of error. Index is
error number.
:return: log_l: A list containing each line in turn of the error log.
"""
local_base_dev = get_dev_options(device, custom_options)
smart_command = [SMART, '-l', 'error'] + local_base_dev
if not test_mode:
o, e, rc = run_command(smart_command, throw=False)
else:
o, e, rc = run_command([CAT, '/root/smartdumps/smart-l-error.out'])
# As we mute exceptions when calling the above command we should at least
# examine what we have as return code (rc); 64 has been seen when the error
# log contains errors but otherwise executes successfully so we catch this.
overide_rc = 64
e_msg = 'Drive %s has logged S.M.A.R.T errors. Please view ' \
'the Error logs tab for this device.' % local_base_dev
screen_return_codes(e_msg, overide_rc, o, e, rc, smart_command)
ecode_map = {
'ABRT' : 'Command ABoRTed',
'AMNF' : 'Address Mark Not Found',
'CCTO' : 'Command Completion Timed Out',
'EOM' : 'End Of Media',
'ICRC' : 'Interface Cyclic Redundancy Code (CRC) error',
'IDNF' : 'IDentity Not Found',
'ILI' : '(packet command-set specific)',
'MC' : 'Media Changed',
'MCR' : 'Media Change Request',
'NM' : 'No Media',
'obs' : 'obsolete',
'TK0NF' : 'TracK 0 Not Found',
'UNC' : 'UNCorrectable Error in Data',
'WP' : 'Media is Write Protected',
}
summary = {}
log_l = []
for i in range(len(o)):
if (re.match('=== START OF READ SMART DATA SECTION ===',
o[i]) is not None):
err_num = None
lifetime_hours = None
state = None
etype = None
details = None
for j in range(i + 1, len(o)):
log_l.append(o[j])
if (re.match('Error ', o[j]) is not None):
fields = o[j].split()
err_num = fields[1]
if ('lifetime:' in fields):
lifetime_hours = int(fields[fields.index('lifetime:')+1])
if (re.match('When the command that caused the error occurred, the device was', o[j].strip()) is not None):
state = o[j].strip().split('the device was ')[1]
if (re.search('Error: ', o[j]) is not None):
e_substr = o[j].split('Error: ')[1]
e_fields = e_substr.split()
etype = e_fields[0]
if (etype in ecode_map):
etype = ecode_map[etype]
details = ' '.join(e_fields[1:]) if (len(e_fields) > 1) else None
summary[err_num] = list([lifetime_hours, state, etype, details])
err_num = lifetime_hours = state = etype = details = None
print ('summary_d %s' % summary)
return (summary, log_l)
def test_logs(device, custom_options='', test_mode=TESTMODE):
"""
Retrieves information from SMART Self-Test logs held by the drive.
Creates a dictionary of previous test info, indexed by test number and a
list containing the remaining log info, each line is an item in the list.
:param device: disk device name
:param test_mode: Not True causes cat from file rather than smartctl command
:return: test_d as a dictionary of summarized test
"""
smart_command = [SMART, '-l', 'selftest', '-l',
'selective'] + get_dev_options(device, custom_options)
if not test_mode:
o, e, rc = run_command(smart_command, throw=False)
else:
o, e, rc = run_command(
[CAT, '/root/smartdumps/smart-l-selftest-l-selective.out'])
# A return code of 128 (non zero so run_command raises an exception) has
# been seen when executing this command. Strange as it means
# "Invalid argument to exit" anyway if we silence the throw of a generic
# non 0 exception we can catch the 128, akin to 64 catch in error_logs().
# N.B. no official list of rc = 128 in /usr/include/sysexits.h
overide_rc = 128
e_msg = 'run_command(%s) returned an error of %s. This has undetermined ' \
'meaning. Please view the Self-Test Logs tab for this device.' \
% (smart_command, overide_rc)
screen_return_codes(e_msg, overide_rc, o, e, rc, smart_command)
test_d = {}
log_l = []
for i in range(len(o)):
if (re.match('SMART Self-test log structure revision number',
o[i]) is not None):
log_l.append(o[i])
if (len(o) > (i + 1)):
if (re.match('Num Test_Description Status',
o[i + 1]) is not None):
for j in range(i + 2, len(o)):
if (re.match('# ', o[j]) is not None):
# slit the line into fields using 2 or more spaces
fields = re.split(r'\s\s+', o[j].strip()[2:])
# Some Seagate drives add an ongoing test progress
# report to the top of this log but there is then
# only one space delimiter and we loose a column.
if len(fields) == 5: # it's normally 6 fields
# we are missing a column (fast check)
if re.match('Self-test routine in progress',
fields[2]):
# An ongoing self-test entry is to blame.
status_fields = fields[2].split()
# Move our last two line fields along one.
fields.insert(5, fields[4])
fields[4] = fields[3]
# Move end of status field percentage to
# freshly freed up column in line list.
fields[3] = status_fields[-1]
# Remove our % remaining in status field.
fields[2] = ' '.join(status_fields[:-1])
# Remove the % char from this columns value
# and change % Remaining to % Completed.
fields[3] = 100 - int(fields[3][:-1])
test_d[fields[0]] = fields[1:]
else:
log_l.append(o[j])
return (test_d, log_l)
def run_test(device, test, custom_options=''):
# start a smart test(short, long or conveyance)
return run_command(
[SMART, '-t', test] + get_dev_options(device, custom_options))
def available(device, custom_options='', test_mode=TESTMODE):
"""
Returns boolean pair: true if SMART support is available on the device and
true if SMART support is enabled.
Used by update_disk_state in views/disk.py to assess smart status
:param device:
:return: available (boolean), enabled (boolean)
"""
if not test_mode:
o, e, rc = run_command(
[SMART, '--info'] + get_dev_options(device, custom_options))
else: # we are testing so use a smartctl --info file dump instead
o, e, rc = run_command([CAT, '/root/smartdumps/smart--info.out'])
a = False
e = False
for i in o:
# N.B. .* in pattern match to allow for multiple spaces
if (re.match('SMART support is:.* Available', i) is not None):
a = True
if (re.match('SMART support is:.* Enabled', i) is not None):
e = True
return a, e
def toggle_smart(device, custom_options='', enable=False):
switch = 'on' if (enable) else 'off'
# enable SMART support of the device
return run_command(
[SMART, '--smart=%s' % switch] + get_dev_options(device,
custom_options))
def update_config(config):
SMARTD_CONFIG = '/etc/smartmontools/smartd.conf'
ROCKSTOR_HEADER = '###BEGIN: Rockstor smartd config. DO NOT EDIT BELOW THIS LINE###'
fo, npath = mkstemp()
with open(SMARTD_CONFIG) as sfo, open(npath, 'w') as tfo:
for line in sfo.readlines():
if (re.match('DEVICESCAN', line) is not None):
# comment out this line, if not, smartd ignores everything else
tfo.write('#%s' % line)
elif (re.match(ROCKSTOR_HEADER, line) is None):
tfo.write(line)
else:
break
tfo.write('%s\n' % ROCKSTOR_HEADER)
for l in config.split('\n'):
tfo.write('%s\n' % l)
return move(npath, SMARTD_CONFIG)
def screen_return_codes(msg_on_hit, return_code_target, o, e, rc, command):
"""
Provides a central mechanism to screen return codes from executing smart
commands. This is required as some non zero return codes would otherwise
trigger a generic exception clause in our general purpose run_command.
If the target return code is seen then email root with the message
provided, otherwise raise a generic exception with the command information.
N.B. May be done better by acting as a SMART run_command wrapper (Future).
:param msg_on_hit: message used to email root
:param return_code_target: return code to screen for
:param o: the output from the command when it was run
:param e: the error from the command when it was run
:param rc: the return code from running the command
:param command: the command that produced the previous o, e, and rc params.
"""
# if our return code is our target then log with our message and email root
# with the same.
if rc == return_code_target:
logger.error(msg_on_hit)
email_root('S.M.A.R.T error', msg_on_hit)
# In all other non zero (error) instances we raise an exception as normal.
elif rc != 0:
e_msg = ('non-zero code(%d) returned by command: %s output: '
'%s error: %s' % (rc, command, o, e))
logger.error(e_msg)
raise CommandException(('%s' % command), o, e, rc)
def get_base_device(device, test_mode=TESTMODE):
"""
Helper function that returns the full path of the base device of a partition
or if given a base device then will return it's full path,
ie
input sda3 output /dev/sda
input sda output /dev/sda
Works as a function of lsblk list order ie base devices first. So we return
the first start of line match to our supplied device name with the pattern
as the first element in lsblk's output and the match target as our device.
:param device: device name as per db entry, ie as returned from scan_disks
:param test_mode: Not True causes cat from file rather than smartctl command
:return: base_dev: single item list containing the root device's full path
ie device = sda3 the base_dev = /dev/sda or [''] if no lsblk entry was found
to match.
"""
base_dev = ['', ]
if not test_mode:
out, e, rc = run_command([LSBLK])
else:
out, e, rc = run_command([CAT, '/root/smartdumps/lsblk.out'])
# now examine the output from lsblk line by line
for line in out:
line_fields = line.split()
if len(line_fields) < 1:
# skip empty lines
continue
if re.match(line_fields[0], device):
# We have found a device string match to our device so record it.
base_dev[0] = '/dev/' + line_fields[0]
break
# Return base_dev ie [''] or first character matches to line start in lsblk.
return base_dev
def get_dev_options(device, custom_options=''):
"""
Returns device specific options for all smartctl commands.
Note that in most cases this requires looking up the base device via
get_base_device but in some instances this is not required as in the case
of devices behind some raid controllers. If custom_options contains known
raid controller smartctl targets then these will be substituted for device
name.
:param device: device name as per db entry, ie as returned from scan_disks
:param custom_options: string of user entered custom smart options.
:return: dev_options: list containing the device specific smart options and
the appropriate smart device target.
"""
# Initially our custom_options parameter may be None, ie db default prior
# to any changes having been made. Deal with this by adding a guard.
if custom_options is None or custom_options == '':
# Empty custom_options or they have never been set so just return
# full path to base device as nothing else to do.
dev_options = get_base_device(device)
else:
# Convert string of custom options into a list ready for run_command
dev_options = custom_options.encode('ascii').split()
# If our custom options don't contain a raid controller target then add
# the full path to our base device as our last device specific option.
if (re.search('/dev/tw|/dev/cciss/c|/dev/sg', custom_options) is None):
# add full path to our custom options as we see no raid target dev
dev_options += get_base_device(device)
# Note on raid controller target devices.
# /dev/twe#, or /dev/twa#, or /dev/twl# are 3ware controller targets devs
# respectively 3x-xxxx, 3w-9xxx, and t2-sas (3ware/LSI 9750) drivers for
# respectively 6000, 7000, 8000 or 9000 or 3ware/LSI 9750 controllers.
# /dev/cciss/c0d0 is the first HP/Compaq Smart Array Controller using the
# deprecated cciss driver
# /dev/sg0 is the first hpsa or hpahcisr driver device for the same adapter.
# This same target device is also used by the Areca SATA RAID controller
# except that the first device is /dev/sg2.
return dev_options