### System Config:

In [None]:
#initial config work

import os
import sys
        
# add working directory
os.chdir(os.getcwd())

# Create a variable for our root path
SPARK_HOME = os.environ['SPARK_HOME']

#Add the following paths to the system path.
sys.path.insert(0,os.path.join(SPARK_HOME,"python"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","pyspark.zip"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","py4j-0.10.7-src.zip"))

### Spark Session Config:

In [None]:
#create spark session
from pyspark.sql import SparkSession

spark = (SparkSession.builder
                     .master("local")
                     .appName("scratch")
                     .config("spark.executor.memory", "1g")
                     .config("spark.cores.max", "2")
                     .getOrCreate())

#### Extract CSV File:

In [None]:

retail_df = (spark.read.csv('./../input-data/test-data/retail.csv',
                                schema=None,
                                sep=",",
                                inferSchema=True,
                                header=True))
retail_df.printSchema()

### Aggregations : 

#### WINDOW:

In [None]:
#create a window function

from pyspark.sql.window import Window
from pyspark.sql.functions import col, date_format, desc, dense_rank, rank, max

#convert date format on retail_df
transform_step1 = (retail_df.withColumn('InvoiceDate', 
                                      date_format(col("InvoiceDate"), "MM/dd/yyyy H:mm")))

#window function
window_function = (Window.partitionBy("CustomerId")
                   .orderBy(desc("Quantity"))
                   .rowsBetween(Window.unboundedPreceding, Window.currentRow))


#aggregate functions
max_purchase_quantity = max(col("Quantity")).over(window_function)


#rank functions
purchase_dense_rank = dense_rank().over(window_function)
purchase_rank = rank().over(window_function)

transformed_df = (retail_df.withColumn('InvoiceDate', date_format(col("InvoiceDate"), "MM/dd/yyyy H:mm"))
                           .where("CustomerId IS NOT NULL")
                           .orderBy("CustomerId")
                           .select(col("CustomerId"),
                                   col("InvoiceDate"),
                                   col("Quantity"),
                                   purchase_rank.alias("quantityRank"),
                                   purchase_dense_rank.alias("quantityDenseRank"),
                                   max_purchase_quantity.alias("maxPurchaseQuantity")))

transformed_df.show(10)

In [None]:
transformed_df.show(10).explain()

#### GROUP BY:

In [None]:
from pyspark.sql.functions import sum, to_date, col;
groupByDF = (retail_df.drop()
                      .withColumn("date", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm"))
                      .groupBy("Date", "Country")
                      .agg(sum("Quantity"))
                      .selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity")
                      .orderBy("Date"))
groupByDF.show()

In [None]:
groupByDF.count()

#### ROLL UP:

In [None]:
from pyspark.sql.functions import sum, to_date, col;
rollupDF = (retail_df.drop()
                      .withColumn("date", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm"))
                      .rollup("Date", "Country")
                      .agg(sum("Quantity"))
                      .selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity")
                      .orderBy("Date"))
rollupDF.show()


In [None]:
rollupDF.count()

#### CUBE:

In [None]:
from pyspark.sql.functions import sum, to_date, col;
cubeDF = (retail_df.drop()
                      .withColumn("date", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm"))
                      .cube("Date", "Country")
                      .agg(sum("Quantity"))
                      .selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity")
                      .orderBy("Date"))
cubeDF.show()

In [None]:
cubeDF.count()

#### GROUPING METADATA:

In [None]:
from pyspark.sql.functions import sum, to_date, col, grouping_id;

cubeWithGroupIdDF = (retail_df.drop()
#                       .withColumn("date", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm"))
                      .cube("CustomerID", "stockcode")
                      .agg(sum("Quantity"), grouping_id())
                      .selectExpr("CustomerID", "stockcode","`grouping_id()` as gid", "`sum(Quantity)` as total_quantity")
                      .orderBy(col("gid").desc()))
cubeWithGroupIdDF.show()


In [None]:
cubeWithGroupIdDF.count()

In [None]:
rollupWithGroupIdDF = (retail_df.drop()
#                       .withColumn("date", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm"))
                      .rollup("CustomerID", "stockcode")
                      .agg(sum("Quantity"), grouping_id())
                      .selectExpr("CustomerID", "stockcode","`grouping_id()` as gid", "`sum(Quantity)` as total_quantity")
                      .orderBy(col("gid").desc()))
rollupWithGroupIdDF.show()

In [None]:
rollupWithGroupIdDF.count()

#### PIVOT:

In [None]:
from pyspark.sql.functions import sum, to_date, col;
pivoted = retail_df.withColumn('date', to_date(col('invoicedate'), 'dd:mm:yy hh:mm')).groupBy("date").pivot("Country").sum()
pivoted.printSchema()

### Session Stop:

In [None]:
spark.stop()