Summary

Why should I learn Pandas? by Stephen Pettinato, Data Scientist at ThriveMarket.com

Pandas is a popular data analytics and data wrangling tool used by Data Scientists, Data Engineers and Data Analysts.  Despite this, the learning curve is steep and rationalization and encouragement is required to get past being a beginner.  This talk focuses on practical reasons that Pandas is useful as well as Big Data extensions to Pandas concepts of functional transformations using DataFrames. 

# Why should I learn Pandas?
## or
## How I learned to stop worrying and love Data Frames

### Stephen Pettinato
### 2018-08-28

# Bio

### Degrees: BA Mathematics, MA Statistics, MS Computer Science

### Work History: Software Engineer, Data Engineer, Data Scientist

## Currently a Data Scientist at ThriveMarket


# Outline
1. Motivation
2. What is Pandas?
3. 2 Demos comparing Base Python to Pandas
4. What is a DataFrame?
5. 2 Demos comparing Pandas to Spark & Dask
6. Unit tests of Pandas Transformations
7. Comparison with SQL & MapReduce

A while back I was presented with the following comment,

> I've been thinking about using Spark because my dataset is bigger than 5 GB, what do you think?

## Let's check the documentation

> pandas is an open source, BSD-licensed library providing high-performance, easy-to-use data structures and data analysis tools for the Python programming language."

https://pandas.pydata.org/

> pandas is a Python package providing fast, flexible, and expressive data structures designed to make working with “relational” or “labeled” data both easy and intuitive.  It aims to be the fundamental high-level building block for doing practical, real world data analysis in Python.

https://pandas.pydata.org/pandas-docs/stable/


This means,
1. you can take a dataset and transform it to other formats (ETL)
2. you can ask questions of the dataset (Analysis)
3. you don't have to write a ton of custom code for each transformation or analysis

I say you should always use Pandas,
* If you are wranging data and expect to wrangle data in the future
* If your computations don't have real time or near real time performance requirements
* If you are not using R
* If you are not using Excel


Why?

* Pandas Data Frames work well for ETL & Analysis on small datasets
* Similar algorithms used locally can be applied at large scales

# Demo

## Comma delimited file with columns,
1. customer
2. orderid
3. product

## Answer the following questions,
1. How many customers ordered products?
2. On average how many products were ordered per order?

### in,
1. base Python
2. pandas

In [1]:
demofile = 'demo_file.csv'

contents = """customer,orderid,product
Oliver,10,mayo
Oliver,10,cashews
Oliver,10,tea
Oliver,11,mustard
Loretta,12,mayo
Loretta,12,cashews
Sofia,13,mayo"""

with open(demofile, 'w') as f:
    f.write(contents)

## Base Python

In [2]:
# First read in the file
data = []  # Use a list of dictionaries
with open(demofile, 'r') as f:
    lines = f.readlines()
    header = lines[0].strip().split(',')
    for line in lines[1:]:
        row = dict(zip(header, line.split(',')))
        data.append(row)
    
# Then answer question 1
customer_count = len(set([row['customer'] for row in data]))
print("Customer count: ", customer_count)

# And answer question 2
from collections import defaultdict
ord_to_items = defaultdict(list)
for row in data:
    ord_to_items[row['orderid']].append(row['product'])
order_lengths = [len(products) for products in ord_to_items.values()]
avg_items_per_order = sum(order_lengths) / len(order_lengths)
print("Average number of items per order", avg_items_per_order)

Customer count:  3
Average number of items per order 1.75


## Pandas

In [3]:
import pandas as pd
df = pd.read_csv(demofile)

print("Customer count: ", df.customer.nunique())

avg_items_per_order = df.groupby('orderid').size().mean()
print("Average number of items per order", avg_items_per_order)

Customer count:  3
Average number of items per order 1.75


# What is a Data Frame?

## A collection of rows and columns of data stored as strings or numbers

### Plus a collection of operations

* Transform one or more columns into one or more columns
* Transform one or more rows into one or more rows
* Filter rows or columns by conditions
* Join to other DataFrames
* Aggregations

In [4]:
def highlight_cond(row, cond):
    """Custom function used below to highlight row 3 or column product"""
    color = 'background-color: white'
    if cond == 'row3' and row.name == 3:
        color = 'background-color: yellow'
    if cond == 'colproduct' and row.name == 'product':
        color = 'background-color: yellow'
    return [color  for _ in row]

In [5]:
import pandas as pd
df = pd.read_csv(demofile)
df

Unnamed: 0,customer,orderid,product
0,Oliver,10,mayo
1,Oliver,10,cashews
2,Oliver,10,tea
3,Oliver,11,mustard
4,Loretta,12,mayo
5,Loretta,12,cashews
6,Sofia,13,mayo


In [6]:
dft = df.style.apply(highlight_cond, cond='row3', axis=1)
dft

Unnamed: 0,customer,orderid,product
0,Oliver,10,mayo
1,Oliver,10,cashews
2,Oliver,10,tea
3,Oliver,11,mustard
4,Loretta,12,mayo
5,Loretta,12,cashews
6,Sofia,13,mayo


In [7]:
dft = df.style.apply(highlight_cond, cond='colproduct')
dft

Unnamed: 0,customer,orderid,product
0,Oliver,10,mayo
1,Oliver,10,cashews
2,Oliver,10,tea
3,Oliver,11,mustard
4,Loretta,12,mayo
5,Loretta,12,cashews
6,Sofia,13,mayo


# Why are these operations in Pandas fast?

# C++!

In [8]:
import pandas as pd
df = pd.read_csv(demofile)

print("Here, everything happens in C++: ", df.customer.nunique())
print("Here, everything happens in Python: ", len(set(df.customer)))

Here, everything happens in C++:  3
Here, everything happens in Python:  3


# Let's do a harder question,

How many customers order mayo and then mustard in their next order?

In [9]:
# With demofile
display(pd.read_csv(demofile))

Unnamed: 0,customer,orderid,product
0,Oliver,10,mayo
1,Oliver,10,cashews
2,Oliver,10,tea
3,Oliver,11,mustard
4,Loretta,12,mayo
5,Loretta,12,cashews
6,Sofia,13,mayo


In [None]:
# Base Python
# Start with data as a list of dictionaries with keys customer, orderid, product

# NO WORKING SOLUTION

from collections import OrderedDict

customerids = set([row['customer'] for row in data])
data_by_customer = {customer: OrderedDict() for customer in customers}

for row in data:
    if len(data_by_customer[row['customer']]) == 0:
        pass
        

for index, row in data_by_customer.items():
    print(index, row)
# for row in data:
#     new_row = dict(productmayo='mayo' in row['product'],
#                    productmustard='mustard' in row['product'])
#     data_by_customer[row['customer']][row['orderid']].append(new_row)
    
# count_mayo_then_mustard = 0
# for customerid, order_dict in data_by_customer.items():
#     print("processing customer", customer)
#     for index, order in enumerate(rows):
#         print("+"*10)
#         print(index, order)
#         if index > 0:
#             if order['productmustard'] and rows[index]['productmustard']:
#                 count_mayo_then_mustard += 1

# print(count_mayo_then_mustard, " Customers ordered mayo then mustard")
    

Doing this in base Python is a hard problem

1. Custom transformations require unit tests for robustness
2. Going to be slow
3. This is a trivial question. I have hundreds of questions like this.

# In Pandas

In [10]:
import pandas as pd
df = pd.read_csv(demofile)

df['mayo_in_order'] = (
    df.groupby('orderid').product.transform(lambda grp: 'mayo' in grp.values))
df['mustard_in_order'] = (
    df.groupby('orderid').product.transform(lambda grp: 'mustard' in grp.values))

df['mayo_in_order_shifted'] = df.mayo_in_order.shift(1)
df['customer_shifted'] = df.customer.shift(1)
df['orderid_shifted'] = df.orderid.shift(1)

count_of_customers = (
    df[(df.mayo_in_order_shifted & df.mustard_in_order) &
       (df.customer == df.customer_shifted) &
       (df.orderid != df.orderid_shifted)]
    .customer.nunique())

print("Customer count: ", count_of_customers)

Customer count:  1


In [11]:
dft = (df[['customer', 'customer_shifted', 'orderid', 
           'orderid_shifted', 'mustard_in_order', 'mayo_in_order_shifted']]
       .style.apply(highlight_cond, cond='row3', axis=1))
display(dft)

Unnamed: 0,customer,customer_shifted,orderid,orderid_shifted,mustard_in_order,mayo_in_order_shifted
0,Oliver,,10,,False,
1,Oliver,Oliver,10,10.0,False,True
2,Oliver,Oliver,10,10.0,False,True
3,Oliver,Oliver,11,10.0,True,True
4,Loretta,Oliver,12,11.0,False,False
5,Loretta,Loretta,12,12.0,False,True
6,Sofia,Loretta,13,12.0,False,True


### This solution in Pandas is 
1. Fast - Once you make a DataFrame Pandas is just C++ under the hood
2. Scalable - Will work quickly on a 10 GB file.
3. Memory efficient - As long as the data fits into memory, this will work quickly
4. Has extendable concepts

### This same algorithm
* can be applied in other frameworks
* run across datasets in the TB range
* and still be a performant algorithm

# Big Data Dataframe Technologies
1. Spark
2. Dask

## Benefits
* Data Frames
* Testable
* Easy to read
* Advanced transformations
* Fast Development
* Easy to run locally or as a distributed process

## Detriments
* Lower execution time compared to custom transformations
* Higher ramp up time compared to SQL

# Why are Dask and Spark fast?

## Dask is fast because 
* Everything happens in C++
* Leverages multiple processors
* Can run on multiple machines

## Spark is fast because
* Everything happens in Java
* Leverages multiple machines

# Let's do the above examples in Spark and Dask

1. How many customers ordered products?
2. On average how many products were ordered per order?

# Pandas Solution

In [12]:
import pandas as pd
df = pd.read_csv(demofile)

print("Customer count: ", df.customer.nunique())

avg_items_per_order = df.groupby('orderid').size().mean()
print("Average number of items per order", avg_items_per_order)

Customer count:  3
Average number of items per order 1.75


# Dask Solution
### Note: Dask API is _designed_ to mimic the Pandas API 

In [13]:
import dask.dataframe as dd
dask_df = dd.read_csv(demofile)

print("Customer count: ", dask_df.customer.nunique().compute())

avg_items_per_order = dask_df.groupby('orderid').size().mean().compute()
print("Average number of items per order", avg_items_per_order)

Customer count:  3
Average number of items per order 1.75


# Spark Solution

In [14]:
import findspark
findspark.init('/home/napoleon/Projects/spark-2.3.1-bin-hadoop2.7')

from pyspark import SparkContext
from pyspark.sql import SQLContext

sc = SparkContext('local')
sqlContext = SQLContext(sc)

In [15]:
# Note: Skipped creation of sqlContext
# Read in file
spark_df = sqlContext.read.csv(demofile, header=True)

customer_count = spark_df.select("customer").distinct().count()
print("Customer count: ", customer_count)

from pyspark.sql.functions import col, avg
spark_df.groupby('orderid').count().agg(avg(col("count"))).show()

Customer count:  3
+----------+
|avg(count)|
+----------+
|      1.75|
+----------+



How many customers order mayo and then mustard in their next order?

# Pandas Solution

In [16]:
import pandas as pd
df = pd.read_csv(demofile)

df['mayo_in_order'] = (
    df.groupby('orderid').product.transform(lambda grp: 'mayo' in grp.values))
df['mustard_in_order'] = (
    df.groupby('orderid').product.transform(lambda grp: 'mustard' in grp.values))

df['mayo_in_order_shifted'] = df.mayo_in_order.shift(1)
df['customer_shifted'] = df.customer.shift(1)
df['orderid_shifted'] = df.orderid.shift(1)

count_of_customers = (
    df[(df.mayo_in_order_shifted & df.mustard_in_order) &
       (df.customer == df.customer_shifted) &
       (df.orderid != df.orderid_shifted)]
    .customer.nunique())

print("Customer count: ", count_of_customers)

Customer count:  1


In [17]:
dft = (df[['customer', 'customer_shifted', 'orderid', 
           'orderid_shifted', 'mustard_in_order', 'mayo_in_order_shifted']]
       .style.apply(highlight_cond, cond='row3', axis=1))
display(dft)

Unnamed: 0,customer,customer_shifted,orderid,orderid_shifted,mustard_in_order,mayo_in_order_shifted
0,Oliver,,10,,False,
1,Oliver,Oliver,10,10.0,False,True
2,Oliver,Oliver,10,10.0,False,True
3,Oliver,Oliver,11,10.0,True,True
4,Loretta,Oliver,12,11.0,False,False
5,Loretta,Loretta,12,12.0,False,True
6,Sofia,Loretta,13,12.0,False,True


# Dask Solution

In [18]:
import dask.dataframe as dd
dask_df = dd.read_csv(demofile)

meta_srs = meta=pd.Series(dtype='bool', name='orderid')
mayo_in_order_df = (
    dask_df.groupby('orderid').product
    .apply(lambda grp: 'mayo' in grp.values, meta=meta_srs).compute())
mustard_in_order = (
    dask_df.groupby('orderid').product
    .apply(lambda grp: 'mustard' in grp.values, meta=meta_srs).compute())

dask_df['mayo_in_order'] = dask_df['orderid'].map(mayo_in_order_df)
dask_df['mustard_in_order'] = dask_df['orderid'].map(mustard_in_order)

dask_df['mayo_in_order_shifted'] = dask_df.mayo_in_order.shift(1).astype(bool)
dask_df['customer_shifted'] = dask_df.customer.shift(1)
dask_df['orderid_shifted'] = dask_df.orderid.shift(1)

count_of_customers = (
    dask_df[(dask_df.mayo_in_order_shifted & dask_df.mustard_in_order) & 
            (dask_df.customer == dask_df.customer_shifted) & 
            (dask_df.orderid != dask_df.orderid_shifted)]
    .customer.nunique().compute())

print("Customer count: ", count_of_customers)

Customer count:  1


# Spark Solution

In [19]:
from pyspark.sql.functions import lag, col, lit
from pyspark.sql.window import Window

def shift(input_df, col_to_shift, shift_amount, fill_val):
    """
    Function to shift a column by n downwards.
    May be in-efficient as there is no partitioning involved
    Adapted from https://stackoverflow.com/a/34296063/2596363
    """
    
    new_col_name = col_to_shift + "_shifted"
    
    input_df = input_df.withColumn("noop_col", lit('noop'))

    w = Window().partitionBy().orderBy(col("noop_col"))
    input_df = (input_df
                .select("*", lag(col_to_shift, count=shift_amount).over(w).alias(new_col_name))
                .na.fill(fill_val))

    input_df = input_df.drop("noop_col")

    return input_df

In [20]:
from pyspark.sql.functions import collect_list, col

spark_df = sqlContext.read.csv(demofile, header=True)

mayo_in_order_df = (spark_df.groupby('orderid').agg(collect_list('product'))
     .rdd.map(lambda row: (row[0], 'mayo' in row[1]))
    ).toDF(["orderid", 'mayo_in_order'])
mustard_in_order_df = (spark_df.groupby('orderid').agg(collect_list('product'))
     .rdd.map(lambda row: (row[0], 'mustard' in row[1]))
    ).toDF(["orderid", 'mustard_in_order'])

# Do a sort after the join as the order gets discarded during the joins
spark_df = (spark_df
            .join(mayo_in_order_df, "orderid")
            .join(mustard_in_order_df, 'orderid')
            .sort(col("orderid")))

In [21]:
# Note: Skipped the implementation of this shift function
spark_df = shift(spark_df, "mayo_in_order", 1, False)
spark_df = shift(spark_df, "customer", 1, -1)
spark_df = shift(spark_df, "orderid", 1, -1)

# Count the distinct customers
customer_count = (
    spark_df.rdd.map(
        lambda row: 
        (row[1], 
         (row[4] and row[5]) and 
         (row [1] == row[-2]) and 
         (row[0] != row[-1])))
    .toDF(["customer", "mayo_then_mustard"])
    .filter(col("mayo_then_mustard"))
    .select("customer").distinct().count())
print("Customer count: ", customer_count)

Customer count:  1


# Example of Testing Pandas Transformation

In [22]:
import pandas as pd
df = pd.read_csv(demofile)

def mayo_then_mustard(df):
    df['mayo_in_order'] = (
        df.groupby('orderid').product.transform(lambda grp: 'mayo' in grp.values))
    df['mustard_in_order'] = (
        df.groupby('orderid').product.transform(lambda grp: 'mustard' in grp.values))

    df['mayo_in_order_shifted'] = df.mayo_in_order.shift(1)
    df['customer_shifted'] = df.customer.shift(1)
    df['orderid_shifted'] = df.orderid.shift(1)
    
    count_of_customers = (
        df[(df.mayo_in_order_shifted & df.mustard_in_order) &
           (df.customer == df.customer_shifted) &
           (df.orderid != df.orderid_shifted)]
        .customer.nunique())

    return count_of_customers
    
print("Customer count: ", mayo_then_mustard(df))

Customer count:  1


In [23]:
test_df = pd.DataFrame([
    dict(customer='Bob', orderid=1, product='mayo'),
    dict(customer='Bob', orderid=2, product='mustard'),
    dict(customer='Marge', orderid=3, product='mayo'),
    dict(customer='Marge', orderid=4, product='mustard'),
])

actual_count =  mayo_then_mustard(test_df)
assert 2 == actual_count, \
    'Expected 2 customers to have mayo then mustard: {}'.format(actual_count)

# Pandas vs Dask vs Spark

They each have their own syntax, but on the whole the algorithm is the same in every system.

## What about non-Data Frame technologies?

# SQL Technologies
1. Hive
2. Spark
3. Pig
4. Athena
5. Redshift

## Benefits
* SQL
* Fast Development
* Reasonable execution time

## Detriments
* SQL (yeah, it's here too)
* Hard to test
* No advanced transformations

# Custom Data Transformations
1. MapReduce
2. Spark
3. Scala
4. Pig

## Benefits
* Super Fast Execution time
* Testable
* Advanced transformations

## Detriments
* Slow development time because each transformation requires custom code
* Significant ramp up time for new engineers
* Analysts?

# Conclusion
1. Working with Data Frames makes data-wrangling easier
2. Pandas is an easy route to learn how to use Data Frames
2. Learning Pandas transformations makes Big Data data-wrangling easier

# Questions?