## Loading Data from CSV into DataFrame

In [1]:
# Setting Environment
import sys
import os
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .appName("Skill-Pyspark")\
        .getOrCreate()

In [5]:
df = spark.read.format("csv")\
    .option("header", "true")\
    .load("sample_sales_pyspark.csv")
df.show()

+----------+------------+----------+---------+---------+-----+
|store_code|product_code|sales_date|sales_qty|sales_rev|price|
+----------+------------+----------+---------+---------+-----+
|        B1|       89912|2021-05-01|       14|    17654| 1261|
|        B1|       89912|2021-05-02|       19|    24282| 1278|
|        B1|       89912|2021-05-03|       15|    19305| 1287|
|        B1|       89912|2021-05-04|       21|    28287| 1347|
|        B1|       89912|2021-05-05|        4|     5404| 1351|
|        B1|       89912|2021-05-06|        5|     6775| 1355|
|        B1|       89912|2021-05-07|       10|    12420| 1242|
|        B1|       89912|2021-05-08|       18|    22500| 1250|
|        B1|       89912|2021-05-09|        5|     6555| 1311|
|        B1|       89912|2021-05-10|        2|     2638| 1319|
|        B1|       89912|2021-05-11|       15|    19575| 1305|
|        B1|       89912|2021-05-12|       21|    28182| 1342|
|        B1|       89912|2021-05-13|        7|     9268

In [6]:
df.printSchema()

root
 |-- store_code: string (nullable = true)
 |-- product_code: string (nullable = true)
 |-- sales_date: string (nullable = true)
 |-- sales_qty: string (nullable = true)
 |-- sales_rev: string (nullable = true)
 |-- price: string (nullable = true)



## Example 1 : Select Expression

In [34]:
sub_df = df.selectExpr("store_code as store_id",
                      "cast(product_code as bigint) as prod_id", 
                      "cast(sales_date as date) as sales_date",
                      "cast(price as float) as amount",
                      "cast(sales_qty as int) as sales_quantity")
sub_df.dtypes

[('store_id', 'string'),
 ('prod_id', 'bigint'),
 ('sales_date', 'date'),
 ('amount', 'float'),
 ('sales_quantity', 'int')]

## Example 2 : Moving Average

In [43]:
from pyspark.sql import Window
# Define a window
window = (
        Window\
        .partitionBy("store_id", "prod_id")\
        .orderBy("sales_date")\
        .rowsBetween(-2, Window.currentRow)
)

In [44]:
from pyspark.sql.functions import round, mean
# Calculate the mean over the window
sub_df = sub_df\
        .withColumn("moving_avg", round(mean("sales_quantity").over(window), 2))
sub_df.show()

+--------+-------+----------+------+--------------+----------+
|store_id|prod_id|sales_date|amount|sales_quantity|moving_avg|
+--------+-------+----------+------+--------------+----------+
|      A1|  89686|2021-05-06| 441.0|            12|      12.0|
|      A1|  89686|2021-05-07| 447.0|            23|      17.5|
|      A1|  89686|2021-05-08| 456.0|            14|     16.33|
|      A1|  89686|2021-05-09| 438.0|             8|      15.0|
|      A1|  89686|2021-05-10| 415.0|             6|      9.33|
|      A1|  89686|2021-05-11| 446.0|             4|       6.0|
|      A1|  89686|2021-05-12| 441.0|            11|       7.0|
|      A1|  89686|2021-05-13| 422.0|             9|       8.0|
|      A1|  89686|2021-05-14| 459.0|            19|      13.0|
|      A1|  89686|2021-05-15| 436.0|             3|     10.33|
|      A1|  89688|2021-05-06| 188.0|             7|       7.0|
|      A1|  89688|2021-05-07| 187.0|             9|       8.0|
|      A1|  89688|2021-05-08| 173.0|            22|    

## Example 3 : Moving average with SQL

In [45]:
from pyspark.sql.functions import expr
# define a window
expression = """
                mean(sales_quantity) over (partition by store_id, prod_id order by sales_date rows between 2 preceding and current row)    
            """

sub_df = sub_df.withColumn(
    "moving_avg", round(expr(expression), 3)
)

In [46]:
sub_df.show(5)

+--------+-------+----------+------+--------------+----------+
|store_id|prod_id|sales_date|amount|sales_quantity|moving_avg|
+--------+-------+----------+------+--------------+----------+
|      A1|  89686|2021-05-06| 441.0|            12|      12.0|
|      A1|  89686|2021-05-07| 447.0|            23|      17.5|
|      A1|  89686|2021-05-08| 456.0|            14|    16.333|
|      A1|  89686|2021-05-09| 438.0|             8|      15.0|
|      A1|  89686|2021-05-10| 415.0|             6|     9.333|
+--------+-------+----------+------+--------------+----------+
only showing top 5 rows



## Example 4 : Conditional Moving Average

In [53]:
from pyspark.sql.functions import when, col
promo_days = ["2021-05-07", "2021-05-08", "2021-05-09"]

sub_df = sub_df.withColumn('is_promo',
                          when(col('sales_date').isin(promo_days), 1).otherwise(0))

sub_df.show()

+--------+-------+----------+------+--------------+----------+--------+
|store_id|prod_id|sales_date|amount|sales_quantity|moving_avg|is_promo|
+--------+-------+----------+------+--------------+----------+--------+
|      A1|  89686|2021-05-06| 441.0|            12|      12.0|       0|
|      A1|  89686|2021-05-07| 447.0|            23|      17.5|       1|
|      A1|  89686|2021-05-08| 456.0|            14|    16.333|       1|
|      A1|  89686|2021-05-09| 438.0|             8|      15.0|       1|
|      A1|  89686|2021-05-10| 415.0|             6|     9.333|       0|
|      A1|  89686|2021-05-11| 446.0|             4|       6.0|       0|
|      A1|  89686|2021-05-12| 441.0|            11|       7.0|       0|
|      A1|  89686|2021-05-13| 422.0|             9|       8.0|       0|
|      A1|  89686|2021-05-14| 459.0|            19|      13.0|       0|
|      A1|  89686|2021-05-15| 436.0|             3|    10.333|       0|
|      A1|  89688|2021-05-06| 188.0|             7|       7.0|  

In [55]:
# filter out promo_days and start calculating the moving average.
nopromo_df = sub_df.filter("is_promo == 0")\
            .withColumn("moving_avg", round(mean("sales_quantity").over(window), 2))\
            .select("store_id","prod_id", "sales_date", "moving_avg")
nopromo_df.show()

+--------+-------+----------+----------+
|store_id|prod_id|sales_date|moving_avg|
+--------+-------+----------+----------+
|      A1|  89686|2021-05-06|      12.0|
|      A1|  89686|2021-05-10|       9.0|
|      A1|  89686|2021-05-11|      7.33|
|      A1|  89686|2021-05-12|       7.0|
|      A1|  89686|2021-05-13|       8.0|
|      A1|  89686|2021-05-14|      13.0|
|      A1|  89686|2021-05-15|     10.33|
|      A1|  89688|2021-05-06|       7.0|
|      A1|  89688|2021-05-10|       3.5|
|      A1|  89688|2021-05-11|       8.0|
|      A1|  89688|2021-05-12|      6.67|
|      A1|  89688|2021-05-13|     13.33|
|      A1|  89688|2021-05-14|     11.67|
|      A1|  89688|2021-05-15|     15.67|
|      A1|  89912|2021-05-01|       4.0|
|      A1|  89912|2021-05-02|       4.0|
|      A1|  89912|2021-05-03|       8.0|
|      A1|  89912|2021-05-04|     10.67|
|      A1|  89912|2021-05-05|      14.0|
|      A1|  89912|2021-05-06|     11.33|
+--------+-------+----------+----------+
only showing top

In [58]:
# Join Nonpromo moving average to the original Dataframe.
sub_df.join(nopromo_df, on = ["store_id", "prod_id", "sales_date"], how = "left").show()

+--------+-------+----------+------+--------------+----------+--------+----------+
|store_id|prod_id|sales_date|amount|sales_quantity|moving_avg|is_promo|moving_avg|
+--------+-------+----------+------+--------------+----------+--------+----------+
|      A1|  89686|2021-05-06| 441.0|            12|      12.0|       0|      12.0|
|      A1|  89686|2021-05-07| 447.0|            23|      17.5|       1|      null|
|      A1|  89686|2021-05-08| 456.0|            14|    16.333|       1|      null|
|      A1|  89686|2021-05-09| 438.0|             8|      15.0|       1|      null|
|      A1|  89686|2021-05-10| 415.0|             6|     9.333|       0|       9.0|
|      A1|  89686|2021-05-11| 446.0|             4|       6.0|       0|      7.33|
|      A1|  89686|2021-05-12| 441.0|            11|       7.0|       0|       7.0|
|      A1|  89686|2021-05-13| 422.0|             9|       8.0|       0|       8.0|
|      A1|  89686|2021-05-14| 459.0|            19|      13.0|       0|      13.0|
|   