Skip to content

Commit

Permalink
[275] Iterate as JSON-lines, not just as regular text lines (skip empty)
Browse files Browse the repository at this point in the history
Otherwise, it freezes at the real cluster (example 99) due to an empty
line is yielded, and cannot be parsed.
  • Loading branch information
nolar committed Dec 16, 2019
1 parent fd3e3fb commit b5e6d21
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 21 deletions.
14 changes: 7 additions & 7 deletions kopf/clients/watching.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,12 @@ async def watch_objs(
response.raise_for_status()

async with response:
async for line in _iter_lines(response.content):
async for line in _iter_jsonlines(response.content):
event = cast(bodies.RawEvent, json.loads(line.decode("utf-8")))
yield event


async def _iter_lines(
async def _iter_jsonlines(
content: aiohttp.StreamReader,
chunk_size: int = 1024 * 1024,
) -> AsyncIterator[bytes]:
Expand Down Expand Up @@ -196,23 +196,23 @@ async def _iter_lines(

# Minimize the memory footprint by keeping at most 2 copies of a yielded line in memory
# (in the buffer and as a yielded value), and at most 1 copy of other lines (in the buffer).
buffer = None
buffer = b''
async for data in content.iter_chunked(chunk_size):

buffer = data if buffer is None else buffer + data
buffer += data
del data

start = 0
index = buffer.find(b'\n', start)
while index >= 0:
line = buffer[start:index]
yield line
if line:
yield line
del line
start = index + 1
index = buffer.find(b'\n', start)

if start > 0:
buffer = buffer[start:]

if buffer is not None:
if buffer:
yield buffer
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import asynctest
from kopf.clients.watching import _iter_lines
from kopf.clients.watching import _iter_jsonlines


async def test_empty_content():
Expand All @@ -9,7 +9,7 @@ async def iter_chunked(n: int):

content = asynctest.Mock(iter_chunked=iter_chunked)
lines = []
async for line in _iter_lines(content):
async for line in _iter_jsonlines(content):
lines.append(line)

assert lines == []
Expand All @@ -21,10 +21,10 @@ async def iter_chunked(n: int):

content = asynctest.Mock(iter_chunked=iter_chunked)
lines = []
async for line in _iter_lines(content):
async for line in _iter_jsonlines(content):
lines.append(line)

assert lines == [b'']
assert lines == []


async def test_one_chunk_one_line():
Expand All @@ -33,7 +33,7 @@ async def iter_chunked(n: int):

content = asynctest.Mock(iter_chunked=iter_chunked)
lines = []
async for line in _iter_lines(content):
async for line in _iter_jsonlines(content):
lines.append(line)

assert lines == [b'hello']
Expand All @@ -45,33 +45,33 @@ async def iter_chunked(n: int):

content = asynctest.Mock(iter_chunked=iter_chunked)
lines = []
async for line in _iter_lines(content):
async for line in _iter_jsonlines(content):
lines.append(line)

assert lines == [b'hello', b'world']


async def test_one_chunk_empty_lines():
async def iter_chunked(n: int):
yield b'\nhello\nworld\n'
yield b'\n\nhello\n\nworld\n\n'

content = asynctest.Mock(iter_chunked=iter_chunked)
lines = []
async for line in _iter_lines(content):
async for line in _iter_jsonlines(content):
lines.append(line)

assert lines == [b'', b'hello', b'world', b'']
assert lines == [b'hello', b'world']


async def test_few_chunks_split():
async def iter_chunked(n: int):
yield b'\nhel'
yield b'lo\nwo'
yield b'rld\n'
yield b'\n\nhell'
yield b'o\n\nwor'
yield b'ld\n\n'

content = asynctest.Mock(iter_chunked=iter_chunked)
lines = []
async for line in _iter_lines(content):
async for line in _iter_jsonlines(content):
lines.append(line)

assert lines == [b'', b'hello', b'world', b'']
assert lines == [b'hello', b'world']

0 comments on commit b5e6d21

Please sign in to comment.