# In this tutorial you will learn
- how to create spark session
- how to load data and check schema
- how to check duplicate records
- how to check missing values
- how to extract date and time from datetime column
- how to create new columns
- how to group, aggregate, and filter data
- how to make meaningful business insights

Author: Ivy Wang  http://www.codewithivy.com

## 1. Preparation

In [1]:
from pyspark.sql import SparkSession, SQLContext
spark = SparkSession.builder.appName('tutorial').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/13 12:19:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# read dataset and show the header
df = spark.read.option('header','true').csv('/Users/ivyw/pyspark/Online_Retail.csv')
df.show()

                                                                                

+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity| InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/10 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/10 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/10 8:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/10 8:26|     4.25|     17850|United

In [3]:
# create a new column names Sales. Sales = Quantity * UnitPrice

from pyspark.sql.functions import col, round
df = df.withColumn('Sales',col('Quantity') * col('UnitPrice'))
df = df.withColumn('Sales', round(col('Sales'),2))
df.show()

+---------+---------+--------------------+--------+------------+---------+----------+--------------+-----+
|InvoiceNo|StockCode|         Description|Quantity| InvoiceDate|UnitPrice|CustomerID|       Country|Sales|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+-----+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/10 8:26|     2.55|     17850|United Kingdom| 15.3|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|20.34|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/10 8:26|     2.75|     17850|United Kingdom| 22.0|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|20.34|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|20.34|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/10 8:26|     7.65|     17850|United Kingdom| 15.3|
|   536365|    21730|GLASS STAR FROST

## 2. Start Analysis with SQL Statement

In [4]:
# register the dataset into sql 
df.createOrReplaceTempView("df")

In [5]:
# how many unique orders are there in the dataset?
spark.sql('''
     SELECT COUNT(DISTINCT InvoiceNo) as counts_of_distinct_order
    FROM df
    ''').show()



+------------------------+
|counts_of_distinct_order|
+------------------------+
|                   25900|
+------------------------+



                                                                                

In [6]:
# What is the total sales in the dataset?
spark.sql('''
    SELECT SUM(Sales) AS total_sales
    FROM df
          ''').show()



+-----------------+
|      total_sales|
+-----------------+
|9747747.929999102|
+-----------------+



                                                                                

In [7]:
# the top 10 most spends cutomers
spark.sql('''
          SELECT CustomerID, SUM(Sales) as sales
          FROM df
          GROUP BY CustomerID
          ORDER BY Sales DESC
          LIMIT 11
          ''').show()



+----------+------------------+
|CustomerID|             sales|
+----------+------------------+
|      NULL| 1447682.120000056|
|     14646| 279489.0199999999|
|     18102|256438.48999999996|
|     17450|187482.16999999998|
|     14911|132572.62000000005|
|     12415|123725.44999999998|
|     14156|113384.14000000004|
|     17511|          88125.38|
|     16684| 65892.07999999999|
|     13694|62653.100000000006|
|     15311|59419.339999999975|
+----------+------------------+



                                                                                

In [8]:
# find the min and max of InvoiceDate
df.describe('InvoiceDate').show()



+-------+-------------+
|summary|  InvoiceDate|
+-------+-------------+
|  count|       541909|
|   mean|         NULL|
| stddev|         NULL|
|    min|1/10/11 10:04|
|    max|  9/9/11 9:52|
+-------+-------------+



                                                                                

In [9]:
# how many customers are there in each country?

spark.sql('''
          SELECT Country, COUNT(DISTINCT CustomerID) AS number_of_customer
          FROM df
          GROUP BY Country
          ORDER BY number_of_customer DESC
          ''').show()



+---------------+------------------+
|        Country|number_of_customer|
+---------------+------------------+
| United Kingdom|              3950|
|        Germany|                95|
|         France|                87|
|          Spain|                31|
|        Belgium|                25|
|    Switzerland|                21|
|       Portugal|                19|
|          Italy|                15|
|        Finland|                12|
|        Austria|                11|
|         Norway|                10|
|        Denmark|                 9|
|Channel Islands|                 9|
|      Australia|                 9|
|    Netherlands|                 9|
|         Sweden|                 8|
|         Cyprus|                 8|
|          Japan|                 8|
|         Poland|                 6|
|         Greece|                 4|
+---------------+------------------+
only showing top 20 rows



                                                                                

In [10]:
# how many purchases made by each customer in Germany?

spark.sql('''
          SELECT CustomerId, COUNT(DISTINCT InvoiceNo) AS number_of_purchase
          FROM df
          WHERE Country = 'Germany'
          GROUP BY CustomerId
          ORDER BY number_of_purchase DESC
          ''').show()




+----------+------------------+
|CustomerId|number_of_purchase|
+----------+------------------+
|     12471|                49|
|     12569|                35|
|     12474|                30|
|     12720|                29|
|     12709|                26|
|     12621|                23|
|     12476|                20|
|     12712|                17|
|     12708|                16|
|     12705|                14|
|     12472|                13|
|     12647|                13|
|     12619|                13|
|     12500|                13|
|     12626|                13|
|     12662|                12|
|     12600|                11|
|     12481|                11|
|     12477|                 9|
|     12473|                 9|
+----------+------------------+
only showing top 20 rows



                                                                                

In [11]:
# I want to know the purchase frequency in Germany

tmp = spark.sql('''
          SELECT CustomerId, COUNT(DISTINCT InvoiceNo) AS number_of_purchase
          FROM df
          WHERE Country = 'Germany'
          GROUP BY CustomerId
          ORDER BY number_of_purchase DESC
          ''')
#tmp.show()

tmp.groupby('number_of_purchase').count().orderBy('number_of_purchase').show()




+------------------+-----+
|number_of_purchase|count|
+------------------+-----+
|                 1|   21|
|                 2|   14|
|                 3|   13|
|                 4|   10|
|                 5|    8|
|                 6|    6|
|                 7|    1|
|                 8|    2|
|                 9|    2|
|                11|    2|
|                12|    1|
|                13|    5|
|                14|    1|
|                16|    1|
|                17|    1|
|                20|    1|
|                23|    1|
|                26|    1|
|                29|    1|
|                30|    1|
+------------------+-----+
only showing top 20 rows



                                                                                

In [12]:
# how about the situation in UK?
# I want to know the purchase frequency in Germany

tmp = spark.sql('''
          SELECT CustomerId, COUNT(DISTINCT InvoiceNo) AS number_of_purchase
          FROM df
          WHERE Country = 'United Kingdom'
          GROUP BY CustomerId
          ORDER BY number_of_purchase DESC
          ''')
#tmp.show()

tmp.groupby('number_of_purchase').count().orderBy('number_of_purchase').show()



+------------------+-----+
|number_of_purchase|count|
+------------------+-----+
|                 1| 1188|
|                 2|  741|
|                 3|  440|
|                 4|  344|
|                 5|  259|
|                 6|  173|
|                 7|  142|
|                 8|  109|
|                 9|   70|
|                10|   73|
|                11|   56|
|                12|   46|
|                13|   28|
|                14|   38|
|                15|   27|
|                16|   25|
|                17|   18|
|                18|   18|
|                19|   13|
|                20|   12|
+------------------+-----+
only showing top 20 rows



                                                                                

In [13]:
# In each country, how about the spend per customer? 
spark.sql('''
          SELECT Country, Sum(Sales) AS total_revenue, Count(DISTINCT CustomerID) AS no_of_customers,total_revenue/no_of_customers  AS sales_per_customer
          FROM df
          GROUP BY Country
          ORDER BY sales_per_customer DESC
          ''').show()



+---------------+------------------+---------------+------------------+
|        Country|     total_revenue|no_of_customers|sales_per_customer|
+---------------+------------------+---------------+------------------+
|           EIRE|263276.82000000007|              3| 87758.94000000002|
|    Netherlands| 284661.5399999998|              9| 31629.05999999998|
|      Australia|137077.26999999996|              9|15230.807777777773|
|      Singapore|           9120.39|              1|           9120.39|
|         Sweden|          36595.91|              8|        4574.48875|
|          Japan|          35340.62|              8|         4417.5775|
|        Iceland|            4310.0|              1|            4310.0|
|         Norway|          35163.46|             10|          3516.346|
|    Switzerland|          56385.35|             21|2685.0166666666664|
|        Germany|221698.21000000017|             95|2333.6653684210546|
|         France|          197403.9|             87|2269.0103448

                                                                                

In [14]:
# What is the best-selling product in each country?

spark.sql('''
          SELECT Country,StockCode,Description, Count(StockCode) AS counts
          FROM df
          GROUP BY Country,StockCode,Description
          ORDER BY counts DESC
          ''').show()

# this statement shows the all products in each country with the order by the count of selling.
# It is not yet what I want. Check the next statement.



+--------------+---------+--------------------+------+
|       Country|StockCode|         Description|counts|
+--------------+---------+--------------------+------+
|United Kingdom|   85123A|WHITE HANGING HEA...|  2204|
|United Kingdom|   85099B|JUMBO BAG RED RET...|  2001|
|United Kingdom|    22423|REGENCY CAKESTAND...|  1859|
|United Kingdom|    47566|       PARTY BUNTING|  1634|
|United Kingdom|    20725|LUNCH BAG RED RET...|  1460|
|United Kingdom|    84879|ASSORTED COLOUR B...|  1416|
|United Kingdom|    22720|SET OF 3 CAKE TIN...|  1316|
|United Kingdom|    20727|LUNCH BAG  BLACK ...|  1292|
|United Kingdom|    22457|NATURAL SLATE HEA...|  1250|
|United Kingdom|    22469|HEART OF WICKER S...|  1199|
|United Kingdom|    22386|JUMBO BAG PINK PO...|  1192|
|United Kingdom|    21212|PACK OF 72 RETROS...|  1190|
|United Kingdom|    22086|PAPER CHAIN KIT 5...|  1172|
|United Kingdom|    21931|JUMBO STORAGE BAG...|  1160|
|United Kingdom|    22411|JUMBO SHOPPER VIN...|  1159|
|United Ki

                                                                                

In [15]:
spark.sql('''SELECT Country, StockCode, Description, counts
FROM (
    SELECT Country, StockCode, Description, counts,
           ROW_NUMBER() OVER (PARTITION BY Country ORDER BY counts DESC) AS row_num
    FROM (
        SELECT Country, StockCode, Description, COUNT(StockCode) AS Counts
        FROM df
        GROUP BY Country, StockCode, Description
    ) temp
) temp2
WHERE row_num = 1
          ''').show()



+------------------+---------+--------------------+------+
|           Country|StockCode|         Description|counts|
+------------------+---------+--------------------+------+
|         Australia|    22720|SET OF 3 CAKE TIN...|    10|
|           Austria|     POST|             POSTAGE|    14|
|           Bahrain|   72802B|OCEAN SCENT CANDL...|     3|
|           Belgium|     POST|             POSTAGE|    98|
|            Brazil|    23051|RECYCLED ACAPULCO...|     1|
|            Canada|    23192|BUNDLE OF 3 ALPHA...|     2|
|   Channel Islands|    20725|LUNCH BAG RED RET...|     7|
|            Cyprus|    22423|REGENCY CAKESTAND...|     8|
|    Czech Republic|    22231|JIGSAW TREE WITH ...|     3|
|           Denmark|     POST|             POSTAGE|    14|
|              EIRE|       C2|            CARRIAGE|   108|
|European Community|     POST|             POSTAGE|     3|
|           Finland|     POST|             POSTAGE|    41|
|            France|     POST|             POSTAGE|   31

                                                                                

In [16]:
# close the session and clean up
spark.stop()