-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprocessor.py
56 lines (46 loc) · 1.56 KB
/
processor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import socket
import threading
import configparser
import sys
import os
import queue
sys.path.append(os.path.abspath('..'))
from usr_func import *
class DataProcessor:
def __init__(self, host, port, process_func):
self.host = host
self.port = port
self.process_func = process_func
self.queue = queue.Queue()
def start(self):
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_socket.bind((self.host, self.port))
print('Listening on {}:{}'.format(self.host, self.port))
t = threading.Thread(target=self.__get, args=(server_socket,))
t.start()
def __get(self, server_socket):
while True:
data, addr = server_socket.recvfrom(1024)
processed_data = self.process_func(data)
self.queue.put(processed_data)
def get_data(self):
try:
item = self.queue.get(block=False)
return item
except queue.Empty:
pass
if __name__ == '__main__':
# read configuration from file
config = configparser.ConfigParser()
config.read('../port.ini')
# create data processors for each data source
processors = []
for section in config.sections():
host = config.get(section, 'host')
port = config.getint(section, 'port')
process_func = globals()[config.get(section, 'process_func')]
processor = DataProcessor(host, port, process_func)
processors.append(processor)
# start data processors
for processor in processors:
processor.start()