-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
94 lines (71 loc) · 3.26 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# import Utility modules
import os
import ast
import datetime
import logging
# import vendor-specific modules
from quixstreams import Application
from quixstreams.models.serializers.quix import JSONDeserializer
from influxdb_client_3 import InfluxDBClient3
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
consumer_group_name = os.environ.get('CONSUMER_GROUP_NAME', 'influxdb-data-writer')
# Create an Application that uses local Kafka
app = Application(
broker_address=os.environ.get('BROKER_ADDRESS','localhost:9092'),
consumer_group=consumer_group_name,
auto_create_topics=True,
auto_offset_reset='earliest'
)
# Override the app variable if the local development env var is set to false or is not present.
localdev = os.environ.get('localdev', 'false')
if localdev == 'false':
# Create a Quix platform-specific application instead
app = Application(consumer_group=consumer_group_name, auto_create_topics=True, auto_offset_reset='earliest')
input_topic = app.topic(os.environ['input'], value_deserializer=JSONDeserializer())
# Read the environment variable and convert it to a dictionary
tag_keys = ast.literal_eval(os.environ.get('INFLUXDB_TAG_KEYS', '[]'))
field_keys = ast.literal_eval(os.environ.get('INFLUXDB_FIELD_KEYS', '[]'))
# Read the environment variable for the field(s) to get.
# For multiple fields, use a list '['field1','field2']'
influx3_client = InfluxDBClient3(token=os.environ['INFLUXDB_TOKEN'],
host=os.environ['INFLUXDB_HOST'],
org=os.environ['INFLUXDB_ORG'],
database=os.environ['INFLUXDB_DATABASE'])
def send_data_to_influx(message):
logger.info(f'Processing message: {message}')
try:
# Uses the current time as the timestamp for writing to the sink
# Adjust to use an alternative timestamp if necesssary,
writetime = datetime.datetime.utcnow()
writetime = writetime.isoformat(timespec='milliseconds') + 'Z'
measurement_name = os.environ.get('INFLUXDB_MEASUREMENT_NAME', 'measurement1')
# Initialize the tags and fields dictionaries
tags = {}
fields = {}
# Iterate over the tag_dict and field_dict to populate tags and fields
for tag_key in tag_keys:
if tag_key in message:
tags[tag_key] = message[tag_key]
for field_key in field_keys:
if field_key in message:
fields[field_key] = message[field_key]
logger.info(f'Using tag keys: {', '.join(tags.keys())}')
logger.info(f'Using field keys: {', '.join(fields.keys())}')
# Construct the points dictionary
points = {
'measurement': measurement_name,
'tags': tags,
'fields': fields,
'time': message['time']
}
influx3_client.write(record=points, write_precision='ms')
print(f'{str(datetime.datetime.utcnow())}: Persisted ponts to influx: {points}')
except Exception as e:
print(f'{str(datetime.datetime.utcnow())}: Write failed')
print(e)
sdf = app.dataframe(input_topic)
sdf = sdf.update(send_data_to_influx)
if __name__ == '__main__':
print('Starting application')
app.run(sdf)