Skip to content

Commit

Permalink
fix: remove new exceptions (#326)
Browse files Browse the repository at this point in the history
* fix: remove new exceptions

* fix: opt-in the the execution error retry

* add logging for session.execute when retry execution enabled

* address comments from nicco & optmizations

* remove unnecessarily use space

* fix: should return when session failed to be newed(but removed from
pool)
  • Loading branch information
wey-gu committed Mar 21, 2024
1 parent bcc60e7 commit c5d8ca7
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 182 deletions.
12 changes: 2 additions & 10 deletions nebula3/gclient/net/Connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
AuthFailedException,
IOErrorException,
ClientServerIncompatibleException,
SessionException,
ExecutionErrorException,
)

from nebula3.gclient.net.AuthResult import AuthResult
Expand Down Expand Up @@ -198,12 +196,6 @@ def execute_parameter(self, session_id, stmt, params):
"""
try:
resp = self._connection.executeWithParameter(session_id, stmt, params)
if resp.error_code == ErrorCode.E_SESSION_INVALID:
raise SessionException(resp.error_code, resp.error_msg)
if resp.error_code == ErrorCode.E_SESSION_TIMEOUT:
raise SessionException(resp.error_code, resp.error_msg)
if resp.error_code == ErrorCode.E_EXECUTION_ERROR:
raise ExecutionErrorException(resp.error_msg)
return resp
except Exception as te:
if isinstance(te, TTransportException):
Expand Down Expand Up @@ -274,15 +266,15 @@ def close(self):
self._connection._iprot.trans.close()
except Exception as e:
logger.error(
'Close connection to {}:{} failed:{}'.format(self._ip, self._port, e)
"Close connection to {}:{} failed:{}".format(self._ip, self._port, e)
)

def ping(self):
"""check the connection if ok
:return: True or False
"""
try:
resp = self._connection.execute(0, 'YIELD 1;')
resp = self._connection.execute(0, "YIELD 1;")
return True
except Exception:
return False
Expand Down
100 changes: 49 additions & 51 deletions nebula3/gclient/net/Session.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
# This source code is licensed under Apache 2.0 License.


import json
import time

from nebula3.Exception import (
IOErrorException,
NotValidConnectionException,
ExecutionErrorException,
)

from nebula3.common.ttypes import ErrorCode
from nebula3.data.ResultSet import ResultSet
from nebula3.gclient.net.AuthResult import AuthResult
from nebula3.logger import logger
Expand All @@ -25,8 +25,8 @@ def __init__(
auth_result: AuthResult,
pool,
retry_connect=True,
retry_times=3,
retry_interval_sec=1,
execution_retry_count=0,
retry_interval_seconds=1,
):
"""
Initialize the Session object.
Expand All @@ -35,8 +35,8 @@ def __init__(
:param auth_result: The result of the authentication process.
:param pool: The pool object where the session was created.
:param retry_connect: A boolean indicating whether to retry the connection if it fails.
:param retry_times: The number of times to retry the connection.
:param retry_interval_sec: The interval between connection retries in seconds.
:param execution_retry_count: The number of attempts to retry the execution upon encountering an execution error(-1005), with the default being 0 (no retries).
:param retry_interval_seconds: The interval between connection retries in seconds.
"""
self._session_id = auth_result.get_session_id()
self._timezone_offset = auth_result.get_timezone_offset()
Expand All @@ -45,8 +45,8 @@ def __init__(
# connection the where the session was created, if session pool was used
self._pool = pool
self._retry_connect = retry_connect
self._retry_times = retry_times
self._retry_interval_sec = retry_interval_sec
self._execution_retry_count = execution_retry_count
self._retry_interval_seconds = retry_interval_seconds
# the time stamp when the session was added to the idle list of the session pool
self._idle_time_start = 0

Expand All @@ -57,11 +57,27 @@ def execute_parameter(self, stmt, params):
:return: ResultSet
"""
if self._connection is None:
raise RuntimeError('The session has been released')
raise RuntimeError("The session has been released")
try:
start_time = time.time()
resp = self._connection.execute_parameter(self._session_id, stmt, params)
end_time = time.time()

if (
self._execution_retry_count > 0
and resp.error_code == ErrorCode.E_EXECUTION_ERROR
):
for retry_count in range(1, self._execution_retry_count + 1):
logger.warning(
f"Execution error, retrying {retry_count}/{self._execution_retry_count} after {self._retry_interval_seconds}s"
)
time.sleep(self._retry_interval_seconds)
resp = self._connection.execute_parameter(
self._session_id, stmt, params
)
if resp.error_code != ErrorCode.E_EXECUTION_ERROR:
break

return ResultSet(
resp,
all_latency=int((end_time - start_time) * 1000000),
Expand All @@ -72,7 +88,7 @@ def execute_parameter(self, stmt, params):
self._pool.update_servers_status()
if self._retry_connect:
if not self._reconnect():
logger.warning('Retry connect failed')
logger.warning("Retry connect failed")
raise IOErrorException(
IOErrorException.E_ALL_BROKEN, ie.message
)
Expand All @@ -86,27 +102,6 @@ def execute_parameter(self, stmt, params):
timezone_offset=self._timezone_offset,
)
raise
except ExecutionErrorException as eee:
retry_count = 0
while retry_count < self._retry_times:
try:
# TODO: add exponential backoff
time.sleep(self._retry_interval_sec)
resp = self._connection.execute_parameter(
self._session_id, stmt, params
)
end_time = time.time()
return ResultSet(
resp,
all_latency=int((end_time - start_time) * 1000000),
timezone_offset=self._timezone_offset,
)
except ExecutionErrorException:
if retry_count >= self._retry_times - 1:
raise eee
else:
retry_count += 1
continue
except Exception:
raise

Expand Down Expand Up @@ -244,18 +239,37 @@ def execute_json_with_parameter(self, stmt, params):
:return: JSON string
"""
if self._connection is None:
raise RuntimeError('The session has been released')
raise RuntimeError("The session has been released")
try:
resp_json = self._connection.execute_json_with_parameter(
self._session_id, stmt, params
)
if self._execution_retry_count > 0:
for retry_count in range(self._execution_retry_count):
if (
json.loads(resp_json).get("errors", [{}])[0].get("code")
!= ErrorCode.E_EXECUTION_ERROR
):
break
logger.warning(
"Execute failed, retry count:{}/{} in {} seconds".format(
retry_count + 1,
self._execution_retry_count,
self._retry_interval_seconds,
)
)
time.sleep(self._retry_interval_seconds)
resp_json = self._connection.execute_json_with_parameter(
self._session_id, stmt, params
)
return resp_json

except IOErrorException as ie:
if ie.type == IOErrorException.E_CONNECT_BROKEN:
self._pool.update_servers_status()
if self._retry_connect:
if not self._reconnect():
logger.warning('Retry connect failed')
logger.warning("Retry connect failed")
raise IOErrorException(
IOErrorException.E_ALL_BROKEN, ie.message
)
Expand All @@ -264,22 +278,6 @@ def execute_json_with_parameter(self, stmt, params):
)
return resp_json
raise
except ExecutionErrorException as eee:
retry_count = 0
while retry_count < self._retry_times:
try:
# TODO: add exponential backoff
time.sleep(self._retry_interval_sec)
resp = self._connection.execute_json_with_parameter(
self._session_id, stmt, params
)
return resp
except ExecutionErrorException:
if retry_count >= self._retry_times - 1:
raise eee
else:
retry_count += 1
continue
except Exception:
raise

Expand Down Expand Up @@ -310,7 +308,7 @@ def ping_session(self):
return True
else:
logger.error(
'failed to ping the session: error code:{}, error message:{}'.format(
"failed to ping the session: error code:{}, error message:{}".format(
resp.error_code, resp.error_msg
)
)
Expand Down Expand Up @@ -342,5 +340,5 @@ def _idle_time(self):
def _sign_out(self):
"""sign out the session"""
if self._connection is None:
raise RuntimeError('The session has been released')
raise RuntimeError("The session has been released")
self._connection.signout(self._session_id)
Loading

0 comments on commit c5d8ca7

Please sign in to comment.