/
robocat.py
90 lines (73 loc) · 2.65 KB
/
robocat.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import sys
import time
import logging
import json
import awsiot.greengrasscoreipc
import awsiot.greengrasscoreipc.client as client
from awsiot.greengrasscoreipc.model import (
IoTCoreMessage,
QOS,
PublishToIoTCoreRequest,
SubscribeToIoTCoreRequest
)
TIMEOUT = 10
ipc_client = awsiot.greengrasscoreipc.connect()
logging.basicConfig(level=logging.INFO)
class StreamHandler(client.SubscribeToIoTCoreStreamHandler):
def __init__(self):
super().__init__()
def on_stream_event(self, event: IoTCoreMessage) -> None:
try:
message = str(event.message.payload, "utf-8")
logging.info(message)
except:
logging.error("Exception occurred", exc_info=True)
def on_stream_error(self, error: Exception) -> bool:
logging.error(f"Stream error: {error}")
return True
def on_stream_closed(self) -> None:
logging.info("Stream closed")
def publish_message(topic: str, message: str):
logging.info(f"Publishing message to topic {topic}: {message}")
request = PublishToIoTCoreRequest()
request.topic_name = topic
request.qos = QOS.AT_MOST_ONCE
request.payload = bytes(message, "utf-8")
operation = ipc_client.new_publish_to_iot_core()
operation.activate(request)
operation.get_response().result(TIMEOUT)
if __name__ == "__main__":
# args : enabled, frequency, sub_topic, pub_topic
if len(sys.argv) == 5:
if sys.argv[1].lower() == "true":
enabled = True
else:
enabled = False
frequency = float(sys.argv[2])
sub_topic = sys.argv[3]
pub_topic = sys.argv[4]
logging.info(f"enabled: {enabled}, frequency: {frequency}, sub_topic: {sub_topic}, pub_topic: {pub_topic}")
handler = StreamHandler()
request = SubscribeToIoTCoreRequest()
request.topic_name = sub_topic
request.qos = QOS.AT_MOST_ONCE
operation = ipc_client.new_subscribe_to_iot_core(handler)
operation.activate(request)
future_response = operation.get_response()
future_response.result(TIMEOUT)
message_count = 1
try:
while True:
time.sleep(frequency)
if enabled:
message = json.dumps({
"id": message_count,
"timestamp": time.time()
})
publish_message(pub_topic, message)
message_count += 1
except:
logging.error("Exception occurred", exc_info=True)
operation.close()
else:
logging.error(f'4 arguments required, only {len(sys.argv) - 1} provided.')