This repository has been archived by the owner on Mar 17, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 57
/
WSLocust.py
65 lines (57 loc) · 2.51 KB
/
WSLocust.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import time
import websocket
from locust import Locust, events
class WSLocustClient(object):
def __init__(self):
self.ws = websocket.WebSocket()
self.connected = False
def connect(self, uuid, uri, task):
start_time = time.time()
try:
self.ws.connect(uri)
except Exception as e:
print('Client [' + uuid + '] failed to connect to websocket.')
total_time = int((time.time() - start_time) * 1000)
events.request_failure.fire(request_type="connect",
name="Connect_to_websocket_"+task,
response_time=total_time,
exception=e)
self.ws.close()
raise e
else:
total_time = int((time.time() - start_time) * 1000)
events.request_success.fire(request_type="connect",
name="Connect_to_websocket_"+task,
response_time=total_time,
response_length=0)
self.connected = True
def send_json_rpc(self, payload, task):
start_time = time.time()
if not self.connected:
print("Client is not connected, cannot send request")
events.request_failure.fire(request_type="send_payload",
name="Send_payload_"+task,
response_time=0,
exception="Not connected.")
try:
self.ws.send(payload)
except Exception as e:
print(e)
total_time = int((time.time() - start_time) * 1000)
events.request_failure.fire(request_type="send_payload",
name="Send_payload_"+task,
response_time=total_time,
exception=e)
raise e
else:
total_time = int((time.time() - start_time) * 1000)
events.request_success.fire(request_type="send_payload",
name="Send_payload_"+task,
response_time=total_time,
response_length=len(payload))
def disconnect(self):
self.ws.close()
class WSLocust(Locust):
def __init__(self, *args, **kwargs):
super(Locust, self)
self.client = WSLocustClient()