-
Notifications
You must be signed in to change notification settings - Fork 0
/
main_sqs.py
176 lines (140 loc) · 7.89 KB
/
main_sqs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# Standard library imports
import os # Used for Diagnostics
import sys
sys.path.append("..")
import time
import uuid
# import pprint
import traceback
import logging
# Third party library imports
import click
import coloredlogs
# Local application importsimport os
from helper import timeHelper
from helper import jsonHelper
from helper import apiMgmt
from helper import configureLogging
from awsHelper import awsCommonHelper
from awsHelper import awsSqsHelper
# Module Constants
# Module level variables and code
configureLogging.setupLogging()
logger = logging.getLogger(__name__)
coloredlogs.install()
# Diagnostics logs
# pp = pprint.PrettyPrinter(indent=2)
# if("os" in dir()):
# logger.info(f"main_sqs::os.getcwd(): '{os.getcwd()}'")
# logger.info(f"main_sqs::ModuleName: '{__name__}'")
# logger.info(f"main_sqs::dir(): {dir()}")
@click.group()
def main():
pass
@main.command("rm", help="Receive messages")
@click.argument("QueueName", type=click.STRING) # help="SQS queue (name) to recieve message from")
@click.option("-c", "--count", type=click.INT, default=10, help="Number of times to loop for receiving messages. Defaults to 10.")
@click.option("-w", "--wait", type=click.INT, default=5, help="Number of seconds to wait between each message. Simulate long message processing time. Defaults to 5.")
def receiveMessages(queuename, count, wait):
receiveMessagesLL(queuename, count, wait)
@main.command("sms", help="Send standard messages")
@click.argument("QueueName", type=click.STRING) # help="SQS queue (name) to send message to")
@click.option("-c", "--count", type=click.INT, default=1, help="Number of mesages to send. Defaults to 1.")
@click.option("-i", "--interval", type=click.INT, default=2, help="Duration between messages in Seconds. Defaults to 2.")
def sendStandardMessages(queuename, count, interval):
sendMessagesLL(queuename, count=count, interval=interval)
@main.command("smf", help="Send FIFO messages")
@click.argument("QueueName", type=click.STRING) # help="SQS queue (name) to send message to")
@click.option("-c", "--count", type=click.INT, default=1, help="Number of mesages to send. Defaults to 1.")
@click.option("-i", "--interval", type=click.INT, default=2, help="Duration between messages in Seconds. Defaults to 2.")
@click.option("-g", "--samegroup", type=click.BOOL, is_flag=True, default=False, help="Use same UUID for all messages in group. Applicable for FIFO Queue. Defaults to False.")
@click.option("-d", "--samededup", type=click.BOOL, is_flag=True, default=False, help="Use same UUID for all messages for deduplication. Applicable for FIFO Queue. Defaults to False.")
def sendFifoMessages(queuename, count, interval, samegroup, samededup):
sendMessagesLL(queuename, count=count, interval=interval, useSameGroupId=samegroup, useSameDeduplicationId=samededup)
def sendMessagesLL(queueName, count=1, interval=2, useSameGroupId=False, useSameDeduplicationId=False):
# Test Scenario 01: None
# result = awsSqsHelper.sendMessage(queueName, message=None)
# Test Scenario 02: Tuple type
# result = awsSqsHelper.sendMessage(queueName, message=("a",2))
# Test Scenario 03: String type
# result = awsSqsHelper.sendMessage(queueName, message=f"Hi There (sent at {timeHelper.getUTCDateTimeString()})")
# Test Scenario 04: Dictionary type
successCount = 0
messageGroupId = None
messageDeduplicationId = None
logger.info(f"main_sqs::sendMessagesLL() >> Parameters >> queueName: '{queueName}', count: {count}, interval: {interval} second(s), useSameGroupId: {useSameGroupId}, useSameDeduplicationId: {useSameDeduplicationId}")
if(useSameGroupId):
messageGroupId = str(uuid.uuid4())
if(useSameDeduplicationId):
messageDeduplicationId = str(uuid.uuid4())
for i in range(1, count+1):
if(i != 1):
time.sleep(interval)
logger.info(f"main_sqs::sendMessagesLL() >> Sending message #{i}...")
currentDateTime = timeHelper.getUTCDateTimeString()
messageBody = getDefaultMessageBody(modifiedAt=currentDateTime)
messageAttributes = getMessageAttributes("HelloWorldType", currentDateTime=currentDateTime)
messageBody["body"] = f"Hello World sent at {currentDateTime}."
result = awsSqsHelper.sendMessage(
queueName,
messageBody,
messageAttributes=messageAttributes,
messageGroupId=messageGroupId,
messageDeduplicationId=messageDeduplicationId)
logger.info(f"main_sqs::sendMessagesLL() >> result: {jsonHelper.convertObjectToFormattedJson(result)}")
if(apiMgmt.isResultFailure(result)):
logger.error(apiMgmt.getResultErrorStackTrace(result))
else:
successCount += 1
logger.info(f"main_sqs::sendMessagesLL() >> Sent successfully.")
logger.info(f"main_sqs::sendMessagesLL() >> Successfully sent {successCount} of {count} message(s).")
def getDefaultMessageBody(modifiedBy="Jyotindra", modifiedAt=None):
messageBody = {}
messageBody["modifiedBy"] = modifiedBy
messageBody["modifiedAt"] = modifiedAt if modifiedAt else timeHelper.getUTCDateTimeString()
return messageBody
def getMessageAttributes(messageType, messageVersion="1.0", messageFormat="application/json", currentDateTime=None):
messageAttributes = {}
messageAttributes["messageVersion"] = messageVersion
messageAttributes["messageFormat"] = messageFormat
messageAttributes["messageType"] = messageType
messageAttributes["messageSentAt"] = currentDateTime if currentDateTime else timeHelper.getUTCDateTimeString()
return messageAttributes
def receiveMessagesLL(queuename, count=10, wait=5):
queueName = queuename
successCount = receivedCount = 0
logger.info(f"main_sqs::receiveMessagesLL() >> Parameters >> queueName: '{queueName}', count: {count}, wait: {wait} second(s)")
for i in range(1, count+1):
result = awsSqsHelper.receiveMessages(queueName, messageProcessorFunc=receiveMessageProcessor)
logger.debug(f"receiveMessagesLL result: {result}")
if(apiMgmt.isResultSuccess(result)):
messages = result[awsSqsHelper.CONSTANTS.CONTEXT.MESSAGES]
receivedCount += len(messages)
logger.info(f"main_sqs::receiveMessagesLL() >> Received {len(messages)} message(s). Processing messages...")
for index, message in enumerate(messages):
resultProcessor = receiveMessageProcessor(message, index)
time.sleep(wait)
if(resultProcessor):
logger.info(f"main_sqs::receiveMessagesLL() >> Message processed successfully.")
successCount += 1
awsSqsHelper.deleteMessage(message)
else:
logger.error(f"main_sqs::receiveMessagesLL() >> Message processing FAILED.")
logger.info("")
logger.info(f"main_sqs::receiveMessagesLL() >> Successfully processed {successCount} of {receivedCount} received message(s).")
def receiveMessageProcessor(message, index):
result = True
logger.info(f"main_sqs::receiveMessageProcessor() >> Processing message #{index+1}...")
messageBody = jsonHelper.convertJsonToObject(message.body)
messageAttributes = awsCommonHelper.deserializeAttributeLisToDict(message.message_attributes)
logger.info(f" Message Id: {message.message_id}")
logger.info(f" Message Attributes: {jsonHelper.convertObjectToFormattedJson(message.attributes)}")
logger.info(f" User Defined Body: {jsonHelper.convertObjectToFormattedJson(messageBody)}")
logger.info(f" User DefinedAttributes: {jsonHelper.convertObjectToFormattedJson(messageAttributes)}")
return result
if( __name__ == "__main__"):
try:
main()
except Exception as ex:
logger.error(f"main_sqs::main() >> Exception has occurred. Error: '{ex.__str__()}'", exc_info=True)
# logger.error(traceback.format_exc())