In [None]:
from pyspark.sql import SparkSession
import numpy as np
import pandas as pd
import json
import pyspark.sql.functions as F
import pyspark.sql
from pyspark.sql.functions import col, skewness, kurtosis
from pyspark.context import SparkContext
from pyspark.sql.functions import * 
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.functions import when
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.functions import from_unixtime, unix_timestamp
from pyspark.sql.types import StringType
from datetime import date, timedelta, datetime
import time

In [None]:
# Set up a SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("capstone").getOrCreate()

In [None]:
def load_data(data):
    t1=time.time()
    dat = spark.read.options(header=True, inferSchema=True).csv(data)
    t2=time.time()
    print("Duration:", np.round((t2-t1), 2), "seconds")
    return(dat)

In [None]:
df1 = load_data('OnlineRetail.csv')

Duration: 4.66 seconds


In [None]:
def datashape(data):
    print("Data shape (rows, columns):", data.count(), "x", len(data.columns))
    
datashape(df1)
#datashape(df2)

Data shape (rows, columns): 541909 x 8


In [None]:
df1.limit(5).show()

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+



In [None]:
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

In [None]:
summary = df1.describe().toPandas()
summary = summary.T
summary.columns = summary.iloc[0]
summary = summary.drop(summary.index[0])
summary

summary,count,mean,stddev,min,max
InvoiceNo,541909,559965.752026781,13428.417280805148,536365,C581569
StockCode,541909,27623.240210938104,16799.73762842771,10002,m
Description,540455,20713.0,,4 PURPLE FLOCK DINNER CANDLES,wrongly sold sets
Quantity,541909,9.55224954743324,218.08115785023355,-80995,80995
InvoiceDate,541909,,,1/10/2011 10:04,9/9/2011 9:52
UnitPrice,541909,4.611113626083471,96.75985306117803,-11062.06,38970.0
CustomerID,406829,15287.690570239583,1713.6003033216148,12346,18287
Country,541909,,,Australia,Unspecified


In [None]:
df1.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df1.columns])

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
0,0,1454,0,0,0,135080,0


In [None]:
df1 = df1.replace(['EIRE'],['Ireland'])

In [None]:
df1 = df1.withColumn("Quantity", col("Quantity").cast("Float"))

In [None]:
df1.groupby("InvoiceDate").count().sort("InvoiceDate", ascending=True).limit(10)

InvoiceDate,count
1/10/2011 10:04,1
1/10/2011 10:07,1
1/10/2011 10:08,1
1/10/2011 10:32,23
1/10/2011 10:35,17
1/10/2011 10:36,1
1/10/2011 10:44,1
1/10/2011 10:58,46
1/10/2011 11:09,19
1/10/2011 11:22,42


In [None]:
df1 = df1.withColumn("Amount", col("Quantity") * col("UnitPrice"))
df1.limit(5)

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country,Amount
536365,85123A,WHITE HANGING HEA...,6.0,12/1/2010 8:26,2.55,17850,United Kingdom,15.3
536365,71053,WHITE METAL LANTERN,6.0,12/1/2010 8:26,3.39,17850,United Kingdom,20.34
536365,84406B,CREAM CUPID HEART...,8.0,12/1/2010 8:26,2.75,17850,United Kingdom,22.0
536365,84029G,KNITTED UNION FLA...,6.0,12/1/2010 8:26,3.39,17850,United Kingdom,20.34
536365,84029E,RED WOOLLY HOTTIE...,6.0,12/1/2010 8:26,3.39,17850,United Kingdom,20.34


In [None]:
df1.groupBy("Country").count().sort("count", ascending=False).limit(10)

Country,count
United Kingdom,495478
Germany,9495
France,8557
Ireland,8196
Spain,2533
Netherlands,2371
Belgium,2069
Switzerland,2002
Portugal,1519
Australia,1259


In [None]:
df1[df1.Country.isin('Ireland')].limit(5)

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country,Amount
536540,22968,ROSE COTTAGE KEEP...,4.0,12/1/2010 14:05,9.95,14911,Ireland,39.8
536540,85071A,BLUE CHARLIE+LOLA...,6.0,12/1/2010 14:05,2.95,14911,Ireland,17.700000000000003
536540,85071C,"""CHARLIE+LOLA""""EX...",6.0,12/1/2010 14:05,2.55,14911,Ireland,15.3
536540,22355,CHARLOTTE BAG SUK...,50.0,12/1/2010 14:05,0.85,14911,Ireland,42.5
536540,21579,LOLITA DESIGN C...,6.0,12/1/2010 14:05,2.25,14911,Ireland,13.5


In [None]:
df1.filter(df1.Quantity > 50000)

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country,Amount
541431,23166,MEDIUM CERAMIC TO...,74215.0,1/18/2011 10:01,1.04,12346,United Kingdom,77183.6
581483,23843,"PAPER CRAFT , LIT...",80995.0,12/9/2011 9:15,2.08,16446,United Kingdom,168469.6


In [None]:
df1.groupby("InvoiceDate").count().sort("InvoiceDate", ascending=True).limit(10)

InvoiceDate,count
1/10/2011 10:04,1
1/10/2011 10:07,1
1/10/2011 10:08,1
1/10/2011 10:32,23
1/10/2011 10:35,17
1/10/2011 10:36,1
1/10/2011 10:44,1
1/10/2011 10:58,46
1/10/2011 11:09,19
1/10/2011 11:22,42
