In [None]:
#initial config work

import os
import sys
        
# add working directory
os.chdir(os.getcwd())

# Create a variable for our root path
SPARK_HOME = os.environ['SPARK_HOME']

#Add the following paths to the system path.
sys.path.insert(0,os.path.join(SPARK_HOME,"python"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","pyspark.zip"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","py4j-0.10.7-src.zip"))

In [None]:
#create spark session
from pyspark.sql import SparkSession

spark = (SparkSession.builder
                     .master("local")
                     .appName("scratch")
                     .config("spark.executor.memory", "1g")
                     .config("spark.cores.max", "2")
                     .getOrCreate())

In [None]:

retail_df = (spark.read.csv('./../input-data/test-data/retail.csv',
                                schema=None,
                                sep=",",
                                inferSchema=True,
                                header=True))
retail_df.printSchema()

#### checking if broadcast partition causes a new stage

In [None]:
df1 = spark.range(0,10000,2)
df2 = spark.range(0,10000,2)

In [None]:
df3 = df1.selectExpr('(id*5) as id')
df4 = df3.join(df2, 'id')

In [None]:
df5 = df4.repartition(3)

In [None]:
from pyspark.sql.functions import sum, col
df6 = df5.agg(sum(col('id')))

In [None]:
df6.show(1,False)

### Working with JSON :

In [None]:
categoryData = (spark.range(1)
                     .selectExpr("""
                                     '
                                         {"categories" :

                                            [{"id": 1, 
                                            "name" : "cat_1", 
                                            "sections":[
                                                {
                                                "id": 1,
                                                "name": "sec_1",
                                                "articles":[{
                                                                "id":11,
                                                                "name": "art_11"
                                                            },
                                                            {
                                                                "id":12,
                                                                "name": "art_12"
                                                            },
                                                            {
                                                                "id":13,
                                                                "name": "art_13"
                                                            }]
                                                },
                                                {
                                                "id": 2,
                                                "name": "sec_2",
                                                "articles":[{
                                                                "id":21,
                                                                "name": "art_21"
                                                            },
                                                            {
                                                                "id":22,
                                                                "name": "art_22"
                                                            },
                                                            {
                                                                "id":23,
                                                                "name": "art_23"
                                                            }]
                                                }]
                                                },
                                            {"id": 2,
                                            "name" : "cat_2", 
                                            "sections":[
                                                {
                                                "id": 3,
                                                "name": "sec_3",
                                                "articles":[{
                                                                "id":31,
                                                                "name": "art_31"
                                                            },
                                                            {
                                                                "id":32,
                                                                "name": "art_32"
                                                            },
                                                            {
                                                                "id":33,
                                                                "name": "art_33"
                                                            }]
                                                },
                                                {
                                                "id": 4,
                                                "name": "sec_4",
                                                "articles":[{
                                                                "id":41,
                                                                "name": "art_41"
                                                            },
                                                            {
                                                                "id":42,
                                                                "name": "art_42"
                                                            },
                                                            {
                                                                "id":43,
                                                                "name": "art_43"
                                                            }]
                                                }]
                                                }
                                                ]
                                                }'
                                                    as categoryData
                                            
                                         """))

In [None]:
from pyspark.sql.functions import get_json_object, json_tuple

flatdata_df = (categoryData.withColumn('article_id',get_json_object(col('categoryData'), '$.categories[*].sections[*].articles[*].id'))
                           .withColumn('section_id',get_json_object(col('categoryData'), '$.categories[*].sections[*].id'))
                           .withColumn('category_id',get_json_object(col('categoryData'), '$.categories[*].id')))


In [None]:
from pyspark.sql.functions import explode
flatdata_df.select('article_id', 'section_id', 'category_id').show()

In [None]:
spark.stop()

df6.explain()

### WINDOW:

In [None]:
#create a window function

from pyspark.sql.window import Window
from pyspark.sql.functions import col, date_format, desc, dense_rank, rank, max

#convert date format on retail_df
transform_step1 = (retail_df.withColumn('InvoiceDate', 
                                      date_format(col("InvoiceDate"), "MM/dd/yyyy H:mm")))

#window function
window_function = (Window.partitionBy("CustomerId")
                   .orderBy(desc("Quantity"))
                   .rowsBetween(Window.unboundedPreceding, Window.currentRow))


#aggregate functions
max_purchase_quantity = max(col("Quantity")).over(window_function)


#rank functions
purchase_dense_rank = dense_rank().over(window_function)
purchase_rank = rank().over(window_function)

transformed_df = (retail_df.withColumn('InvoiceDate', date_format(col("InvoiceDate"), "MM/dd/yyyy H:mm"))
                           .where("CustomerId IS NOT NULL")
                           .orderBy("CustomerId")
                           .select(col("CustomerId"),
                                   col("InvoiceDate"),
                                   col("Quantity"),
                                   purchase_rank.alias("quantityRank"),
                                   purchase_dense_rank.alias("quantityDenseRank"),
                                   max_purchase_quantity.alias("maxPurchaseQuantity")))

transformed_df.show(10)

In [None]:
transformed_df.show(10).explain()

### GROUP BY:

In [None]:
from pyspark.sql.functions import sum, to_date, col;
groupByDF = (retail_df.drop()
                      .withColumn("date", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm"))
                      .groupBy("Date", "Country")
                      .agg(sum("Quantity"))
                      .selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity")
                      .orderBy("Date"))
groupByDF.show()

In [None]:
groupByDF.count()

### ROLL UP:

In [None]:
from pyspark.sql.functions import sum, to_date, col;
rollupDF = (retail_df.drop()
                      .withColumn("date", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm"))
                      .rollup("Date", "Country")
                      .agg(sum("Quantity"))
                      .selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity")
                      .orderBy("Date"))
rollupDF.show()


In [None]:
rollupDF.count()

### CUBE:

In [None]:
from pyspark.sql.functions import sum, to_date, col;
cubeDF = (retail_df.drop()
                      .withColumn("date", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm"))
                      .cube("Date", "Country")
                      .agg(sum("Quantity"))
                      .selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity")
                      .orderBy("Date"))
cubeDF.show()

In [None]:
cubeDF.count()

### GROUPING METADATA:

In [None]:
from pyspark.sql.functions import sum, to_date, col, grouping_id;

cubeWithGroupIdDF = (retail_df.drop()
#                       .withColumn("date", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm"))
                      .cube("CustomerID", "stockcode")
                      .agg(sum("Quantity"), grouping_id())
                      .selectExpr("CustomerID", "stockcode","`grouping_id()` as gid", "`sum(Quantity)` as total_quantity")
                      .orderBy(col("gid").desc()))
cubeWithGroupIdDF.show()


In [None]:
cubeWithGroupIdDF.count()

In [None]:
rollupWithGroupIdDF = (retail_df.drop()
#                       .withColumn("date", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm"))
                      .rollup("CustomerID", "stockcode")
                      .agg(sum("Quantity"), grouping_id())
                      .selectExpr("CustomerID", "stockcode","`grouping_id()` as gid", "`sum(Quantity)` as total_quantity")
                      .orderBy(col("gid").desc()))
rollupWithGroupIdDF.show()

In [None]:
rollupWithGroupIdDF.count()

### PIVOT:

In [None]:
from pyspark.sql.functions import sum, to_date, col;
pivoted = retail_df.withColumn('date', to_date(col('invoicedate'), 'dd:mm:yy hh:mm')).groupBy("date").pivot("Country").sum()
pivoted.printSchema()

### Basic tests

In [None]:
from pyspark.sql.functions import expr, locate

simpleColors = ["black", "white", "red", "green", "blue"]

def color_locator(column, color_string):
    return (locate(color_string.upper(), column)
                            .cast("boolean")
                            .alias("is_" + color_string))


selectedColumns = [color_locator(df.Description, c) for c in simpleColors]



In [None]:
selectedColumns

In [None]:
selectedColumns.append(expr("*"))

In [None]:
df.select(selectedColumns)

In [None]:
df.select(selectedColumns).where(expr("is_white OR is_red")).select("is_white").show(3, False)

In [None]:
from pyspark.sql.functions import struct, expr
complexDf = df.withColumn('ComplexCountry',expr('(Country,(CustomerId,Description))'))
complexDf.select('$ComplexCountry.CustomerId').show(2, False)

In [None]:
from pyspark.sql.functions import split, explode
descSplits = split(expr('Description')," ").alias('splits')
descExplodes = explode(split(expr('Description')," ").alias('splits')).alias('explodes')
df.select(expr('Description'), descSplits, descExplodes).show(10,False)

In [None]:
jsonDF = spark.range(1).selectExpr("""
'{"myJSONKey" : {"myJSONValue" : [1, 2, 3]}}' as jsonString""")

In [None]:
from pyspark.sql.functions import get_json_object, json_tuple, col
jsonDF.select(get_json_object(col("jsonString"),"$.myJSONKey.myJSONValue[1]").alias("column"), json_tuple(col("jsonString"), "myJSONKey")).show(2,False)

In [None]:
from pyspark.sql.functions import col, expr

(retail_df.where(col('Quantity') > 3).show(4))

In [None]:
spark