# Install Spark + import lib + start spark session

In [0]:
from pyspark.sql.functions import *

# Getting data from Kaggle

You may download data from Kaggle mannually or using auto pipeline like this one.

In [0]:
%pip install opendatasets --upgrade

In [0]:
import opendatasets as od

od.download("https://www.kaggle.com/datasets/claudiodavi/superhero-set/data","/Volumes/workspace/default/test/")

In [0]:
%sh ls ./

In [0]:
%sh unzip /dbfs/FileStore/mypath/superhero-set.zip

In [0]:
#important "file:/"

file_path = "/Volumes/workspace/default/test/superhero-set/heroes_information.csv"


df_hero_indi = spark.read.options(header="true",inferschema = "true").csv(file_path)

df_hero_indi.show()

In [0]:
display(df_hero_indi)

In [0]:
file_path2 = "/Volumes/workspace/default/test/superhero-set/super_hero_powers.csv"

df_hero_power = spark.read.options(header="true",inferschema = "true").csv(file_path2)

#df_hero_power.show()
display(df_hero_power)

# Querying + Stat test

In [0]:
df_hero_indi.count()

In [0]:
display(df_hero_indi.select("Race"))

In [0]:
df_hero_indi.select("Race").distinct().show()

In [0]:
display(df_hero_indi.filter(col("Race")=="Cyborg"))

In [0]:
from pyspark.sql.functions import col, countDistinct

df_hero_indi.agg(countDistinct(col("Race"))).show()

In [0]:
from pyspark.sql.functions import col, countDistinct

display(df_hero_indi.agg(*(countDistinct(col(c)).alias(c) for c in df_hero_indi.columns)))

In [0]:
from pyspark.sql import functions as F

df_hero_indi.agg(F.min(col("Weight"))\
              ,F.max(col("Weight"))\
              ,F.avg(col("Weight"))\
              ,F.sum(col("Weight"))\
              ,F.stddev(col("Weight")))\
              .show()

In [0]:
display(
df_hero_indi.groupBy(col("Race")).agg(F.min(col("Weight"))\
              ,F.max(col("Weight"))\
              ,F.avg(col("Weight"))\
              ,F.sum(col("Weight"))\
              ,F.stddev(col("Weight")))\
)
              

## Finding Median
http://infolab.stanford.edu/~datar/courses/cs361a/papers/quantiles.pdf

In [0]:
df_hero_indi.approxQuantile("weight", [0.5], 0.0)

## "Null" **checking**

In [0]:
df_hero_indi.filter(col("Weight").isNull()).show()

## Group by

In [0]:
df_hero_indi.groupBy("Race").count().show()

In [0]:
from pyspark.sql import functions as F

df_hero_indi.groupby(col("Gender")).agg(F.min(col("Weight"))\
              ,F.max(col("Weight"))\
              ,F.avg(col("Weight"))\
              ,F.sum(col("Weight"))\
              ,F.stddev(col("Weight")))\
              .show()

In [0]:
df_hero_weight = df_hero_indi.filter(col("Weight")!=-99).select("weight")
df_hero_weight.show()

In [0]:
df_hero_weight.agg(F.min(col("Weight"))\
              ,F.max(col("Weight"))\
              ,F.avg(col("Weight"))\
              ,F.sum(col("Weight"))\
              ,F.stddev(col("Weight")))\
              .show()

In [0]:
display(df_hero_weight)


In [0]:
import pandas as pd

def compute_histogram(pdf_iter):
    for pdf in pdf_iter:
        hist, bin_edges = pd.cut(pdf['Weight'], bins=11, retbins=True)
        hist_counts = hist.value_counts().sort_index()
        yield pd.DataFrame({'histogram': hist_counts.values})

weight_histogram = df_hero_weight.mapInPandas(compute_histogram, schema="histogram array<double>")

display(weight_histogram)

[0;31m---------------------------------------------------------------------------[0m
[0;31mPythonException[0m                           Traceback (most recent call last)
File [0;32m<command-4776691178928453>, line 11[0m
[1;32m      7[0m         [38;5;28;01myield[39;00m pd[38;5;241m.[39mDataFrame({[38;5;124m'[39m[38;5;124mhistogram[39m[38;5;124m'[39m: hist_counts[38;5;241m.[39mvalues})
[1;32m      9[0m weight_histogram [38;5;241m=[39m df_hero_weight[38;5;241m.[39mmapInPandas(compute_histogram, schema[38;5;241m=[39m[38;5;124m"[39m[38;5;124mhistogram array<double>[39m[38;5;124m"[39m)
[0;32m---> 11[0m display(weight_histogram)

File [0;32m/databricks/python_shell/lib/dbruntime/display.py:142[0m, in [0;36mDisplay.display[0;34m(self, input, *args, **kwargs)[0m
[1;32m    140[0m [38;5;66;03m# This version is for Serverless + Spark Connect dogfooding.[39;00m
[1;32m    141[0m [38;5;28;01melif[39;00m [38;5;28mself[39m[38;5;241m.[39mspark_connec

(Using Pandas and plot for showing graph)

## Joinning (yes, same as join in SQL)

In [0]:
#Rename to match
df_power = df_hero_power.withColumnRenamed("hero_names","name")

df_power.show()

In [0]:
df_joined = df_hero_indi.join(df_power, on="name",how="left")
display(df_joined)

# Basic Transformation

## New conditional column

Due to WORM (write once read many) so normally we will not alter df, we would add with new column

In [0]:
from pyspark.sql.functions import col, expr, when

new_column = F.when(col("Race")=="-","null").otherwise(col("Race"))

df_test_nc = df_hero_indi.withColumn("clean_Race",new_column)
df_test_nc.show()

Apply same concept to clean null

In [0]:
from pyspark.sql.functions import col, expr, when

new_column = F.when(col("weight").isNull(),-99).otherwise(col("weight"))

df_test_nc = df_hero_indi.withColumn("clean_weight1",new_column)
df_test_nc.show()

## UDF: User defined function(s)
Spark does not support direct calculation to each cell values so there is some reway to do calculation, in distribution mode.

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import udf

def lbs2kg(lbs):
    return lbs*0.4536

lbs2kg_udf = udf(lbs2kg, FloatType())
df_test = df_hero_indi.withColumn('weight_in_kg',lbs2kg_udf(df_hero_indi["weight"]))
df_test.show()

## Binarizer

In [0]:
from pyspark.ml.feature import Binarizer

binarizer = Binarizer(threshold=112.25, inputCol="Weight", outputCol="binarized_weight")
binarizedDataFrame = binarizer.transform(df_hero_indi)
binarizedDataFrame.show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JError[0m                                 Traceback (most recent call last)
File [0;32m<command-4776691178928408>, line 3[0m
[1;32m      1[0m [38;5;28;01mfrom[39;00m [38;5;21;01mpyspark[39;00m[38;5;21;01m.[39;00m[38;5;21;01mml[39;00m[38;5;21;01m.[39;00m[38;5;21;01mfeature[39;00m [38;5;28;01mimport[39;00m Binarizer
[0;32m----> 3[0m binarizer [38;5;241m=[39m Binarizer(threshold[38;5;241m=[39m[38;5;241m112.25[39m, inputCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mWeight[39m[38;5;124m"[39m, outputCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mbinarized_weight[39m[38;5;124m"[39m)
[1;32m      4[0m binarizedDataFrame [38;5;241m=[39m binarizer[38;5;241m.[39mtransform(df_hero_indi)
[1;32m      5[0m binarizedDataFrame[38;5;241m.[39mshow()

File [0;32m/databricks/python/lib/python3.11/site-packages/pyspark/__init__.py:120[0m, in [0;36mkeyword_only.<loc

## Quatile / Percentile

In [0]:
bounds = {
    c: dict(
        zip(["q1", "q3"], df_hero_weight.approxQuantile(c, [0.25, 0.75], 0))
    )
    for c in df_hero_weight.columns
}

print(bounds)

Using quatile to check outlier

In [0]:
for c in bounds:
    iqr = bounds[c]['q3'] - bounds[c]['q1']
    bounds[c]['lower'] = bounds[c]['q1'] - (iqr * 1.5)
    bounds[c]['upper'] = bounds[c]['q3'] + (iqr * 1.5)
print(bounds)

In [0]:
import pyspark.sql.functions as f
df_hero_weight.select(
    "*",
    *[
        f.when(
            f.col(c).between(bounds[c]['lower'], bounds[c]['upper']),
            0
        ).otherwise(1).alias(c+"_out") 
        for c in df_hero_weight.columns
    ]
).show()

Advanced solution for percentile / quatile

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
from pyspark.sql import DataFrameStatFunctions as statFunc
import numpy as np

from pyspark.sql import Column
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql.types import *
import ast


class Discretize:
    @staticmethod
    def threshold_index(col_val, threshold: Column, threshold_str: bool = False):
        if threshold_str:
            # convert list that represent as string to normal list
            threshold = ast.literal_eval(threshold)
        for i, val_i in enumerate(threshold):
            current = threshold[i]
            if i > 0:
                previous = threshold[i - 1]
                if col_val > previous and col_val <= current:
                    result = int(i)
                elif col_val > previous and col_val > current:
                    # for threshold cutoff (extend positive limit bound)
                    result = int(i) + 1
            if i == 0 and col_val <= current:
                result = int(i)
        return result

    @staticmethod
    def human_score(x, y):
        return (int(y) - int(x))

    @staticmethod
    def indexer(df_in, columnname, x, output_name, invert: bool = True):

        threshold_index_udf = udf(Discretize.threshold_index, IntegerType())
        human_score_udf = udf(Discretize.human_score, IntegerType())

        index = list(np.linspace(1. / x, 1, x))
        pthvalue = statFunc(df_in).approxQuantile(columnname, index, 0.0)  # get list of cutoff nth //
        df_out = df_in.withColumn("pth", array([lit(df_in) for df_in in pthvalue]))
        df_out = df_out.withColumn('ranking', threshold_index_udf(columnname, "pth"))
        if invert:
            df_out = df_out.withColumn("maxpth", lit(x))
            df_out = df_out.withColumn(output_name, col("maxpth") - col("ranking")).drop("maxpth")
        else:
            df_out = df_out.withColumn(output_name, lit("ranking"))
        df_out = df_out.drop("pth")
        return df_out

In [0]:
output_data=Discretize.indexer(df_hero_weight,"Weight",100,"Percnetile_weight")
output_data.show()

## Numerical to categorical

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

lookup = spark.createDataFrame(
    [(-100.0,0.000,"NA"),
     (0.001,50.00,"0-50 Lbs"),
     (50.00,100.00,"51-100 Lbs"),
     (100.00,200.00,"101-200 Lbs"),
     (200.00,300.00,"201-300 Lbs"),
     (300.00,400.00,"301-400 Lbs"),
     (400.00,500.00,"401-500 Lbs"),
     (500.00,600.00,"501-600 Lbs"),
     (600.00,1000.00,"600+ Lbs")],
    ("b","t","weight_grp"))
    
df_test_grp = df_hero_indi\
    .join(lookup,[F.col("weight")>=F.col("b"),F.col("weight") < F.col("t")],"leftouter")
  
df_test_grp.groupby("weight_grp").count().orderBy("weight_grp").show()

In [0]:
df_test_grp2 = df_test_nc\
    .join(lookup,[F.col("clean_weight1")>=F.col("b"),F.col("clean_weight1") < F.col("t")],"leftouter")
  
df_test_grp2.groupby("weight_grp").count().orderBy("weight_grp").show()

## Standardization

In [0]:
from pyspark.mllib.util import MLUtils
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="Weight", outputCol="scaled_weight",
                        withStd=True, withMean=False)

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(df_hero_indi)

# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(df_hero_indi)
scaledData.show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JError[0m                                 Traceback (most recent call last)
File [0;32m<command-4776691178928421>, line 4[0m
[1;32m      1[0m [38;5;28;01mfrom[39;00m [38;5;21;01mpyspark[39;00m[38;5;21;01m.[39;00m[38;5;21;01mmllib[39;00m[38;5;21;01m.[39;00m[38;5;21;01mutil[39;00m [38;5;28;01mimport[39;00m MLUtils
[1;32m      2[0m [38;5;28;01mfrom[39;00m [38;5;21;01mpyspark[39;00m[38;5;21;01m.[39;00m[38;5;21;01mml[39;00m[38;5;21;01m.[39;00m[38;5;21;01mfeature[39;00m [38;5;28;01mimport[39;00m StandardScaler
[0;32m----> 4[0m scaler [38;5;241m=[39m StandardScaler(inputCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mWeight[39m[38;5;124m"[39m, outputCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mscaled_weight[39m[38;5;124m"[39m,
[1;32m      5[0m                         withStd[38;5;241m=[39m[38;5;28;01mTrue[39;00m, withMean[38;5;241m=[39m[38;5

More reading: https://spark.apache.org/docs/1.4.1/ml-features.html

#Lab

Try to utilize spark as much as possible

## Ingest data

Data set: [here](https://www.kaggle.com/mashlyn/online-retail-ii-uci)

In [0]:
from google.colab import drive
drive.mount('/content/gdrive')

import os
os.environ['KAGGLE_CONFIG_DIR'] = "/content/gdrive/My Drive/Kaggle"

#Adding API code here

#Unzip and delete zip file
!unzip ##  && rm ##

## Data Description

This Online Retail II data set contains all the transactions occurring for a UK-based and registered, 
non-store online retail between 01/12/2009 and 09/12/2011. The company mainly sells unique all-occasion gift-ware. 
Many customers of the company are wholesalers.

Attribute Information:

- InvoiceNo: Invoice number. Nominal. A 6-digit integral number uniquely assigned to each transaction. If this code starts with the letter 'c', it indicates a cancellation.
- StockCode: Product (item) code. Nominal. A 5-digit integral number uniquely assigned to each distinct product.
- Description: Product (item) name. Nominal.
- Quantity: The quantities of each product (item) per transaction. Numeric.
- InvoiceDate: Invice date and time. Numeric. The day and time when a transaction was generated.
- UnitPrice: Unit price. Numeric. Product price per unit in sterling (Â£).
- CustomerID: Customer number. Nominal. A 5-digit integral number uniquely assigned to each customer.
- Country: Country name. Nominal. The name of the country where a customer resides.

### 1. Explore the Data: Check NULL values, Check for outliers, and highlight

In [0]:
#code here
[]

## For all questions, assume the current date is 10/12/2011

### 2. Find an average basket size of customer in each country in the year 2010

#### Basket size = Total Sales Amount / Total Number of Invoices

Hint: df.select(to_date(df.STRING_COLUMN).alias('new_date')).show()

###  3. Does the basket size in each country change over time? Which country has the highest growth in terms of both sales amount and basket size in the past 6 months?

### 4. Monitor weekly sales and visit by country, Past 1 week, Past 2 weeks, Past 4 weeks, Year-to-date
#### Create a report that includes the following columns:
- Country
- Number of Customers in past 1 week
- Number of Customers in past 2 weeks
- Number of Customers in past 4 weeks
- Number of Customers accumulated since 01/01/2011
- Sales amount in past 1 week
- Sales amount in past 2 weeks
- Sales amount in past 4 weeks
- Sales amount since 01/01/2011
- Number of Invoices in past 1 week
- Number of Invoices in past 2 weeks
- Number of Invoices in past 4 weeks
- Number of Invoices since 01/01/2011

### 5. Find the average number of days since last visit of the customer in each country