In [1]:
import findspark
findspark.init()

import pyspark

In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as f

In [3]:
from pyspark.sql import SparkSession


spark= SparkSession.builder.appName('dataframe').getOrCreate()

In [4]:
# load the retail dataset:
retail_data=spark.read.csv("online-retail-dataset.csv",inferSchema=True,header=True,timestampFormat="dd/M/yyyy H:mm")

In [5]:
retail_data.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-01-12 08:26:00|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-01-12 08:26:00|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-01-12 08:26:00|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-01-12 08:26:00|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-01-12 08:26:00|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|2010-01-12 08:26:00|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS S

In [6]:
### Question 1 ###
# How many orders did customers perform at which hour?

# a) SQL
retail_data.createOrReplaceTempView("retailTable")


In [7]:
result = spark.sql("""
SELECT hour(InvoiceDate) as InvoiceHour, count(distinct InvoiceNo) as NoInvoices
FROM retailTable
GROUP BY InvoiceHour
ORDER BY InvoiceHour
""")
result.show()

+-----------+----------+
|InvoiceHour|NoInvoices|
+-----------+----------+
|          6|        22|
|          7|        31|
|          8|       624|
|          9|      1824|
|         10|      2961|
|         11|      3165|
|         12|      3962|
|         13|      3369|
|         14|      3137|
|         15|      3069|
|         16|      1952|
|         17|      1205|
|         18|       333|
|         19|       219|
|         20|        28|
+-----------+----------+



In [8]:
# b) Spark
result = retail_data.selectExpr("hour(InvoiceDate) as InvoiceHour", "InvoiceNo").distinct().groupBy('InvoiceHour').agg(f.expr("count(InvoiceNo) as NoInvoices")).orderBy('InvoiceHour')
result.show()

+-----------+----------+
|InvoiceHour|NoInvoices|
+-----------+----------+
|          6|        22|
|          7|        31|
|          8|       624|
|          9|      1824|
|         10|      2961|
|         11|      3165|
|         12|      3962|
|         13|      3369|
|         14|      3137|
|         15|      3069|
|         16|      1952|
|         17|      1205|
|         18|       333|
|         19|       219|
|         20|        28|
+-----------+----------+



In [10]:
### Question 2 ###
#How frequently was each product brought in the different countries?

# a) SQL
df_selection = retail_data.selectExpr("StockCode", "Quantity", "Country")
#df_selection.show()
df_nonull = df_selection.na.replace([""],["UNKNOWN"],"StockCode").na.replace([""],["UNKNOWN"],"Country").na.drop("any")
df_nonull.createOrReplaceTempView("retailNoNull")



In [12]:
result = spark.sql("""
SELECT Country, StockCode, sum(Quantity) as Quantity
FROM retailNoNull
GROUP BY Country, StockCode
GROUPING SETS ((Country, StockCode), (Country), (StockCode), ())
ORDER BY Country, StockCode
""")
result.show()



+-------+---------+--------+
|Country|StockCode|Quantity|
+-------+---------+--------+
|   null|     null| 5176450|
|   null|    10002|    1037|
|   null|    10080|     495|
|   null|    10120|     193|
|   null|   10123C|     -13|
|   null|   10123G|     -38|
|   null|   10124A|      16|
|   null|   10124G|      17|
|   null|    10125|    1296|
|   null|    10133|    2775|
|   null|    10134|     -19|
|   null|    10135|    2230|
|   null|    11001|    1430|
|   null|    15030|     293|
|   null|    15034|    5206|
|   null|    15036|   22552|
|   null|    15039|    2065|
|   null|   15044A|     463|
|   null|   15044B|     293|
|   null|   15044C|     311|
+-------+---------+--------+
only showing top 20 rows



In [13]:
# b) Spark

result = df_nonull.cube("Country", "StockCode").agg(f.sum("Quantity").alias("Quantity")).orderBy(f.col("Country"), f.col("Stockcode"))
result.show()

+-------+---------+--------+
|Country|StockCode|Quantity|
+-------+---------+--------+
|   null|     null| 5176450|
|   null|    10002|    1037|
|   null|    10080|     495|
|   null|    10120|     193|
|   null|   10123C|     -13|
|   null|   10123G|     -38|
|   null|   10124A|      16|
|   null|   10124G|      17|
|   null|    10125|    1296|
|   null|    10133|    2775|
|   null|    10134|     -19|
|   null|    10135|    2230|
|   null|    11001|    1430|
|   null|    15030|     293|
|   null|    15034|    5206|
|   null|    15036|   22552|
|   null|    15039|    2065|
|   null|   15044A|     463|
|   null|   15044B|     293|
|   null|   15044C|     311|
+-------+---------+--------+
only showing top 20 rows



In [15]:
result.coalesce(1).write.format("csv").option("header", "true").save("C:\\Users\\ziaridoy20\\Documents\\University\\Big Data\\Tutorial\\tutorial_04\\frequencies")