Skip to content

Commit

Permalink
alternator: close output_stream when exception is thrown during respo…
Browse files Browse the repository at this point in the history
…nse streaming

When exception occurs and we omit closing output_stream then the whole process is brought down
by an assertion in ~output_stream.

Fixes #14453
Relates #14403

Closes #14454
  • Loading branch information
nuivall authored and nyh committed Jul 4, 2023
1 parent 3679792 commit 6424dd5
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 4 deletions.
10 changes: 7 additions & 3 deletions alternator/executor.cc
Expand Up @@ -109,16 +109,20 @@ json::json_return_type make_streamed(rjson::value&& value) {
// move objects to coroutine frame.
auto los = std::move(os);
auto lrs = std::move(rs);
std::exception_ptr ex;
try {
co_await rjson::print(*lrs, los);
co_await los.close();
} catch (...) {
// at this point, we cannot really do anything. HTTP headers and return code are
// already written, and quite potentially a portion of the content data.
// just log + rethrow. It is probably better the HTTP server closes connection
// abruptly or something...
elogger.error("Unhandled exception in data streaming: {}", std::current_exception());
throw;
ex = std::current_exception();
elogger.error("Exception during streaming HTTP response: {}", ex);
}
co_await los.close();
if (ex) {
co_await coroutine::return_exception_ptr(std::move(ex));
}
co_return;
};
Expand Down
13 changes: 13 additions & 0 deletions test/alternator/conftest.py
Expand Up @@ -14,6 +14,7 @@
import requests
import re
from util import create_test_table, is_aws, scylla_log
from urllib.parse import urlparse

# When tests are run with HTTPS, the server often won't have its SSL
# certificate signed by a known authority. So we will disable certificate
Expand Down Expand Up @@ -89,6 +90,18 @@ def dynamodb(request):
region_name='us-east-1', aws_access_key_id='alternator', aws_secret_access_key='secret_pass',
config=boto_config.merge(botocore.client.Config(retries={"max_attempts": 0}, read_timeout=300)))

def new_dynamodb_session(request, dynamodb):
ses = boto3.Session()
host = urlparse(dynamodb.meta.client._endpoint.host)
conf = botocore.client.Config(parameter_validation=False)
if request.config.getoption('aws'):
return boto3.resource('dynamodb', config=conf)
if host.hostname == 'localhost':
conf = conf.merge(botocore.client.Config(retries={"max_attempts": 0}, read_timeout=300))
return ses.resource('dynamodb', endpoint_url=dynamodb.meta.client._endpoint.host, verify=host.scheme != 'http',
region_name='us-east-1', aws_access_key_id='alternator', aws_secret_access_key='secret_pass',
config=conf)

@pytest.fixture(scope="session")
def dynamodbstreams(request):
# Disable boto3's client-side validation of parameters. This validation
Expand Down
43 changes: 42 additions & 1 deletion test/alternator/test_batch.py
Expand Up @@ -8,8 +8,12 @@

import pytest
import random
from botocore.exceptions import ClientError
from botocore.exceptions import ClientError, HTTPClientError
from util import random_string, full_scan, full_query, multiset, scylla_inject_error
import urllib3
import traceback
import sys
from conftest import new_dynamodb_session

# Test ensuring that items inserted by a batched statement can be properly extracted
# via GetItem. Schema has both hash and sort keys.
Expand Down Expand Up @@ -375,6 +379,43 @@ def test_batch_write_item_large(test_table_sn):
assert full_query(test_table_sn, KeyConditionExpression='p=:p', ExpressionAttributeValues={':p': p}
) == [{'p': p, 'c': i, 'content': long_content} for i in range(25)]

# Test if client breaking connection during HTTP response
# streaming doesn't break the server.
def test_batch_write_item_large_broken_connection(test_table_sn, request, dynamodb):
fn_name = sys._getframe().f_code.co_name
ses = new_dynamodb_session(request, dynamodb)

p = random_string()
long_content = random_string(100)*500
write_reply = test_table_sn.meta.client.batch_write_item(RequestItems = {
test_table_sn.name: [{'PutRequest': {'Item': {'p': p, 'c': i, 'content': long_content}}} for i in range(25)],
})
assert 'UnprocessedItems' in write_reply and write_reply['UnprocessedItems'] == dict()

read_fun = urllib3.HTTPResponse.read_chunked
triggered = False
def broken_read_fun(self, amt=None, decode_content=None):
ret = read_fun(self, amt, decode_content)
st = traceback.extract_stack()
# Try to not disturb other tests if executed in parallel
if fn_name in str(st):
self._fp.fp.raw.close() # close the socket
nonlocal triggered
triggered = True
return ret
urllib3.HTTPResponse.read_chunked = broken_read_fun

try:
# This disruption doesn't always work so we repeat it.
for _ in range(1, 20):
with pytest.raises(HTTPClientError):
# Our monkey patched read_chunked function will make client unusable
# so we need to use separate session so that it doesn't affect other tests.
ses.meta.client.query(TableName=test_table_sn.name, KeyConditionExpression='p=:p', ExpressionAttributeValues={':p': p})
assert triggered
finally:
urllib3.HTTPResponse.read_chunked = read_fun

# DynamoDB limits the number of items written by a BatchWriteItem operation
# to 25, even if they are small. Exceeding this limit results in a
# ValidationException error - and none of the items in the batch are written.
Expand Down

0 comments on commit 6424dd5

Please sign in to comment.