# The Split-Apply-Combine Pattern in Data Science and Python

## Tobias Brandt

<img src="img/argon_logo.png" align=left width=200>

<!-- <img src="http://www.argonassetmanagement.co.za/css/img/logo.png" align=left width=200> -->

# Argon Asset Management

PUT OVERVIEW SLIDE HERE

### Projected number of talk attendees

  * With "Data Science" in title 
      * 10 +/- 5
  * vs without "Data Science" in title
      * 20 +/- 5
  * insert xkcd style chart

### Google trends chart

!["data science" vs "data analysis"](img/data_science_vs_data_analysis.png)

## Data Science

According to https://en.wikipedia.org/wiki/Data_science:

In November 1997, C.F. Jeff Wu gave the inaugural lecture entitled **"Statistics = Data Science?"**[5] for his appointment to the H. C. Carver Professorship at the University of Michigan.[6] In this lecture, he characterized statistical work as a trilogy of **data collection**, **data modeling and analysis**, and **decision making**. In his conclusion, he initiated the modern, non-computer science, usage of the term "data science" and advocated that statistics be renamed data science and statisticians data scientists.[5]

## The Github Archive Dataset

https://www.githubarchive.org/

Open-source developers all over the world are working on millions of projects: writing code & documentation, fixing & submitting bugs, and so forth. GitHub Archive is a project to record the public GitHub timeline, archive it, and make it easily accessible for further analysis.

GitHub provides 20+ event types, which range from new commits and fork events, to opening new tickets, commenting, and adding members to a project. These events are aggregated into hourly archives, which you can access with any HTTP client:

  * gzipped json files
  * yyyy-mm-dd-HH.json.gz

In [16]:
import os
import gzip
import json

directory = r'data\github_archive'
filename = '2015-01-01-0.json.gz'
path = os.path.join(directory, filename)
with gzip.open(path) as f:
        events = [json.loads(line) for line in f]
print json.dumps(events[0], indent=4)

{
    "payload": {
        "size": 1, 
        "head": "a9b22a6d80c1e0bb49c1cf75a3c075b642c28f81", 
        "commits": [
            {
                "distinct": true, 
                "sha": "a9b22a6d80c1e0bb49c1cf75a3c075b642c28f81", 
                "message": "Altered BingBot.jar\n\nFixed issue with multiple account support", 
                "url": "https://api.github.com/repos/davidjhulse/davesbingrewardsbot/commits/a9b22a6d80c1e0bb49c1cf75a3c075b642c28f81", 
                "author": {
                    "email": "david.hulse@live.com", 
                    "name": "davidjhulse"
                }
            }
        ], 
        "distinct_size": 1, 
        "push_id": 536740396, 
        "ref": "refs/heads/master", 
        "before": "86ffa724b4d70fce46e760f8cc080f5ec3d7d85f"
    }, 
    "created_at": "2015-01-01T00:00:00Z", 
    "actor": {
        "url": "https://api.github.com/users/davidjhulse", 
        "login": "davidjhulse", 
        "avatar_url": "https://avatars.githu

### Typical Data Science Questions

  * How many Github repositories are created per hour/day/month?
  * To which repositories are the most commits are pushed per hour/day/month?
  * Which projects receive the most pull requests?
  * What are the most popular languages on Github?

## Example 1 - Number of Repositories Created

In [17]:
new_repo_count = 0
for event in events:
    if event['type']=="CreateEvent":
        new_repo_count += 1
print new_repo_count

815


## Example 2 - Number of commits pushed per repository

In [18]:
pushed_commits = {}
for event in events:
    if event['type']=="PushEvent":
        repo = event['repo']['name']
        commits = event['payload']['size']
        pushed_commits[repo] = pushed_commits.get(repo, 0) + commits 
print sorted(pushed_commits.items(), key=lambda t: t[1], reverse=True)[:5]

[(u'sakai-mirror/melete', 3209), (u'sakai-mirror/mneme', 2922), (u'sakai-mirror/ambrosia', 770), (u'snarfed/beautifulsoup', 559), (u'bruschill/macvim', 417)]


# The Split-Apply-Combine Pattern

## Hadley Wickham <img src="http://pix-media.s3.amazonaws.com/blog/1001/HadleyObama.png" width=200 align=left>

[Hadley Wickham, the man who revolutionized R](http://priceonomics.com/hadley-wickham-the-man-who-revolutionized-r/)

*If you don’t spend much of your time coding in the open-source statistical programming language R, 
his name is likely not familiar to you -- but the statistician Hadley Wickham is, 
in his own words, “nerd famous.” The kind of famous where people at statistics conferences 
line up for selfies, ask him for autographs, and are generally in awe of him. 

## Why Split-Apply-Combine?

  * StackOverflow tag: http://stackoverflow.com/tags/split-apply-combine/info
  * Pandas documentation: http://pandas.pydata.org/pandas-docs/stable/groupby.html
  * Julia documentation: https://dataframesjl.readthedocs.org/en/latest/split_apply_combine.html
  * PyTools documentation: http://toolz.readthedocs.org/en/latest/streaming-analytics.html#split-apply-combine-with-groupby-and-reduceby
  * Blaze documentation: http://blaze.pydata.org/en/stable/split-apply-combine.html
  * R plyr: https://cran.r-project.org/web/packages/plyr/index.html

### The Point of Learning Patterns

From Cosma Shalizi's [Statistical Computing](http://www.stat.cmu.edu/~cshalizi/statcomp/13/lectures/12/lecture-12.pdf) course:
  
  * Distinguish between **what** you want to do and **how you want to do it**.
  * Focusing on **what** brings clarity to intentions.
  * **How** also matters, but can obscure the high level problem.
 
 Learn the pattern, recognize the pattern, love the pattern!
 
 Re-use *good* solutions!

### Iteration Considered Unhelpful

Could always do the same thing with `for` loops, but those are
  
  * *verbose* - lots of "how" obscures the "what"
  * painful/error-prone bookkeeping (indices, placeholders, ...)
  * clumsy - hard to parallelize

In [5]:
from IPython.display import HTML
HTML('<iframe src="http://www.jstatsoft.org/v40/i01/paper" width=800 height=400></iframe>')

## The Basic Pattern

 1. **Split** the data by some **grouping variable**
 2. **Apply** some function to each group **independently**
 3. **Combine** the data into some output dataset

### Example 2 - revisited

In [6]:
track_revenues = {}
for row in curs.execute("SELECT * FROM InvoiceLine;"):
    InvoiceLineId, InvoiceId, TrackId, UnitPrice, Quantity = row
    invoice_revenue = UnitPrice*Quantity
    track_revenues[TrackId] = track_revenues.get(TrackId, 0) + invoice_revenue
    
print sorted(track_revenues.iteritems(), key=lambda item: (item[1], item[0]), reverse=True)[:3]

[(3250, 3.98), (3223, 3.98), (3214, 3.98)]


In [7]:
def calc_revenue(row):
    InvoiceLineId, InvoiceId, TrackId, UnitPrice, Quantity = row
    return UnitPrice*Quantity

In [8]:
track_revenues = {}
for row in curs.execute("SELECT * FROM InvoiceLine;"):
    track_revenues[row[2]] = track_revenues.get(row[2], 0) + calc_revenue(row)
    
print sorted(track_revenues.iteritems(), key=lambda item: (item[1], item[0]), reverse=True)[:3]

[(3250, 3.98), (3223, 3.98), (3214, 3.98)]


# Pandas - Python Data Analysis Library

<p><a href="http://pandas.pydata.org/"><img src="http://pandas.pydata.org/_static/pandas_logo.png" align=right width=400></a></p>

  * Provides high-performance, easy-to-use data structures and data analysis tools.
  * My default tool for interactive data analysis.
  * Provides core data structures **Series**, **DataFrame** and **Panel** (although the latter are largely obviated by MultiIndexed DataFrames)

### pandas.Series

  * Basically "labelled arrays"
  * Combines *dict* and *numpy.array* interfaces
  * *numpy.array* performance

In [9]:
import pandas as pd
s1 = pd.Series(dict(apples=1, potatoes=2))
s1

apples      1
potatoes    2
dtype: int64

In [10]:
s2 = pd.Series(dict(oranges=3, potatoes=4))
print s1+s2

apples     NaN
oranges    NaN
potatoes     6
dtype: float64


In [11]:
print s1.add(s2, fill_value=0)

apples      1
oranges     3
potatoes    6
dtype: float64


### pandas.DataFrame

  * Basically in-memory database tables
  * Can have columns of different dtypes
  * Indexed rows and columns
  * Hierarchical indexing allows for representing Panel data (pandas.MultiIndex)

In [12]:
df = pd.DataFrame(dict(s1=s1, s2=s2))
df

Unnamed: 0,s1,s2
apples,1.0,
oranges,,3.0
potatoes,2.0,4.0


In [13]:
print df.index
print df.columns

Index([u'apples', u'oranges', u'potatoes'], dtype='object')
Index([u's1', u's2'], dtype='object')


### Pandas Data Analysis

In [14]:
df

Unnamed: 0,s1,s2
apples,1.0,
oranges,,3.0
potatoes,2.0,4.0


In [15]:
df.sum()

s1    3
s2    7
dtype: float64

In [16]:
df.sum(axis=1)

apples      1
oranges     3
potatoes    6
dtype: float64

In [17]:
df.stack()

apples    s1    1
oranges   s2    3
potatoes  s1    2
          s2    4
dtype: float64

In [18]:
print df.stack().index

MultiIndex(levels=[[u'apples', u'oranges', u'potatoes'], [u's1', u's2']],
           labels=[[0, 1, 2, 2], [0, 1, 0, 1]])


### Split-Apply-Combine in Pandas

  * Uses **groupby** to
      * **split** the data into groups based on some criteria
      * **apply** a function on each group independently
      * **combining** the results into a data structure

  * The **apply** step is usually one of
      * **aggregate**
      * **transform**
      * or **filter**

In [19]:
HTML('<iframe src="http://pandas.pydata.org/pandas-docs/version/0.16.2/groupby.html" width=800 height=300></iframe>')

### Example 2 - using Pandas

In [20]:
df = pd.read_sql('SELECT * FROM InvoiceLine', conn)

In [21]:
df['Revenue'] = df['UnitPrice']*df['Quantity']
track_revenues = df.groupby('TrackId')['Revenue'].sum()
track_revenues.sort(ascending=False)
print track_revenues[:3]

TrackId
3250    3.98
3223    3.98
3177    3.98
Name: Revenue, dtype: float64


In [22]:
print track_revenues.reset_index().sort(columns=['Revenue', 'TrackId'], ascending=False).set_index('TrackId')[:3]

         Revenue
TrackId         
3250        3.98
3223        3.98
3214        3.98


Great for interactive work:

  * tab-completion!
  * `df.head()`, `df.tail()`
  * `df.describe()`

However ...

### Pandas currently only handles in-memory datasets!

### Does my data look big in this? <img src="http://www.techmynd.com/wp-content/uploads/2009/06/bill-gates-windows.jpg" align=right height=100>
  <!-- <img src="http://www.achievement.org/achievers/gat0/large/gat0-004.jpg"> -->

# MapReduce

  * If you want to process Big Data, you need some MapReduce framework like one of the following
<p>
<a href="https://hadoop.apache.org/"><img src="https://hadoop.apache.org/images/hadoop-logo.jpg" width=200 align=left></a>
<a href="http://spark.apache.org/"><img src="http://spark.apache.org/images/spark-logo.png" width=100 align=left></a>
</p>

<img src="https://mitpress.mit.edu/sicp/full-text/book/cover.jpg" align=right width=150>

The key to these frameworks is adopting a **functional** [programming] mindset. In Python this means, think **iterators**!

See [The Structure and Interpretation of Computer Programs](https://mitpress.mit.edu/sicp/full-text/book/book.html)
(the "*Wizard book*")

  * in particular [Chapter 2 Building Abstractions with Data](https://mitpress.mit.edu/sicp/full-text/book/book-Z-H-13.html#%_chap_2) 
  * and [Section 2.2.3 Sequences as Conventional Interfaces](https://mitpress.mit.edu/sicp/full-text/book/book-Z-H-15.html#%_sec_2.2.3)

Luckily, the Split-Apply-Combine pattern is well suited to this!  

## Example 1 - revisited

In [23]:
total_revenue = 0
for row in curs.execute("SELECT * FROM InvoiceLine;"):
    InvoiceLineId, InvoiceId, TrackId, UnitPrice, Quantity = row
    total_revenue += UnitPrice*Quantity
total_revenue

2328.599999999957

In [24]:
reduce(lambda x,y: x+y, map(calc_revenue, curs.execute("SELECT * FROM InvoiceLine;")))

2328.599999999957

In [25]:
sum(calc_revenue(row) for row in curs.execute("SELECT * FROM InvoiceLine;"))

2328.599999999957

What about **group by** operations?

There is an `itertools.groupby` function in the standard library.

However

  * it requires the data to be sorted,
  * returns iterables which are shared with the original iterable.

Hence I find that I usually need to consult the [documentation](https://docs.python.org/2/library/itertools.html#itertools.groupby) to use it correctly.

Use the `toolz` library rather!

In [26]:
HTML('<iframe src="https://docs.python.org/2/library/itertools.html#itertools.groupby" width=800 height=200></iframe>')

# PyToolz

In [27]:
HTML('<iframe src="https://toolz.readthedocs.org/en/latest/index.html" width=800 height=400></iframe>')

## Example 2 - revisited

In [28]:
track_revenues = {}
for row in curs.execute("SELECT * FROM InvoiceLine;"):
    track_revenues[row[2]] = track_revenues.get(row[2], 0) + calc_revenue(row)
    
print sorted(track_revenues.iteritems(), key=lambda item: (item[1], item[0]), reverse=True)[:3]

[(3250, 3.98), (3223, 3.98), (3214, 3.98)]


In [29]:
from toolz import groupby, valmap
sorted(valmap(sum,
              valmap(lambda lst: map(calc_revenue, lst),
                     groupby(lambda row: row[2],
                             curs.execute("SELECT * FROM InvoiceLine")))
             ).iteritems()
       , key=lambda t: (t[1], t[0]), reverse=True)[:3]

[(3250, 3.98), (3223, 3.98), (3214, 3.98)]

In [30]:
from toolz.curried import pipe, groupby, valmap, map, get
pipe(curs.execute("SELECT * FROM InvoiceLine"),
     groupby(get(2)),
     valmap(map(calc_revenue)),
     valmap(sum),
     lambda track_revenues: sorted(track_revenues.iteritems(), key=lambda t: (t[1], t[0]), reverse=True)[:3]
     )

[(3250, 3.98), (3223, 3.98), (3214, 3.98)]

In [31]:
HTML('<iframe src="https://toolz.readthedocs.org/en/latest/streaming-analytics.html#streaming-split-apply-combine" width=800 height=300></iframe>')

In [32]:
from toolz.curried import reduceby
pipe(curs.execute("SELECT * FROM InvoiceLine"),
     reduceby(get(2),
              lambda track_revenue, row: track_revenue + calc_revenue(row),
              init=0
             ),
     lambda track_revenues: sorted(track_revenues.iteritems(), key=lambda t: (t[1], t[0]), reverse=True)[:3]
     )

[(3250, 3.98), (3223, 3.98), (3214, 3.98)]

## toolz example - multiprocessing

In [33]:
import glob
files = glob.glob('C:/ARGO/ARGO/notebooks/articles/github_archive/*')
print len(files), files[:3]
N = len(files)    # 10

745 ['C:/ARGO/ARGO/notebooks/articles/github_archive\\2015-01-01-0.json.gz', 'C:/ARGO/ARGO/notebooks/articles/github_archive\\2015-01-01-1.json.gz', 'C:/ARGO/ARGO/notebooks/articles/github_archive\\2015-01-01-10.json.gz']


In [34]:
def count_types(filename):
    import gzip
    import json
    from collections import Counter
    try:
        with gzip.open(filename) as f:
            return dict(Counter(json.loads(line)['type'] for line in f))
    except Exception, e:
        print "Error in {!r}: {}".format(filename, e)
        return {}
print count_types(files[0])

{u'ReleaseEvent': 24, u'PublicEvent': 2, u'PullRequestReviewCommentEvent': 85, u'ForkEvent': 213, u'MemberEvent': 16, u'PullRequestEvent': 315, u'IssueCommentEvent': 650, u'PushEvent': 4280, u'DeleteEvent': 141, u'CommitCommentEvent': 56, u'WatchEvent': 642, u'IssuesEvent': 373, u'CreateEvent': 815, u'GollumEvent': 90}


In [35]:
from collections import Counter
def update_counts(total_counts, file_counts):
    total_counts.update(file_counts)
    return total_counts

In [36]:
%%time
pmap = map
print reduce(update_counts,
             pmap(count_types, files[:N]),
             Counter())

Error in 'C:/ARGO/ARGO/notebooks/articles/github_archive\\2015-01-07-5.json.gz': CRC check failed 0xc9b8b241L != 0x3fdb8691L
Error in 'C:/ARGO/ARGO/notebooks/articles/github_archive\\tmp': [Errno 13] Permission denied: 'C:/ARGO/ARGO/notebooks/articles/github_archive\\tmp'
Counter({u'PushEvent': 7021744, u'CreateEvent': 1649671, u'IssueCommentEvent': 1321440, u'WatchEvent': 1319860, u'IssuesEvent': 692702, u'PullRequestEvent': 680308, u'ForkEvent': 490633, u'DeleteEvent': 256818, u'PullRequestReviewCommentEvent': 214188, u'GollumEvent': 150744, u'CommitCommentEvent': 96389, u'MemberEvent': 69718, u'ReleaseEvent': 44292, u'PublicEvent': 14596})
Wall time: 17min 54s


In [37]:
%%time
from IPython.parallel import Client
p = Client()[:]
pmap = p.map_sync
print reduce(update_counts,
             pmap(count_types, files[:N]),
             Counter())

Counter({u'PushEvent': 7021744, u'CreateEvent': 1649671, u'IssueCommentEvent': 1321440, u'WatchEvent': 1319860, u'IssuesEvent': 692702, u'PullRequestEvent': 680308, u'ForkEvent': 490633, u'DeleteEvent': 256818, u'PullRequestReviewCommentEvent': 214188, u'GollumEvent': 150744, u'CommitCommentEvent': 96389, u'MemberEvent': 69718, u'ReleaseEvent': 44292, u'PublicEvent': 14596})
Wall time: 4min 14s


# Next time

## [Blaze](http://blaze.pydata.org/en/latest/) 

<img src="http://blaze.pydata.org/en/latest/_images/blaze_med.png" width=400>


## [Dask](http://dask.pydata.org/en/latest/)

<img src="http://dask.pydata.org/en/latest/_images/collections-schedulers.png">

In [38]:
HTML('<iframe src="http://spark.apache.org/docs/latest/api/python/pyspark.html" width=800 height=400></iframe>')

# Thank you!

## If this stuff interests you, let's chat!