-
Notifications
You must be signed in to change notification settings - Fork 155
/
files.py
325 lines (302 loc) · 13.5 KB
/
files.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
# Adafruit MicroPython Tool - File Operations
# Author: Tony DiCola
# Copyright (c) 2016 Adafruit Industries
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import ast
import textwrap
import binascii
from ampy.pyboard import PyboardError
BUFFER_SIZE = 32 # Amount of data to read or write to the serial port at a time.
# This is kept small because small chips and USB to serial
# bridges usually have very small buffers.
class DirectoryExistsError(Exception):
pass
class Files(object):
"""Class to interact with a MicroPython board files over a serial connection.
Provides functions for listing, uploading, and downloading files from the
board's filesystem.
"""
def __init__(self, pyboard):
"""Initialize the MicroPython board files class using the provided pyboard
instance. In most cases you should create a Pyboard instance (from
pyboard.py) which connects to a board over a serial connection and pass
it in, but you can pass in other objects for testing, etc.
"""
self._pyboard = pyboard
def get(self, filename):
"""Retrieve the contents of the specified file and return its contents
as a byte string.
"""
# Open the file and read it a few bytes at a time and print out the
# raw bytes. Be careful not to overload the UART buffer so only write
# a few bytes at a time, and don't use print since it adds newlines and
# expects string data.
command = """
import sys
try:
import ubinascii
except:
import binascii as ubinascii
with open('{0}', 'rb') as infile:
while True:
result = infile.read({1})
if result == b'':
break
len = sys.stdout.write(ubinascii.hexlify(result))
""".format(
filename, BUFFER_SIZE
)
self._pyboard.enter_raw_repl()
try:
out = self._pyboard.exec_(textwrap.dedent(command))
except PyboardError as ex:
# Check if this is an OSError #2, i.e. file doesn't exist and
# rethrow it as something more descriptive.
try:
message = ex.args[2].decode("utf-8")
if message.find("OSError") != -1 and message.find("2") != -1:
raise RuntimeError("No such file: {0}".format(filename))
else:
raise ex
except UnicodeDecodeError:
raise ex
self._pyboard.exit_raw_repl()
return binascii.unhexlify(out)
def ls(self, directory="/", long_format=True, recursive=False):
"""List the contents of the specified directory (or root if none is
specified). Returns a list of strings with the names of files in the
specified directory. If long_format is True then a list of 2-tuples
with the name and size (in bytes) of the item is returned. Note that
it appears the size of directories is not supported by MicroPython and
will always return 0 (i.e. no recursive size computation).
"""
# Disabling for now, see https://github.com/adafruit/ampy/issues/55.
# # Make sure directory ends in a slash.
# if not directory.endswith("/"):
# directory += "/"
# Make sure directory starts with slash, for consistency.
if not directory.startswith("/"):
directory = "/" + directory
command = """\
try:
import os
except ImportError:
import uos as os\n"""
if recursive:
command += """\
def listdir(directory):
result = set()
def _listdir(dir_or_file):
try:
# if its a directory, then it should provide some children.
children = os.listdir(dir_or_file)
except OSError:
# probably a file. run stat() to confirm.
os.stat(dir_or_file)
result.add(dir_or_file)
else:
# probably a directory, add to result if empty.
if children:
# queue the children to be dealt with in next iteration.
for child in children:
# create the full path.
if dir_or_file == '/':
next = dir_or_file + child
else:
next = dir_or_file + '/' + child
_listdir(next)
else:
result.add(dir_or_file)
_listdir(directory)
return sorted(result)\n"""
else:
command += """\
def listdir(directory):
if directory == '/':
return sorted([directory + f for f in os.listdir(directory)])
else:
return sorted([directory + '/' + f for f in os.listdir(directory)])\n"""
# Execute os.listdir() command on the board.
if long_format:
command += """
r = []
for f in listdir('{0}'):
size = os.stat(f)[6]
r.append('{{0}} - {{1}} bytes'.format(f, size))
print(r)
""".format(
directory
)
else:
command += """
print(listdir('{0}'))
""".format(
directory
)
self._pyboard.enter_raw_repl()
try:
out = self._pyboard.exec_(textwrap.dedent(command))
except PyboardError as ex:
# Check if this is an OSError #2, i.e. directory doesn't exist and
# rethrow it as something more descriptive.
message = ex.args[2].decode("utf-8")
if message.find("OSError") != -1 and message.find("2") != 1:
raise RuntimeError("No such directory: {0}".format(directory))
else:
raise ex
self._pyboard.exit_raw_repl()
# Parse the result list and return it.
return ast.literal_eval(out.decode("utf-8"))
def mkdir(self, directory, exists_okay=False):
"""Create the specified directory. Note this cannot create a recursive
hierarchy of directories, instead each one should be created separately.
"""
# Execute os.mkdir command on the board.
command = """
try:
import os
except ImportError:
import uos as os
os.mkdir('{0}')
""".format(
directory
)
self._pyboard.enter_raw_repl()
try:
out = self._pyboard.exec_(textwrap.dedent(command))
except PyboardError as ex:
# Check if this is an OSError #17, i.e. directory already exists.
message = ex.args[2].decode("utf-8")
if message.find("OSError") != -1 and message.find("17") != -1:
if not exists_okay:
raise DirectoryExistsError(
"Directory already exists: {0}".format(directory)
)
else:
raise ex
self._pyboard.exit_raw_repl()
def put(self, filename, data):
"""Create or update the specified file with the provided data.
"""
# Open the file for writing on the board and write chunks of data.
self._pyboard.enter_raw_repl()
self._pyboard.exec_("f = open('{0}', 'wb')".format(filename))
size = len(data)
# Loop through and write a buffer size chunk of data at a time.
for i in range(0, size, BUFFER_SIZE):
chunk_size = min(BUFFER_SIZE, size - i)
chunk = repr(data[i : i + chunk_size])
# Make sure to send explicit byte strings (handles python 2 compatibility).
if not chunk.startswith("b"):
chunk = "b" + chunk
self._pyboard.exec_("f.write({0})".format(chunk))
self._pyboard.exec_("f.close()")
self._pyboard.exit_raw_repl()
def rm(self, filename):
"""Remove the specified file or directory."""
command = """
try:
import os
except ImportError:
import uos as os
os.remove('{0}')
""".format(
filename
)
self._pyboard.enter_raw_repl()
try:
out = self._pyboard.exec_(textwrap.dedent(command))
except PyboardError as ex:
message = ex.args[2].decode("utf-8")
# Check if this is an OSError #2, i.e. file/directory doesn't exist
# and rethrow it as something more descriptive.
if message.find("OSError") != -1 and message.find("2") != 1:
raise RuntimeError("No such file/directory: {0}".format(filename))
# Check for OSError #13, the directory isn't empty.
if message.find("OSError") != -1 and message.find("13") != 1:
raise RuntimeError("Directory is not empty: {0}".format(filename))
else:
raise ex
self._pyboard.exit_raw_repl()
def rmdir(self, directory, missing_okay=False):
"""Forcefully remove the specified directory and all its children."""
# Build a script to walk an entire directory structure and delete every
# file and subfolder. This is tricky because MicroPython has no os.walk
# or similar function to walk folders, so this code does it manually
# with recursion and changing directories. For each directory it lists
# the files and deletes everything it can, i.e. all the files. Then
# it lists the files again and assumes they are directories (since they
# couldn't be deleted in the first pass) and recursively clears those
# subdirectories. Finally when finished clearing all the children the
# parent directory is deleted.
command = """
try:
import os
except ImportError:
import uos as os
def rmdir(directory):
os.chdir(directory)
for f in os.listdir():
try:
os.remove(f)
except OSError:
pass
for f in os.listdir():
rmdir(f)
os.chdir('..')
os.rmdir(directory)
rmdir('{0}')
""".format(
directory
)
self._pyboard.enter_raw_repl()
try:
out = self._pyboard.exec_(textwrap.dedent(command))
except PyboardError as ex:
message = ex.args[2].decode("utf-8")
# Check if this is an OSError #2, i.e. directory doesn't exist
# and rethrow it as something more descriptive.
if message.find("OSError") != -1 and message.find("2") != 1:
if not missing_okay:
raise RuntimeError("No such directory: {0}".format(directory))
else:
raise ex
self._pyboard.exit_raw_repl()
def run(self, filename, wait_output=True, stream_output=True):
"""Run the provided script and return its output. If wait_output is True
(default) then wait for the script to finish and then return its output,
otherwise just run the script and don't wait for any output.
If stream_output is True(default) then return None and print outputs to
stdout without buffering.
"""
self._pyboard.enter_raw_repl()
out = None
if stream_output:
self._pyboard.execfile(filename, stream_output=True)
elif wait_output:
# Run the file and wait for output to return.
out = self._pyboard.execfile(filename)
else:
# Read the file and run it using lower level pyboard functions that
# won't wait for it to finish or return output.
with open(filename, "rb") as infile:
self._pyboard.exec_raw_no_follow(infile.read())
self._pyboard.exit_raw_repl()
return out