-
Notifications
You must be signed in to change notification settings - Fork 3
/
load_jpg_file.py
216 lines (174 loc) · 6.3 KB
/
load_jpg_file.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
import sys
import os
import binascii
import serial
from serial import SerialException
import time
import struct
import click
from serial.tools.list_ports import comports
global MTU
global filesize
jpeg_signatures = [
binascii.unhexlify(b'FFD8FFD8'),
binascii.unhexlify(b'FFD8FFE0'),
binascii.unhexlify(b'FFD8FFE1')
]
try:
raw_input
except NameError:
raw_input = input # in python3 it's "raw"
unichr = chr
def ask_for_port():
"""\
Show a list of ports and ask the user for a choice. To make selection
easier on systems with long device names, also allow the input of an
index.
"""
sys.stderr.write('\n--- Available ports:\n')
ports = []
for n, (port, desc, hwid) in enumerate(sorted(comports()), 1):
sys.stderr.write('--- {:2}: {:20} {}\n'.format(n, port, desc))
ports.append(port)
while True:
port = raw_input('--- Enter port index or full name: ')
try:
index = int(port) - 1
if not 0 <= index < len(ports):
sys.stderr.write('--- Invalid index!\n')
continue
except ValueError:
pass
else:
port = ports[index]
return port
def getSize(fileobject):
fileobject.seek(0,2) # move the cursor to the end of the file
size = fileobject.tell()
return size
def check_file_jpg(filename):
with open(filename, 'rb') as file:
first_four_bytes = file.read(4)
if first_four_bytes in jpeg_signatures:
print("JPEG detected.")
return 1
else:
print("File does not look like a JPEG.")
sys.exit(0)
return 0
PAYLOAD_JPG_OPCODE_PING = b'\xA0'
PAYLOAD_JPG_OPCODE_FILEINFO_RSP = b'\xA1'
PAYLOAD_JPG_OPCODE_MTU_REQ = b'\xA2'
PAYLOAD_JPG_OPCODE_MTU_RSP = b'\xA3'
PAYLOAD_JPG_OPCODE_DATA_REQ = b'\xA4'
PAYLOAD_JPG_OPCODE_DATA_RSP = b'\xA5'
PAYLOAD_JPG_OPCODE_COMPLETE = b'\xA6'
PAYLOAD_JPG_OPCODE_COMPLETE_ACK = b'\xA7'
PAYLOAD_JPG_OPCODE_DATA_RSP_LAST = b'\xA8'
mtu_length = "global"
file_offset = "global"
file_offset_len = "global"
file_crc32 = "global"
def main():
DEFAULT_MTU_LEN = 20
port_list = ask_for_port()
print("You select the port number")
print(port_list)
# print ("Number of argument = ", len(sys.argv))
len_argument = len(sys.argv)
filesize = 0
if (len_argument != 2):
print ("")
print ("Error - Please provide JPG file!!!")
print ("")
print ("Correct Usage:")
print ("python load_jpg_file.py <jpg_file>")
print ("")
sys.exit(0)
# Open the JPG file
try:
binary_file = open(sys.argv[1], 'rb')
binary_data = binary_file.read()
binary_crc = binascii.crc32(binary_data)
filesize = getSize(binary_file)
print("filesize and CRC", filesize, binary_crc)
except:
print ("Fail to open jpg file ", sys.argv[1])
sys.exit(0)
# Open the Serial Port
try:
ser = serial.Serial(port_list, 115200, timeout=30)
except:
print ("Please check to open the correct com port!!")
sys.exit(0)
print ("Waiting to get the PING Message from DK Board [Timeout = 30 sec]!!")
payload_len = 3
ping_request = ser.read(3)
print (ping_request)
if ping_request == '':
print ("[Error] : Timeout, missing the ping message from the DK board!!")
sys.exit(0)
(payload_len, opcode, mtu_len) = struct.unpack('BsB', ping_request)
print (payload_len, opcode, mtu_len)
if (opcode != PAYLOAD_JPG_OPCODE_PING):
print ("[Error]: Incorrect OPCODE PING", opcode)
sys.exit(0)
else:
print ("Get the PING message from device !!!")
if (mtu_len > 247):
print ("[Error] MTU Length (", mtu_len, ") is > 247!!")
sys.exit(0)
# # Send the filesize
payload_len = 9 # Opcode (1) + FileSize(4) + CRC32(4)
ser.write(struct.pack("B", payload_len))
ser.write(PAYLOAD_JPG_OPCODE_FILEINFO_RSP)
#Send the file size
ser.write(filesize.to_bytes(4, byteorder='little', signed=True))
#Send File CRC32
ser.write(binary_crc.to_bytes(4, byteorder='little', signed=False))
data_index = 0
offset = 0
length = mtu_len
data_index = 0
offset_addr = 0
while (data_index < filesize):
print (offset_addr, length, filesize)
print ("Waiting for the data request [offset, length] !!!")
data_request = ser.read(8)
print (data_request)
(payload_len, opcode, request_length, offset_addr) = struct.unpack('BsHL', data_request)
print (payload_len, opcode, "request length = ", request_length, "offset address = ", offset_addr)
if ((opcode == PAYLOAD_JPG_OPCODE_DATA_REQ) or (opcode == PAYLOAD_JPG_OPCODE_DATA_RSP_LAST)):
print ("OPCODE ", opcode)
else:
print ("Failure on incorrect OPCODE", opcode)
sys.exit(0)
if (offset_addr+request_length > filesize):
print ("Error: the request offset address is outside the filesize!!")
sys.exit(0)
data_index = 0
offset = offset_addr
length = mtu_len
while (data_index < request_length):
if (data_index >= request_length - mtu_len):
length = request_length - data_index
else:
length = mtu_len
binary_file.seek(offset_addr+data_index, 0)
print ("offset_addr: ", offset_addr, "data_index ", data_index, "length ", length, "filesize ", filesize)
payload_len = length + 1
ser.write(struct.pack("B", payload_len))
if (data_index+length >= request_length):
print ("PAYLOAD_JPG_OPCODE_DATA_RSP_LAST: offset_addr", offset_addr, "data_index ", data_index, "length ", length)
ser.write(PAYLOAD_JPG_OPCODE_DATA_RSP_LAST)
else:
print ("PAYLOAD_JPG_OPCODE_DATA_RSP : offset_addr", offset_addr, "data_index ", data_index, "length", length)
ser.write(PAYLOAD_JPG_OPCODE_DATA_RSP)
ser.write(binary_file.read(length))
# time.sleep(10)
data_index = data_index + length
if (offset_addr+data_index >= filesize):
sys.exit(0)
binary_file.close()
if __name__=="__main__":
main()