Skip to content

Commit

Permalink
Merge pull request #7 from miermans/release/1.0.1
Browse files Browse the repository at this point in the history
Fix chaining tracker methods
  • Loading branch information
mmiermans committed Jul 5, 2022
2 parents 2c79eea + 2431abb commit 7962c37
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 16 deletions.
8 changes: 8 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
Version 1.0.1 (2022-06-04)
--------------------------
Fix: Do not send events multiple times when track calls are chained.

Version 1.0.0 (2022-06-03)
--------------------------
Release asyncio Snowplow Python library.

Version 1.0.0a3 (2022-06-03)
--------------------------
Breaking: Changed import from `snowplow_tracker` to `aio_snowplow_tracker` to support installing both libraries.
Expand Down
2 changes: 1 addition & 1 deletion aio_snowplow_tracker/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@
License: Apache License Version 2.0
"""

__version_info__ = (1, 0, 0)
__version_info__ = (1, 0, 1)
__version__ = ".".join(str(x) for x in __version_info__)
__build_version__ = __version__ + ''
25 changes: 15 additions & 10 deletions aio_snowplow_tracker/emitters.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import asyncio
import logging
import time
import threading
import aiohttp
from typing import Optional, Union, Tuple

Expand Down Expand Up @@ -112,7 +111,7 @@ def __init__(
self.on_success = on_success
self.on_failure = on_failure

self.lock = threading.RLock()
self.lock = asyncio.Lock()

self.timer = None

Expand Down Expand Up @@ -155,7 +154,7 @@ async def input(self, payload: PayloadDict) -> None:
:param payload: The name-value pairs for the event
:type payload: dict(string:*)
"""
with self.lock:
async with self.lock:
if self.bytes_queued is not None:
self.bytes_queued += len(str(payload))

Expand All @@ -165,7 +164,7 @@ async def input(self, payload: PayloadDict) -> None:
self.buffer.append(payload)

if self.reached_limit():
await self.flush()
await self._flush_unsafe()

def reached_limit(self) -> bool:
"""
Expand All @@ -182,11 +181,17 @@ async def flush(self) -> None:
"""
Sends all events in the buffer to the collector.
"""
with self.lock:
await self.send_events(self.buffer)
self.buffer = []
if self.bytes_queued is not None:
self.bytes_queued = 0
async with self.lock:
await self._flush_unsafe()

async def _flush_unsafe(self) -> None:
"""
Sends all events in the buffer to the collector without locking.
"""
await self.send_events(self.buffer)
self.buffer = []
if self.bytes_queued is not None:
self.bytes_queued = 0

async def http_post(self, data: str) -> bool:
"""
Expand Down Expand Up @@ -244,7 +249,7 @@ async def http_get(self, payload: PayloadDict) -> bool:
async def sync_flush(self) -> None:
"""
Calls the flush method of the base Emitter class.
This is guaranteed to be blocking, not asynchronous.
This is guaranteed to be flushed immediately, without buffering.
"""
logger.debug("Starting synchronous flush...")
await Emitter.flush(self)
Expand Down
20 changes: 20 additions & 0 deletions aio_snowplow_tracker/test/integration/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import re
import json
import base64
from asyncio import gather
from urllib.parse import unquote_plus

import aiohttp
Expand Down Expand Up @@ -132,6 +133,25 @@ async def test_integration_screen_view(snowplow_server: TestServer, default_subj
}


async def test_integration_gather_unstruct_event(snowplow_server: TestServer, default_subject: subject.Subject):
t = tracker.Tracker([create_emitter(snowplow_server)], default_subject, encode_base64=False)
n_products = 10
product_ids = {f"PRODUCT_{i}" for i in range(n_products)}
track_coroutines = [
t.track_unstruct_event(
SelfDescribingJson("iglu:com.acme/viewed_product/jsonschema/2-0-2", {"product_id": product_id})
)
for product_id in product_ids
]

await gather(*track_coroutines)

requests = snowplow_server.app['requests']
assert len(requests) == len(product_ids)
received_product_ids = {json.loads(r.query["ue_pr"])["data"]["data"]["product_id"] for r in requests}
assert product_ids == received_product_ids


async def test_integration_struct_event(snowplow_server: TestServer, default_subject: subject.Subject):
t = tracker.Tracker([create_emitter(snowplow_server)], default_subject)
await t.track_struct_event("Ecomm", "add-to-basket", "dog-skateboarding-video", "hd", 13.99)
Expand Down
8 changes: 4 additions & 4 deletions aio_snowplow_tracker/test/unit/test_emitters.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ async def test_input_no_flush(self, mok_flush: Any) -> None:
self.assertFalse(e.reached_limit())
mok_flush.assert_not_called()

@async_patch('aio_snowplow_tracker.Emitter.flush')
@async_patch('aio_snowplow_tracker.Emitter._flush_unsafe')
async def test_input_flush_byte_limit(self, mok_flush: Any) -> None:
mok_flush.side_effect = mocked_flush

Expand All @@ -139,7 +139,7 @@ async def test_input_flush_byte_limit(self, mok_flush: Any) -> None:
self.assertTrue(e.reached_limit())
self.assertEqual(mok_flush.call_count, 1)

@async_patch('aio_snowplow_tracker.Emitter.flush')
@async_patch('aio_snowplow_tracker.Emitter._flush_unsafe')
async def test_input_flush_buffer(self, mok_flush: Any) -> None:
mok_flush.side_effect = mocked_flush

Expand All @@ -158,7 +158,7 @@ async def test_input_flush_buffer(self, mok_flush: Any) -> None:
self.assertTrue(e.reached_limit())
self.assertEqual(mok_flush.call_count, 1)

@async_patch('aio_snowplow_tracker.Emitter.flush')
@async_patch('aio_snowplow_tracker.Emitter._flush_unsafe')
async def test_input_bytes_queued(self, mok_flush: Any) -> None:
mok_flush.side_effect = mocked_flush

Expand All @@ -172,7 +172,7 @@ async def test_input_bytes_queued(self, mok_flush: Any) -> None:
await e.input(nvPairs)
self.assertEqual(e.bytes_queued, 48)

@async_patch('aio_snowplow_tracker.Emitter.flush')
@async_patch('aio_snowplow_tracker.Emitter._flush_unsafe')
async def test_input_bytes_post(self, mok_flush: Any) -> None:
mok_flush.side_effect = mocked_flush

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@

setup(
name='aio-snowplow-tracker',
version='1.0.0',
version='1.0.1',
author=authors_str,
author_email=authors_email_str,
packages=[
Expand Down

0 comments on commit 7962c37

Please sign in to comment.