Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions e2e/test_http_proxy_m3u_rewrite.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,23 @@
proxy. These tests verify the rewriting logic end-to-end.
"""

import socket
import threading

import pytest

from helpers import (
MockHTTPUpstream,
R2HProcess,
find_free_port,
http_get,
stream_get,
)

pytestmark = pytest.mark.http_proxy

_TIMEOUT = 5.0
_HEADER_PARSE_READ_SIZE = 8191 # HTTP_PROXY_RESPONSE_BUFFER_SIZE - 1


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -66,6 +71,81 @@ def _make_m3u_upstream(path, body, content_type="application/vnd.apple.mpegurl")
return upstream


class _RawHTTPResponseUpstream:
"""Serve a prebuilt raw HTTP response and keep the connection open."""

def __init__(self, response):
self.port = find_free_port()
self.response = response
self._server_sock = None
self._thread = None
self._stop = threading.Event()
self._client_threads = []

def start(self):
self._server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._server_sock.bind(("127.0.0.1", self.port))
self._server_sock.listen(5)
self._server_sock.settimeout(0.5)
self._thread = threading.Thread(target=self._accept, daemon=True)
self._thread.start()

def stop(self):
self._stop.set()
if self._server_sock:
self._server_sock.close()
for thread in self._client_threads:
thread.join(timeout=1)
if self._thread:
self._thread.join(timeout=3)

def _accept(self):
assert self._server_sock is not None
while not self._stop.is_set():
try:
conn, _ = self._server_sock.accept()
except socket.timeout:
continue
except OSError:
break
thread = threading.Thread(target=self._handle, args=(conn,), daemon=True)
self._client_threads.append(thread)
thread.start()

def _handle(self, conn):
try:
conn.settimeout(1.0)
request = b""
while b"\r\n\r\n" not in request:
chunk = conn.recv(1024)
if not chunk:
return
request += chunk
conn.sendall(self.response)
self._stop.wait(_TIMEOUT * 2)
except OSError:
pass
finally:
conn.close()


def _make_padded_header_m3u_upstream(body, content_type="application/vnd.apple.mpegurl"):
"""Create an upstream whose first proxy header read contains no body bytes."""
if isinstance(body, str):
body = body.encode()
prefix = (f"HTTP/1.1 200 OK\r\nContent-Type: {content_type}\r\nContent-Length: {len(body)}\r\nX-Pad: ").encode()
suffix = b"\r\n\r\n"
pad_len = _HEADER_PARSE_READ_SIZE - len(prefix) - len(suffix)
assert pad_len > 0
headers = prefix + (b"a" * pad_len) + suffix
assert len(headers) == _HEADER_PARSE_READ_SIZE
response = headers + body
upstream = _RawHTTPResponseUpstream(response)
upstream.start()
return upstream


# ---------------------------------------------------------------------------
# Basic absolute http:// URL rewriting
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -571,6 +651,37 @@ def test_mixed_absolute_and_relative(self, shared_r2h):
finally:
upstream.stop()

@pytest.mark.parametrize("upstream_mode", ["normal_headers", "padded_header_only"], ids=["normal", "header-only"])
def test_large_playlist_body_is_fully_buffered(self, shared_r2h, upstream_mode):
"""A large M3U body should be fully read after header parsing."""
segment_count = 4096
segments = "".join("#EXTINF:10,\nsegment-%04d.ts?token=abcdef0123456789\n" % i for i in range(segment_count))
m3u = "#EXTM3U\n#EXT-X-TARGETDURATION:10\n" + segments + "#EXT-X-ENDLIST\n"
if upstream_mode == "normal_headers":
upstream = _make_m3u_upstream("/lookback/long.m3u8", m3u)
else:
upstream = _make_padded_header_m3u_upstream(m3u)
try:
status, hdrs, body = stream_get(
"127.0.0.1",
shared_r2h.port,
f"/http/127.0.0.1:{upstream.port}/lookback/long.m3u8",
read_bytes=512 * 1024,
timeout=_TIMEOUT,
)
text = body.decode("utf-8", errors="replace")
assert status == 200
assert f"/http/127.0.0.1:{upstream.port}/lookback/segment-0000.ts?token=abcdef0123456789" in text
assert (
f"/http/127.0.0.1:{upstream.port}/lookback/segment-{segment_count - 1:04d}.ts?token=abcdef0123456789"
in text
)
cl = hdrs.get("content-length")
assert cl is not None
assert int(cl) == len(body)
finally:
upstream.stop()


# ---------------------------------------------------------------------------
# Edge cases
Expand Down
15 changes: 9 additions & 6 deletions src/http_proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,7 @@ static int http_proxy_try_receive_response(http_proxy_session_t *session) {
return http_proxy_finalize_rewrite(session);
}

return 0; /* Keep buffering */
return (int)received; /* Progress: keep draining edge-triggered sockets */
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the doc comment to say "positive value if progress was made" instead of "bytes forwarded", and renamed the accumulator in http_proxy_handle_socket_event from bytes_forwarded to progress. The only caller (stream.c) only checks < 0, so no functional change.

}

/* Phase 2: Zero-copy streaming - recv directly to buffer pool */
Expand Down Expand Up @@ -891,6 +891,7 @@ static int http_proxy_try_receive_response(http_proxy_session_t *session) {
}

session->response_buffer_pos += received;
int socket_progress = (int)received;

/* Try to parse headers */
if (!session->headers_received) {
Expand All @@ -899,7 +900,7 @@ static int http_proxy_try_receive_response(http_proxy_session_t *session) {
return -1;
}
if (result == 0) {
return 0; /* Need more data for headers */
return socket_progress; /* Progress: keep draining edge-triggered sockets */
}
/* result > 0 means headers complete, state is now STREAMING */
}
Expand Down Expand Up @@ -930,6 +931,8 @@ static int http_proxy_try_receive_response(http_proxy_session_t *session) {
logger(LOG_DEBUG, "HTTP Proxy: All M3U content received with headers (%zd bytes)", session->bytes_received);
return http_proxy_finalize_rewrite(session);
}

bytes_forwarded = (int)initial_size;
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed by the second commit (f5b918d) which added socket_progress fallback — the header-only case now returns socket_progress (the full recv byte count) instead of 0, so the drain loop stays alive. Additionally updated the doc comment and renamed the accumulator to progress in 5a1a0d9 to reflect the actual semantics.

} else {
/* Normal mode: forward immediately */
if (connection_queue_output(session->conn, session->response_buffer, session->response_buffer_pos) < 0) {
Expand All @@ -949,7 +952,7 @@ static int http_proxy_try_receive_response(http_proxy_session_t *session) {
}
}

return bytes_forwarded;
return bytes_forwarded > 0 ? bytes_forwarded : socket_progress;
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Went with updating the contract — renamed the accumulator to progress and updated the doc to say "positive value if progress was made (data received/forwarded)". No caller uses the positive value semantically, so separating the signals would be unnecessary complexity.

}

/* Check if status code is a redirect that may have Location header */
Expand Down Expand Up @@ -1305,7 +1308,7 @@ int http_proxy_handle_socket_event(http_proxy_session_t *session, uint32_t event
* edge-triggered pollers (epoll EPOLLET / kqueue EV_CLEAR) where the read event fires
* only once per data arrival and won't re-trigger while unread data
* remains in the socket buffer. */
int bytes_forwarded = 0;
int progress = 0;
if (events & POLLER_IN) {
while (session->state == HTTP_PROXY_STATE_AWAITING_HEADERS || session->state == HTTP_PROXY_STATE_STREAMING) {
result = http_proxy_try_receive_response(session);
Expand All @@ -1316,7 +1319,7 @@ int http_proxy_handle_socket_event(http_proxy_session_t *session, uint32_t event
}
if (result == 0)
break; /* EAGAIN or need more data - wait for next event */
bytes_forwarded += result;
progress += result;
}
}

Expand Down Expand Up @@ -1356,7 +1359,7 @@ int http_proxy_handle_socket_event(http_proxy_session_t *session, uint32_t event
}
}

return bytes_forwarded;
return progress;
}

int http_proxy_session_cleanup(http_proxy_session_t *session) {
Expand Down
4 changes: 2 additions & 2 deletions src/http_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ int http_proxy_connect(http_proxy_session_t *session);
* Called when socket has EPOLLIN or EPOLLOUT events
* @param session HTTP proxy session
* @param events Epoll events (EPOLLIN, EPOLLOUT, etc.)
* @return Number of bytes forwarded to client (>0), 0 if no data forwarded, -1
* on error
* @return Positive value if progress was made (data received/forwarded),
* 0 if no progress (EAGAIN), -1 on error
*/
int http_proxy_handle_socket_event(http_proxy_session_t *session, uint32_t events);

Expand Down
Loading