Skip to content

Commit

Permalink
Source Google Ads: run decorator as a thread with timeout (airbytehq#…
Browse files Browse the repository at this point in the history
  • Loading branch information
roman-yermilov-gl authored and jatinyadav-cc committed Feb 26, 2024
1 parent f870e39 commit 7af1682
Show file tree
Hide file tree
Showing 8 changed files with 287 additions and 156 deletions.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Expand Up @@ -11,7 +11,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 253487c0-2246-43ba-a21f-5116b20a2c50
dockerImageTag: 3.0.0
dockerImageTag: 3.0.1
dockerRepository: airbyte/source-google-ads
documentationUrl: https://docs.airbyte.com/integrations/sources/google-ads
githubIssueLabel: source-google-ads
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-google-ads/setup.py
Expand Up @@ -7,7 +7,7 @@

# pin protobuf==3.20.0 as other versions may cause problems on different architectures
# (see https://github.com/airbytehq/airbyte/issues/13580)
MAIN_REQUIREMENTS = ["airbyte-cdk>=0.51.3", "google-ads==22.1.0", "protobuf", "pendulum"]
MAIN_REQUIREMENTS = ["airbyte-cdk>=0.51.3", "google-ads==22.1.0", "protobuf", "pendulum<3.0.0"]

TEST_REQUIREMENTS = ["pytest~=6.1", "pytest-mock", "freezegun", "requests-mock"]

Expand Down
Expand Up @@ -20,7 +20,7 @@

from .google_ads import GoogleAds, logger
from .models import CustomerModel
from .utils import ExpiredPageTokenError, chunk_date_range, generator_backoff, get_resource_name, parse_dates, traced_exception
from .utils import ExpiredPageTokenError, chunk_date_range, detached, generator_backoff, get_resource_name, parse_dates, traced_exception


class GoogleAdsStream(Stream, ABC):
Expand All @@ -44,17 +44,32 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite
for customer in self.customers:
yield {"customer_id": customer.id}

@generator_backoff(
wait_gen=backoff.constant,
exception=(TimeoutError),
max_tries=5,
on_backoff=lambda details: logger.info(
f"Caught retryable error {details['exception']} after {details['tries']} tries. Waiting {details['wait']} seconds then retrying..."
),
interval=1,
)
@detached(timeout_minutes=5)
def request_records_job(self, customer_id, query, stream_slice):
response_records = self.google_ads_client.send_request(query=query, customer_id=customer_id)
yield from self.parse_records_with_backoff(response_records, stream_slice)

def read_records(self, sync_mode, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> Iterable[Mapping[str, Any]]:
if stream_slice is None:
return []

customer_id = stream_slice["customer_id"]
try:
response_records = self.google_ads_client.send_request(self.get_query(stream_slice), customer_id=customer_id)

yield from self.parse_records_with_backoff(response_records, stream_slice)
yield from self.request_records_job(customer_id, self.get_query(stream_slice), stream_slice)
except GoogleAdsException as exception:
traced_exception(exception, customer_id, self.CATCH_CUSTOMER_NOT_ENABLED_ERROR)
except TimeoutError as exception:
# Prevent sync failure
logger.warning(f"Timeout: Failed to access {self.name} stream data. {str(exception)}")

@generator_backoff(
wait_gen=backoff.expo,
Expand Down
Expand Up @@ -2,8 +2,10 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


import functools
import queue
import re
import threading
import time
from dataclasses import dataclass
from datetime import datetime
Expand Down Expand Up @@ -180,6 +182,123 @@ def wrapper(*args, **kwargs) -> Generator:
return decorator


class RunAsThread:
"""
The `RunAsThread` decorator is designed to run a generator function in a separate thread with a specified timeout.
This is particularly useful when dealing with functions that involve potentially time-consuming operations,
and you want to enforce a time limit for their execution.
"""

def __init__(self, timeout_minutes):
"""
:param timeout_minutes: The maximum allowed time (in minutes) for the generator function to idle.
If the timeout is reached, a TimeoutError is raised.
"""
self._timeout_seconds = timeout_minutes * 60

def __call__(self, generator_func):
@functools.wraps(generator_func)
def wrapper(*args, **kwargs):
"""
The wrapper function sets up threading components, starts a separate thread to run the generator function.
It uses events and a queue for communication and synchronization between the main thread and the thread running the generator function.
"""
# Event and Queue initialization
write_event = threading.Event()
exit_event = threading.Event()
the_queue = queue.Queue()

# Thread initialization and start
thread = threading.Thread(
target=self.target, args=(the_queue, write_event, exit_event, generator_func, args, kwargs), daemon=True
)
thread.start()

# Records the starting time for the timeout calculation.
start_time = time.time()
while thread.is_alive() or not the_queue.empty():
# The main thread waits for the `write_event` to be set or until the specified timeout.
if the_queue.empty():
write_event.wait(self._timeout_seconds)
try:
# The main thread yields the result obtained from reading the queue.
yield self.read(the_queue)
# The timer is reset since a new result has been received, preventing the timeout from occurring.
start_time = time.time()
except queue.Empty:
# If exit_event is set it means that the generator function in the thread has completed its execution.
if exit_event.is_set():
break
# Check if the timeout has been reached without new results.
if time.time() - start_time > self._timeout_seconds:
# The thread may continue to run for some time after reaching a timeout and even come to life and continue working.
# That is why the exit event is set to signal the generator function to stop producing data.
exit_event.set()
raise TimeoutError(f"Method '{generator_func.__name__}' timed out after {self._timeout_seconds / 60.0} minutes")
# The write event is cleared to reset it for the next iteration.
write_event.clear()

return wrapper

def target(self, the_queue, write_event, exit_event, func, args, kwargs):
"""
This is a target function for the thread.
It runs the actual generator function, writing its results to a queue.
Exceptions raised during execution are also written to the queue.
:param the_queue: A queue used for communication between the main thread and the thread running the generator function.
:param write_event: An event signaling the availability of new data in the queue.
:param exit_event: An event indicating whether the generator function should stop producing data due to a timeout.
:param func: The generator function to be executed.
:param args: Positional arguments for the generator function.
:param kwargs: Keyword arguments for the generator function.
:return: None
"""
try:
for value in func(*args, **kwargs):
# If the timeout has been reached we must stop producing any data
if exit_event.is_set():
break
self.write(the_queue, value, write_event)
else:
# Notify the main thread that the generator function has completed its execution.
exit_event.set()
# Notify the main thread (even if the generator didn't produce any data) to prevent waiting for no reason.
if not write_event.is_set():
write_event.set()
except Exception as e:
self.write(the_queue, e, write_event)

@staticmethod
def write(the_queue, value, write_event):
"""
Puts a value into the queue and sets a write event to notify the main thread that new data is available.
:param the_queue: A queue used for communication between the main thread and the thread running the generator function.
:param value: The value to be put into the communication queue.
This can be any type of data produced by the generator function, including results or exceptions.
:param write_event: An event signaling the availability of new data in the queue.
:return: None
"""
the_queue.put(value)
write_event.set()

@staticmethod
def read(the_queue, timeout=0.001):
"""
Retrieves a value from the queue, handling the case where the value is an exception, and raising it.
:param the_queue: A queue used for communication between the main thread and the thread running the generator function.
:param timeout: A time in seconds to wait for a value to be available in the queue.
If the timeout is reached and no new data is available, a `queue.Empty` exception is raised.
:return: a value retrieved from the queue
"""
value = the_queue.get(block=True, timeout=timeout)
if isinstance(value, Exception):
raise value
return value


detached = RunAsThread


def parse_dates(stream_slice):
start_date = pendulum.parse(stream_slice["start_date"])
end_date = pendulum.parse(stream_slice["end_date"])
Expand Down
Expand Up @@ -512,7 +512,7 @@ def test_update_state_with_parent_state(mocker):

# Set pendulum to return a consistent value
now = pendulum.datetime(2023, 11, 2, 12, 53, 7)
pendulum.travel_to(now)
pendulum.set_test_now(now)

# Call the _update_state method with the third stream_slice
stream_slice_third = {"customer_id": "customer_id_3"}
Expand All @@ -529,22 +529,22 @@ def test_update_state_with_parent_state(mocker):
assert stream._state == expected_state_third_call

# Reset the pendulum mock to its original state
pendulum.travel_back()
pendulum.set_test_now()


def test_update_state_without_parent_state(mocker):
"""
Test the _update_state method when the parent_stream does not have a state.
"""
# Reset any previous mock state for pendulum
pendulum.travel_back()
pendulum.set_test_now()

# Mock instance setup
stream = CampaignCriterion(api=MagicMock(), customers=[])

# Mock pendulum call to return a consistent value
now = pendulum.datetime(2023, 11, 2, 12, 53, 7)
pendulum.travel_to(now)
pendulum.set_test_now(now)

# Call the _update_state method with the first stream_slice
stream_slice_first = {"customer_id": "customer_id_1"}
Expand All @@ -568,4 +568,4 @@ def test_update_state_without_parent_state(mocker):
assert stream._state == expected_state_second_call

# Reset the pendulum mock to its original state
pendulum.travel_back()
pendulum.set_test_now()
1 change: 1 addition & 0 deletions docs/integrations/sources/google-ads.md
Expand Up @@ -278,6 +278,7 @@ Due to a limitation in the Google Ads API which does not allow getting performan

| Version | Date | Pull Request | Subject |
|:---------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------|
| `3.0.1` | 2023-12-26 | [33769](https://github.com/airbytehq/airbyte/pull/33769) | Run a read function in a separate thread to enforce a time limit for its execution |
| `3.0.0` | 2023-12-07 | [33120](https://github.com/airbytehq/airbyte/pull/33120) | Upgrade API version to v15 |
| `2.0.4` | 2023-11-10 | [32414](https://github.com/airbytehq/airbyte/pull/32414) | Add backoff strategy for read_records method |
| `2.0.3` | 2023-11-02 | [32102](https://github.com/airbytehq/airbyte/pull/32102) | Fix incremental events streams |
Expand Down

0 comments on commit 7af1682

Please sign in to comment.