-
Notifications
You must be signed in to change notification settings - Fork 0
/
udpcommunication.py
34 lines (28 loc) · 1 KB
/
udpcommunication.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import socket
localhost = "127.0.0.1"
class UDPCommunication:
@staticmethod
def recv_UDP_block(port, buff_size):
while True:
yield UDPCommunication.recv_UDP(port, buff_size)
@staticmethod
def recv_UDP(port, buff_size):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((localhost, port))
data, addr = sock.recvfrom(buff_size)
return data
@staticmethod
def recv_UDP_nonblock(port, buff_size, block_time_s):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((localhost, port))
sock.settimeout(block_time_s)
try:
data, addr = sock.recvfrom(buff_size)
except socket.timeout:
print("TIMED OUT")
data=None
return data
@staticmethod
def send_UDP(msg_bytes:bytes, port):
UDPClientSocket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
UDPClientSocket.sendto(msg_bytes, (localhost, port))