Skip to content

Commit

Permalink
Backoff for Protocol error and chunked encoding errors (#131)
Browse files Browse the repository at this point in the history
* backoff protocol error and chunked encoding errors

* add comments

* changelog and version bump

* minor to patch

* refator http.py for backoffs

* use imported connectonerror instead of builtin

* add backoff for requests_metrics_path
  • Loading branch information
kethan1122 committed Jun 5, 2023
1 parent 3e5b556 commit 038acbe
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 24 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Expand Up @@ -33,7 +33,7 @@ jobs:
name: 'pylint'
command: |
source /usr/local/share/virtualenvs/tap-zendesk/bin/activate
pylint tap_zendesk -d missing-docstring,invalid-name,line-too-long,too-many-locals,too-few-public-methods,fixme,stop-iteration-return,too-many-branches,useless-import-alias,no-else-return,logging-not-lazy
pylint tap_zendesk -d missing-docstring,invalid-name,line-too-long,too-many-locals,too-few-public-methods,fixme,stop-iteration-return,too-many-branches,useless-import-alias,no-else-return,logging-not-lazy,redefined-builtin
- run:
name: 'unittests'
when: always
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
@@ -1,5 +1,7 @@
# Changelog

## 2.0.1
* Adds backoff/retry for `ProtocolError` and `ChunkedEncodingError` [#131](https://github.com/singer-io/tap-zendesk/pull/131)
## 2.0.0
* Incremental Exports API implementation for User's stream [#127](https://github.com/singer-io/tap-zendesk/pull/127)
## 1.7.6
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -3,7 +3,7 @@
from setuptools import setup

setup(name='tap-zendesk',
version='2.0.0',
version='2.0.1',
description='Singer.io tap for extracting data from the Zendesk API',
author='Stitch',
url='https://singer.io',
Expand Down
35 changes: 29 additions & 6 deletions tap_zendesk/__init__.py
Expand Up @@ -6,8 +6,11 @@
import requests
from requests import Session
from requests.adapters import HTTPAdapter
from requests.exceptions import Timeout, ChunkedEncodingError
from urllib3.exceptions import ProtocolError
import singer
from singer import metadata, metrics as singer_metrics
import backoff
from tap_zendesk import metrics as zendesk_metrics
from tap_zendesk.discover import discover_streams
from tap_zendesk.streams import STREAMS
Expand Down Expand Up @@ -36,6 +39,12 @@
# patch Session.request to record HTTP request metrics
request = Session.request


@backoff.on_exception(backoff.expo,
(ConnectionError, ConnectionResetError, Timeout, ChunkedEncodingError,
ProtocolError),
max_tries=5,
factor=2)
def request_metrics_patch(self, method, url, **kwargs):
with singer_metrics.http_request_timer(None):
response = request(self, method, url, **kwargs)
Expand All @@ -45,7 +54,10 @@ def request_metrics_patch(self, method, url, **kwargs):
response.headers.get('X-Request-Id', 'Not present'))
return response


Session.request = request_metrics_patch


# end patch

def do_discover(client, config):
Expand All @@ -54,9 +66,11 @@ def do_discover(client, config):
json.dump(catalog, sys.stdout, indent=2)
LOGGER.info("Finished discover")


def stream_is_selected(mdata):
return mdata.get((), {}).get('selected', False)


def get_selected_streams(catalog):
selected_stream_names = []
for stream in catalog.streams:
Expand All @@ -70,15 +84,18 @@ def get_selected_streams(catalog):
'tickets': ['ticket_audits', 'ticket_metrics', 'ticket_comments']
}


def get_sub_stream_names():
sub_stream_names = []
for parent_stream in SUB_STREAMS:
sub_stream_names.extend(SUB_STREAMS[parent_stream])
return sub_stream_names


class DependencyException(Exception):
pass


def validate_dependencies(selected_stream_ids):
errs = []
msg_tmpl = ("Unable to extract {0} data. "
Expand All @@ -92,13 +109,14 @@ def validate_dependencies(selected_stream_ids):
if errs:
raise DependencyException(" ".join(errs))


def populate_class_schemas(catalog, selected_stream_names):
for stream in catalog.streams:
if stream.tap_stream_id in selected_stream_names:
STREAMS[stream.tap_stream_id].stream = stream

def do_sync(client, catalog, state, config):

def do_sync(client, catalog, state, config):
selected_stream_names = get_selected_streams(catalog)
validate_dependencies(selected_stream_names)
populate_class_schemas(catalog, selected_stream_names)
Expand All @@ -121,7 +139,6 @@ def do_sync(client, catalog, state, config):
# else:
# LOGGER.info("%s: Starting", stream_name)


key_properties = metadata.get(mdata, (), 'table-key-properties')
singer.write_schema(stream_name, stream.schema.to_dict(), key_properties)

Expand All @@ -133,7 +150,8 @@ def do_sync(client, catalog, state, config):
sub_stream = STREAMS[sub_stream_name].stream
sub_mdata = metadata.to_map(sub_stream.metadata)
sub_key_properties = metadata.get(sub_mdata, (), 'table-key-properties')
singer.write_schema(sub_stream.tap_stream_id, sub_stream.schema.to_dict(), sub_key_properties)
singer.write_schema(sub_stream.tap_stream_id, sub_stream.schema.to_dict(),
sub_key_properties)

# parent stream will sync sub stream
if stream_name in all_sub_stream_names:
Expand All @@ -150,6 +168,7 @@ def do_sync(client, catalog, state, config):
LOGGER.info("Finished sync")
zendesk_metrics.log_aggregate_rates()


def oauth_auth(args):
if not set(OAUTH_CONFIG_KEYS).issubset(args.config.keys()):
LOGGER.debug("OAuth authentication unavailable.")
Expand All @@ -161,6 +180,7 @@ def oauth_auth(args):
"oauth_token": args.config['access_token'],
}


def api_token_auth(args):
if not set(API_TOKEN_CONFIG_KEYS).issubset(args.config.keys()):
LOGGER.debug("API Token authentication unavailable.")
Expand All @@ -173,6 +193,7 @@ def api_token_auth(args):
"token": args.config['api_token']
}


def get_session(config):
""" Add partner information to requests Session object if specified in the config. """
if not all(k in config for k in ["marketplace_name",
Expand All @@ -184,10 +205,12 @@ def get_session(config):
# https://github.com/facetoe/zenpy/blob/master/docs/zenpy.rst#usage
session.mount("https://", HTTPAdapter(**Zenpy.http_adapter_kwargs()))
session.headers["X-Zendesk-Marketplace-Name"] = config.get("marketplace_name", "")
session.headers["X-Zendesk-Marketplace-Organization-Id"] = str(config.get("marketplace_organization_id", ""))
session.headers["X-Zendesk-Marketplace-Organization-Id"] = str(
config.get("marketplace_organization_id", ""))
session.headers["X-Zendesk-Marketplace-App-Id"] = str(config.get("marketplace_app_id", ""))
return session


@singer.utils.handle_top_exception(LOGGER)
def main():
parsed_args = singer.utils.parse_args(REQUIRED_CONFIG_KEYS)
Expand All @@ -197,11 +220,11 @@ def main():
if config_request_timeout and float(config_request_timeout):
request_timeout = float(config_request_timeout)
else:
request_timeout = REQUEST_TIMEOUT # If value is 0, "0", "" or not passed then it sets default to 300 seconds.
request_timeout = REQUEST_TIMEOUT # If value is 0, "0", "" or not passed then it sets default to 300 seconds.
# OAuth has precedence
creds = oauth_auth(parsed_args) or api_token_auth(parsed_args)
session = get_session(parsed_args.config)
client = Zenpy(session=session, timeout=request_timeout, **creds) # Pass request timeout
client = Zenpy(session=session, timeout=request_timeout, **creds) # Pass request timeout

if not client:
LOGGER.error("""No suitable authentication keys provided.""")
Expand Down
24 changes: 8 additions & 16 deletions tap_zendesk/http.py
Expand Up @@ -2,7 +2,8 @@
import backoff
import requests
import singer
from requests.exceptions import Timeout, HTTPError
from requests.exceptions import Timeout, HTTPError, ChunkedEncodingError, ConnectionError
from urllib3.exceptions import ProtocolError



Expand Down Expand Up @@ -107,17 +108,11 @@ def is_fatal(exception):
sleep(sleep_time)
return False

return 400 <=status_code < 500
if status_code == 409:
# retry ZendeskConflictError for at-least 10 times
return False

def should_retry_error(exception):
"""
Return true if exception is required to retry otherwise return false
"""
if isinstance(exception, ZendeskConflictError):
return True
if isinstance(exception,Exception) and isinstance(exception.args[0][1],ConnectionResetError):
return True
return False
return 400 <=status_code < 500

def raise_for_error(response):
""" Error handling method which throws custom error. Class for each error defined above which extends `ZendeskError`.
Expand All @@ -140,16 +135,13 @@ def raise_for_error(response):
response.status_code, {}).get("raise_exception", ZendeskError)
raise exc(message, response) from None

@backoff.on_exception(backoff.expo,
(ZendeskConflictError),
max_tries=10,
giveup=lambda e: not should_retry_error(e))

@backoff.on_exception(backoff.expo,
(HTTPError, ZendeskError), # Added support of backoff for all unhandled status codes.
max_tries=10,
giveup=is_fatal)
@backoff.on_exception(backoff.expo,
(ConnectionError, Timeout),#As ConnectionError error and timeout error does not have attribute status_code,
(ConnectionError, ConnectionResetError, Timeout, ChunkedEncodingError, ProtocolError),#As ConnectionError error and timeout error does not have attribute status_code,
max_tries=5, # here we added another backoff expression.
factor=2)
def call_api(url, request_timeout, params, headers):
Expand Down
38 changes: 38 additions & 0 deletions test/unittests/test_http.py
Expand Up @@ -2,6 +2,8 @@
from unittest.mock import MagicMock, Mock, patch
from tap_zendesk import http, streams
import requests
from urllib3.exceptions import ProtocolError
from requests.exceptions import ChunkedEncodingError, ConnectionError

import zenpy

Expand Down Expand Up @@ -479,3 +481,39 @@ def test_get_cursor_based_handles_503(self, mock_get, mock_sleep):

# Verify the request retry 10 times
self.assertEqual(mock_get.call_count, 10)

@patch("requests.get")
def test_call_api_handles_protocol_error(self, mock_get, mock_sleep):
"""Check whether the request backoff properly for call_api method for 5 times in case of
Protocol error"""
mock_get.side_effect = ProtocolError

with self.assertRaises(ProtocolError) as _:
http.call_api(
url="some_url", request_timeout=300, params={}, headers={}
)
self.assertEqual(mock_get.call_count, 5)

@patch("requests.get")
def test_call_api_handles_chunked_encoding_error(self, mock_get, mock_sleep):
"""Check whether the request backoff properly for call_api method for 5 times in case of
ChunkedEncoding error"""
mock_get.side_effect = ChunkedEncodingError

with self.assertRaises(ChunkedEncodingError) as _:
http.call_api(
url="some_url", request_timeout=300, params={}, headers={}
)
self.assertEqual(mock_get.call_count, 5)

@patch("requests.get")
def test_call_api_handles_connection_reset_error(self, mock_get, mock_sleep):
"""Check whether the request backoff properly for call_api method for 5 times in case of
ConnectionResetError error"""
mock_get.side_effect = ConnectionResetError

with self.assertRaises(ConnectionResetError) as _:
http.call_api(
url="some_url", request_timeout=300, params={}, headers={}
)
self.assertEqual(mock_get.call_count, 5)

0 comments on commit 038acbe

Please sign in to comment.