From dc258e041ce472a76f7b9f8550ed271312f26648 Mon Sep 17 00:00:00 2001 From: skyflow-bharti Date: Fri, 12 Sep 2025 15:47:23 +0530 Subject: [PATCH 1/3] SK-2292 retry on connection --- skyflow/vault/_client.py | 40 +++++++++++++++++++++++++++++----------- 1 file changed, 29 insertions(+), 11 deletions(-) diff --git a/skyflow/vault/_client.py b/skyflow/vault/_client.py index e426f59f..b9d858fa 100644 --- a/skyflow/vault/_client.py +++ b/skyflow/vault/_client.py @@ -45,7 +45,6 @@ def __init__(self, config: Configuration): def insert(self, records: dict, options: InsertOptions = InsertOptions()): interface = InterfaceName.INSERT.value log_info(InfoMessages.INSERT_TRIGGERED.value, interface=interface) - self._checkConfig(interface) jsonBody = getInsertRequestBody(records, options) @@ -57,16 +56,35 @@ def insert(self, records: dict, options: InsertOptions = InsertOptions()): "sky-metadata": json.dumps(getMetrics()) } - response = requests.post(requestURL, data=jsonBody, headers=headers) - processedResponse = processResponse(response) - result, partial = convertResponse(records, processedResponse, options) - if partial: - log_error(SkyflowErrorMessages.BATCH_INSERT_PARTIAL_SUCCESS.value, interface) - elif 'records' not in result: - log_error(SkyflowErrorMessages.BATCH_INSERT_FAILURE.value, interface) - else: - log_info(InfoMessages.INSERT_DATA_SUCCESS.value, interface) - return result + # Use for-loop for retry logic, avoid code repetition + for attempt in range(2): + try: + # If jsonBody is a dict, use json=, else use data= + if isinstance(jsonBody, dict): + response = requests.post(requestURL, json=jsonBody, headers=headers) + else: + response = requests.post(requestURL, data=jsonBody, headers=headers) + processedResponse = processResponse(response) + result, partial = convertResponse(records, processedResponse, options) + if partial: + log_error(SkyflowErrorMessages.BATCH_INSERT_PARTIAL_SUCCESS.value, interface) + raise SkyflowError(SkyflowErrorCodes.PARTIAL_SUCCESS, SkyflowErrorMessages.BATCH_INSERT_PARTIAL_SUCCESS.value, result, interface=interface) + if 'records' not in result: + log_error(SkyflowErrorMessages.BATCH_INSERT_FAILURE.value, interface) + raise SkyflowError(SkyflowErrorCodes.SERVER_ERROR, SkyflowErrorMessages.BATCH_INSERT_FAILURE.value, result, interface=interface) + log_info(InfoMessages.INSERT_DATA_SUCCESS.value, interface) + return result + except requests.exceptions.ConnectionError as err: + log_error(f'Connection error inserting record: {err}', interface) + if attempt == 0: + log_info("Retrying record...", interface) + continue + else: + raise SkyflowError(SkyflowErrorCodes.SERVER_ERROR, f"Connection error after retry: {err}", interface=interface) + except Exception as err: + log_error(f'Unexpected error in insert: {err}', interface) + raise + def detokenize(self, records: dict, options: DetokenizeOptions = DetokenizeOptions()): interface = InterfaceName.DETOKENIZE.value From adafc1cc1b618f4d26c868fb764b490a92147580 Mon Sep 17 00:00:00 2001 From: skyflow-bharti Date: Fri, 12 Sep 2025 15:51:48 +0530 Subject: [PATCH 2/3] SK-2292 set default retry --- skyflow/vault/_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/skyflow/vault/_client.py b/skyflow/vault/_client.py index b9d858fa..58e28d05 100644 --- a/skyflow/vault/_client.py +++ b/skyflow/vault/_client.py @@ -55,9 +55,9 @@ def insert(self, records: dict, options: InsertOptions = InsertOptions()): "Authorization": "Bearer " + self.storedToken, "sky-metadata": json.dumps(getMetrics()) } - + max_retries = 1 # Use for-loop for retry logic, avoid code repetition - for attempt in range(2): + for attempt in range(max_retries): try: # If jsonBody is a dict, use json=, else use data= if isinstance(jsonBody, dict): From b3d791cafb37e6e1d0748dde5b389dd07d737178 Mon Sep 17 00:00:00 2001 From: skyflow-bharti Date: Fri, 12 Sep 2025 18:48:11 +0530 Subject: [PATCH 3/3] SK-2293 max retry set to 3 --- skyflow/vault/_client.py | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/skyflow/vault/_client.py b/skyflow/vault/_client.py index 58e28d05..785c42eb 100644 --- a/skyflow/vault/_client.py +++ b/skyflow/vault/_client.py @@ -55,15 +55,12 @@ def insert(self, records: dict, options: InsertOptions = InsertOptions()): "Authorization": "Bearer " + self.storedToken, "sky-metadata": json.dumps(getMetrics()) } - max_retries = 1 + max_retries = 3 # Use for-loop for retry logic, avoid code repetition - for attempt in range(max_retries): + for attempt in range(max_retries+1): try: # If jsonBody is a dict, use json=, else use data= - if isinstance(jsonBody, dict): - response = requests.post(requestURL, json=jsonBody, headers=headers) - else: - response = requests.post(requestURL, data=jsonBody, headers=headers) + response = requests.post(requestURL, data=jsonBody, headers=headers) processedResponse = processResponse(response) result, partial = convertResponse(records, processedResponse, options) if partial: @@ -75,16 +72,10 @@ def insert(self, records: dict, options: InsertOptions = InsertOptions()): log_info(InfoMessages.INSERT_DATA_SUCCESS.value, interface) return result except requests.exceptions.ConnectionError as err: - log_error(f'Connection error inserting record: {err}', interface) - if attempt == 0: - log_info("Retrying record...", interface) + if attempt < max_retries: continue else: - raise SkyflowError(SkyflowErrorCodes.SERVER_ERROR, f"Connection error after retry: {err}", interface=interface) - except Exception as err: - log_error(f'Unexpected error in insert: {err}', interface) - raise - + raise SkyflowError(SkyflowErrorCodes.SERVER_ERROR, f"Error occurred: {err}", interface=interface) def detokenize(self, records: dict, options: DetokenizeOptions = DetokenizeOptions()): interface = InterfaceName.DETOKENIZE.value