forked from hpcjmart/kafka-kafdrop-example-ha
-
Notifications
You must be signed in to change notification settings - Fork 0
/
producer.py
130 lines (107 loc) · 4.15 KB
/
producer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import os
import websocket
import json
import io
import avro.schema
import avro.io
from kafka import KafkaProducer
# https://finnhub.io/docs/api/websocket-trades
class FinnhubProducer:
def __init__(self):
"""
Producer class that connects to the finnhub websocket, encodes & validates the JSON payload
in avro format against pre-defined schema then sends data to kafka.
"""
# define config from config file
self.config = self.load_config('config.json')
# define the kafka producer here. This assumes there is a kafka server already setup at the address and port
#self.producer = KafkaProducer(bootstrap_servers=f"{self.config['KAFKA_SERVER']}:{self.config['KAFKA_PORT']}",api_version=(0, 10, 1))
kafka_servers=[config['KAFKA_SERVER'] + config['KAFKA_PORT']
self.producer = KafkaProducer(bootstrap_servers = kafka_servers,api_version=(0, 10, 1))
# define the avro schema here. This assumes the schema is already defined in the src/schemas folder
# this helps us enforce the schema when we send data to kafka
self.avro_schema = avro.schema.parse(open('trades.avsc').read())
print("AVRO schema loaded")
# define the websocket client
self.ws = websocket.WebSocketApp(f"wss://ws.finnhub.io?token={self.config['FINNHUB_API_KEY']}",
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close)
self.ws.on_open = self.on_open
self.ws.run_forever()
def load_config(self, config_file):
with open(config_file, 'r') as f:
config = json.load(f)
return config
def avro_encode(self, data, schema):
"""
Avro encode data using the provided schema.
Parameters
----------
data : dict
Data to encode.
schema : avro.schema.Schema
Avro schema to use for encoding.
Returns
-------
bytes : Encoded data.
"""
writer = avro.io.DatumWriter(schema)
bytes_writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(bytes_writer)
writer.write(data, encoder)
return bytes_writer.getvalue()
def on_message(self, ws, message):
"""
Callback function that is called when a message is received from the websocket.
Parameters
----------
ws : websocket.WebSocketApp
Websocket client.
message : str
Message received from the websocket.
"""
message = json.loads(message)
avro_message = self.avro_encode(
{
'data': message['data'],
'type': message['type']
},
self.avro_schema
)
self.producer.send(self.config['KAFKA_TOPIC_NAME'], avro_message)
def on_error(self, ws, error):
"""
Websocket error callback. This currently just prints the error to the console.
In a production environment, this should be logged to a file or sent to a monitoring service.
Parameters
----------
ws : websocket.WebSocketApp
Websocket client.
error : str
Error message.
"""
print(error)
def on_close(self, ws):
"""
Websocket close callback. This currently just prints a message to the console.
In a production environment, this should be logged to a file or sent to a monitoring service.
Parameters
----------
ws : websocket.WebSocketApp
Websocket client.
"""
print("### closed ###")
def on_open(self, ws):
"""
Websocket open callback. This subscribes to the MSFT stock topic on the websocket.
Parameters
----------
ws : websocket.WebSocketApp
Websocket client.
"""
print("sending subscribe message")
self.ws.send('{"type":"subscribe","symbol":"BINANCE:BTCUSDT"}')
print("subscribed to AAPL")
if __name__ == "__main__":
FinnhubProducer()