forked from nats-io/nats.py
/
aiohttp-example.py
197 lines (161 loc) · 6.42 KB
/
aiohttp-example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import asyncio
import json
import logging
import signal
import aiohttp
import platform
from aiohttp import web
from datetime import datetime
from nats.aio.client import Client as NATS
__version___ = "0.1.0"
class Component():
def __init__(self,
name="aiohttp-nats-example",
uri="nats://127.0.0.1:4222",
loop=asyncio.get_event_loop(),
logger=logging.getLogger(),
nats_options={},
notify_subject="events",
):
# Default NATS Options
self.name = name
self.version = __version___
self.nc = NATS()
self.loop = loop
self.nats_uri = uri
self.notify_subject = notify_subject
logger.setLevel(logging.DEBUG)
self.logger = logger
default_nats_options = {
"name": self.name,
"io_loop": self.loop,
"servers": [self.nats_uri],
# NATS handlers
"error_cb": self.on_error,
"closed_cb": self.on_close,
"reconnected_cb": self.on_reconnect,
"disconnected_cb": self.on_disconnect,
}
self.nats_options = {**default_nats_options, **nats_options}
async def handle_work(self, request):
self.logger.debug("Received request: {}".format(request))
try:
data = await request.json()
self.logger.debug("Payload: {}".format(data))
except ValueError:
# Bad Request
web.web_response.Response.status = 400
return web.web_response.Response.status
# Work handler notifies of events via NATS
self.logger.debug("Received request: {}".format(request))
try:
await self.nc.publish(self.notify_subject, json.dumps(data).encode())
except Exception as e:
self.logger.error("Error: {}".format(e))
return web.Response(text='{"status": "success"}')
async def on_error(self, e):
self.logger.warning("Error: {}".format(e))
async def on_reconnect(self):
self.logger.warning("Reconnected to NATS at nats://{}".format(self.nc.connected_url.netloc))
async def on_disconnect(self):
self.logger.warning("Disconnected from NATS")
async def on_close(self):
self.logger.warning("Closed connection to NATS")
async def signal_handler(self):
if self.nc.is_connected:
await self.nc.close()
self.loop.stop()
async def start(self):
self.logger.info("Starting {name} v{version}".format(name=self.name, version=self.version))
# Setup NATS client
self.logger.info("Connecting to NATS server at '{}'".format(self.nats_uri))
await self.nc.connect(**self.nats_options)
self.logger.info("Connected to NATS")
# Signal handler
if platform.system() == "Linux":
for sig in ('SIGINT', 'SIGTERM'):
self.loop.add_signal_handler(
getattr(signal, sig),
lambda: asyncio.ensure_future(self.signal_handler()))
# Server
app = web.Application()
runner = web.AppRunner(app)
# Routes
app.router.add_post('/work', self.handle_work)
await runner.setup()
site = web.TCPSite(runner, '0.0.0.0', 8080)
self.logger.info("Server listening at '0.0.0.0:8080'")
await site.start()
class Requestor():
def __init__(self,
uri="http://127.0.0.1:8080",
payload={"hello":"world"},
max_requests=100,
loop=asyncio.get_event_loop(),
logger=logging.getLogger("requestor"),
):
self.uri = uri
self.payload = payload
self.max_requests = max_requests
self.loop = loop
logger.setLevel(logging.DEBUG)
self.logger = logger
async def send_requests(self):
# Start aiohttp connection sending requests
async with aiohttp.ClientSession() as session:
for i in range(0, self.max_requests):
payload = self.payload
payload["seq"] = i
response = await self.send_request(session, payload)
self.logger.debug("Response: {}".format(response))
await asyncio.sleep(0.2, loop=self.loop)
async def send_request(self, session, payload):
async with session.post(self.uri, json=payload) as response:
result = await response.text()
return result
class Subscriber():
def __init__(self,
name="nats-subscriber",
uri="nats://127.0.0.1:4222",
loop=asyncio.get_event_loop(),
logger=logging.getLogger("subscriber"),
nats_options={},
notify_subject="events",
):
# Default NATS Options
self.name = name
self.version = __version___
self.nc = NATS()
self.loop = loop
self.nats_uri = uri
self.notify_subject = notify_subject
logger.setLevel(logging.DEBUG)
self.logger = logger
default_nats_options = {
"name": self.name,
"io_loop": self.loop,
"servers": [self.nats_uri],
}
self.nats_options = {**default_nats_options, **nats_options}
async def events_handler(self, msg):
self.logger.info("NATS Event: {}".format(msg.data.decode()))
async def start(self):
await self.nc.connect(**self.nats_options)
await self.nc.subscribe(self.notify_subject, cb=self.events_handler)
if __name__ == '__main__':
logging.basicConfig(format='[%(process)s] %(asctime)s.%(msecs)03d - %(name)14s - [%(levelname)7s] - %(message)s', datefmt='%Y/%m/%d %I:%M:%S')
loop = asyncio.get_event_loop()
# Start component with defaults
component = Component(loop=loop)
# Can customize NATS connection via nats_options
# component = Component(loop=loop, nats_options={"servers":["nats://127.0.0.1:4223"]})
loop.run_until_complete(component.start())
subscriber = Subscriber(loop=loop)
loop.run_until_complete(subscriber.start())
futures = []
for i in range(0, 10):
requestor = Requestor(uri="http://127.0.0.1:8080/work", payload={"client_id": i}, loop=loop)
future = loop.create_task(requestor.send_requests())
futures.append(future)
loop.run_until_complete(asyncio.wait(futures))
loop.run_forever()