-
Notifications
You must be signed in to change notification settings - Fork 0
/
requester.py
76 lines (62 loc) · 2.44 KB
/
requester.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import sys
import traceback
import time
import json
import os
from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2
from awsiot.greengrasscoreipc.model import (
SubscriptionResponseMessage,
UnauthorizedError,
PublishMessage,
BinaryMessage
)
def main():
topic = 'local/inference'
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
print('BASE_DIR = ', BASE_DIR)
try:
ipc_client = GreengrassCoreIPCClientV2()
# Subscription operations return a tuple with the response and the operation.
_, operation = ipc_client.subscribe_to_topic(topic="local/result", on_stream_event=on_stream_event,
on_stream_error=on_stream_error, on_stream_closed=on_stream_closed)
print('Successfully subscribed to topic: ' + "local/result")
try:
while True:
message = {
'image_dir': BASE_DIR,
'fname': 'pelican.jpeg'
}
publish_binary_message_to_topic(ipc_client, topic, json.dumps(message))
print('request:', json.dumps(message))
time.sleep(5)
except InterruptedError:
print('Publisher interrupted.')
# To stop subscribing, close the stream.
operation.close()
except UnauthorizedError:
print('Unauthorized error while subscribing to topic: ' +
topic, file=sys.stderr)
traceback.print_exc()
exit(1)
except Exception:
print('Exception occurred', file=sys.stderr)
traceback.print_exc()
exit(1)
def publish_binary_message_to_topic(ipc_client, topic, message):
binary_message = BinaryMessage(message=bytes(message, 'utf-8'))
publish_message = PublishMessage(binary_message=binary_message)
ipc_client.publish_to_topic(topic=topic, publish_message=publish_message)
def on_stream_event(event: SubscriptionResponseMessage) -> None:
try:
message = str(event.binary_message.message, 'utf-8')
print('result: %s' % (message))
except:
traceback.print_exc()
def on_stream_error(error: Exception) -> bool:
print('Received a stream error.', file=sys.stderr)
traceback.print_exc()
return False # Return True to close stream, False to keep stream open.
def on_stream_closed() -> None:
print('Subscribe to topic stream closed.')
if __name__ == '__main__':
main()