# More DataFrames:  UDFs and Windowing

In [1]:
from pyspark.sql import SparkSession, Row
import pyspark.sql.functions as F

ss = SparkSession.builder.\
     master('spark://spark-master:7077').\
     appName('windowing').getOrCreate()

In [2]:
# so that we can play with RDDs as well
sc = ss.sparkContext

Let's manually load some data that describes purchase events.  As we've seen, there are many ways to load data manually.  Let's create an RDD first and then convert to a DataFrame:

In [3]:
purchases_rdd = sc.parallelize([
("Geoffrey", "2016-04-22", "A", "apples", 1, 50.00),
("Geoffrey", "2016-05-03", "B", "Lamp", 2, 38.00),
("Geoffrey", "2016-05-03", "D", "Solar Pannel", 1, 29.00),
("Geoffrey", "2016-05-03", "A", "apples", 3, 50.00),
("Geoffrey", "2016-05-03", "C", "Rice", 5, 15.00),
("Geoffrey", "2016-06-05", "A", "apples", 5, 50.00),
("Geoffrey", "2016-06-05", "A", "bananas", 5, 55.00),
("Geoffrey", "2016-06-15", "Y", "Motor skate", 7, 68.00),
("Geoffrey", "2016-06-15", "E", "Book: The noose", 1, 125.00),
("Yann", "2016-04-22", "B", "Lamp", 1, 38.00),
("Yann", "2016-05-03", "Y", "Motor skate", 1, 68.00),
("Yann", "2016-05-03", "D", "Recycle bin", 5, 27.00),
("Yann", "2016-05-03", "C", "Rice", 15, 15.00),
("Yann", "2016-04-02", "A", "bananas", 3, 55.00),
("Yann", "2016-04-02", "B", "Lamp", 2, 38.00),
("Yann", "2016-04-03", "E", "Book: Crime and Punishment", 5, 100.00),
("Yann", "2016-04-13", "E", "Book: The noose", 5, 125.00),
("Yann", "2016-04-27", "D", "Solar Pannel", 5, 29.00),
("Yann", "2016-05-27", "D", "Recycle bin", 5, 27.00),
("Yann", "2016-05-27", "A", "bananas", 3, 55.00),
("Yann", "2016-05-01", "Y", "Motor skate", 1, 68.00),
("Yann", "2016-06-07", "Z", "space ship", 1, 227.00),
("Yoshua", "2016-02-07", "Z", "space ship", 2, 227.00),
("Yoshua", "2016-02-14", "A", "bananas", 9, 55.00),
("Yoshua", "2016-02-14", "B", "Lamp", 2, 38.00),
("Yoshua", "2016-02-14", "A", "apples", 10, 55.00),
("Yoshua", "2016-03-07", "Z", "space ship", 5, 227.00),
("Yoshua", "2016-04-07", "Y", "Motor skate", 4, 68.00),
("Yoshua", "2016-04-07", "D", "Recycle bin", 5, 27.00),
("Yoshua", "2016-04-07", "C", "Rice", 5, 15.00),
("Yoshua", "2016-04-07", "A", "bananas", 9, 55.00),
("Jurgen", "2016-05-01", "Z", "space ship", 1, 227.00),
("Jurgen", "2016-05-01", "A", "bananas", 5, 55.00),
("Jurgen", "2016-05-08", "A", "bananas", 5, 55.00),
("Jurgen", "2016-05-08", "Y", "Motor skate", 1, 68.00),
("Jurgen", "2016-06-05", "A", "bananas", 5, 55.00),
("Jurgen", "2016-06-05", "C", "Rice", 5, 15.00),
("Jurgen", "2016-06-05", "Y", "Motor skate", 2, 68.00),
("Jurgen", "2016-06-05", "D", "Recycle bin", 5, 27.00),
])

In [4]:
colnames = ["customer_name", "date", "category", "product_name", "quantity", "price"]
purchases_df = purchases_rdd.toDF(colnames)

Just like with RDDs, we can call `.persist()` on a DataFrame if we are planning on performing multiple actions on it.

In [5]:
purchases_df.persist()

DataFrame[customer_name: string, date: string, category: string, product_name: string, quantity: bigint, price: double]

We could use `.show(5)` or `.take(5)`/`.head(5)`, but Pandas actually has prettier output:

In [6]:
purchases_df.limit(5).toPandas().head()

Unnamed: 0,customer_name,date,category,product_name,quantity,price
0,Geoffrey,2016-04-22,A,apples,1,50.0
1,Geoffrey,2016-05-03,B,Lamp,2,38.0
2,Geoffrey,2016-05-03,D,Solar Pannel,1,29.0
3,Geoffrey,2016-05-03,A,apples,3,50.0
4,Geoffrey,2016-05-03,C,Rice,5,15.0


Let's check the distinct products that are being purchased by our customers:

In [7]:
purchases_df.select('product_name').distinct().show()

+--------------------+
|        product_name|
+--------------------+
|             bananas|
|Book: Crime and P...|
|                Lamp|
|         Recycle bin|
|              apples|
|        Solar Pannel|
|     Book: The noose|
|                Rice|
|          space ship|
|         Motor skate|
+--------------------+



## Summary statistics on certain columns

A valuable way to get a quick look at some data is to use the `.describe()` method.  This will give some very basic statistics about the columns that I specify:

In [8]:
purchases_df.describe('quantity', 'price').show()

+-------+------------------+-----------------+
|summary|          quantity|            price|
+-------+------------------+-----------------+
|  count|                39|               39|
|   mean| 4.153846153846154|68.94871794871794|
| stddev|2.9694803863945936|59.75958355996516|
|    min|                 1|             15.0|
|    max|                15|            227.0|
+-------+------------------+-----------------+



## Contingency tables

Recall in statistics we have the concept of a "contingency table".  In DataFrames we use the `.crosstab()` method to produce one.  This can be a useful way to look at data, but we need to be careful with interpretation here:  this only counts *rows*.

In [9]:
prod_freq_per_cust = purchases_df.crosstab('customer_name', 'product_name')
prod_freq_per_cust.toPandas().head()

Unnamed: 0,customer_name_product_name,Book: Crime and Punishment,Book: The noose,Lamp,Motor skate,Recycle bin,Rice,Solar Pannel,apples,bananas,space ship
0,Geoffrey,0,1,1,1,0,1,1,3,1,0
1,Yoshua,0,0,1,1,1,1,0,1,2,2
2,Yann,1,1,2,2,2,1,1,0,2,1
3,Jurgen,0,0,0,2,1,1,0,0,3,1


Let's look at the columns

In [10]:
cols = prod_freq_per_cust.columns
cols

['customer_name_product_name',
 'Book: Crime and Punishment',
 'Book: The noose',
 'Lamp',
 'Motor skate',
 'Recycle bin',
 'Rice',
 'Solar Pannel',
 'apples',
 'bananas',
 'space ship']

Recall the list "slicing" syntax.  We can use this to only select certain elements in the list.  In this case we want to cut off the first column name:

In [11]:
cols[1:]

['Book: Crime and Punishment',
 'Book: The noose',
 'Lamp',
 'Motor skate',
 'Recycle bin',
 'Rice',
 'Solar Pannel',
 'apples',
 'bananas',
 'space ship']

So now we can just pass these column names to `.describe()` to get some basic purchase frequency stats:

In [12]:
prod_freq_per_cust.describe(cols[1:]).toPandas().head()

Unnamed: 0,summary,Book: Crime and Punishment,Book: The noose,Lamp,Motor skate,Recycle bin,Rice,Solar Pannel,apples,bananas,space ship
0,count,4.0,4.0,4.0,4.0,4.0,4.0,4.0,4.0,4.0,4.0
1,mean,0.25,0.5,1.0,1.5,1.0,1.0,0.5,1.0,2.0,1.0
2,stddev,0.5,0.5773502691896257,0.816496580927726,0.5773502691896257,0.816496580927726,0.0,0.5773502691896257,1.4142135623730951,0.816496580927726,0.816496580927726
3,min,0.0,0.0,0.0,1.0,0.0,1.0,0.0,0.0,1.0,0.0
4,max,1.0,1.0,2.0,2.0,2.0,1.0,1.0,3.0,3.0,2.0


`.describe()` should ONLY be used for exploratory analysis.  If we really wanted to get the average number of purchase events per product (to be used in further calculations) then we should perform an explicit aggregation ourselves:

In [13]:
prod_freq_per_cust2 = purchases_df.groupBy('customer_name', 'product_name').count()
prod_freq_per_cust2.show()

+-------------+---------------+-----+
|customer_name|   product_name|count|
+-------------+---------------+-----+
|       Jurgen|        bananas|    3|
|     Geoffrey|           Lamp|    1|
|       Yoshua|         apples|    1|
|         Yann|           Lamp|    2|
|       Jurgen|     space ship|    1|
|       Yoshua|        bananas|    2|
|     Geoffrey|Book: The noose|    1|
|         Yann|           Rice|    1|
|         Yann|    Recycle bin|    2|
|         Yann|     space ship|    1|
|     Geoffrey|   Solar Pannel|    1|
|     Geoffrey|    Motor skate|    1|
|       Yoshua|    Motor skate|    1|
|         Yann|        bananas|    2|
|       Yoshua|           Lamp|    1|
|     Geoffrey|         apples|    3|
|       Jurgen|    Motor skate|    2|
|       Yoshua|           Rice|    1|
|       Jurgen|    Recycle bin|    1|
|     Geoffrey|           Rice|    1|
+-------------+---------------+-----+
only showing top 20 rows



In [14]:
prod_freq_per_cust2 = prod_freq_per_cust2.withColumnRenamed('count', 'num_purchase_events')
prod_freq_per_cust2.show()

+-------------+---------------+-------------------+
|customer_name|   product_name|num_purchase_events|
+-------------+---------------+-------------------+
|       Jurgen|        bananas|                  3|
|     Geoffrey|           Lamp|                  1|
|       Yoshua|         apples|                  1|
|         Yann|           Lamp|                  2|
|       Jurgen|     space ship|                  1|
|       Yoshua|        bananas|                  2|
|     Geoffrey|Book: The noose|                  1|
|         Yann|           Rice|                  1|
|         Yann|    Recycle bin|                  2|
|         Yann|     space ship|                  1|
|     Geoffrey|   Solar Pannel|                  1|
|     Geoffrey|    Motor skate|                  1|
|       Yoshua|    Motor skate|                  1|
|         Yann|        bananas|                  2|
|       Yoshua|           Lamp|                  1|
|     Geoffrey|         apples|                  3|
|       Jurg

Let's compute the average number of purchase events per product:

In [15]:
prod_freq_per_cust2.groupBy('product_name').avg('num_purchase_events').show()

+--------------------+------------------------+
|        product_name|avg(num_purchase_events)|
+--------------------+------------------------+
|             bananas|                     2.0|
|Book: Crime and P...|                     1.0|
|                Lamp|      1.3333333333333333|
|         Recycle bin|      1.3333333333333333|
|              apples|                     2.0|
|        Solar Pannel|                     1.0|
|     Book: The noose|                     1.0|
|                Rice|                     1.0|
|          space ship|      1.3333333333333333|
|         Motor skate|                     1.5|
+--------------------+------------------------+



## Pivoting columns

What if we wanted to take the `quantity` column into account (i.e. for each purchase event a customer might buy MORE THAN ONE of a given product)?

One way to analyze this is to use the `.pivot()` method.  `.pivot()` *roughly* "makes a column horizontal".  More precisely, it constructs a new table where the column names are taken from column *values* in the old table.

To make sense of this we always need to start with a `.groupBy()` and end with an aggregation.
It's easier seen than said:

In [21]:
tot_prod_per_cust = purchases_df.groupBy('customer_name').pivot('product_name').sum('quantity')
tot_prod_per_cust.toPandas().head()

Unnamed: 0,customer_name,Book: Crime and Punishment,Book: The noose,Lamp,Motor skate,Recycle bin,Rice,Solar Pannel,apples,bananas,space ship
0,Yann,5.0,5.0,3.0,2,10.0,15,5.0,,6,1.0
1,Jurgen,,,,3,5.0,5,,,15,1.0
2,Yoshua,,,2.0,4,5.0,5,,10.0,18,7.0
3,Geoffrey,,1.0,2.0,7,,5,1.0,9.0,5,


Look at all of those `NaN` (not a number).  In this context that means that the customer never bought that particular product.  Let's fill those in with zeros:

In [22]:
tot_prod_per_cust = tot_prod_per_cust.na.fill(0)
tot_prod_per_cust.toPandas().head()

Unnamed: 0,customer_name,Book: Crime and Punishment,Book: The noose,Lamp,Motor skate,Recycle bin,Rice,Solar Pannel,apples,bananas,space ship
0,Yann,5,5,3,2,10,15,5,0,6,1
1,Jurgen,0,0,0,3,5,5,0,0,15,1
2,Yoshua,0,0,2,4,5,5,0,10,18,7
3,Geoffrey,0,1,2,7,0,5,1,9,5,0


Let's say we wanted to compute the average number of products purchased over all customers?  Let's start by getting a list of products:

In [23]:
products = tot_prod_per_cust.columns[1:]
products

['Book: Crime and Punishment',
 'Book: The noose',
 'Lamp',
 'Motor skate',
 'Recycle bin',
 'Rice',
 'Solar Pannel',
 'apples',
 'bananas',
 'space ship']

It is easy to get averages by hand over a couple of products:

In [25]:
avg_quantity_per_prod = tot_prod_per_cust.groupBy().avg('apples', 'bananas')
avg_quantity_per_prod.show()

+-----------+------------+
|avg(apples)|avg(bananas)|
+-----------+------------+
|       4.75|        11.0|
+-----------+------------+



If we want to compute averages for ALL products then we need to use a specify Python syntax.  Recall that we have a list of products in `products`.  We can "unpack" this list to be the arguments of a function by using the `*` operator:

In [27]:
avg_prod = tot_prod_per_cust.groupBy().avg(*products)
avg_prod.toPandas().head()

Unnamed: 0,avg(Book: Crime and Punishment),avg(Book: The noose),avg(Lamp),avg(Motor skate),avg(Recycle bin),avg(Rice),avg(Solar Pannel),avg(apples),avg(bananas),avg(space ship)
0,1.25,1.5,1.75,4.0,5.0,7.5,1.5,4.75,11.0,2.25


## UDFs

User-defined functions are very useful when performing computations on DataFrames.  These are similar in spirit to the lambdas that we often used when computing on RDDs:

In [29]:
from pyspark.sql.types import DoubleType

# define the function itself
def amount_spent(quantity, price):
    return quantity*price

# convert it to a UDF - i.e. "vectorize" the function
# over entire columns.
# This new output column will be filled with doubles (64-bit floats)
amount_spent_udf = F.udf(amount_spent, DoubleType())

Now create a new column named `amount_spent` where the values are computed using the UDF:

In [31]:
purchases_df = purchases_df.withColumn('amount_spent', amount_spent_udf(F.col('quantity'), F.col('price')))
purchases_df.limit(5).toPandas().head()

Unnamed: 0,customer_name,date,category,product_name,quantity,price,amount_spent
0,Geoffrey,2016-04-22,A,apples,1,50.0,50.0
1,Geoffrey,2016-05-03,B,Lamp,2,38.0,76.0
2,Geoffrey,2016-05-03,D,Solar Pannel,1,29.0,29.0
3,Geoffrey,2016-05-03,A,apples,3,50.0,150.0
4,Geoffrey,2016-05-03,C,Rice,5,15.0,75.0


## Windowing

Windowing is the way to aggregate a row with neighboring rows to produce interesting statistics.  For example, imagine answering questions like "average spend over last 5 visits".

Let's just do a simple example:  cumulative historical spend

In [32]:
from pyspark.sql import Window

# start by defining the window over which computations will be performed
window = Window.partitionBy('customer_name').\
                orderBy('date',).\
                rowsBetween(Window.unboundedPreceding, 0)

In [33]:
# now apply the window aggregation to compute a new column `cumulative_spend`
purchases_df = purchases_df.withColumn('cumulative_spend',
                                       F.sum(F.col('amount_spent')).over(window))

In [34]:
purchases_df.limit(20).toPandas().head()

Unnamed: 0,customer_name,date,category,product_name,quantity,price,amount_spent,cumulative_spend
0,Yann,2016-04-02,A,bananas,3,55.0,165.0,165.0
1,Yann,2016-04-02,B,Lamp,2,38.0,76.0,241.0
2,Yann,2016-04-03,E,Book: Crime and Punishment,5,100.0,500.0,741.0
3,Yann,2016-04-13,E,Book: The noose,5,125.0,625.0,1366.0
4,Yann,2016-04-22,B,Lamp,1,38.0,38.0,1404.0


We can make this example more interesting.  Above we were computing spend per *visit*.  Very often it is interesting to answer questions about buckets of time (e.g. weekly spend).

Just like we did for RDDs, we can use our old friend `datetime` to perform time analysis:

In [35]:
# start by creating a UDF that converts the date string to a datetime object
from datetime import datetime
from pyspark.sql.types import DateType

def parse_date(datestr):
    return datetime.strptime(datestr, '%Y-%m-%d')

string_to_datetime = F.udf(parse_date, DateType())

In [36]:
purchases_df = purchases_df.withColumn('datetime',
                                       string_to_datetime(F.col('date')))
purchases_df = purchases_df.drop('date')
purchases_df.limit(10).toPandas().head()

Unnamed: 0,customer_name,category,product_name,quantity,price,amount_spent,cumulative_spend,datetime
0,Yann,A,bananas,3,55.0,165.0,165.0,2016-04-02
1,Yann,B,Lamp,2,38.0,76.0,241.0,2016-04-02
2,Yann,E,Book: Crime and Punishment,5,100.0,500.0,741.0,2016-04-03
3,Yann,E,Book: The noose,5,125.0,625.0,1366.0,2016-04-13
4,Yann,B,Lamp,1,38.0,38.0,1404.0,2016-04-22


Let's add a `weekofyear` column so that we can aggregate by the week:

In [37]:
purchases_df = purchases_df.withColumn('weekofyear',
                                       F.weekofyear(F.col('datetime')))
purchases_df.limit(10).toPandas().head()

Unnamed: 0,customer_name,category,product_name,quantity,price,amount_spent,cumulative_spend,datetime,weekofyear
0,Yann,A,bananas,3,55.0,165.0,165.0,2016-04-02,13
1,Yann,B,Lamp,2,38.0,76.0,241.0,2016-04-02,13
2,Yann,E,Book: Crime and Punishment,5,100.0,500.0,741.0,2016-04-03,13
3,Yann,E,Book: The noose,5,125.0,625.0,1366.0,2016-04-13,15
4,Yann,B,Lamp,1,38.0,38.0,1404.0,2016-04-22,16


Now aggregating by the week is easy!

In [38]:
purchases_df.groupBy('customer_name', 'weekofyear').\
             sum('amount_spent').\
             orderBy('customer_name', 'weekofyear').show()

+-------------+----------+-----------------+
|customer_name|weekofyear|sum(amount_spent)|
+-------------+----------+-----------------+
|     Geoffrey|        16|             50.0|
|     Geoffrey|        18|            330.0|
|     Geoffrey|        22|            525.0|
|     Geoffrey|        24|            601.0|
|       Jurgen|        17|            502.0|
|       Jurgen|        18|            343.0|
|       Jurgen|        22|            621.0|
|         Yann|        13|            741.0|
|         Yann|        15|            625.0|
|         Yann|        16|             38.0|
|         Yann|        17|            213.0|
|         Yann|        18|            428.0|
|         Yann|        21|            300.0|
|         Yann|        23|            227.0|
|       Yoshua|         5|            454.0|
|       Yoshua|         6|           1121.0|
|       Yoshua|        10|           1135.0|
|       Yoshua|        14|            977.0|
+-------------+----------+-----------------+

