In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Chocolate Sales').getOrCreate()

In [None]:
# Read the dataset
df_spark = spark.read.option("header", "true").csv("../data/Chocolate Sales.csv", inferSchema = True)

# Another option to read dataset
spark.read.csv("../data/Chocolate Sales.csv", header=True, inferSchema=True).show()

## 1. Introduction

In [None]:
# Check the schema
df_spark.printSchema()

In [None]:
# Get all the column names
df_spark.columns

In [None]:
# Check the first 10 rows
df_spark.head(10)

In [None]:
# Select only the Product column
df_spark.select('Product').show()

In [None]:
# Select multiple columns
df_spark.select(['Product', 'Country', 'Date']).show()

In [None]:
# Check the types of the columns
df_spark.dtypes

In [None]:
# Check the summary of the dataset
df_spark.describe().show()

In [12]:
# Add a new column
df_spark = df_spark.withColumn('New Column', df_spark['Boxes Shipped'] * 2)

In [13]:
# Drop the columns
df_spark = df_spark.drop('New Column')

In [None]:
# Rename the columns
df_spark.withColumnRenamed('Sales Person', 'Salesperson').show()

## 2. Operations

In [None]:
# Drop the missing values

# how parameter
df_spark.na.drop(how = 'any') # drop column if any missing value - default
df_spark.na.drop(how = 'all') # drop column if all missing value

# threshold parameter
df_spark.na.drop(thresh = 2) # specify the minimum number of non-null values requires for a row or column to be retained

# subset parameter
df_spark.na.drop(subset = ['Product']) # drop rows or columns with missing values in the specified columns

In [None]:
# Fill the missing values
df_spark.na.fill('Missing Value') # fill all missing values with the specified value
df_spark.na.fill('Missing Value', subset = ['Product']).show() # fill missing values in the specified columns

In [None]:
# Fill the missing values with the mean value
from pyspark.ml.feature import Imputer
from pyspark.sql.functions import collect_list, avg

df_spark = df_spark.withColumn("Boxes Shipped", df_spark["Boxes Shipped"].cast("double"))

imputer = Imputer(
    inputCols = ['Boxes Shipped'],
    outputCols = ["Boxes Shipped_imputed"]
    ).setStrategy("mean")

imputer.fit(df_spark).transform(df_spark).show()


## 3. Filter

In [18]:
# Casting the data type
from pyspark.sql.functions import regexp_replace
df_spark = df_spark.withColumn("Amount", regexp_replace(df_spark["Amount"], "[$,]", "").cast("double"))

In [None]:
# Filter the dataset
df_spark.filter("Amount > 15000")

# Filter the dataset selecting the columns
df_spark.filter("`Boxes Shipped` < 300").select(['Sales Person', 'Country'])

# Filter the dataset using multiple conditions
df_spark.filter((df_spark['Amount'] > 15000) & 
                (df_spark['Country'] == 'UK')).show()

## 4. GroupBy and Aggregate Functions

In [None]:
## GroupBy operation
df_spark.groupBy('Sales Person').sum().select(['Sales Person', 'sum(Boxes Shipped)']).show()

df_spark.groupBy('Country').mean().select(['Country', 'avg(Amount)']).show()

df_spark.groupBy('Country').count().show()

In [None]:
df_spark.agg({'Amount': 'sum'}).show()

## 5. PySpark ML

In [4]:
training = spark.read.csv("../data/Chocolate Sales.csv", header=True, inferSchema=True)

In [20]:
from pyspark.sql.functions import regexp_replace

training = training.withColumn("Amount", regexp_replace(training["Amount"], "[$,]", "").cast("int"))
training = training.withColumn("Salary", training['Amount'] * training['Boxes Shipped'] * 0.6)

In [None]:
from pyspark.ml.feature import VectorAssembler

feature_assembler = VectorAssembler(
    inputCols = ['Amount', 'Boxes Shipped'],
    outputCol = 'Independent Features'
)

output = feature_assembler.transform(training)
output.show()

In [25]:
finalized_data = output.select('Independent Features', 'Salary')

In [26]:
from pyspark.ml.regression import LinearRegression

train_data, test_data = finalized_data.randomSplit([0.75, 0.25])
regressor = LinearRegression(
    featuresCol='Independent Features',
    labelCol='Salary'
)

regressor = regressor.fit(train_data)

In [None]:
# Get the coefficients
regressor.coefficients

In [None]:
# Get the intercept
regressor.intercept

In [None]:
# Prediction
pred_results = regressor.evaluate(test_data)

pred_results.predictions.show()

In [None]:
pred_results.meanAbsoluteError, pred_results.meanSquaredError