# Python-FP: Pipelines

A very useful concept in functional programming is a pipeline. Pipelines compose individual steps together and form one big function. The output of one function gets fed into the next function.

Start to think of data as being on an assembly line of specialized machines instead of on a workbench with a list of instructions. Henry Ford saw the power of this idea and became the father of mass production. The same 100 year-old insight can apply to your data

Source: https://medium.com/@hansonkd/thinking-functionally-with-python-and-django-4127e3ace6e9

## Chaining Calls
Surprisingly, Python does not have anything that is similar for building iterators. Lets take an example where a generator of CSV rows gets convert ed to Book objects, filtered by year, sorted and finally formatted. It quickly gets unweildy.

In [None]:
books = get_book_row_generator_from_csv('mybooks.csv')
formatted_books = list(
    map(
        sorted(
            filter(
                map(
                    books,
                    Book.from_row
                ),
                lambda: book.year >= 1950
            ),
            key=lambda book: (book.year, book.author_name.lower())
        ),
        '{0.title} | {0.author} ({0.year})'.format
    )
)

The major problem with this syntax is that the first action is the deepest in the nest. It’s not how a person thinks.

Expressing iterator transformation by chaining together functions would be better.

In [None]:
books = get_book_row_generator_from_csv('mybooks.csv')

formatted_books = (
    conduit(books)
        .map(Book.from_row)
        .filter(lambda book: book.year >= 1950)
        .sort(lambda book: (book.year, book.author_name.lower()))
        .map('{0.title} | {0.author} ({0.year})'.format)
        .to_list()
)

The conduit class is dead simple. If you want more robust features, there is PyFunctional and even an underscore.js port which does the same thing and much more.

The conduit isn’t a performance upgrade, but it expresses data transformation in a clear, natural way.

## Function Composition
Function composition is a pipeline technique that pieces together small functions into a larger function. It is very similar to chaining, except the end result is a function and not a result.

Chaining method calls is actually doing something very similar to function composition. Remember that obj.method() it is the same as ObjClass.method(obj).

In [None]:
class Obj(object):
    def method1(self):
        return do_stuff(self)

    def method2(self):
        return do_other_stuff(self)

    def method3(self):
        return do_more_stuff(self)

result = Obj().method1().method2().method3()

# Equivalent using function composition.

def compose(*funcs):
    return lambda xx: reduce(lambda val, func: func(val), reversed(funcs), xx)

composed = compose(
   Obj.method1,
   Obj.method2,
   Obj.method3
)
result2 = composed(Obj())

assert result == result2

Using composition, each method is explicit. Using dot notation and chaining, if any method in the chain returns a subclass class of Obj that subclassses’ method is used. In Python we frequently want to turn everything into a Class, but sometimes it is simpler and better to use small, freestanding functions. Function composition helps to give some of the power of classes with the simplicity of compact functions.

## The Flat Data Pipeline
In the case of our report needs to be explicit. Our report consists of pulling Django objects which represents financial securities and doing computations on them. Django objects are dangereous. Query counts can explode unexpectedly. Since we wanted to pull all data upfront, passing Django objects to Excel view is not an option. We need to flatten the data before it gets to the view. The view should only deal with strings, decimals, namedtuples, and other basic python objects.

Our first attempt was a pipeline of functions which took two element tuple: the “flat” object and the Django model instance. It copied over everything that the Excel writer needed from the model to the flat object. This ensured that it was impossible to make a query while writing the CSV file.

In [None]:
all_securities = Security.objects.filter(company=company).for_report().iterator()

def flatten(*args):
    def transform_pipeline(mdl):
        data = FlatData()
        for fn in args:
            fn(data, mdl)
        return data
    return transform_pipeline

def add_issue_date(flat, mdl):
    flat.issue_date = mdl.get_issue_date()

def add_label(flat, mdl):
    flat.label = mdl.get_label()

flat_securities = (
  conduit(all_securities)
      ...
      .map(
          flatten(
              add_issue_date,
              add_label
          )
      )
      ...
)

While it isn’t a purely functional approach, recognizing the strengths and weaknesses of the language can lead to cleaner code. Don’t swim upstream trying to achieve theoretical, immutable perfection in a mutable world.

## Efficient Pipes
If the report is simple, the data can be directly piped into a CSV.

In [None]:
with open('file_output.csv', 'w') as csvfile:
    writer = csv.DictWriter(csvfile, fieldnames=['Label', 'Issue Date'])
    
    for security in flat_securities:
        writer.write_row({'Label': security.label, 'Issue Date': security.issue_date})

Using `iterator()` to pull data from the database makes the steps simple: read a row from the database, transform that row through the pipeline, and then write the data to a CSV. Then repeat for the next row. It’s an incredibly efficient way to write a report. Only keep one row is kept in memory and the roles of each piece of code are clearly defined.

## Pure, Higher-Order Functions and Resource Management
Unfortunately, not all reports are that simple. Usually the transform step requires information not found directly on the element it is transforming. The problem areas we identitied were clarity and resource management.

Using classes invites complexity. It also makes it difficult to tell if a function is pure. A pure function is a function that when run with the same input will always give the same output. This means no database access, no cache access and even no clock access. The result is higher-order functions which conveniently document their requirements and everything is explicit.

For example, a transformer that relies on the current export time to tell if a security is canceled or not, needs the date.

In [None]:
def is_canceled(report_date)
    def inner(flat, mdl):
        flat.is_canceled = mdl.canceled_date < report_date
    return inner

report_date = datetime.today()

flat_securities = (
  conduit(all_securities)
      .map(
          flatten(
              ...
              is_canceled(report_date),
          )
      )
      ...
)

Now the transformation provides its own documentation and informs us that in order to know if a security is canceled or not, an external resource not contained on the Django object is needed.

This becomes especially important for database access. Some of the transformations for securities depend on other securities. We wanted to make it a goal to only have one copy of each object in memory. Using `select_related` and `prefetch_related` won’t help us here.

Instead of using `iterator()` to load the data, we load every piece of data into lists. Then create indexes to the data using basic python dictionaries. It’s fast, efficient, and requires no dependencies.

In [None]:
# computations.py

def add_shareholder_name(shareholder_ix):
    def inner(flat, mdl):
        flat.shareholder_name = shareholder_ix[mdl.shareholder_id]
    return inner

def add_transfered_to_labels(transfered_to_ix):
    def inner(flat, mdl):
        flat.exercised_from_labels = [
            sec.label for sec in transfered_to_ix[mdl.id]
        ]
    return inner

...
# data.py

all_securities = list(Security.objects.filter(company=company).all())
all_shareholders = list(ShareHolders.objects.filter(company=company).all())

# Many to One
shareholder_ix = {sh.pk: sh for shareholder in all_shareholders}
# Reverse foreign key
transfered_to_ix = defaultdict(list)
for security in all_securities:
    if security.transformed_from_id:
        transfered_to_ix[security.transformed_from_id].append(security)

flat_securities = (
  conduit(all_securities)
      ...
      .filter(should_show_security) #  Filter securities since now we grab all of them
      .map(
          flatten(
              ...
              add_shareholder_name(shareholder_ix),
              add_transfered_to_labels(transfered_to_ix)
          )
      )
      ...
)

Because all of the transformation functions are isolated, an assert statement to the flatten function can be added to be absolutely certain there are no queries during the transformations.

In [None]:
from django.db import connection

def flatten(*args):
    def transform_pipeline(mdl):
        data = FlatData()
        num_before = len(connection.queries)
        for fn in args:
            fn(data, mdl)
        assert len(connection.queries) == num_before
        return data
    return transform_pipeline

Final Thoughts
Everything that we built required no external dependencies. The glue, classes and support functions were only a few lines each. We were able to build a powerful data pipeline by taking advantage of the native features of Python.

We learned some important lessons:

1. Break your process into discrete steps.
2. Have one copy of data in memory.
3. Embrace Python’s OOP and functional styles. Don’t go for purity.
4. Make it as simple as possible. Rely on the standard lib.
5. Focus on a solid foundation. Save optimizations till the end.
6. Don’t be afraid to throw code away or comporomise.

The results are dramatic. From exponentially growing query counts reaching up to 7,000 for a single report down to about 30 and generation times for some customers dropped from 600 seconds to 15 seconds. Now, it is easier to detect were the performance problems are. Separating the different stages allows us to see OpenPyxl’s styles cause a 2x performance drop and which transformations are expensive. Those have not been optimized yet.

Our code is faster, cleaner and even more resuable. Since the code is broken into distinct parts, there is nothing stopping us from piping the Data to a JSON API instead of to an Excel document.

Software design is only one component of the problem. The reports run on worker servers and are processed through a queue system. Improving them is a cross-domain effort. By optimizing other parts of the system, we believe we can get all reports down to 5 seconds or less. These are things that can be optimized now that we have a solid foundation.

Functional programming didn’t do anything magical. All it did was bring clarity and structure to a complex process.