diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py index 32e0044906d8..b5666a4b1680 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py @@ -4,7 +4,7 @@ from abc import ABC from datetime import timedelta -from typing import Any, Iterable, List, Mapping, MutableMapping, Optional +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union import pendulum import requests @@ -62,6 +62,13 @@ def request_headers( ) -> Mapping[str, Any]: return {"Accept": "application/json"} + @property + def max_retries(self) -> Union[int, None]: + """ + Override if needed. Specifies maximum amount of retries for backoff policy. Return None for no limit. + """ + return 10 + def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> requests.Response: try: return super()._send_request(request, request_kwargs) @@ -104,7 +111,7 @@ def backoff_time(self, response: requests.Response) -> float: return float(retry_after) self.retries += 1 - return 2**self.retries * 60 + return min(2**self.retries * 60, 16 * 60) def should_retry(self, response: requests.Response) -> bool: if response.status_code == 402: diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/cohorts.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/cohorts.py index 173d8d521252..82f30b7c740f 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/cohorts.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/cohorts.py @@ -44,6 +44,14 @@ class Cohorts(IncrementalMixpanelStream): def path(self, **kwargs) -> str: return "cohorts/list" + def should_retry(self, response: requests.Response) -> bool: + try: + if response.status_code == 200: + response.json() + except ConnectionResetError: + return True + return super().should_retry(response) + def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]: records = super().parse_response(response, stream_state=stream_state, **kwargs) for record in records: diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/engage.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/engage.py index 037bbe2fd80e..848f95d5a33e 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/engage.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/engage.py @@ -27,6 +27,14 @@ class EngageSchema(MixpanelStream): def path(self, **kwargs) -> str: return "engage/properties" + def should_retry(self, response: requests.Response) -> bool: + try: + if response.status_code == 200: + response.json() + except ConnectionResetError: + return True + return super().should_retry(response) + def process_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: """ response.json() example: @@ -215,3 +223,11 @@ def stream_slices( if sync_mode == SyncMode.incremental: self.set_cursor(cursor_field) return super().stream_slices(sync_mode=sync_mode, cursor_field=cursor_field, stream_state=stream_state) + + def should_retry(self, response: requests.Response) -> bool: + try: + if response.status_code == 200: + response.json() + except ConnectionResetError: + return True + return super().should_retry(response) diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/funnels.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/funnels.py index 8a0d69d37168..0f9ff2803be5 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/funnels.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/funnels.py @@ -49,6 +49,14 @@ def get_funnel_slices(self, sync_mode) -> List[dict]: def funnel_slices(self, sync_mode) -> List[dict]: return self.get_funnel_slices(sync_mode) + def should_retry(self, response: requests.Response) -> bool: + try: + if response.status_code == 200: + response.json() + except ConnectionResetError: + return True + return super().should_retry(response) + def stream_slices( self, sync_mode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None ) -> Iterable[Optional[Mapping[str, Mapping[str, Any]]]]: diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/revenue.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/revenue.py index 2d461b50eda3..4c5bef47275a 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/revenue.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/revenue.py @@ -22,6 +22,15 @@ class Revenue(DateSlicesMixin, IncrementalMixpanelStream): def path(self, **kwargs) -> str: return "engage/revenue" + def should_retry(self, response: requests.Response) -> bool: + try: + if response.status_code == 200: + response.json() + except ConnectionResetError: + return True + return super().should_retry(response) + + def process_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: """ response.json() example: