# Exploratory data analysis

## Import packages

In [3]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import os

## set environment variables for aws and avro for spark and bucket link

In [4]:
with open("work/aws_access_key_id") as file:
    os.environ["AWS_ACCESS_KEY_ID"] = file.readline()

In [5]:
with open("work/aws_secret_access_key") as file:
    os.environ["AWS_SECRET_ACCESS_KEY"] = file.readline()

In [6]:
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages org.apache.spark:spark-avro_2.12:3.5.1,org.apache.hadoop:hadoop-aws:3.3.4 pyspark-shell"

In [7]:
url = "s3a://bucket-uci-retail-project-1/bronze/kafka/sales_events/"

## set up spark connection

In [8]:
spark = ( 
         SparkSession.builder
        .appName("Read Avro from S3")
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        .config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .getOrCreate()
)

## print schema of sales events

In [9]:
df = spark.read.format("avro").load(url)
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- OrderList: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- StockCode: string (nullable = true)
 |    |    |-- Description: string (nullable = true)
 |    |    |-- Quantity: integer (nullable = true)
 |    |    |-- UnitPrice: float (nullable = true)
 |-- year: integer (nullable = true)



In [10]:
df.show()

+---------+-------------------+----------+--------------+--------------------+----+
|InvoiceNo|        InvoiceDate|CustomerID|       Country|           OrderList|year|
+---------+-------------------+----------+--------------+--------------------+----+
|   567860|2011-09-22 13:53:00|     18102|United Kingdom|[{22720, SET OF 3...|2011|
|   567861|2011-09-22 14:01:00|       nan|United Kingdom|[{35597A, nan, -1...|2011|
|   567862|2011-09-22 14:02:00|       nan|United Kingdom|[{35591T, nan, -4...|2011|
|   567866|2011-09-22 14:13:00|     16669|United Kingdom|[{22423, REGENCY ...|2011|
|   567868|2011-09-22 14:18:00|     17603|United Kingdom|[{84949, SILVER H...|2011|
|   567869|2011-09-22 14:18:00|     16669|United Kingdom|[{M, Manual, 5, 0...|2011|
|   567873|2011-09-22 14:25:00|     13055|United Kingdom|[{22898, CHILDREN...|2011|
|   567874|2011-09-22 14:26:00|     13055|United Kingdom|[{23196, RETO LEA...|2011|
|   567879|2011-09-22 14:53:00|     16161|United Kingdom|[{20973, 12 PENCI..

## Analysis of non numeric Invoice numbers

In [11]:
df.withColumn(
    "Invoice_trimmed",
    f.expr("substring(InvoiceNo, 1, length(InvoiceNo) - 6)")
).dropDuplicates(
    ["Invoice_trimmed"]
).show()

+---------+-------------------+----------+--------------+--------------------+----+---------------+
|InvoiceNo|        InvoiceDate|CustomerID|       Country|           OrderList|year|Invoice_trimmed|
+---------+-------------------+----------+--------------+--------------------+----+---------------+
|   567860|2011-09-22 13:53:00|     18102|United Kingdom|[{22720, SET OF 3...|2011|               |
|  A563185|2011-08-12 14:50:00|       nan|United Kingdom|[{B, Adjust bad d...|2011|              A|
|  C567884|2011-09-22 15:08:00|     13627|United Kingdom|[{22909, SET OF 2...|2011|              C|
+---------+-------------------+----------+--------------+--------------------+----+---------------+



In [12]:
df.filter(
    f.expr("substring(InvoiceNo, 1, length(InvoiceNo) - 6)") == "A"
).withColumn(
    "Order",
    f.explode("OrderList")
).select(
    "InvoiceNo",
    "Order.*"
).show()

+---------+---------+---------------+--------+---------+
|InvoiceNo|StockCode|    Description|Quantity|UnitPrice|
+---------+---------+---------------+--------+---------+
|  A563185|        B|Adjust bad debt|       1| 11062.06|
|  A563186|        B|Adjust bad debt|       1|-11062.06|
|  A563187|        B|Adjust bad debt|       1|-11062.06|
+---------+---------+---------------+--------+---------+



## Analysis of non-numeric stock codes

In [13]:
df.withColumn(
    "Order",
    f.explode("OrderList")
).select(
    "Order.*"
).withColumn(
    "trimmed_stockcode",
    f.col("StockCode").substr(1, 5)
).withColumn(
    "lower_stockcode",
    f.lower(f.col("StockCode"))
).filter(
    ( f.col("trimmed_stockcode").cast("int").isNull() ) &
    ( f.col("UnitPrice") != 0 )
).dropDuplicates(
    ["lower_stockcode"]
).show(100, truncate=False)

+------------+----------------------------------+--------+---------+-----------------+---------------+
|StockCode   |Description                       |Quantity|UnitPrice|trimmed_stockcode|lower_stockcode|
+------------+----------------------------------+--------+---------+-----------------+---------------+
|AMAZONFEE   |AMAZON FEE                        |-1      |5942.57  |AMAZO            |amazonfee      |
|B           |Adjust bad debt                   |1       |11062.06 |B                |b              |
|BANK CHARGES|Bank Charges                      |1       |15.0     |BANK             |bank charges   |
|C2          |CARRIAGE                          |1       |50.0     |C2               |c2             |
|CRUK        |CRUK Commission                   |-1      |361.59   |CRUK             |cruk           |
|D           |Discount                          |-1      |18.62    |D                |d              |
|DCGS0003    |BOXED GLASS ASHTRAY               |1       |2.46     |DCGS0

## Check the split between model type and version

In [14]:
df.withColumn(
    "Order",
    f.explode("OrderList")
).select(
    "Order.*"
).withColumn(
    "StockCode",
    f.upper(f.col("StockCode"))
).withColumn(
    "trimmed_stockcode",
    f.col("StockCode").substr(1, 5)
).filter(
    ( f.col("trimmed_stockcode").cast("int").isNotNull() ) &
    ( f.col("UnitPrice") != 0 )
).withColumns(
    {
        "ProductModel": f.col("trimmed_stockcode"),
        "ProductVersion": f.when(
            f.length("StockCode") > 5,
            f.col("StockCode").substr(6, 100)
        ),
        "IsSpecial": 
            ( ~f.col("trimmed_stockcode").cast("int").isNotNull() ) &
            ( ~f.col("StockCode").startswith("DCGS") ) &
            ( ~f.col("StockCode").startswith("GIFT") )
    }
).show()

+---------+--------------------+--------+---------+-----------------+------------+--------------+---------+
|StockCode|         Description|Quantity|UnitPrice|trimmed_stockcode|ProductModel|ProductVersion|IsSpecial|
+---------+--------------------+--------+---------+-----------------+------------+--------------+---------+
|    22720|SET OF 3 CAKE TIN...|       8|     3.39|            22720|       22720|          NULL|    false|
|    23243|SET OF TEA COFFEE...|      24|     3.39|            23243|       23243|          NULL|    false|
|    22838|3 TIER CAKE TIN R...|       4|     9.98|            22838|       22838|          NULL|    false|
|    22842|BISCUIT TIN VINTA...|       6|     5.95|            22842|       22842|          NULL|    false|
|    22843|BISCUIT TIN VINTA...|       6|     4.67|            22843|       22843|          NULL|    false|
|    22839|3 TIER CAKE TIN G...|       4|     9.98|            22839|       22839|          NULL|    false|
|    22840|ROUND CAKE TIN VI

## check outliers

In [16]:
df.withColumn(
    "Order",
    f.explode("OrderList")
).select(
    "Order.*"
).filter(
    f.col("UnitPrice") > 10000
).show()

+---------+---------------+--------+---------+
|StockCode|    Description|Quantity|UnitPrice|
+---------+---------------+--------+---------+
|        B|Adjust bad debt|       1| 11062.06|
|        M|         Manual|      -1|  38970.0|
|AMAZONFEE|     AMAZON FEE|      -1| 13541.33|
|AMAZONFEE|     AMAZON FEE|       1| 13541.33|
|AMAZONFEE|     AMAZON FEE|      -1| 13474.79|
|AMAZONFEE|     AMAZON FEE|      -1| 16453.71|
|AMAZONFEE|     AMAZON FEE|      -1| 13541.33|
|AMAZONFEE|     AMAZON FEE|      -1| 16888.02|
+---------+---------------+--------+---------+



In [18]:
df.withColumn(
    "Order",
    f.explode("OrderList")
).select(
    "Order.*"
).filter(
    f.col("UnitPrice") < -10000
).show()

+---------+---------------+--------+---------+
|StockCode|    Description|Quantity|UnitPrice|
+---------+---------------+--------+---------+
|        B|Adjust bad debt|       1|-11062.06|
|        B|Adjust bad debt|       1|-11062.06|
+---------+---------------+--------+---------+

