-
Notifications
You must be signed in to change notification settings - Fork 0
/
qna.py
100 lines (80 loc) · 3.58 KB
/
qna.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import time
import os
import queue
import threading
import logging
import sys
from process_pdf import PDFProcessor
import fewlines.dashboard as fd
class Assistant:
def __init__(self, working_dir) -> None:
self.work_queue = queue.Queue()
self.done_queue = queue.Queue()
self.dir = working_dir
# configuration
self.tick_period = 0.5
self.debounce_delay = 1.0
self.stats_print_period = 30.0
self.processor = PDFProcessor()
def start(self):
threading.Thread(target=self.filemonitor, daemon=True).start()
while True:
(path, tstamp) = self.work_queue.get()
# process_pdf returns True if:
# - no need to reply to anything
# - there was a need to reply and it succeeded
# returns False if there was a need to reply and a failure to do so (e.g. network error)
self.done_queue.put((path, tstamp, self.processor.process_pdf(path)))
# this is just queue housekeeping, if the task failed (see above) we'll enqueue file again
self.work_queue.task_done()
def filemonitor(self):
last_update_time_processed = {}
enqueued = set()
# TODO: log these occasionally
total_enqueued = 0
total_done = 0
last_printed = time.monotonic()
while True:
# non-blocking read from done queue
while True:
try:
(path, tstamp, success) = self.done_queue.get_nowait()
if success:
last_update_time_processed[path] = tstamp
total_done += 1
# so that we can retry and enqueue again
enqueued.remove(path)
except queue.Empty:
break
else:
self.done_queue.task_done()
for root, dirs, files in os.walk(self.dir):
for file in files:
if file.lower().endswith('.pdf'):
file_path = os.path.join(root, file)
last_processed_time = last_update_time_processed.get(file_path, 0)
last_update_time = os.stat(file_path).st_mtime_ns
debounce_cutoff = int(1e9 * (time.time() - self.debounce_delay))
# TODO: this is not correct in general, as time might,
# for example, go backwards during clock adjustment
if last_processed_time < last_update_time and not file_path in enqueued:
if last_update_time > debounce_cutoff:
logging.info(f'{file_path} updated, waiting for debounce')
continue
total_enqueued += 1
self.work_queue.put((file_path, last_update_time))
enqueued.add(file_path)
continue
now = time.monotonic()
if now > last_printed + self.stats_print_period:
for l in fd.histograms("*latency*"):
logging.info(l)
for l in fd.histograms("*tokens*"):
logging.info(l)
last_printed = now
time.sleep(self.tick_period)
if __name__ == "__main__":
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO)
if len(sys.argv) < 2:
logging.error(f'usage: qna.py path/to/folder/to/monitor/ [local_url]')
Assistant(sys.argv[1]).start()