Skip to content

Commit

Permalink
Fix platform event message creation date extraction issue
Browse files Browse the repository at this point in the history
The creation date of the incoming messages for platform events is
stored in a different property than in push topic messages.
Create a method in ReplayMarkerStorage which can extract the
creation date value from both of these message types.
  • Loading branch information
robertmrk committed Nov 6, 2018
1 parent 08021fe commit f88638f
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 3 deletions.
19 changes: 16 additions & 3 deletions aiosfstream/replay.py
Expand Up @@ -67,6 +67,20 @@ async def insert_replay_id(self, message):
message["ext"] = {}
message["ext"]["replay"] = {subscription: replay_id}

@staticmethod
def get_message_date(message):
"""Return the creation date of the *message*
:param dict message: An incoming message
:return: Creation date as an ISO 8601 formatted datetime string
:rtype: str
"""
# get the creation date of the message from a PushTopic message
# structure if it exists, or read it from a PlatfromEvent message
# structure
return (message["data"]["event"].get("createdDate") or
message["data"]["payload"].get("CreatedDate"))

async def extract_replay_id(self, message):
"""Extract and store the replay id present int the *message*
Expand All @@ -77,9 +91,8 @@ async def extract_replay_id(self, message):

# create the replay marker object from the creation date and the
# actual id
event = message["data"]["event"]
marker = ReplayMarker(date=event["createdDate"],
replay_id=event["replayId"])
marker = ReplayMarker(date=self.get_message_date(message),
replay_id=message["data"]["event"]["replayId"])

# get the last, stored, replay marker
last_marker = await self.get_replay_marker(subscription)
Expand Down
48 changes: 48 additions & 0 deletions tests/test_replay.py
Expand Up @@ -144,12 +144,50 @@ async def test_insert_replay_id_doesnt_insert_none(self):
self.replay_storage.get_replay_id.assert_called_with(
message["subscription"])

def test_get_message_date_for_push_topic(self):
date = datetime.now(timezone.utc).isoformat()
message = {
"channel": "/foo/bar",
"data": {
"event": {
"createdDate": date,
"replayId": "id"
}
}
}

result = self.replay_storage.get_message_date(message)

self.assertEqual(result, date)

def test_get_message_date_for_platform_event(self):
date = datetime.now(timezone.utc).isoformat()
message = {
"channel": "/foo/bar",
"data": {
"payload": {
"value__c": "some value",
"CreatedById": "id",
"CreatedDate": date
},
"event": {
"replayId": "id"
}
}
}

result = self.replay_storage.get_message_date(message)

self.assertEqual(result, date)

async def test_extract_replay_id_on_no_previous_id(self):
self.replay_storage.set_replay_marker = mock.CoroutineMock()
self.replay_storage.get_replay_marker = mock.CoroutineMock(
return_value=None
)
date = datetime.now(timezone.utc).isoformat()
self.replay_storage.get_message_date = \
mock.MagicMock(return_value=date)
id_value = "id"
message = {
"channel": "/foo/bar",
Expand All @@ -167,6 +205,7 @@ async def test_extract_replay_id_on_no_previous_id(self):
message["channel"],
ReplayMarker(date=date, replay_id=id_value)
)
self.replay_storage.get_message_date.assert_called()

async def test_extract_replay_id_on_previous_id_older(self):
self.replay_storage.set_replay_marker = mock.CoroutineMock()
Expand All @@ -179,6 +218,8 @@ async def test_extract_replay_id_on_previous_id_older(self):
return_value=prev_marker
)
date = datetime.now(timezone.utc).isoformat()
self.replay_storage.get_message_date = \
mock.MagicMock(return_value=date)
id_value = "id"
message = {
"channel": "/foo/bar",
Expand All @@ -196,10 +237,13 @@ async def test_extract_replay_id_on_previous_id_older(self):
message["channel"],
ReplayMarker(date=date, replay_id=id_value)
)
self.replay_storage.get_message_date.assert_called()

async def test_extract_replay_id_on_previous_id_same_date(self):
self.replay_storage.set_replay_marker = mock.CoroutineMock()
date = datetime.now(timezone.utc).isoformat()
self.replay_storage.get_message_date = \
mock.MagicMock(return_value=date)
prev_marker = ReplayMarker(
date=date,
replay_id="old_id"
Expand All @@ -224,6 +268,7 @@ async def test_extract_replay_id_on_previous_id_same_date(self):
message["channel"],
ReplayMarker(date=date, replay_id=id_value)
)
self.replay_storage.get_message_date.assert_called()

async def test_extract_replay_id_on_previous_id_newer(self):
self.replay_storage.set_replay_marker = mock.CoroutineMock()
Expand All @@ -236,6 +281,8 @@ async def test_extract_replay_id_on_previous_id_newer(self):
return_value=prev_marker
)
date = datetime.now(timezone.utc).isoformat()
self.replay_storage.get_message_date = \
mock.MagicMock(return_value=date)
id_value = "id"
message = {
"channel": "/foo/bar",
Expand All @@ -250,6 +297,7 @@ async def test_extract_replay_id_on_previous_id_newer(self):
await self.replay_storage.extract_replay_id(message)

self.replay_storage.set_replay_marker.assert_not_called()
self.replay_storage.get_message_date.assert_called()


class TestMappingReplayStorage(TestCase):
Expand Down

0 comments on commit f88638f

Please sign in to comment.