-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
143 lines (119 loc) · 4.87 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# Importing necessary libraries and modules
import os
import random
import json
import logging
import uuid
from time import sleep
from quixstreams import Application
import influxdb_client_3 as InfluxDBClient3
# Initialize logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
consumer_group_name=os.getenv('CONSUMER_GROUP_NAME', 'influxdb-data-reader')
# Create an Application that uses local Kafka
app = Application(
broker_address=os.environ.get('BROKER_ADDRESS','localhost:9092'),
consumer_group=consumer_group_name,
auto_create_topics=True
)
# Override the app variable if the local development env var is set to false or is not present.
localdev = os.environ.get('localdev', 'false')
if localdev == 'false':
# Create a Quix platform-specific application instead
app = Application(consumer_group=consumer_group_name, auto_create_topics=True)
# Define the topic using the 'output' environment variable
topic_name = os.environ['output']
topic = app.topic(name=topic_name)
influxdb3_client = InfluxDBClient3.InfluxDBClient3(token=os.environ['INFLUXDB_TOKEN'],
host=os.environ['INFLUXDB_HOST'],
org=os.environ['INFLUXDB_ORG'],
database=os.environ['INFLUXDB_DATABASE'])
measurement_name = os.environ.get('INFLUXDB_MEASUREMENT_NAME', os.environ['output'])
interval = os.environ.get('task_interval', '5m')
# should the main loop run?
# Global variable to control the main loop's execution
run = True
# Helper function to convert time intervals (like 1h, 2m) into seconds for easier processing.
# This function is useful for determining the frequency of certain operations.
UNIT_SECONDS = {
's': 1,
'm': 60,
'h': 3600,
'd': 86400,
'w': 604800,
'y': 31536000,
}
def interval_to_seconds(interval: str) -> int:
try:
return int(interval[:-1]) * UNIT_SECONDS[interval[-1]]
except ValueError as e:
if 'invalid literal' in str(e):
raise ValueError(
'interval format is {int}{unit} i.e. \'10h\'; '
f'valid units: {list(UNIT_SECONDS.keys())}')
except KeyError:
raise ValueError(
f'Unknown interval unit: {interval[-1]}; '
f'valid units: {list(UNIT_SECONDS.keys())}')
interval_seconds = interval_to_seconds(interval)
# Function to fetch data from InfluxDB and send it to Quix
# It runs in a continuous loop, periodically fetching data based on the interval.
def get_data():
# Run in a loop until the main thread is terminated
while run:
try:
myquery = f'SELECT * FROM \'{measurement_name}\' WHERE time >= {interval}'
print(f'sending query {myquery}')
# Query InfluxDB 3.0 using influxql or sql
table = influxdb3_client.query(
query=myquery,
mode='pandas',
language='influxql')
table = table.drop(columns=['iox::measurement'])
table.rename(columns={'time': 'time_recorded'}, inplace=True)
# If there are rows to write to the stream at this time
if not table.empty:
json_result = table.to_json(orient='records', date_format='iso')
yield json_result
print('query success')
else:
print('No new data to publish.')
# Wait for the next interval
sleep(interval_seconds)
except Exception as e:
print('query failed', flush=True)
print(f'error: {e}', flush=True)
sleep(1)
def main():
"""
Read data from the Query and publish it to Kafka
"""
# Create a pre-configured Producer object.
# Producer is already setup to use Quix brokers.
# It will also ensure that the topics exist before producing to them if
# Quix's Application class is initialized with 'auto_create_topics=True'.
with app.get_producer() as producer:
for res in get_data():
# Parse the JSON string into a Python object
records = json.loads(res)
for index, obj in enumerate(records):
print(obj)
# Generate a unique message_key for each row
message_key = obj['machineId']
logger.info(f'Produced message with key:{message_key}, value:{obj}')
serialized = topic.serialize(
key=message_key, value=obj, headers={'uuid': str(uuid.uuid4())}
)
# publish the data to the topic
producer.produce(
topic=topic.name,
headers=serialized.headers,
key=serialized.key,
value=serialized.value,
)
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Exiting.')