Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 48 additions & 27 deletions app/drivers/surfboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,11 @@ def _hnap_post(self, action: str, body: dict, *,
HNAP_AUTH is sent on **every** request. Before login the
pre-shared key ``withoutloginkey`` is used as PrivateKey.

On transport-level failures (ConnectionError, ChunkedEncodingError)
retries once with a fresh HNAP_AUTH timestamp. Some SB8200 firmware
resets the TCP connection after sending HTTP 200 headers but before the
response body is readable; a single retry often succeeds.

Args:
action: HNAP action name (e.g. "Login", "GetMultipleHNAPs")
body: JSON body to send
Expand All @@ -623,34 +628,50 @@ def _hnap_post(self, action: str, body: dict, *,
else:
algo = hashlib.sha256

ts = str(int(time.time() * 1000) % 2_000_000_000_000)
auth_key = self._private_key or _HNAP_PRELOGIN_KEY
auth_payload = ts + soap_action
auth_hash = hmac.new(
auth_key.encode(),
auth_payload.encode(),
algo,
).hexdigest().upper()

headers = {
"Content-Type": "application/json",
"Connection": "close",
"SOAPACTION": soap_action,
"HNAP_AUTH": f"{auth_hash} {ts}",
}
last_err: Exception | None = None
for attempt in range(2):
if attempt > 0:
log.warning(
"HNAP %s response read failed, retrying once: %s",
action, last_err,
)
time.sleep(1)

try:
r = self._session.post(url, json=body, headers=headers, timeout=30)
if not r.ok:
log.debug("HNAP %s returned HTTP %d (%d bytes): %s",
action, r.status_code, len(r.content), r.text[:500])
r.raise_for_status()
return r.json()
except requests.exceptions.ChunkedEncodingError as e:
# Some SB8200 units reset the socket after sending HTTP 200 headers
# but before the response body is fully readable. Treat that as the
# same transport-level failure path as a dropped connection.
raise requests.ConnectionError(str(e)) from e
ts = str(int(time.time() * 1000) % 2_000_000_000_000)
auth_key = self._private_key or _HNAP_PRELOGIN_KEY
auth_payload = ts + soap_action
auth_hash = hmac.new(
auth_key.encode(),
auth_payload.encode(),
algo,
).hexdigest().upper()

headers = {
"Content-Type": "application/json",
"Connection": "close",
"SOAPACTION": soap_action,
"HNAP_AUTH": f"{auth_hash} {ts}",
}

try:
r = self._session.post(
url, json=body, headers=headers, timeout=30,
)
if not r.ok:
log.debug(
"HNAP %s returned HTTP %d (%d bytes): %s",
action, r.status_code, len(r.content), r.text[:500],
)
r.raise_for_status()
return r.json()
except requests.exceptions.ChunkedEncodingError as e:
last_err = requests.ConnectionError(str(e))
last_err.__cause__ = e
except requests.ConnectionError as e:
last_err = e

assert last_err is not None
raise last_err

# -- Channel parsers --

Expand Down
196 changes: 194 additions & 2 deletions tests/test_surfboard_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -1286,7 +1286,8 @@ def mock_hnap_post(action, body, **kwargs):
driver.login()

def test_legacy_tls_phase2_chunked_read_retry_succeeds(self):
"""Legacy TLS phase 2 retries when the response body resets after HTTP 200."""
"""Legacy TLS phase 2 body read failure is retried inside _hnap_post
without needing a full session reset."""
import requests as req
from urllib3.exceptions import ProtocolError

Expand Down Expand Up @@ -1318,7 +1319,8 @@ def test_legacy_tls_phase2_chunked_read_retry_succeeds(self):
patch("app.drivers.surfboard.time"):
driver.login()

assert fresh_session.call_count == 1
# _hnap_post retries internally -- no full session reset needed
assert fresh_session.call_count == 0
assert driver._logged_in is True


Expand Down Expand Up @@ -1441,3 +1443,193 @@ def test_connection_close_on_data_request(self, driver):
call_kwargs = mock_post.call_args
headers = call_kwargs.kwargs.get("headers") or call_kwargs[1].get("headers", {})
assert headers.get("Connection") == "close"


# -- _hnap_post transport-level retry --

class TestHnapPostRetry:
def test_connection_error_retries_once_and_succeeds(self, driver):
"""ConnectionError on first attempt retries; second attempt succeeds."""
import requests as req

ok_response = MagicMock()
ok_response.ok = True
ok_response.raise_for_status = MagicMock()
ok_response.json.return_value = {"LoginResponse": {"LoginResult": "OK"}}

with patch.object(
driver._session, "post",
side_effect=[req.ConnectionError("reset"), ok_response],
) as mock_post, patch("app.drivers.surfboard.time") as mock_time:
mock_time.time.return_value = 1700000000.0
mock_time.sleep = MagicMock()

result = driver._hnap_post("Login", {"Login": {}})

assert result == {"LoginResponse": {"LoginResult": "OK"}}
assert mock_post.call_count == 2
mock_time.sleep.assert_called_once_with(1)

def test_chunked_encoding_error_retries_once_and_succeeds(self, driver):
"""ChunkedEncodingError on first attempt retries; second succeeds."""
import requests as req
from urllib3.exceptions import ProtocolError

ok_response = MagicMock()
ok_response.ok = True
ok_response.raise_for_status = MagicMock()
ok_response.json.return_value = {"result": "ok"}

with patch.object(
driver._session, "post",
side_effect=[
req.exceptions.ChunkedEncodingError(
ProtocolError("Connection broken", ConnectionResetError(104))
),
ok_response,
],
) as mock_post, patch("app.drivers.surfboard.time") as mock_time:
mock_time.time.return_value = 1700000000.0
mock_time.sleep = MagicMock()

result = driver._hnap_post("Login", {"Login": {}})

assert result == {"result": "ok"}
assert mock_post.call_count == 2

def test_both_attempts_fail_raises_connection_error(self, driver):
"""When both attempts fail, ConnectionError propagates."""
import requests as req

with patch.object(
driver._session, "post",
side_effect=req.ConnectionError("persistent reset"),
), patch("app.drivers.surfboard.time") as mock_time:
mock_time.time.return_value = 1700000000.0
mock_time.sleep = MagicMock()

with pytest.raises(req.ConnectionError, match="persistent reset"):
driver._hnap_post("Login", {"Login": {}})

def test_chunked_error_both_fail_raises_as_connection_error(self, driver):
"""ChunkedEncodingError on both attempts raises as ConnectionError
with original error text and cause chain preserved."""
import requests as req
from urllib3.exceptions import ProtocolError

original_msg = "Connection broken: ConnectionResetError(104, 'Connection reset by peer')"
with patch.object(
driver._session, "post",
side_effect=req.exceptions.ChunkedEncodingError(
ProtocolError(original_msg, ConnectionResetError(104))
),
), patch("app.drivers.surfboard.time") as mock_time:
mock_time.time.return_value = 1700000000.0
mock_time.sleep = MagicMock()

with pytest.raises(req.ConnectionError, match="Connection broken") as exc_info:
driver._hnap_post("Login", {"Login": {}})

assert exc_info.value.__cause__ is not None, \
"cause chain must be preserved for caller error messages"

def test_http_error_not_retried(self, driver):
"""HTTPError is not caught by the retry loop -- propagates immediately."""
import requests as req

error_response = MagicMock()
error_response.ok = False
error_response.status_code = 500
error_response.content = b"error"
error_response.text = "error"
error_response.raise_for_status.side_effect = req.HTTPError(
response=error_response
)

with patch.object(
driver._session, "post", return_value=error_response,
) as mock_post, patch("app.drivers.surfboard.time") as mock_time:
mock_time.time.return_value = 1700000000.0

with pytest.raises(req.HTTPError):
driver._hnap_post("Login", {"Login": {}})

# Only one attempt -- no retry on HTTPError
assert mock_post.call_count == 1

def test_retry_regenerates_hnap_auth(self, driver):
"""Retry generates a fresh HNAP_AUTH with updated timestamp."""
import requests as req

ok_response = MagicMock()
ok_response.ok = True
ok_response.raise_for_status = MagicMock()
ok_response.json.return_value = {"result": "ok"}

timestamps = iter([1700000000.0, 1700000001.0])

with patch.object(
driver._session, "post",
side_effect=[req.ConnectionError("reset"), ok_response],
) as mock_post, patch("app.drivers.surfboard.time") as mock_time:
mock_time.time.side_effect = timestamps
mock_time.sleep = MagicMock()

driver._hnap_post("Login", {"Login": {}})

first_headers = mock_post.call_args_list[0].kwargs.get("headers") or \
mock_post.call_args_list[0][1].get("headers", {})
second_headers = mock_post.call_args_list[1].kwargs.get("headers") or \
mock_post.call_args_list[1][1].get("headers", {})

assert first_headers["HNAP_AUTH"] != second_headers["HNAP_AUTH"], \
"retry must use a fresh HNAP_AUTH with updated timestamp"

def test_retry_preserves_private_key_and_auth_algo(self, driver):
"""Retry during phase 2 uses the derived private key and explicit
auth_algo, not the pre-login defaults."""
import hashlib
import requests as req

driver._private_key = "DERIVED_PRIVATE_KEY_ABC123"
driver._hmac_algo = "md5"

ok_response = MagicMock()
ok_response.ok = True
ok_response.raise_for_status = MagicMock()
ok_response.json.return_value = {"LoginResponse": {"LoginResult": "OK"}}

timestamps = iter([1700000000.0, 1700000001.0])

with patch.object(
driver._session, "post",
side_effect=[req.ConnectionError("reset"), ok_response],
) as mock_post, patch("app.drivers.surfboard.time") as mock_time:
mock_time.time.side_effect = timestamps
mock_time.sleep = MagicMock()

driver._hnap_post("Login", {"Login": {}}, auth_algo=hashlib.md5)

# Both attempts must use the derived private key with MD5
for call in mock_post.call_args_list:
headers = call.kwargs.get("headers") or call[1].get("headers", {})
hnap_auth = headers["HNAP_AUTH"]
# Pre-login key "withoutloginkey" produces a different HMAC;
# verify the auth was computed with the derived key
assert hnap_auth != "", "HNAP_AUTH must be set"

# Verify the retry HMAC uses the same key but different timestamp
first_auth = mock_post.call_args_list[0].kwargs.get("headers", {})["HNAP_AUTH"]
second_auth = mock_post.call_args_list[1].kwargs.get("headers", {})["HNAP_AUTH"]
assert first_auth != second_auth, "different timestamps"

# Verify both used MD5 with derived key (not pre-login default)
ts1 = first_auth.split(" ")[1]
soap = '"http://purenetworks.com/HNAP1/Login"'
expected_hash = hmac.new(
"DERIVED_PRIVATE_KEY_ABC123".encode(),
(ts1 + soap).encode(),
hashlib.md5,
).hexdigest().upper()
assert first_auth.startswith(expected_hash), \
"retry must use derived private key with explicit auth_algo, not defaults"
Loading