/
aws_sns_sqs_middleware_service.py
65 lines (49 loc) 路 2.57 KB
/
aws_sns_sqs_middleware_service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import os
from typing import Any, Callable, Dict
import tomodachi
from tomodachi import aws_sns_sqs, aws_sns_sqs_publish
from tomodachi.discovery import AWSSNSRegistration
from tomodachi.protocol import JsonBase
async def middleware_function(func: Callable, service: Any, message: Any, topic: str, context: Dict, *args: Any, **kwargs: Any) -> Any:
# Functionality before function is called
service.log('middleware before')
return_value = await func(*args, **kwargs)
# There's also the possibility to pass in extra arguments or keywords arguments, for example:
# return_value = await func(*args, id='overridden', **kwargs)
# Functinoality after function is called
service.log('middleware after')
return return_value
@tomodachi.service
class ExampleAWSSNSSQSService(tomodachi.Service):
name = 'example_aws_sns_sqs_service'
log_level = 'INFO'
uuid = os.environ.get('SERVICE_UUID')
# Build own "discovery" functions, to be run on start and stop
# See tomodachi/discovery/aws_sns_registration.py for example
discovery = [AWSSNSRegistration]
# The message protocol class defines how a message should be processed when sent and received
# See tomodachi/protocol/json_base.py for a basic example using JSON and transferring some metadata
message_protocol = JsonBase
# Adds a middleware function that is run on every incoming message.
# Several middlewares can be chained.
message_middleware = [middleware_function]
# Some options can be specified to define credentials, used ports, hostnames, access log, etc.
options = {
'aws_sns_sqs': {
'region_name': None, # specify AWS region (example: 'eu-west-1')
'aws_access_key_id': None, # specify AWS access key (example: 'AKIAXNTIENCJIY2STOCI')
'aws_secret_access_key': None # specify AWS secret key (example: 'f7sha92hNotarealsecretkeyn29ShnSYQi3nzgA')
},
'aws_endpoint_urls': {
'sns': None, # For example 'http://localhost:4575' if localstack is used for testing
'sqs': None # For example 'http://localhost:4576' if localstack is used for testing
}
}
@aws_sns_sqs('example-route1')
async def route1a(self, data: Any) -> None:
self.log('Received data (function: route1a) - "{}"'.format(data))
async def _started_service(self) -> None:
async def publish(data: Any, topic: str) -> None:
self.log('Publish data "{}"'.format(data))
await aws_sns_sqs_publish(self, data, topic=topic, wait=False)
await publish('鍙嬮仈', 'example-route1')