-
Notifications
You must be signed in to change notification settings - Fork 158
/
usbstoragedriver.py
129 lines (111 loc) · 4.09 KB
/
usbstoragedriver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# pylint: disable=no-member
import enum
import logging
import os
import time
import attr
from ..factory import target_factory
from ..step import step
from ..util.managedfile import ManagedFile
from .common import Driver
from ..driver.exception import ExecutionError
from ..util.helper import processwrapper
from ..util import Timeout
class Mode(enum.Enum):
DD = 1
BMAPTOOL = 2
@target_factory.reg_driver
@attr.s(eq=False)
class USBStorageDriver(Driver):
bindings = {
"storage": {
"USBMassStorage",
"NetworkUSBMassStorage",
"USBSDMuxDevice",
"NetworkUSBSDMuxDevice"
},
}
image = attr.ib(
default=None,
validator=attr.validators.optional(attr.validators.instance_of(str))
)
def __attrs_post_init__(self):
super().__attrs_post_init__()
self.logger = logging.getLogger("{}:{}".format(self, self.target))
def on_activate(self):
pass
def on_deactivate(self):
pass
@Driver.check_active
@step(args=['filename'])
def write_image(self, filename=None, mode=Mode.DD, partition=None, skip=0, seek=0):
"""
Writes the file specified by filename or if not specified by config image subkey to the
bound USB storage root device or partition.
Args:
filename (str): optional, path to the image to write to bound USB storage
mode (Mode): optional, Mode.DD or Mode.BMAPTOOL (defaults to Mode.DD)
partition (int or None): optional, write to the specified partition or None for writing
to root device (defaults to None)
skip (int): optional, skip n 512-sized blocks at start of input file (defaults to 0)
seek (int): optional, skip n 512-sized blocks at start of output (defaults to 0)
"""
if filename is None and self.image is not None:
filename = self.target.env.config.get_image_path(self.image)
assert filename, "write_image requires a filename"
mf = ManagedFile(filename, self.storage)
mf.sync_to_resource()
self.logger.info("pwd: %s", os.getcwd())
# wait for medium
timeout = Timeout(10.0)
while not timeout.expired:
try:
if self.get_size() > 0:
break
time.sleep(0.5)
except ValueError:
# when the medium gets ready the sysfs attribute is empty for a short time span
continue
else:
raise ExecutionError("Timeout while waiting for medium")
partition = "" if partition is None else partition
if mode == Mode.DD:
block_size = '512' if skip or seek else '4M'
args = [
"dd",
"if={}".format(mf.get_remote_path()),
"of={}{}".format(self.storage.path, partition),
"status=progress",
"bs={}".format(block_size),
"skip={}".format(skip),
"seek={}".format(seek),
"conv=fdatasync"
]
elif mode == Mode.BMAPTOOL:
if skip or seek:
raise ExecutionError("bmaptool does not support skip or seek")
args = [
"bmaptool",
"copy",
"{}".format(mf.get_remote_path()),
"{}{}".format(self.storage.path, partition),
]
else:
raise ValueError
processwrapper.check_output(
self.storage.command_prefix + args
)
@Driver.check_active
@step(result=True)
def get_size(self):
args = ["cat", "/sys/class/block/{}/size".format(self.storage.path[5:])]
size = processwrapper.check_output(self.storage.command_prefix + args)
return int(size)
@target_factory.reg_driver
@attr.s(eq=False)
class NetworkUSBStorageDriver(USBStorageDriver):
def __attrs_post_init__(self):
import warnings
warnings.warn("NetworkUSBStorageDriver is deprecated, use USBStorageDriver instead",
DeprecationWarning)
super().__attrs_post_init__()