Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: retry connection reset errors for mixpanel #194

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from abc import ABC
from datetime import timedelta
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

import pendulum
import requests
Expand Down Expand Up @@ -62,6 +62,13 @@ def request_headers(
) -> Mapping[str, Any]:
return {"Accept": "application/json"}

@property
def max_retries(self) -> Union[int, None]:
"""
Override if needed. Specifies maximum amount of retries for backoff policy. Return None for no limit.
"""
return 10

def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> requests.Response:
try:
return super()._send_request(request, request_kwargs)
Expand Down Expand Up @@ -104,7 +111,7 @@ def backoff_time(self, response: requests.Response) -> float:
return float(retry_after)

self.retries += 1
return 2**self.retries * 60
return min(2**self.retries * 60, 16 * 60)

def should_retry(self, response: requests.Response) -> bool:
if response.status_code == 402:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ class Cohorts(IncrementalMixpanelStream):
def path(self, **kwargs) -> str:
return "cohorts/list"

def should_retry(self, response: requests.Response) -> bool:
try:
if response.status_code == 200:
response.json()
except ConnectionResetError:
return True
return super().should_retry(response)

def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
records = super().parse_response(response, stream_state=stream_state, **kwargs)
for record in records:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ class EngageSchema(MixpanelStream):
def path(self, **kwargs) -> str:
return "engage/properties"

def should_retry(self, response: requests.Response) -> bool:
try:
if response.status_code == 200:
response.json()
except ConnectionResetError:
return True
return super().should_retry(response)

def process_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
"""
response.json() example:
Expand Down Expand Up @@ -215,3 +223,11 @@ def stream_slices(
if sync_mode == SyncMode.incremental:
self.set_cursor(cursor_field)
return super().stream_slices(sync_mode=sync_mode, cursor_field=cursor_field, stream_state=stream_state)

def should_retry(self, response: requests.Response) -> bool:
try:
if response.status_code == 200:
response.json()
except ConnectionResetError:
return True
return super().should_retry(response)
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ def get_funnel_slices(self, sync_mode) -> List[dict]:
def funnel_slices(self, sync_mode) -> List[dict]:
return self.get_funnel_slices(sync_mode)

def should_retry(self, response: requests.Response) -> bool:
try:
if response.status_code == 200:
response.json()
except ConnectionResetError:
return True
return super().should_retry(response)

def stream_slices(
self, sync_mode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Mapping[str, Any]]]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ class Revenue(DateSlicesMixin, IncrementalMixpanelStream):
def path(self, **kwargs) -> str:
return "engage/revenue"

def should_retry(self, response: requests.Response) -> bool:
try:
if response.status_code == 200:
response.json()
except ConnectionResetError:
return True
return super().should_retry(response)


def process_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
"""
response.json() example:
Expand Down
Loading