# Data Processing

## Import modules

In [None]:
# import SparkSession
from pyspark.sql import SparkSession

In [None]:
import pyspark.sql.functions as fn
from pyspark.sql.types import StringType,DoubleType,IntegerType

## Set spark session

In [None]:
spark.sparkContext.appName

## Load data

In [None]:
# Load csv Dataset 
df=spark.read.csv('data/sample_data.csv',inferSchema=True,header=True)
df.createOrReplaceTempView("dfTable")

## Inspect data

In [None]:
# columns of dataframe
df.columns

In [None]:
# shape of dataset
df.count(),len(df.columns)

In [None]:
# print dataframe schema
df.printSchema()

In [None]:
# display fisrt few rows of dataframe
df.show()
#df.show(10)

## Grouping data

In [None]:
# group by one column
df.groupBy('mobile').count().show(5)

In [None]:
# sort value counts
df.groupBy('mobile').count().orderBy('count',ascending=False).show(5)

In [None]:
# calculate statistical measures
df.groupBy('mobile').mean().show(5)

In [None]:
# calculate statistical measures
df.groupBy('mobile').sum().show()

In [None]:
# calculate statistical measures
df.groupBy('mobile').max().show()

In [None]:
# calculate statistical measures
df.groupBy('mobile').min().show()

In [None]:
# use spark sql
spark.sql('''select mobile, count(*) as count from dfTable
        group by mobile''').show()

In [None]:
# use spark sql
spark.sql('''select mobile, min(experience), min(age) from dfTable
        group by mobile''').show()

In [None]:
# Aggregation
df.groupBy('mobile').agg({'experience':'sum'}).show()

## Load and Inspect data

In [None]:
# Load csv Dataset
rtdf=spark.read.csv('data/online_retail_dataset.csv',inferSchema=True,header=True)
rtdf.createOrReplaceTempView("rtTable")

In [None]:
# columns of dataframe
rtdf.columns

In [None]:
# shape of dataset
rtdf.count(),len(rtdf.columns)

In [None]:
# print dataframe schema
rtdf.printSchema()

In [None]:
# display fisrt few rows of dataframe
rtdf.show()
#rtdf.show(10)

In [None]:
# counting
rtdf.select(fn.count('StockCode')).show() 

In [None]:
# distinct count
rtdf.select(fn.countDistinct('StockCode')).show()

In [None]:
# get minimun and maximun
rtdf.select(fn.min("Quantity"), fn.max("Quantity")).show()

In [None]:
# Variance and Standard Deviation
rtdf.select(fn.var_pop('Quantity'), fn.var_samp('Quantity'),
        fn.stddev_pop('Quantity'), fn.stddev_samp('Quantity')).show()

In [None]:
# use sql
spark.sql('''select count(StockCode) from rtTable''').show()

In [None]:
# Covariance and Correlation
rtdf.select(fn.corr('InvoiceNo', 'Quantity'), fn.covar_samp('InvoiceNo', 'Quantity'),
        fn.covar_pop('InvoiceNo', 'Quantity')).show()

In [None]:
# count with groupby
rtdf.groupBy("InvoiceNo", "CustomerId").count().show(5)

In [None]:
# agg function
rtdf.groupBy('InvoiceNo').agg({'Quantity':'count'}).show(5)

In [None]:
# agg function
rtdf.groupBy('InvoiceNo').agg(fn.count('Quantity').alias('quan'),
        fn.expr('count(Quantity)')).show(5)

In [None]:
# agg function
rtdf.groupBy('InvoiceNo').agg({'Quantity':'min', 'UnitPrice':'max'}).show(5)

In [None]:
# agg function
rtdf.groupBy('InvoiceNo').agg(fn.max('Quantity'),
        fn.min('Quantity')).show(5)

## UDF

In [None]:
# UDF
from pyspark.sql.functions import udf

In [None]:
# normal function 
def price_range(brand):
    if brand in ['Samsung','Apple']:
        return 'High Price'
    elif brand =='MI':
        return 'Mid Price'
    else:
        return 'Low Price'

In [None]:
# create udf using python function
brand_udf=udf(price_range,StringType())

In [None]:
# apply udf on dataframe
df.withColumn('price_range',brand_udf(df['mobile'])).show(10,False)

In [None]:
# using lambda function
age_udf = udf(lambda age: "young" if age <= 30 else "senior", StringType())

In [None]:
# apply udf on dataframe
df.withColumn("age_group", age_udf(df.age)).show(10)

## Saving file

In [None]:
# save the dataframe as single csv 
df.coalesce(1).write.csv('data/df_data.csv', header='True', mode='overwrite')

In [None]:
# save the data into parquet format 
rtdf.write.parquet('data/retail_dataset_parquet', mode='overwrite')

In [None]:
# read the data from parquet format 
rtdf2=spark.read.parquet('data/retail_dataset_parquet')

In [None]:
rtdf2.show(10)

## Outlier

In [None]:
import numpy as np

In [None]:
wdf = spark.read.csv('data/winequality_white.csv',sep=';',inferSchema=True,header=True)

In [None]:
# columns of dataframe
wdf.columns

In [None]:
# shape of dataset
wdf.count(),len(wdf.columns)

In [None]:
# print dataframe schema
wdf.printSchema()

In [None]:
# display fisrt few rows of dataframe
#wdf.show()
wdf.show(10)

In [None]:
wdf.select('pH','sulphates','chlorides').summary().show()

In [None]:
bounds = {
    c: dict(
        zip(["q1", "q3"], wdf.approxQuantile(c, [0.25, 0.75], 0))
    )
    for c in ['pH']
}

In [None]:
bounds

In [None]:
for c in bounds:
    iqr = bounds[c]['q3'] - bounds[c]['q1']
    bounds[c]['lower'] = bounds[c]['q1'] - (iqr * 1.5)
    bounds[c]['upper'] = bounds[c]['q3'] + (iqr * 1.5)

In [None]:
bounds

In [None]:
wdf2 = wdf.select(
    '*',
    *[
        fn.when(
            fn.col(c).between(bounds[c]['lower'], bounds[c]['upper']),
            0
        ).otherwise(1).alias(c+"_out") 
        for c in ['pH']
    ]
)

In [None]:
wdf2.select('pH', 'pH_out').show(10)

In [None]:
wdf2.select('pH','pH_out').filter('pH_out==1').show(10)

In [None]:
wdf2.select('pH','pH_out').where('pH_out == 1').count()

In [None]:
wdf2.createOrReplaceTempView("wdf2Table")

In [None]:
spark.sql('''select pH,pH_out from wdf2Table where pH_out=1 limit 10''').show()

In [None]:
# save the data into parquet format 
wdf2.write.csv('data/wdf2', header='True', mode='overwrite')

In [None]:
# Load csv Dataset
wdf3=spark.read.csv('data/wdf2',inferSchema=True,header=True)
wdf3.createOrReplaceTempView("wdf4Table")

In [None]:
wdf3.filter('pH_out==1').show(15)