Skip to content

Commit

Permalink
Add pagination visitors stream (#129)
Browse files Browse the repository at this point in the history
* add custom pagination support for Visitors stream

* update unittests

* fix the vistors filter value

* - fix minor pagination issue
- update integration tests

* fix test name and start date

* fix the review

* fix regression

* fix lint

* update start date to reduce the execution time and minor refoctoring

* setup and changelog update

* remove unused debug statement

---------

Co-authored-by: RushiT0122 <rtodkar@stitchdata-talend.com>
Co-authored-by: Sourabh Gandhi <sgandhi@talend.com>
  • Loading branch information
3 people committed Nov 14, 2023
1 parent 4b10c85 commit dafe891
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 15 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 1.1.0
* Add the the custom pagination support for the Visitors stream [#126](https://github.com/singer-io/tap-pendo/pull/129)

## 1.0.1
* Fix infinite loop issue [#126](https://github.com/singer-io/tap-pendo/pull/126)

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

setup(
name="tap-pendo",
version="1.0.1",
version="1.1.0",
description="Singer.io tap for extracting data",
author="Stitch",
url="https://github.com/singer-io/tap-pendo",
Expand Down
44 changes: 38 additions & 6 deletions tap_pendo/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ class Stream():
period = None
request_retry_count = 1
last_processed = None
last_synced_record = {}

# initialized the endpoint attribute which can be overriden by child streams based on
# the different parameters used by the stream.
Expand Down Expand Up @@ -507,7 +508,8 @@ def sync_substream(self, state, parent, sub_stream, parent_response):
sub_stream.name, record[parent.key_properties[0]])

# After processing for all parent ids we can remove our resumption state
state.get('bookmarks').get(sub_stream.name).pop('last_processed')
if 'last_processed' in state.get('bookmarks').get(sub_stream.name):
state.get('bookmarks').get(sub_stream.name).pop('last_processed')

self.update_child_stream_bookmarks(state=state,
sub_stream=sub_stream,
Expand Down Expand Up @@ -729,7 +731,14 @@ def send_request_get_results(self, req, endpoint, params, count, **kwargs):
# Request retry
yield from self.request(endpoint, params, count, **kwargs)

def get_record_count(self):
return 0

def is_loop_required(self):
return False

def sync(self, state, start_date=None, key_id=None, parent_last_updated=None):
loop_for_records = self.is_loop_required()
stream_response = self.request(self.name, json=self.get_body()) or []

# Get and intialize sub-stream for the current stream
Expand All @@ -746,7 +755,7 @@ def sync(self, state, start_date=None, key_id=None, parent_last_updated=None):
# which flush out during sync_substream call above
stream_response = self.request(self.name, json=self.get_body()) or []

return (self.stream, stream_response), False
return (self.stream, stream_response), loop_for_records

class EventsBase(Stream):
DATE_WINDOW_SIZE = 1
Expand Down Expand Up @@ -1280,14 +1289,37 @@ def get_body(self, key_id=None, period=None, first=None):
"identified": not anons
}
}
}, {
"sort": ["visitorId"]
}, {
"filter": self.set_filter_value()
}, {
"limit": self.record_limit
}],
"requestId": "all-visitors",
"sort": [
"visitorId"
]
"requestId": "all-visitors"
}
}

def get_record_count(self):
# Get number of records to be fetched using current filter
body = self.get_body()
body["request"]["pipeline"].append({"count": None})
return list(self.request(self.name, json=body))[0]["count"]

def is_loop_required(self):
# If number of records equal to record then assume there are more records to be synced
# and save the last filter value. Otherwise assume we have extracted all records
return self.get_record_count() >= self.record_limit

def set_filter_value(self):
# Set the value of filter parameter in request body
if self.last_synced_record:
filter_value = f'visitorId>"{self.last_synced_record["visitor_id"]}"'
else:
filter_value = 'visitorId>\"\"'

return filter_value

def transform(self, record):
# Transform data of accounts into one level dictionary with following transformation
record['lastupdated'] = record.get('metadata').get('auto').get(
Expand Down
3 changes: 3 additions & 0 deletions tap_pendo/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ def sync_stream(state, start_date, instance):
LOGGER.info('Replication Value NULL for tap_stream_id: %s', stream.tap_stream_id)
counter.increment()

# preserve the last processed record which will be useful if stream supports pagination
instance.last_synced_record = record

# Update bookmark and write state for the stream with new_bookmark
instance.update_bookmark(state, instance.name, strftime(new_bookmark),
instance.replication_key)
Expand Down
2 changes: 1 addition & 1 deletion tests/tap_tester/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class TestPendoBase(unittest.TestCase):
# After 180 days visitor_history data cannot be synced
# so We need to manually create test data for visitors and vistor_history streams
# Once created, we should update this start date to optimise the execution time
START_DATE_VISTOR_HISTORY = "2023-03-15T00:00:00Z"
START_DATE_VISTOR_HISTORY = "2023-11-06T00:00:00Z"

@staticmethod
def name():
Expand Down
10 changes: 5 additions & 5 deletions tests/tap_tester/test_start_date.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ def test_run(self):

# All these streams have similar implementation like guides and guide_events so removing this test to limit the execution time
# self.run_test("2020-09-01T00:00:00Z", "2021-03-01T00:00:00Z", {"features", "feature_events", "pages", "page_events", "events", "track_types", "track_events"})

# Visitors history can be retrieved only for 180 days so to reduce execution time setting first start time older than 180 days back
self.run_test(
start_date_1="2022-06-25T00:00:00Z",
start_date_2="2022-07-20T00:00:00Z",
start_date_1=self.START_DATE_VISTOR_HISTORY,
start_date_2="2023-11-09T00:00:00Z",
streams={"visitors", "visitor_history"})

def expected_metadata(self):
Expand Down Expand Up @@ -72,7 +72,7 @@ def run_test(self, start_date_1, start_date_2, streams):
self.start_date_1 = start_date_1
self.start_date_2 = start_date_2
self.streams = streams

self.start_date = self.start_date_1

expected_streams = streams
Expand Down Expand Up @@ -100,7 +100,7 @@ def run_test(self, start_date_1, start_date_2, streams):
##########################################################################
# Update START DATE Between Syncs
##########################################################################

LOGGER.info("REPLICATION START DATE CHANGE: {} ===>>> {} ".format(
self.start_date, self.start_date_2))
self.start_date = self.start_date_2
Expand Down
84 changes: 84 additions & 0 deletions tests/tap_tester/test_visitors_pagination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import tap_tester.connections as connections
import tap_tester.menagerie as menagerie
import tap_tester.runner as runner
from base import TestPendoBase


class PendoAllFieldsTest(TestPendoBase):
start_date = "2019-09-10T00:00:00Z"
record_limit = 50
include_anonymous_visitors = False
def name(self):
return "pendo_visitors_pagination_test"

def get_properties(self, original: bool = True):
"""Configuration properties required for the tap."""
return {
# To reduce the execution time to test this stream taking recently start_date
"start_date": self.start_date,
"lookback_window": "1",
"period": "dayRange",
"record_limit": self.record_limit,
"include_anonymous_visitors": self.include_anonymous_visitors
}

def test_run(self):
# Verify that visitors pagination logic work as expected named and anonymous visitors
# without impacting other stream replication
# Note: there are 21000+ named and anonymous visitors
self.run_pagination_test(expected_streams= {"accounts", "features", "feature_events", "visitors"},
start_date="2019-09-10T00:00:00Z",
record_limit=10000,
include_anonymous_visitors="true")

# Verify with visitors pagination, we are able to sync child stream records i.e. visitor_history
# Note: there are only 58 named and anonymous visitors but only recently updated visitors will be synced
self.run_pagination_test(expected_streams={"visitors", "visitor_history"},
start_date=self.START_DATE_VISTOR_HISTORY,
record_limit=50,
include_anonymous_visitors="false")


def run_pagination_test(self, expected_streams, start_date, record_limit, include_anonymous_visitors):
"""
This is a canary test to verify pagination implementation for the Visitors stream.
"""
self.streams_to_test = expected_streams
self.start_date = start_date
self.record_limit = record_limit
self.include_anonymous_visitors = include_anonymous_visitors

conn_id = connections.ensure_connection(self)

found_catalogs = self.run_and_verify_check_mode(conn_id)

# table and field selection
test_catalogs_all_fields = [catalog for catalog in found_catalogs
if catalog.get('tap_stream_id') in expected_streams]

self.perform_and_verify_table_and_field_selection(
conn_id, test_catalogs_all_fields)

# Grab metadata after performing table-and-field selection to set expectations
# used for asserting all fields are replicated
stream_to_all_catalog_fields = dict()
for catalog in test_catalogs_all_fields:
stream_id, stream_name = catalog['stream_id'], catalog['stream_name']
catalog_entry = menagerie.get_annotated_schema(conn_id, stream_id)
fields_from_field_level_md = [md_entry['breadcrumb'][1]
for md_entry in catalog_entry['metadata']
if md_entry['breadcrumb'] != []]
stream_to_all_catalog_fields[stream_name] = set(
fields_from_field_level_md)

record_count_by_stream = self.run_and_verify_sync(conn_id)

synced_records = runner.get_records_from_target_output()

# Verify no unexpected streams were replicated
synced_stream_names = set(synced_records.keys())

self.assertSetEqual(expected_streams, synced_stream_names)
for stream in expected_streams:
with self.subTest(stream=stream):
self.assertGreaterEqual(record_count_by_stream.get(stream), 1)
6 changes: 4 additions & 2 deletions tests/unittests/test_lazy_aggregation_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ class TestLazyAggregationSync(unittest.TestCase):
@mock.patch("requests.Session.send")
@mock.patch("tap_pendo.streams.Stream.is_selected")
@mock.patch("tap_pendo.streams.Stream.sync_substream", side_effect=mocked_substream)
def test_lazzy_aggregation_with_sub_stream(self, mocked_substream, mocked_selected, mocked_request):
@mock.patch("tap_pendo.streams.Visitors.get_record_count", return_value=100)
def test_lazzy_aggregation_without_sub_stream(self, get_record_count, mocked_substream, mocked_selected, mocked_request):
'''
Verify that if sub stream is present then also all data should be return for super stream
and sync_substream should be called
Expand All @@ -60,7 +61,8 @@ def test_lazzy_aggregation_with_sub_stream(self, mocked_substream, mocked_select
@mock.patch("requests.Session.send")
@mock.patch("tap_pendo.streams.Stream.is_selected")
@mock.patch("tap_pendo.streams.Stream.sync_substream", side_effect=mocked_substream)
def test_lazzy_aggregation_without_sub_stream(self, mocked_substream, mocked_selected, mocked_request):
@mock.patch("tap_pendo.streams.Visitors.get_record_count", return_value=100)
def test_lazzy_aggregation_without_sub_stream(self, get_record_count, mocked_substream, mocked_selected, mocked_request):
'''
Verify that if sub stream is not selected then also all data should be return for super stream
and sync_substream should not be called
Expand Down

0 comments on commit dafe891

Please sign in to comment.