# Stream Processing

A very common need in many languages is the ability to process streams of data.

## Background

To follow what's going on here, we'll use a decorator to print out the output of every stage of our processing pipelines. If you're not familiar with decorators, you can look into one of the many tutorials on the matter (including [my decorators tutorial](../decorators/README.ipynb)). All this decorator does is print out the function name and value of each piece of data passing through it. This gives us visibility into what's going on inside various generators that are processing concurrently.

The other function we'll use is just a simple function that iterates over the stream as a whole and emits each piece of final output. This is the actual output from whatever stream pipeline we create.

In [1]:
def print_element(f_name, item):
    print(f'{f_name}: {item}')

def print_elements(func):
    def wrapper(*args, **kwargs):
        f_name = func.__name__
        for item in func(*args, **kwargs):
            print(f'{f_name}: {item}')
            yield item
    return wrapper

def consume(stream, max_len=None):
    index = 0
    for item in stream:
        print(f'output: {item}')
        index += 1
        if max_len is not None and index >= max_len:
            print('output: ... (ignoring the rest)')
            break

@print_elements
def basic_stream(n=10):
    return range(n)

consume(basic_stream(), max_len=3)

basic_stream: 0
output: 0
basic_stream: 1
output: 1
basic_stream: 2
output: 2
output: ... (ignoring the rest)


Anything that is iterable can be passed to `consume()`. In this quick dump, we emit a string (which iterates as a series of characters) so we can see how the output handles whitespace:

In [2]:
consume('ab\tc\nd e')

output: a
output: b
output: 	
output: c
output: 

output: d
output:  
output: e


## Data

We will also need a stream of data to process. For simplicity, I will be using a generator that simply emits lines from a pretend file:

In [3]:
@print_elements
def source_strings():
    return 'the quick brown fox jumps over the lazy dog'.split()

consume(source_strings())

source_strings: the
output: the
source_strings: quick
output: quick
source_strings: brown
output: brown
source_strings: fox
output: fox
source_strings: jumps
output: jumps
source_strings: over
output: over
source_strings: the
output: the
source_strings: lazy
output: lazy
source_strings: dog
output: dog


## Pipelines

The basic idea of a pipeline is that you have a series of iterables, each of which consumes a stream (its input) and emits a stream (its output). The last example was techincally a pipeline.

Another example would be to normalize our input, perhaps by capitalizing each word:

In [4]:
@print_elements
def to_upper(word_stream):
    for word in word_stream:
        yield word.upper()

consume(to_upper(source_strings()), max_len=3)

source_strings: the
to_upper: THE
output: THE
source_strings: quick
to_upper: QUICK
output: QUICK
source_strings: brown
to_upper: BROWN
output: BROWN
output: ... (ignoring the rest)


A more complex example might only modify certain words:

In [5]:
@print_elements
def to_upper_conditional(word_stream):
    for word in word_stream:
        yield word.upper() if word[0] in 'abcdef' else word

consume(to_upper_conditional(source_strings()), max_len=5)

source_strings: the
to_upper_conditional: the
output: the
source_strings: quick
to_upper_conditional: quick
output: quick
source_strings: brown
to_upper_conditional: BROWN
output: BROWN
source_strings: fox
to_upper_conditional: FOX
output: FOX
source_strings: jumps
to_upper_conditional: jumps
output: jumps
output: ... (ignoring the rest)


## Generators

If we want to really do great things with pipelines, we should be using generators and generator functions. 

The first thing to realize about generators is that they advance only when they need to. For instance, if we have one step of our processing that needs to break strings into characters, each generator in the pipeline only reads the next element when it is explicitly requested by a stage further down:

In [6]:
@print_elements
def convert_strings_to_characters(word_stream):
    for word in word_stream:
        for character in word:
            yield character

consume(convert_strings_to_characters(source_strings()), max_len=9)

source_strings: the
convert_strings_to_characters: t
output: t
convert_strings_to_characters: h
output: h
convert_strings_to_characters: e
output: e
source_strings: quick
convert_strings_to_characters: q
output: q
convert_strings_to_characters: u
output: u
convert_strings_to_characters: i
output: i
convert_strings_to_characters: c
output: c
convert_strings_to_characters: k
output: k
source_strings: brown
convert_strings_to_characters: b
output: b
output: ... (ignoring the rest)


The interesting thing here is that the inner-most generator (`source_strings()`) is only asked to return the k<sup>th</sup> word after all of the (k-1)<sup>th</sup> word has been consumed. At any point in time, only a small amount of memory is actually in use for data within the pipeline, so if the source input and ultimate output are outside of memory (a database, a file, a network connection, whatever) then memory use of this pipeline is minimal.

## Decompressing content

Let's imagine we have a compressed file

In [7]:
import gzip

file_name = 'content.txt.gz'

with gzip.open(file_name) as fin:
    content = fin.read()
    print(content.decode("utf-8"))

Hello World!
The quick brown fox jumps over the lazy dog.
She sells sea shells by the seashore.



But when you call `fin.read()`, the *entire* file is being read into memory. With very large files, this is a problem!

For the remainder of this tutorial, we will be working with a stream of `bytestring` objects:

In [11]:
with open(file_name, 'rb') as fin:
    content = fin.read()

@print_elements
def content_stream(chunk_size=32):
    return (content[i:i+chunk_size] for i in range(0, len(content), chunk_size))

# the following line can be used to confirm that if you re-join the
# chunks, you get back the original file contents:
# assert b''.join(content_stream()) == content

consume(content_stream())

content_stream: b'\x1f\x8b\x08\x08\x96\xabM^\x00\x03content.txt\x00\x1d\x8bA\x12@0\x14C\xf7='
content_stream: b'E\\\xa0\xe7\xb0g\xc6\xba\xd5R\x94O\xbf\xa2N\xef\x8fM&yIj\x1f#\xa1\xa3\x14]\xa5\xda\xe0q'
content_stream: b'\xe4\xa9_`\x13\xdd\x1b\x06z0\xe7ug\xd0\xe5\x13N\xa9\xa3y\x0b\x1c\x8dZ5\x92X\xde,j\xc0\xe1'
content_stream: b'\xb7\xb6\xfc+!\x1c(y\xad>\xe4\xae)\xe8`\x00\x00\x00'
content_stream: b'\x1f\x8b\x08\x08\x96\xabM^\x00\x03content.txt\x00\x1d\x8bA\x12@0\x14C\xf7='
output: b'\x1f\x8b\x08\x08\x96\xabM^\x00\x03content.txt\x00\x1d\x8bA\x12@0\x14C\xf7='
content_stream: b'E\\\xa0\xe7\xb0g\xc6\xba\xd5R\x94O\xbf\xa2N\xef\x8fM&yIj\x1f#\xa1\xa3\x14]\xa5\xda\xe0q'
output: b'E\\\xa0\xe7\xb0g\xc6\xba\xd5R\x94O\xbf\xa2N\xef\x8fM&yIj\x1f#\xa1\xa3\x14]\xa5\xda\xe0q'
content_stream: b'\xe4\xa9_`\x13\xdd\x1b\x06z0\xe7ug\xd0\xe5\x13N\xa9\xa3y\x0b\x1c\x8dZ5\x92X\xde,j\xc0\xe1'
output: b'\xe4\xa9_`\x13\xdd\x1b\x06z0\xe7ug\xd0\xe5\x13N\xa9\xa3y\x0b\x1c\x8dZ5\x92X\xde,j\xc0\xe1'
content_stream: b'\xb7\

Since we are no longer working with a file, we can't use the `gzip` library any longer. Instead we can use `zlib` to decode in chunks:

In [14]:
import zlib

@print_elements
def decode_gzip_stream(stream):
    decompressor = zlib.decompressobj(zlib.MAX_WBITS | 16)
    return (decompressor.decompress(chunk) for chunk in stream)

consume(decode_gzip_stream(content_stream(chunk_size=64)))

content_stream: b'\x1f\x8b\x08\x08\x96\xabM^\x00\x03content.txt\x00\x1d\x8bA\x12@0\x14C\xf7=E\\\xa0\xe7\xb0g\xc6\xba\xd5R\x94O\xbf\xa2N\xef\x8fM&yIj\x1f#\xa1\xa3\x14]\xa5\xda\xe0q'
decode_gzip_stream: b'Hello World!\nThe '
output: b'Hello World!\nThe '
content_stream: b'\xe4\xa9_`\x13\xdd\x1b\x06z0\xe7ug\xd0\xe5\x13N\xa9\xa3y\x0b\x1c\x8dZ5\x92X\xde,j\xc0\xe1\xb7\xb6\xfc+!\x1c(y\xad>\xe4\xae)\xe8`\x00\x00\x00'
decode_gzip_stream: b'quick brown fox jumps over the lazy dog.\nShe sells sea shells by the seashore.\n'
output: b'quick brown fox jumps over the lazy dog.\nShe sells sea shells by the seashore.\n'


Note that the output of `content_stream` is big in the first chunk and small in the second but that most of the decompressed output doesn't appear until the second compressed chunk has been read; this is because the first part of the file contains metadata that `zlib.decompress` ignores and also because decoding doesn't happen a character at a time but rather includes state that carries from character to character. The output of `zlib.decompress` is variable-length chunks of decompressed data. But since those uncompressed chunks are themselves `bytestring` objects, we can just pass them on down the pipeline without worrying about fixing sizes or anything.

Finally, let's say we want to put the decompressed data into fixed-size chunks:

In [10]:
import codecs

@print_elements
def byte_stream_from_chunks(chunk_stream):
    for chunk in chunk_stream:
        for byte in chunk:
            yield byte

@print_elements
def fixed_size_chunks(byte_stream, chunk_size=8):
    chunk = []
    for byte in byte_stream:
        chunk.append(byte)
        if len(chunk) == chunk_size:
            yield bytes(chunk)
            chunk = []
    yield bytes(chunk)

@print_elements
def convert_to_strings(chunk_stream):
    return codecs.iterdecode(chunk_stream, 'utf-8')

consume(
    convert_to_strings(
        fixed_size_chunks(
            byte_stream_from_chunks(
                decode_gzip_stream(
                    content_stream()
                )
            )
        )
    )
)

content_stream: b'\x1f\x8b\x08\x08\x96\xabM^\x00\x03content.txt\x00\x1d\x8bA\x12@0\x14C\xf7='
decode_gzip_stream: b''
content_stream: b'E\\\xa0\xe7\xb0g\xc6\xba\xd5R\x94O\xbf\xa2N\xef\x8fM&yIj\x1f#\xa1\xa3\x14]\xa5\xda\xe0q'
decode_gzip_stream: b'Hello World!\nThe '
byte_stream_from_chunks: 72
byte_stream_from_chunks: 101
byte_stream_from_chunks: 108
byte_stream_from_chunks: 108
byte_stream_from_chunks: 111
byte_stream_from_chunks: 32
byte_stream_from_chunks: 87
byte_stream_from_chunks: 111
fixed_size_chunks: b'Hello Wo'
convert_to_strings: Hello Wo
output: Hello Wo
byte_stream_from_chunks: 114
byte_stream_from_chunks: 108
byte_stream_from_chunks: 100
byte_stream_from_chunks: 33
byte_stream_from_chunks: 10
byte_stream_from_chunks: 84
byte_stream_from_chunks: 104
byte_stream_from_chunks: 101
fixed_size_chunks: b'rld!\nThe'
convert_to_strings: rld!
The
output: rld!
The
byte_stream_from_chunks: 32
content_stream: b'\xe4\xa9_`\x13\xdd\x1b\x06z0\xe7ug\xd0\xe5\x13N\xa9\xa3y\x0b\x1c\x8dZ5\x92

There's a whole lot going on there... let's break it down:

1. The byte stream produced by the innermost function simply emits fixed-size chunks of bytes from the source content
1. These chunks are passed to a decoder that decompresses as much as it can from each chunk
    1. at the end of a chunk, the still-uncompressed portion becomes the first part of the next chunk to be decompressed
1. The decompressed chunks are turned into a single stream of bytes
    1. **NOTE**: when you iterate a `str` object, the output is a sequence of characters, but when you iterate a `bytes` object, the output is a sequence of *integers* (`int` objects); this is why you see numbers in the output of `byte_stream_from_chunks`; later in `fixed_size_chunks`, calling `bytes(sequence_of_ints)` returns a bytestring again, so we just need to be aware of the change of type so we aren't confused by errors or bugs related to the change
1. The sequence of individual bytes are grouped into fixed-size chunks and passed down the pipeline
1. each fixed-size chunk is converted to a `str` object (using the `codecs` library)

You can see in the output that the first chunk is received, then subsets of that data are emitted as requested by each next stage in the pipeline. In particular, look at the following sequence in the output:

    byte_stream_from_chunks: 104
    byte_stream_from_chunks: 101
    fixed_size_chunks: b'rld!\nThe'
    convert_to_strings: rld!
    The
    output: rld!
    The
    byte_stream_from_chunks: 32
    content_stream: b'\xe4\xa9_`\x13\xdd\x1b\x06z0\xe7ug\xd0\xe5\x13N\xa9\xa3y\x0b\x1c\x8dZ5\x92X\xde,j\xc0\xe1'
    decode_gzip_stream: b'quick brown fox jumps over the lazy dog.\nShe sells sea sh'
    byte_stream_from_chunks: 113
    byte_stream_from_chunks: 117

`fixed_size_chunks` emits `b'rld!\nThe'` (which is converted to text and emitted in the output), then continues collecting bytes for the next chunk. It is able to get one (`byte_stream_from_chunks: 32`), but then `byte_stream_from_chunks` has used everything in the current chunk and needs the next one. This causes `decode_gzip_stream` to have to grab the next chunk from the source stream, so we see that happen before `byte_stream_from_chunks` is able to continue feeding values to `fixed_size_chunks`.

At no point does any part of the pipeline depend on the entire file being in memory. If our file were one petabyte in size, then as long as we read it into the computer in small chunks we would only use enough memory to store the current chunk of raw data plus some intermediate state in each function in the pipeline (this isn't entirely true, see the note below).

**Note**: we're ignoring a lot of nuance around how memory is reclaimed in garbage collection here, but the point is that we would never need more than a minimal amout of RAM; this should run successfully on any computer that can run Python.

## References

* [Generators](https://wiki.python.org/moin/Generators)
* [Primer on Python Decorators](https://realpython.com/primer-on-python-decorators/)
* [io â€” Core tools for working with streams](https://docs.python.org/3/library/io.html)