-
Notifications
You must be signed in to change notification settings - Fork 6
/
peer.py
138 lines (102 loc) · 3.93 KB
/
peer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
from threading import Thread
from typing import Tuple
import curio
from curio import socket
import click
from . import stun
class UdpPeer:
def __init__(self, bind_port: int) -> None:
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._sock.bind(('0.0.0.0', bind_port))
async def start(self) -> None:
recv_task = await curio.spawn(self.recv_data)
queue = curio.UniversalQueue()
stdin_thread = Thread(target=read_peer_info, args=(queue,))
stdin_thread.start()
peer_ip, peer_port = await queue.get()
print(f'Connecting to: {peer_ip}:{peer_port}')
await self._sock.sendto(b'hey there!', (peer_ip, peer_port))
stdin_thread.join()
await recv_task.join()
async def recv_data(self) -> None:
while True:
data, addr = await self._sock.recvfrom(4096)
print('Received [{}]: '.format(addr), data)
class TcpPeer:
def __init__(self, bind_port: int) -> None:
self._bind_port = bind_port
self._sock = socket.socket()
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
self._sock.bind(('0.0.0.0', bind_port))
async def start(self) -> None:
queue = curio.UniversalQueue()
stdin_thread = Thread(target=read_peer_info, args=(queue,))
stdin_thread.start()
peer_ip, peer_port = await queue.get()
print(f'Connecting to: {peer_ip}:{peer_port}')
await self._sock.connect((peer_ip, peer_port))
# We must start reading only when connected, otherwise Linux returns
# socket error #107
recv_task = await curio.spawn(self.recv_data)
print('Connected')
await self._sock.send(b'hey there!')
print('Sent msg')
stdin_thread.join()
await recv_task.join()
async def recv_data(self) -> None:
while True:
data = await self._sock.recv(4096)
if len(data) == 0:
break
print('Received: ', data)
def read_peer_info(queue: curio.UniversalQueue) -> None:
line = input('Enter peer connection info: ')
queue.put(parse_conn_info(line))
@click.command()
@click.option('--protocol', '--proto', 'protocol', default='udp', type=str,
help='Use either TCP or UDP to communicate with STUN server.',)
def main(protocol: str) -> None:
my_ip, my_port = curio.run(whats_my_external_ip, protocol)
print('Public connection info:', my_ip, my_port)
peer = None
if protocol == 'udp':
peer = UdpPeer(my_port)
elif protocol == 'tcp':
peer = TcpPeer(my_port)
else:
raise Exception('Unsupported protocol')
curio.run(peer.start)
def parse_conn_info(ln: str) -> Tuple[str, int]:
parts = ln.strip().split()
return (parts[0], int(parts[1]))
async def whats_my_external_ip(protocol: str='udp') -> Tuple[str, int]:
if protocol == 'udp':
stun_port = 19302
stun_ip = await resolve_hostname('stun.l.google.com', stun_port)
_, ip, port = await stun.get_ip_info(stun_host=stun_ip, stun_port=stun_port)
return (ip, port)
elif protocol == 'tcp':
return await stun.get_ip_for_tcp('35.202.168.244', 80)
else:
raise Exception('Unsupported protocol')
async def resolve_hostname(hostname: str, port: int=None) -> str:
"""DNS resolve hostname.
Args:
hostname: hostname to get IP address for.
port: optional. Used to hint what DNS entry we're looking
for.
Returns:
IP address used to connect to the specified hostname.
"""
try:
res = await socket.getaddrinfo(hostname, port, socket.AF_INET)
if len(res) == 0:
return None
_, _, _, _, socket_addr = res[0]
ip_addr, _ = socket_addr
return ip_addr
except socket.gaierror:
return None
if __name__ == '__main__':
main()