## create Spark context

In [3]:
from pyspark import SQLContext,SparkConf,SparkContext,HiveContext
from pyspark.sql.window import Window
from pyspark.sql.types import DateType
import pyspark.sql.functions as F

In [4]:
my_conf = SparkConf(loadDefaults=True)
sc = SparkContext(conf=my_conf)
sql_context = HiveContext(sc)

## generate PySpark dataframe

In [5]:
shopping_data = \
[('Alex','2018-10-10','Paint',80),('Alex','2018-04-02','Ladder',20),('Alex','2018-06-22','Stool',20),\
('Alex','2018-12-09','Vacuum',40),('Alex','2018-07-12','Bucket',5),('Alex','2018-02-18','Gloves',5),\
('Alex','2018-03-03','Brushes',30),('Alex','2018-09-26','Sandpaper',10)]

In [6]:
df = sql_context.createDataFrame(shopping_data, ['name','date','product','price']).withColumn('date',F.col('date').cast(DateType()))

In [7]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- date: date (nullable = true)
 |-- product: string (nullable = true)
 |-- price: long (nullable = true)



In [8]:
df.show()

+----+----------+---------+-----+
|name|      date|  product|price|
+----+----------+---------+-----+
|Alex|2018-10-10|    Paint|   80|
|Alex|2018-04-02|   Ladder|   20|
|Alex|2018-06-22|    Stool|   20|
|Alex|2018-12-09|   Vacuum|   40|
|Alex|2018-07-12|   Bucket|    5|
|Alex|2018-02-18|   Gloves|    5|
|Alex|2018-03-03|  Brushes|   30|
|Alex|2018-09-26|Sandpaper|   10|
+----+----------+---------+-----+



## set the base Window Partition 

In [9]:
w0 = Window.partitionBy('name')

## Rank , Dense Rank, Percentile of purchases by price

In [10]:
df.withColumn('price_rank',F.dense_rank().over(w0.orderBy(F.col('price').desc()))).show()

+----+----------+---------+-----+----------+
|name|      date|  product|price|price_rank|
+----+----------+---------+-----+----------+
|Alex|2018-10-10|    Paint|   80|         1|
|Alex|2018-12-09|   Vacuum|   40|         2|
|Alex|2018-03-03|  Brushes|   30|         3|
|Alex|2018-04-02|   Ladder|   20|         4|
|Alex|2018-06-22|    Stool|   20|         4|
|Alex|2018-09-26|Sandpaper|   10|         5|
|Alex|2018-07-12|   Bucket|    5|         6|
|Alex|2018-02-18|   Gloves|    5|         6|
+----+----------+---------+-----+----------+



In [11]:
df.withColumn('price_rank',F.rank().over(w0.orderBy(F.col('price').asc()))).show()

+----+----------+---------+-----+----------+
|name|      date|  product|price|price_rank|
+----+----------+---------+-----+----------+
|Alex|2018-07-12|   Bucket|    5|         1|
|Alex|2018-02-18|   Gloves|    5|         1|
|Alex|2018-09-26|Sandpaper|   10|         3|
|Alex|2018-04-02|   Ladder|   20|         4|
|Alex|2018-06-22|    Stool|   20|         4|
|Alex|2018-03-03|  Brushes|   30|         6|
|Alex|2018-12-09|   Vacuum|   40|         7|
|Alex|2018-10-10|    Paint|   80|         8|
+----+----------+---------+-----+----------+



In [12]:
df.withColumn('price_bucket',F.ntile(4).over(w0.orderBy(F.col('price').desc()))).show()

+----+----------+---------+-----+------------+
|name|      date|  product|price|price_bucket|
+----+----------+---------+-----+------------+
|Alex|2018-10-10|    Paint|   80|           1|
|Alex|2018-12-09|   Vacuum|   40|           1|
|Alex|2018-03-03|  Brushes|   30|           2|
|Alex|2018-04-02|   Ladder|   20|           2|
|Alex|2018-06-22|    Stool|   20|           3|
|Alex|2018-09-26|Sandpaper|   10|           3|
|Alex|2018-07-12|   Bucket|    5|           4|
|Alex|2018-02-18|   Gloves|    5|           4|
+----+----------+---------+-----+------------+



In [13]:
df.withColumn('price_rel_rank',F.percent_rank().over(w0.orderBy(F.col('price').desc()))).show()

+----+----------+---------+-----+-------------------+
|name|      date|  product|price|     price_rel_rank|
+----+----------+---------+-----+-------------------+
|Alex|2018-10-10|    Paint|   80|                0.0|
|Alex|2018-12-09|   Vacuum|   40|0.14285714285714285|
|Alex|2018-03-03|  Brushes|   30| 0.2857142857142857|
|Alex|2018-04-02|   Ladder|   20|0.42857142857142855|
|Alex|2018-06-22|    Stool|   20|0.42857142857142855|
|Alex|2018-09-26|Sandpaper|   10| 0.7142857142857143|
|Alex|2018-07-12|   Bucket|    5| 0.8571428571428571|
|Alex|2018-02-18|   Gloves|    5| 0.8571428571428571|
+----+----------+---------+-----+-------------------+



## Moving Average and Running Sum of price

In [18]:
df.withColumn('avg_to_date',     F.round(F.avg('price').over(w0.orderBy(F.col('date'))),2))\
  .withColumn('accumulating_sum',F.sum('price').over(w0.orderBy(F.col('date'))))\
  .withColumn('max_to_date',     F.max('price').over(w0.orderBy(F.col('date'))))\
  .withColumn('max_of_last2',    F.max('price').over(w0.orderBy(F.col('date')).rowsBetween(-1,Window.currentRow)))\
  .withColumn('items_to_date',   F.count('*').over(w0.orderBy(F.col('date'))))\
  .show()

+----+----------+---------+-----+-----------+----------------+-----------+------------+-------------+
|name|      date|  product|price|avg_to_date|accumulating_sum|max_to_date|max_of_last2|items_to_date|
+----+----------+---------+-----+-----------+----------------+-----------+------------+-------------+
|Alex|2018-02-18|   Gloves|    5|        5.0|               5|          5|           5|            1|
|Alex|2018-03-03|  Brushes|   30|       17.5|              35|         30|          30|            2|
|Alex|2018-04-02|   Ladder|   20|      18.33|              55|         30|          30|            3|
|Alex|2018-06-22|    Stool|   20|      18.75|              75|         30|          20|            4|
|Alex|2018-07-12|   Bucket|    5|       16.0|              80|         30|          20|            5|
|Alex|2018-09-26|Sandpaper|   10|       15.0|              90|         30|          10|            6|
|Alex|2018-10-10|    Paint|   80|      24.29|             170|         80|        

## Time Lag and Lead for purchases

In [13]:
df.withColumn('days_from_last_purchase', F.datediff('date',F.lag('date',1).over(w0.orderBy(F.col('date'))))).show()

+----+----------+---------+-----+-----------------------+
|name|      date|  product|price|days_from_last_purchase|
+----+----------+---------+-----+-----------------------+
|Alex|2018-02-18|   Gloves|    5|                   null|
|Alex|2018-03-03|  Brushes|   30|                     13|
|Alex|2018-04-02|   Ladder|   20|                     30|
|Alex|2018-06-22|    Stool|   20|                     81|
|Alex|2018-07-12|   Bucket|    5|                     20|
|Alex|2018-09-26|Sandpaper|   10|                     76|
|Alex|2018-10-10|    Paint|   80|                     14|
|Alex|2018-12-09|   Vacuum|   40|                     60|
+----+----------+---------+-----+-----------------------+



In [14]:
df.withColumn('days_before_next_purchase', F.datediff(F.lead('date',1).over(w0.orderBy(F.col('date'))),'date')).show()

+----+----------+---------+-----+-------------------------+
|name|      date|  product|price|days_before_next_purchase|
+----+----------+---------+-----+-------------------------+
|Alex|2018-02-18|   Gloves|    5|                       13|
|Alex|2018-03-03|  Brushes|   30|                       30|
|Alex|2018-04-02|   Ladder|   20|                       81|
|Alex|2018-06-22|    Stool|   20|                       20|
|Alex|2018-07-12|   Bucket|    5|                       76|
|Alex|2018-09-26|Sandpaper|   10|                       14|
|Alex|2018-10-10|    Paint|   80|                       60|
|Alex|2018-12-09|   Vacuum|   40|                     null|
+----+----------+---------+-----+-------------------------+



## List and Sets  of purchases by member

In [15]:
newRow = sql_context.createDataFrame([('Alex','2018-10-11','Paint',80)])
df2 = df.union(newRow)

df2.withColumn('items_by_price', F.collect_list('product').over(w0.partitionBy('price')))\
   .withColumn('all_prices',     F.collect_set('price').over(w0))\
   .show(9,False)

+----+----------+---------+-----+----------------+-----------------------+
|name|date      |product  |price|items_by_price  |all_prices             |
+----+----------+---------+-----+----------------+-----------------------+
|Alex|2018-07-12|Bucket   |5    |[Bucket, Gloves]|[30, 5, 20, 10, 40, 80]|
|Alex|2018-02-18|Gloves   |5    |[Bucket, Gloves]|[30, 5, 20, 10, 40, 80]|
|Alex|2018-09-26|Sandpaper|10   |[Sandpaper]     |[30, 5, 20, 10, 40, 80]|
|Alex|2018-10-10|Paint    |80   |[Paint, Paint]  |[30, 5, 20, 10, 40, 80]|
|Alex|2018-10-11|Paint    |80   |[Paint, Paint]  |[30, 5, 20, 10, 40, 80]|
|Alex|2018-03-03|Brushes  |30   |[Brushes]       |[30, 5, 20, 10, 40, 80]|
|Alex|2018-04-02|Ladder   |20   |[Ladder, Stool] |[30, 5, 20, 10, 40, 80]|
|Alex|2018-06-22|Stool    |20   |[Ladder, Stool] |[30, 5, 20, 10, 40, 80]|
|Alex|2018-12-09|Vacuum   |40   |[Vacuum]        |[30, 5, 20, 10, 40, 80]|
+----+----------+---------+-----+----------------+-----------------------+



In [16]:
df2.withColumn('items', F.collect_set('product').over(w0.partitionBy('price')))\
   .select('name','Price','items')\
   .distinct()\
   .show()

+----+-----+----------------+
|name|Price|           items|
+----+-----+----------------+
|Alex|    5|[Bucket, Gloves]|
|Alex|   10|     [Sandpaper]|
|Alex|   80|         [Paint]|
|Alex|   30|       [Brushes]|
|Alex|   20| [Ladder, Stool]|
|Alex|   40|        [Vacuum]|
+----+-----+----------------+



## Other tricks

### multiple  order

In [17]:
w1 = Window.partitionBy('name').orderBy(F.col('price').desc(),F.col('date').asc())

In [18]:
df.withColumn('price_and_date_rank',F.rank().over(w1)).show()

+----+----------+---------+-----+-------------------+
|name|      date|  product|price|price_and_date_rank|
+----+----------+---------+-----+-------------------+
|Alex|2018-10-10|    Paint|   80|                  1|
|Alex|2018-12-09|   Vacuum|   40|                  2|
|Alex|2018-03-03|  Brushes|   30|                  3|
|Alex|2018-04-02|   Ladder|   20|                  4|
|Alex|2018-06-22|    Stool|   20|                  5|
|Alex|2018-09-26|Sandpaper|   10|                  6|
|Alex|2018-02-18|   Gloves|    5|                  7|
|Alex|2018-07-12|   Bucket|    5|                  8|
+----+----------+---------+-----+-------------------+



### moving average

In [19]:
days = lambda i: i * 86400 # 86400 seconds in a day  
window_cast = w0.orderBy(F.col('date').cast('timestamp').cast('long')) 

df.withColumn('unix_date',F.col('date').cast('timestamp').cast('long'))\
  .withColumn('30day_moving_avg', F.avg('price').over(w0.orderBy(F.col('unix_date')).rangeBetween(-days(30),0)))\
  .show()

+----+----------+---------+-----+----------+----------------+
|name|      date|  product|price| unix_date|30day_moving_avg|
+----+----------+---------+-----+----------+----------------+
|Alex|2018-02-18|   Gloves|    5|1518933600|             5.0|
|Alex|2018-03-03|  Brushes|   30|1520056800|            17.5|
|Alex|2018-04-02|   Ladder|   20|1522645200|            25.0|
|Alex|2018-06-22|    Stool|   20|1529643600|            20.0|
|Alex|2018-07-12|   Bucket|    5|1531371600|            12.5|
|Alex|2018-09-26|Sandpaper|   10|1537938000|            10.0|
|Alex|2018-10-10|    Paint|   80|1539147600|            45.0|
|Alex|2018-12-09|   Vacuum|   40|1544335200|            40.0|
+----+----------+---------+-----+----------+----------------+



### multiple windows

In [20]:
w2 = Window.partitionBy('name').orderBy(F.col('price'))
w3 = Window.partitionBy('name').orderBy(F.col('date'))

df.withColumn('price_rank',F.dense_rank().over(w2))\
  .withColumn('date_rank',F.row_number().over(w3))\
  .show()

+----+----------+---------+-----+----------+---------+
|name|      date|  product|price|price_rank|date_rank|
+----+----------+---------+-----+----------+---------+
|Alex|2018-02-18|   Gloves|    5|         1|        1|
|Alex|2018-03-03|  Brushes|   30|         4|        2|
|Alex|2018-04-02|   Ladder|   20|         3|        3|
|Alex|2018-06-22|    Stool|   20|         3|        4|
|Alex|2018-07-12|   Bucket|    5|         1|        5|
|Alex|2018-09-26|Sandpaper|   10|         2|        6|
|Alex|2018-10-10|    Paint|   80|         6|        7|
|Alex|2018-12-09|   Vacuum|   40|         5|        8|
+----+----------+---------+-----+----------+---------+

