Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 37 additions & 12 deletions src/lumigo_tracer/event/event_dumper.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,19 @@
]


class Event:
def __init__(self, event):
"""
Cache propeties of the event in order improve performance.
"""
self.raw_event = event
self.record_event_source = safe_get(event, ["Records", 0, "eventSource"])


class EventParseHandler(ABC):
@staticmethod
@abstractmethod
def is_supported(event) -> bool:
def is_supported(event: Event) -> bool:
raise NotImplementedError()

@staticmethod
Expand All @@ -92,8 +101,8 @@ def get_omit_skip_path() -> Optional[List[str]]:

class S3Handler(EventParseHandler):
@staticmethod
def is_supported(event) -> bool:
return safe_get(event, ["Records", 0, "eventSource"]) == "aws:s3"
def is_supported(event: Event) -> bool:
return event.record_event_source == "aws:s3"

@staticmethod
def parse(event) -> OrderedDict:
Expand Down Expand Up @@ -125,8 +134,8 @@ def get_omit_skip_path() -> Optional[List[str]]:

class CloudfrontHandler(EventParseHandler):
@staticmethod
def is_supported(event) -> bool:
return bool(safe_get(event, ["Records", 0, "cf", "config", "distributionId"], {}))
def is_supported(event: Event) -> bool:
return bool(safe_get(event.raw_event, ["Records", 0, "cf", "config", "distributionId"], {}))

@staticmethod
def parse(event) -> OrderedDict:
Expand All @@ -153,8 +162,8 @@ def parse(event) -> OrderedDict:

class ApiGWHandler(EventParseHandler):
@staticmethod
def is_supported(event) -> bool:
return is_api_gw_event(event=event)
def is_supported(event: Event) -> bool:
return is_api_gw_event(event=event.raw_event)

@staticmethod
def parse(event) -> OrderedDict:
Expand Down Expand Up @@ -182,8 +191,8 @@ def parse(event) -> OrderedDict:

class SNSHandler(EventParseHandler):
@staticmethod
def is_supported(event) -> bool:
return safe_get(event, ["Records", 0, "EventSource"]) == "aws:sns"
def is_supported(event: Event) -> bool:
return safe_get(event.raw_event, ["Records", 0, "EventSource"]) == "aws:sns"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use record_event_source


@staticmethod
def parse(event) -> OrderedDict:
Expand All @@ -201,8 +210,8 @@ def parse(event) -> OrderedDict:

class SQSHandler(EventParseHandler):
@staticmethod
def is_supported(event) -> bool:
return safe_get(event, ["Records", 0, "eventSource"]) == "aws:sqs"
def is_supported(event: Event) -> bool:
return event.record_event_source == "aws:sqs"

@staticmethod
def parse(event) -> OrderedDict:
Expand All @@ -218,6 +227,20 @@ def parse(event) -> OrderedDict:
return new_sqs_event


class DDBHandler(EventParseHandler):
@staticmethod
def is_supported(event: Event) -> bool:
return event.record_event_source == "aws:dynamodb"

@staticmethod
def parse(event) -> OrderedDict:
return event

@staticmethod
def get_omit_skip_path() -> Optional[List[str]]:
return ["Records", "dynamodb", "Keys"]


class EventDumper:
@staticmethod
def dump_event(
Expand All @@ -230,10 +253,12 @@ def dump_event(
SQSHandler(),
S3Handler(),
CloudfrontHandler(),
DDBHandler(),
]
event_obj = Event(event)
for handler in handlers:
try:
if handler.is_supported(event):
if handler.is_supported(event_obj):
return lumigo_dumps(
handler.parse(event),
max_size,
Expand Down
50 changes: 47 additions & 3 deletions src/test/unit/event/test_event_dumper.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@
EventParseHandler,
CloudfrontHandler,
S3Handler,
Event,
)
from lumigo_tracer.lumigo_utils import lumigo_dumps, Configuration


class ExceptionHandler(EventParseHandler):
@staticmethod
def is_supported(event) -> bool:
def is_supported(event, record_event_source=None) -> bool:
raise Exception()

@staticmethod
Expand Down Expand Up @@ -413,7 +414,7 @@ def test_parse_event_sqs():


def test_is_s3_event(s3_event):
assert S3Handler().is_supported(s3_event) is True
assert S3Handler().is_supported(Event(s3_event)) is True


def test_parse_s3_event(s3_event):
Expand Down Expand Up @@ -443,7 +444,7 @@ def test_parse_s3_event(s3_event):


def test_is_cloudfront_event(cloudfront_event):
assert CloudfrontHandler().is_supported(cloudfront_event) is True
assert CloudfrontHandler().is_supported(Event(cloudfront_event)) is True


def test_parse_cloudfront_event(cloudfront_event):
Expand Down Expand Up @@ -577,3 +578,46 @@ def cloudfront_event():
}
]
}


def test_parse_ddb_event():
ddb_event = {
"Records": [
{
"eventID": "22222222222222222222222222222222",
"eventName": "INSERT",
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": "us-west-2",
"dynamodb": {
"ApproximateCreationDateTime": 1613301976,
"Keys": {"k": {"S": "val0"}},
"NewImage": {"v": {"S": "This is a realistic test!"}, "k": {"S": "val0"}},
"SequenceNumber": "111111111111111111111111111",
"SizeBytes": 64,
"StreamViewType": "NEW_AND_OLD_IMAGES",
},
"eventSourceARN": "arn:aws:dynamodb:us-west-2:111111111111:table/table-with-stream/stream/2020-08-25T09:03:34.809",
},
{
"eventID": "22222222222222222222222222222223",
"eventName": "INSERT",
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": "us-west-2",
"dynamodb": {
"ApproximateCreationDateTime": 1613302000,
"Keys": {"k": {"S": "val1"}},
"NewImage": {"v": {"S": "This is a realistic test!"}, "k": {"S": "val1"}},
"SequenceNumber": "111111111111111111111111112",
"SizeBytes": 64,
"StreamViewType": "NEW_AND_OLD_IMAGES",
},
"eventSourceARN": "arn:aws:dynamodb:us-west-2:111111111111:table/table-with-stream/stream/2020-08-25T09:03:34.809",
},
]
}

parsed_event = EventDumper.dump_event(event=ddb_event)

assert parsed_event == json.dumps(ddb_event)