# Data Exploration and Tests

#### Easily Launch a Spark Session with Spark on Kubernetes Managed in CML

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [2]:
import os
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

In [3]:
spark = SparkSession\
    .builder\
    .appName("PythonSQL")\
    .config("spark.yarn.access.hadoopFileSystems",os.environ['STORAGE'])\
    .config("spark.hadoop.yarn.resourcemanager.principal",os.environ["HADOOP_USER_NAME"])\
    .getOrCreate()

Py4JError: org.apache.spark.api.python.PythonUtils.isEncryptionEnabled does not exist in the JVM

In [None]:
#### Optional Spark Properties to run in Distributed Mode
####    .config("spark.executor.instances", 2)\
####    .config("spark.executor.cores", 2)\
####    .config("spark.executor.memory", "12g")\

### Loading Data with SparkSQL

#### Customers Table

In [None]:
customers_df = spark.sql("SELECT * FROM default.olist_customers")

#### Geolocation Table

In [None]:
geolocation_df = spark.sql("SELECT * FROM default.olist_geolocation")

#### Order Items Table

In [None]:
order_items_df = spark.sql("SELECT * FROM default.olist_order_items")

#### Order Payments Table

In [None]:
order_payments_df = spark.sql("SELECT * FROM default.olist_order_payments")

#### Order Reviews Table

In [None]:
order_reviews_df = spark.sql("SELECT * FROM default.olist_order_reviews")

#### Orders Table

In [None]:
orders_df = spark.sql("SELECT * FROM default.olist_orders")

#### Products Table

In [None]:
products_df = spark.sql("SELECT * FROM default.olist_products")

#### Sellers Table

In [None]:
sellers_df = spark.sql("SELECT * FROM default.olist_sellers")

#### Product Category Translation

In [None]:
prod_cat_translation_df = spark.sql("SELECT * FROM default.product_category_translation")

### Profiling Tables

In [None]:
import pandas as pd
import pandas_profiling
import numpy as np

In [None]:
cust_df = customers_df.toPandas()
pandas_profiling.ProfileReport(cust_df)

In [None]:
df = geolocation_df.toPandas()
pandas_profiling.ProfileReport(df)

### Data Manipulation and Analysis

#### Order Items Table

In [None]:
order_items_df.printSchema()

In [None]:
#Printing number of rows and columns:
print('Dataframe Shape')
print((order_items_df.count(), len(order_items_df.columns)))

In [None]:
#Max value for Freight Value Column
order_items_df.agg(F.max("FREIGHT_VALUE")).show()

In [None]:
from pyspark.ml.feature import Bucketizer
bucketizer = Bucketizer(splits=[ 0, 50, 100, 150, float('Inf') ],inputCol="PRICE", outputCol="buckets")
df_buck = bucketizer.setHandleInvalid("keep").transform(order_items_df)

df_buck.show()

In [None]:
df_plot = df_buck.groupBy('buckets').count().toPandas()
plt.figure(figsize=(8,3))
g = sns.barplot(x="buckets", y="count", data=df_plot)
g.set_title('Order Price Bucket Counts')
plt.show()

#### Orders Table

In [None]:
orders_df.dtypes

In [None]:
from pyspark.sql.functions import to_date

In [None]:
#Extracting month from order delivery date timestamp column 
orders_df.selectExpr("from_unixtime(unix_timestamp(ORDER_DELIVERED_CUSTOMER_ACTUAL,'MMM-yyyy'),'MM') as issue_month").show(4)

In [None]:
orders_df.selectExpr("from_unixtime(unix_timestamp(ORDER_DELIVERED_CUSTOMER_ACTUAL,'MMM-yyyy'),'yyyy') as ORDER_YEAR").show(5)

In [None]:
orders_df = orders_df.withColumn("ORDER_MONTH",F.from_unixtime(F.unix_timestamp(F.col("ORDER_DELIVERED_CUSTOMER_ACTUAL"),'MMM-yyyy'),'MM'))

In [None]:
orders_df = orders_df.withColumn("ORDER_YEAR",F.from_unixtime(F.unix_timestamp(F.col("ORDER_DELIVERED_CUSTOMER_ACTUAL"),'MMM-yyyy'),'yyyy'))

In [None]:
#how many orders were delivered by month:
orders_df.groupby('ORDER_MONTH').count().na.drop().sort(F.asc('ORDER_MONTH')).show()

In [None]:
#how many orders were delivered by month:
orders_df.groupby('ORDER_YEAR').count().na.drop().sort(F.asc('ORDER_YEAR')).show()

In [None]:
plot_df = orders_df.select("ORDER_ID", "ORDER_MONTH", "ORDER_YEAR").groupby("ORDER_MONTH", "ORDER_YEAR").count().na.drop().sort(F.asc("ORDER_MONTH")).toPandas()

In [None]:
plot_df.columns

In [None]:
plot_df.head()

In [None]:
plt.figure(figsize=(8,3))
g = sns.barplot(x="ORDER_MONTH", y="count", hue="ORDER_YEAR", data=plot_df)
g.set_title('Orders Delivered by Month')
g.set_ylabel('Orders Count')
g.set_xlabel('Month')
plt.show()

In [None]:
from statsmodels.graphics.tsaplots import plot_acf

In [None]:
plot_df_sorted = plot_df.sort_values(["ORDER_YEAR", "ORDER_MONTH"])

In [None]:
plot_df_sorted["MM-YYYY"] = plot_df_sorted["ORDER_MONTH"].str.cat(plot_df_sorted["ORDER_YEAR"],sep="-")
plot_df_sorted["count"] = plot_df_sorted["count"].astype(float)

In [None]:
ax = sns.scatterplot(x ="MM-YYYY", y="count", data=plot_df_sorted).set(title='Deliveries 2016 - 2018', xlabel='Month', ylabel='Count')
plt.xticks(rotation=70)
plt.tight_layout()

In [None]:
plot_acf(plot_df.sort_values(["ORDER_YEAR", "ORDER_MONTH"])["count"])
plt.show()

In [None]:
spark.stop()