# UDFs and Windowing

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F


ss = SparkSession.builder.\
     master('spark://spark-master:7077').\
     appName('week9lecture_udfwindow').getOrCreate()

In [2]:
# RDD
sc = ss.sparkContext

In [3]:
purchases_rdd = sc.parallelize([
("Geoffrey", "2016-04-22", "A", "apples", 1, 50.00),
("Geoffrey", "2016-05-03", "B", "Lamp", 2, 38.00),
("Geoffrey", "2016-05-03", "D", "Solar Pannel", 1, 29.00),
("Geoffrey", "2016-05-03", "A", "apples", 3, 50.00),
("Geoffrey", "2016-05-03", "C", "Rice", 5, 15.00),
("Geoffrey", "2016-06-05", "A", "apples", 5, 50.00),
("Geoffrey", "2016-06-05", "A", "bananas", 5, 55.00),
("Geoffrey", "2016-06-15", "Y", "Motor skate", 7, 68.00),
("Geoffrey", "2016-06-15", "E", "Book: The noose", 1, 125.00),
("Yann", "2016-04-22", "B", "Lamp", 1, 38.00),
("Yann", "2016-05-03", "Y", "Motor skate", 1, 68.00),
("Yann", "2016-05-03", "D", "Recycle bin", 5, 27.00),
("Yann", "2016-05-03", "C", "Rice", 15, 15.00),
("Yann", "2016-04-02", "A", "bananas", 3, 55.00),
("Yann", "2016-04-02", "B", "Lamp", 2, 38.00),
("Yann", "2016-04-03", "E", "Book: Crime and Punishment", 5, 100.00),
("Yann", "2016-04-13", "E", "Book: The noose", 5, 125.00),
("Yann", "2016-04-27", "D", "Solar Pannel", 5, 29.00),
("Yann", "2016-05-27", "D", "Recycle bin", 5, 27.00),
("Yann", "2016-05-27", "A", "bananas", 3, 55.00),
("Yann", "2016-05-01", "Y", "Motor skate", 1, 68.00),
("Yann", "2016-06-07", "Z", "space ship", 1, 227.00),
("Yoshua", "2016-02-07", "Z", "space ship", 2, 227.00),
("Yoshua", "2016-02-14", "A", "bananas", 9, 55.00),
("Yoshua", "2016-02-14", "B", "Lamp", 2, 38.00),
("Yoshua", "2016-02-14", "A", "apples", 10, 55.00),
("Yoshua", "2016-03-07", "Z", "space ship", 5, 227.00),
("Yoshua", "2016-04-07", "Y", "Motor skate", 4, 68.00),
("Yoshua", "2016-04-07", "D", "Recycle bin", 5, 27.00),
("Yoshua", "2016-04-07", "C", "Rice", 5, 15.00),
("Yoshua", "2016-04-07", "A", "bananas", 9, 55.00),
("Jurgen", "2016-05-01", "Z", "space ship", 1, 227.00),
("Jurgen", "2016-05-01", "A", "bananas", 5, 55.00),
("Jurgen", "2016-05-08", "A", "bananas", 5, 55.00),
("Jurgen", "2016-05-08", "Y", "Motor skate", 1, 68.00),
("Jurgen", "2016-06-05", "A", "bananas", 5, 55.00),
("Jurgen", "2016-06-05", "C", "Rice", 5, 15.00),
("Jurgen", "2016-06-05", "Y", "Motor skate", 2, 68.00),
("Jurgen", "2016-06-05", "D", "Recycle bin", 5, 27.00),
])

In [4]:
colnames = ["customer_name",
            "date",
            "category", 
            "product_name", 
            "quantity", 
            "price"]

In [5]:
purchases_df = purchases_rdd.toDF(colnames)

In [6]:
purchases_df.take(5)

[Row(customer_name='Geoffrey', date='2016-04-22', category='A', product_name='apples', quantity=1, price=50.0),
 Row(customer_name='Geoffrey', date='2016-05-03', category='B', product_name='Lamp', quantity=2, price=38.0),
 Row(customer_name='Geoffrey', date='2016-05-03', category='D', product_name='Solar Pannel', quantity=1, price=29.0),
 Row(customer_name='Geoffrey', date='2016-05-03', category='A', product_name='apples', quantity=3, price=50.0),
 Row(customer_name='Geoffrey', date='2016-05-03', category='C', product_name='Rice', quantity=5, price=15.0)]

In [7]:
purchases_df.show(5)

+-------------+----------+--------+------------+--------+-----+
|customer_name|      date|category|product_name|quantity|price|
+-------------+----------+--------+------------+--------+-----+
|     Geoffrey|2016-04-22|       A|      apples|       1| 50.0|
|     Geoffrey|2016-05-03|       B|        Lamp|       2| 38.0|
|     Geoffrey|2016-05-03|       D|Solar Pannel|       1| 29.0|
|     Geoffrey|2016-05-03|       A|      apples|       3| 50.0|
|     Geoffrey|2016-05-03|       C|        Rice|       5| 15.0|
+-------------+----------+--------+------------+--------+-----+
only showing top 5 rows



In [8]:
purchases_df.limit(5).show()

+-------------+----------+--------+------------+--------+-----+
|customer_name|      date|category|product_name|quantity|price|
+-------------+----------+--------+------------+--------+-----+
|     Geoffrey|2016-04-22|       A|      apples|       1| 50.0|
|     Geoffrey|2016-05-03|       B|        Lamp|       2| 38.0|
|     Geoffrey|2016-05-03|       D|Solar Pannel|       1| 29.0|
|     Geoffrey|2016-05-03|       A|      apples|       3| 50.0|
|     Geoffrey|2016-05-03|       C|        Rice|       5| 15.0|
+-------------+----------+--------+------------+--------+-----+



In [9]:
purchases_df.limit(5).toPandas().head()

Unnamed: 0,customer_name,date,category,product_name,quantity,price
0,Geoffrey,2016-04-22,A,apples,1,50.0
1,Geoffrey,2016-05-03,B,Lamp,2,38.0
2,Geoffrey,2016-05-03,D,Solar Pannel,1,29.0
3,Geoffrey,2016-05-03,A,apples,3,50.0
4,Geoffrey,2016-05-03,C,Rice,5,15.0


In [10]:
# use persist if we are planning on performing multiple actions on it
# kind of checkpoint
purchases_df.persist()

DataFrame[customer_name: string, date: string, category: string, product_name: string, quantity: bigint, price: double]

In [11]:
products_df = purchases_df.select('product_name').distinct()
products_df.show()

+--------------------+
|        product_name|
+--------------------+
|             bananas|
|Book: Crime and P...|
|                Lamp|
|         Recycle bin|
|              apples|
|        Solar Pannel|
|     Book: The noose|
|                Rice|
|          space ship|
|         Motor skate|
+--------------------+



# Summary statistics on certain columns

In [12]:
basic_stats_df = purchases_df.describe('quantity','price')
basic_stats_df.show()

+-------+------------------+-----------------+
|summary|          quantity|            price|
+-------+------------------+-----------------+
|  count|                39|               39|
|   mean| 4.153846153846154|68.94871794871794|
| stddev|2.9694803863945936|59.75958355996516|
|    min|                 1|             15.0|
|    max|                15|            227.0|
+-------+------------------+-----------------+



# Contingency tables

In [13]:
prod_freq_per_cust = purchases_df.crosstab('customer_name','product_name')
prod_freq_per_cust.show()

+--------------------------+--------------------------+---------------+----+-----------+-----------+----+------------+------+-------+----------+
|customer_name_product_name|Book: Crime and Punishment|Book: The noose|Lamp|Motor skate|Recycle bin|Rice|Solar Pannel|apples|bananas|space ship|
+--------------------------+--------------------------+---------------+----+-----------+-----------+----+------------+------+-------+----------+
|                  Geoffrey|                         0|              1|   1|          1|          0|   1|           1|     3|      1|         0|
|                    Yoshua|                         0|              0|   1|          1|          1|   1|           0|     1|      2|         2|
|                      Yann|                         1|              1|   2|          2|          2|   1|           1|     0|      2|         1|
|                    Jurgen|                         0|              0|   0|          2|          1|   1|           0|     0|     

In [14]:
prod_freq_per_cust.toPandas()
# it only counts rows

Unnamed: 0,customer_name_product_name,Book: Crime and Punishment,Book: The noose,Lamp,Motor skate,Recycle bin,Rice,Solar Pannel,apples,bananas,space ship
0,Geoffrey,0,1,1,1,0,1,1,3,1,0
1,Yoshua,0,0,1,1,1,1,0,1,2,2
2,Yann,1,1,2,2,2,1,1,0,2,1
3,Jurgen,0,0,0,2,1,1,0,0,3,1


In [15]:
# look at columns
colnames = prod_freq_per_cust.columns
colnames

['customer_name_product_name',
 'Book: Crime and Punishment',
 'Book: The noose',
 'Lamp',
 'Motor skate',
 'Recycle bin',
 'Rice',
 'Solar Pannel',
 'apples',
 'bananas',
 'space ship']

In [16]:
# pick some of the columns
colnames[1:5] #1 to 4

['Book: Crime and Punishment', 'Book: The noose', 'Lamp', 'Motor skate']

In [17]:
colnames[1:len(colnames)-1]

['Book: Crime and Punishment',
 'Book: The noose',
 'Lamp',
 'Motor skate',
 'Recycle bin',
 'Rice',
 'Solar Pannel',
 'apples',
 'bananas']

In [18]:
prod_freq_per_cust.describe(colnames).toPandas()
#prod_freq_per_cust is spark DF

Unnamed: 0,summary,customer_name_product_name,Book: Crime and Punishment,Book: The noose,Lamp,Motor skate,Recycle bin,Rice,Solar Pannel,apples,bananas,space ship
0,count,4,4.0,4.0,4.0,4.0,4.0,4.0,4.0,4.0,4.0,4.0
1,mean,,0.25,0.5,1.0,1.5,1.0,1.0,0.5,1.0,2.0,1.0
2,stddev,,0.5,0.5773502691896257,0.816496580927726,0.5773502691896257,0.816496580927726,0.0,0.5773502691896257,1.4142135623730951,0.816496580927726,0.816496580927726
3,min,Geoffrey,0.0,0.0,0.0,1.0,0.0,1.0,0.0,0.0,1.0,0.0
4,max,Yoshua,1.0,1.0,2.0,2.0,2.0,1.0,1.0,3.0,3.0,2.0


In [19]:
prod_freq_per_cust2 = purchases_df.groupBy('customer_name','product_name').count()

In [20]:
prod_freq_per_cust2.show()

+-------------+---------------+-----+
|customer_name|   product_name|count|
+-------------+---------------+-----+
|       Jurgen|        bananas|    3|
|     Geoffrey|           Lamp|    1|
|       Yoshua|         apples|    1|
|         Yann|           Lamp|    2|
|       Jurgen|     space ship|    1|
|       Yoshua|        bananas|    2|
|     Geoffrey|Book: The noose|    1|
|         Yann|           Rice|    1|
|         Yann|    Recycle bin|    2|
|         Yann|     space ship|    1|
|     Geoffrey|   Solar Pannel|    1|
|     Geoffrey|    Motor skate|    1|
|       Yoshua|    Motor skate|    1|
|         Yann|        bananas|    2|
|       Yoshua|           Lamp|    1|
|     Geoffrey|         apples|    3|
|       Jurgen|    Motor skate|    2|
|       Yoshua|           Rice|    1|
|       Jurgen|    Recycle bin|    1|
|     Geoffrey|           Rice|    1|
+-------------+---------------+-----+
only showing top 20 rows



In [21]:
# rename the column
prod_freq_per_cust2 = prod_freq_per_cust2.withColumnRenamed('count', 'num_purchase_events')
prod_freq_per_cust2.show()

+-------------+---------------+-------------------+
|customer_name|   product_name|num_purchase_events|
+-------------+---------------+-------------------+
|       Jurgen|        bananas|                  3|
|     Geoffrey|           Lamp|                  1|
|       Yoshua|         apples|                  1|
|         Yann|           Lamp|                  2|
|       Jurgen|     space ship|                  1|
|       Yoshua|        bananas|                  2|
|     Geoffrey|Book: The noose|                  1|
|         Yann|           Rice|                  1|
|         Yann|    Recycle bin|                  2|
|         Yann|     space ship|                  1|
|     Geoffrey|   Solar Pannel|                  1|
|     Geoffrey|    Motor skate|                  1|
|       Yoshua|    Motor skate|                  1|
|         Yann|        bananas|                  2|
|       Yoshua|           Lamp|                  1|
|     Geoffrey|         apples|                  3|
|       Jurg

In [22]:
# average
prod_freq_per_cust2.groupBy('product_name').avg('num_purchase_events').show()

+--------------------+------------------------+
|        product_name|avg(num_purchase_events)|
+--------------------+------------------------+
|             bananas|                     2.0|
|Book: Crime and P...|                     1.0|
|                Lamp|      1.3333333333333333|
|         Recycle bin|      1.3333333333333333|
|              apples|                     2.0|
|        Solar Pannel|                     1.0|
|     Book: The noose|                     1.0|
|                Rice|                     1.0|
|          space ship|      1.3333333333333333|
|         Motor skate|                     1.5|
+--------------------+------------------------+



# Pivoting columns

In [23]:
total_prod_per_cust = purchases_df.groupBy('customer_name').pivot('product_name').sum('quantity')

# group by = group by; pivot = column; sum = actual value

In [24]:
total_prod_per_cust.toPandas()
# it's unit total

Unnamed: 0,customer_name,Book: Crime and Punishment,Book: The noose,Lamp,Motor skate,Recycle bin,Rice,Solar Pannel,apples,bananas,space ship
0,Yann,5.0,5.0,3.0,2,10.0,15,5.0,,6,1.0
1,Jurgen,,,,3,5.0,5,,,15,1.0
2,Yoshua,,,2.0,4,5.0,5,,10.0,18,7.0
3,Geoffrey,,1.0,2.0,7,,5,1.0,9.0,5,


In [25]:
total_prod_per_cust = total_prod_per_cust.na.fill(0)

In [26]:
total_prod_per_cust.toPandas()

Unnamed: 0,customer_name,Book: Crime and Punishment,Book: The noose,Lamp,Motor skate,Recycle bin,Rice,Solar Pannel,apples,bananas,space ship
0,Yann,5,5,3,2,10,15,5,0,6,1
1,Jurgen,0,0,0,3,5,5,0,0,15,1
2,Yoshua,0,0,2,4,5,5,0,10,18,7
3,Geoffrey,0,1,2,7,0,5,1,9,5,0


In [27]:
colnames = total_prod_per_cust.columns
colnames

['customer_name',
 'Book: Crime and Punishment',
 'Book: The noose',
 'Lamp',
 'Motor skate',
 'Recycle bin',
 'Rice',
 'Solar Pannel',
 'apples',
 'bananas',
 'space ship']

In [28]:
colnames = colnames[1:]

In [29]:
total_prod_per_cust.groupBy().avg('bananas','apples').show()

+------------+-----------+
|avg(bananas)|avg(apples)|
+------------+-----------+
|        11.0|       4.75|
+------------+-----------+



In [30]:
total_prod_per_cust.groupBy().avg(*colnames).toPandas()
# put one star in front of list, it will take all elements inside of list

Unnamed: 0,avg(Book: Crime and Punishment),avg(Book: The noose),avg(Lamp),avg(Motor skate),avg(Recycle bin),avg(Rice),avg(Solar Pannel),avg(apples),avg(bananas),avg(space ship)
0,1.25,1.5,1.75,4.0,5.0,7.5,1.5,4.75,11.0,2.25


# UDFs

In [31]:
purchases_df.columns

['customer_name', 'date', 'category', 'product_name', 'quantity', 'price']

In [32]:
from pyspark.sql.types import DoubleType

def amount_spent(quantity,price):
    return quantity*price

amount_spent_udf = F.udf(amount_spent,DoubleType())
# F.udf like map

In [33]:
purchases_df = purchases_df.withColumn('amount_spent', amount_spent_udf(F.col('quantity'), F.col('price')))
purchases_df.limit(5).toPandas().head()

Unnamed: 0,customer_name,date,category,product_name,quantity,price,amount_spent
0,Geoffrey,2016-04-22,A,apples,1,50.0,50.0
1,Geoffrey,2016-05-03,B,Lamp,2,38.0,76.0
2,Geoffrey,2016-05-03,D,Solar Pannel,1,29.0,29.0
3,Geoffrey,2016-05-03,A,apples,3,50.0,150.0
4,Geoffrey,2016-05-03,C,Rice,5,15.0,75.0


# Windowing

In [34]:
from pyspark.sql import Window

# start by defining the window over which computations will be performed
window = Window.partitionBy('customer_name').\
         orderBy('date').\
         rowsBetween(Window.unboundedPreceding,0)

#Window.unboundedPreceding big negative number

In [35]:
# now apply the window aggregation to compute a new column `cumulative_spend`
purchases_df = purchases_df.withColumn('cumulative_spend',
                                       F.sum(F.col('amount_spent')).over(window))

In [36]:
purchases_df.limit(20).toPandas().head()

Unnamed: 0,customer_name,date,category,product_name,quantity,price,amount_spent,cumulative_spend
0,Yann,2016-04-02,A,bananas,3,55.0,165.0,165.0
1,Yann,2016-04-02,B,Lamp,2,38.0,76.0,241.0
2,Yann,2016-04-03,E,Book: Crime and Punishment,5,100.0,500.0,741.0
3,Yann,2016-04-13,E,Book: The noose,5,125.0,625.0,1366.0
4,Yann,2016-04-22,B,Lamp,1,38.0,38.0,1404.0


In [37]:
# start by creating a UDF that converts the date string to a datetime object
from datetime import datetime
from pyspark.sql.types import DateType

def parse_date(datestr):
    return datetime.strptime(datestr, '%Y-%m-%d')

string_to_datetime = F.udf(parse_date, DateType())

In [38]:
purchases_df = purchases_df.withColumn('datetime',
                                       string_to_datetime(F.col('date')))
purchases_df = purchases_df.drop('date')
purchases_df.limit(10).toPandas().head()

Unnamed: 0,customer_name,category,product_name,quantity,price,amount_spent,cumulative_spend,datetime
0,Yann,A,bananas,3,55.0,165.0,165.0,2016-04-02
1,Yann,B,Lamp,2,38.0,76.0,241.0,2016-04-02
2,Yann,E,Book: Crime and Punishment,5,100.0,500.0,741.0,2016-04-03
3,Yann,E,Book: The noose,5,125.0,625.0,1366.0,2016-04-13
4,Yann,B,Lamp,1,38.0,38.0,1404.0,2016-04-22


In [39]:
# add a weekofyear column so that we can aggregate by the week:
purchases_df = purchases_df.withColumn('weekofyear',
                                       F.weekofyear(F.col('datetime')))
purchases_df.limit(10).toPandas().head()

Unnamed: 0,customer_name,category,product_name,quantity,price,amount_spent,cumulative_spend,datetime,weekofyear
0,Yann,A,bananas,3,55.0,165.0,165.0,2016-04-02,13
1,Yann,B,Lamp,2,38.0,76.0,241.0,2016-04-02,13
2,Yann,E,Book: Crime and Punishment,5,100.0,500.0,741.0,2016-04-03,13
3,Yann,E,Book: The noose,5,125.0,625.0,1366.0,2016-04-13,15
4,Yann,B,Lamp,1,38.0,38.0,1404.0,2016-04-22,16


In [40]:
# aggregating by the week

purchases_df.groupBy('customer_name', 'weekofyear').\
             sum('amount_spent').\
             orderBy('customer_name', 'weekofyear').show()

+-------------+----------+-----------------+
|customer_name|weekofyear|sum(amount_spent)|
+-------------+----------+-----------------+
|     Geoffrey|        16|             50.0|
|     Geoffrey|        18|            330.0|
|     Geoffrey|        22|            525.0|
|     Geoffrey|        24|            601.0|
|       Jurgen|        17|            502.0|
|       Jurgen|        18|            343.0|
|       Jurgen|        22|            621.0|
|         Yann|        13|            741.0|
|         Yann|        15|            625.0|
|         Yann|        16|             38.0|
|         Yann|        17|            213.0|
|         Yann|        18|            428.0|
|         Yann|        21|            300.0|
|         Yann|        23|            227.0|
|       Yoshua|         5|            454.0|
|       Yoshua|         6|           1121.0|
|       Yoshua|        10|           1135.0|
|       Yoshua|        14|            977.0|
+-------------+----------+-----------------+

