forked from commaai/openpilot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_uploader.py
executable file
·154 lines (119 loc) · 5.3 KB
/
test_uploader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#!/usr/bin/env python3
import os
import time
import threading
import unittest
import logging
import json
from selfdrive.swaglog import cloudlog
import selfdrive.loggerd.uploader as uploader
from common.xattr import getxattr
from selfdrive.loggerd.tests.loggerd_tests_common import UploaderTestCase
class TestLogHandler(logging.Handler):
def __init__(self):
logging.Handler.__init__(self)
self.reset()
def reset(self):
self.upload_order = list()
self.upload_ignored = list()
def emit(self, record):
try:
j = json.loads(record.getMessage())
if j["event"] == "upload_success":
self.upload_order.append(j["key"])
if j["event"] == "upload_ignored":
self.upload_ignored.append(j["key"])
except Exception:
pass
log_handler = TestLogHandler()
cloudlog.addHandler(log_handler)
class TestUploader(UploaderTestCase):
def setUp(self):
super().setUp()
log_handler.reset()
def start_thread(self):
self.end_event = threading.Event()
self.up_thread = threading.Thread(target=uploader.uploader_fn, args=[self.end_event])
self.up_thread.daemon = True
self.up_thread.start()
def join_thread(self):
self.end_event.set()
self.up_thread.join()
def gen_files(self, lock=False, boot=True):
f_paths = list()
for t in ["qlog.bz2", "rlog.bz2", "dcamera.hevc", "fcamera.hevc"]:
f_paths.append(self.make_file_with_data(self.seg_dir, t, 1, lock=lock))
if boot:
f_paths.append(self.make_file_with_data("boot", f"{self.seg_dir}.bz2", 1, lock=lock))
return f_paths
def gen_order(self, seg1, seg2, boot=True):
keys = []
if boot:
keys += [f"boot/{self.seg_format.format(i)}.bz2" for i in seg1]
keys += [f"boot/{self.seg_format2.format(i)}.bz2" for i in seg2]
keys += [f"{self.seg_format.format(i)}/qlog.bz2" for i in seg1]
keys += [f"{self.seg_format2.format(i)}/qlog.bz2" for i in seg2]
keys += [f"{self.seg_format.format(i)}/rlog.bz2" for i in seg1]
keys += [f"{self.seg_format2.format(i)}/rlog.bz2" for i in seg2]
for i in seg1:
keys += [f"{self.seg_format.format(i)}/{f}" for f in ['fcamera.hevc', 'dcamera.hevc']]
for i in seg2:
keys += [f"{self.seg_format2.format(i)}/{f}" for f in ['fcamera.hevc', 'dcamera.hevc']]
return keys
def test_upload(self):
self.gen_files(lock=False)
self.start_thread()
# allow enough time that files could upload twice if there is a bug in the logic
time.sleep(5)
self.join_thread()
exp_order = self.gen_order([self.seg_num], [])
self.assertTrue(len(log_handler.upload_ignored) == 0, "Some files were ignored")
self.assertFalse(len(log_handler.upload_order) < len(exp_order), "Some files failed to upload")
self.assertFalse(len(log_handler.upload_order) > len(exp_order), "Some files were uploaded twice")
for f_path in exp_order:
self.assertTrue(getxattr(os.path.join(self.root, f_path), uploader.UPLOAD_ATTR_NAME), "All files not uploaded")
self.assertTrue(log_handler.upload_order == exp_order, "Files uploaded in wrong order")
def test_upload_ignored(self):
self.set_ignore()
self.gen_files(lock=False)
self.start_thread()
# allow enough time that files could upload twice if there is a bug in the logic
time.sleep(5)
self.join_thread()
exp_order = self.gen_order([self.seg_num], [])
self.assertTrue(len(log_handler.upload_order) == 0, "Some files were not ignored")
self.assertFalse(len(log_handler.upload_ignored) < len(exp_order), "Some files failed to ignore")
self.assertFalse(len(log_handler.upload_ignored) > len(exp_order), "Some files were ignored twice")
for f_path in exp_order:
self.assertTrue(getxattr(os.path.join(self.root, f_path), uploader.UPLOAD_ATTR_NAME), "All files not ignored")
self.assertTrue(log_handler.upload_ignored == exp_order, "Files ignored in wrong order")
def test_upload_files_in_create_order(self):
seg1_nums = [0, 1, 2, 10, 20]
for i in seg1_nums:
self.seg_dir = self.seg_format.format(i)
self.gen_files(boot=False)
seg2_nums = [5, 50, 51]
for i in seg2_nums:
self.seg_dir = self.seg_format2.format(i)
self.gen_files(boot=False)
exp_order = self.gen_order(seg1_nums, seg2_nums, boot=False)
self.start_thread()
# allow enough time that files could upload twice if there is a bug in the logic
time.sleep(5)
self.join_thread()
self.assertTrue(len(log_handler.upload_ignored) == 0, "Some files were ignored")
self.assertFalse(len(log_handler.upload_order) < len(exp_order), "Some files failed to upload")
self.assertFalse(len(log_handler.upload_order) > len(exp_order), "Some files were uploaded twice")
for f_path in exp_order:
self.assertTrue(getxattr(os.path.join(self.root, f_path), uploader.UPLOAD_ATTR_NAME), "All files not uploaded")
self.assertTrue(log_handler.upload_order == exp_order, "Files uploaded in wrong order")
def test_no_upload_with_lock_file(self):
f_paths = self.gen_files(lock=True, boot=False)
self.start_thread()
# allow enough time that files should have been uploaded if they would be uploaded
time.sleep(5)
self.join_thread()
for f_path in f_paths:
self.assertFalse(getxattr(f_path, uploader.UPLOAD_ATTR_NAME), "File upload when locked")
if __name__ == "__main__":
unittest.main(failfast=True)