In [1]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")
print (spark.sparkContext)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/12 16:53:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


<SparkContext master=local[*] appName=pyspark-shell>


In [2]:
from pyspark.sql.functions import to_date, date_trunc, minute, unix_timestamp, from_unixtime, when, col, min, max, lit, sum, round, concat, month, year, substring

In [3]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, LongType, TimestampType

sellsSchema = StructType([
    StructField("InvoiceNo", LongType(), True),
    StructField("StockCode", LongType(), True),
    StructField("Description", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("InvoiceDate", TimestampType(), True),
    StructField("UnitPrice", DoubleType(), True),
    StructField("CustomerID", LongType(), True),
    StructField("Country", StringType(), True)
])

In [4]:
sellsRaw = spark.read.option("header", "true").option("timestampFormat","mm/d/yyyy h:mm").csv("./Data/datautf8.csv", schema=sellsSchema)
sellsRaw.show()

                                                                                

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|     NULL|WHITE HANGING HEA...|       6|2010-01-01 08:26:00|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-01-01 08:26:00|     3.39|     17850|United Kingdom|
|   536365|     NULL|CREAM CUPID HEART...|       8|2010-01-01 08:26:00|     2.75|     17850|United Kingdom|
|   536365|     NULL|KNITTED UNION FLA...|       6|2010-01-01 08:26:00|     3.39|     17850|United Kingdom|
|   536365|     NULL|RED WOOLLY HOTTIE...|       6|2010-01-01 08:26:00|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|2010-01-01 08:26:00|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS S

In [5]:
#  Total No Of Sells Records
sellsRaw.count()

                                                                                

541909

In [6]:
# Total No Of Sells Day Wise

sellsWithDate = sellsRaw.withColumn("InvoiceDateOnly",to_date(sellsRaw.InvoiceDate))

In [7]:
# sellsWithDate.show()
sellsWithDate.groupBy("InvoiceDateOnly").count().show()



+---------------+-----+
|InvoiceDateOnly|count|
+---------------+-----+
|     2011-01-30|15415|
|     2011-01-29|11925|
|     2011-01-23|16842|
|     2011-01-25|16232|
|     2011-01-03|12293|
|     2011-01-01|14423|
|     2011-01-14|15998|
|     2010-01-09| 2891|
|     2010-01-19|  522|
|     2011-01-04|19617|
|     2011-01-28|17265|
|     2011-01-24|16888|
|     2011-01-15|14341|
|     2010-01-06| 3878|
|     2010-01-10| 2758|
|     2011-01-06|20304|
|     2011-01-16|13813|
|     2010-01-07| 2963|
|     2010-01-01| 3108|
|     2011-01-05|19379|
+---------------+-----+
only showing top 20 rows



                                                                                

In [8]:
# Total No Of Sell Hourly Discretized
# If Timestamps minute is <=30 then round it to hour
# If Timestamps minute is > 30 then round it to hour + 1

SellsWithHourlyDescreteTimestamp = sellsRaw\
    .withColumn("minutes", minute(sellsRaw.InvoiceDate)) \
    .withColumn("unixInvoiceDate", unix_timestamp(sellsRaw.InvoiceDate)) \
    .withColumn("DescreteUnixInvoiceDate", when(col("minutes") <= 30, col("unixInvoiceDate")-60*col("minutes")).otherwise(col("unixInvoiceDate")+(60-col("minutes"))*60)) \
    .withColumn("DescreteInvoiceDate", from_unixtime(col("DescreteUnixInvoiceDate"))) \
    .drop("minutes", "unixInvoiceDate", "DescreteUnixInvoiceDate")

hourlyDescreteSells = SellsWithHourlyDescreteTimestamp.groupBy(SellsWithHourlyDescreteTimestamp.DescreteInvoiceDate)
hourlyDescreteSellsCounts = hourlyDescreteSells.count()

In [9]:
# As above timestamp has so many empty timeslots in between will add those to create continues time series
minDescreteInvoiceDate, maxDescreteInvoiceDate = hourlyDescreteSellsCounts.agg(min(hourlyDescreteSellsCounts.DescreteInvoiceDate), max(hourlyDescreteSellsCounts.DescreteInvoiceDate)).first()
print(minDescreteInvoiceDate)
print (maxDescreteInvoiceDate)
from datetime import datetime
unixminDescreteInvoiceDate = datetime.strptime(minDescreteInvoiceDate, "%Y-%m-%d %H:%M:%S").strftime('%s')
unixmaxDescreteInvoiceDate = datetime.strptime(maxDescreteInvoiceDate, "%Y-%m-%d %H:%M:%S").strftime('%s')


CompleteTimeSeries = spark.range(unixminDescreteInvoiceDate, unixmaxDescreteInvoiceDate, 60*60).select(col("id").cast("timestamp").alias("TimeSeries"), lit(0).alias("count"))
CompleteTimeSeries.show()

                                                                                

2010-01-01 00:00:00
2011-01-31 20:00:00
+-------------------+-----+
|         TimeSeries|count|
+-------------------+-----+
|2010-01-01 00:00:00|    0|
|2010-01-01 01:00:00|    0|
|2010-01-01 02:00:00|    0|
|2010-01-01 03:00:00|    0|
|2010-01-01 04:00:00|    0|
|2010-01-01 05:00:00|    0|
|2010-01-01 06:00:00|    0|
|2010-01-01 07:00:00|    0|
|2010-01-01 08:00:00|    0|
|2010-01-01 09:00:00|    0|
|2010-01-01 10:00:00|    0|
|2010-01-01 11:00:00|    0|
|2010-01-01 12:00:00|    0|
|2010-01-01 13:00:00|    0|
|2010-01-01 14:00:00|    0|
|2010-01-01 15:00:00|    0|
|2010-01-01 16:00:00|    0|
|2010-01-01 17:00:00|    0|
|2010-01-01 18:00:00|    0|
|2010-01-01 19:00:00|    0|
+-------------------+-----+
only showing top 20 rows



In [10]:
CompleteTimeseriesSellsCount = CompleteTimeSeries \
    .join(hourlyDescreteSellsCounts, CompleteTimeSeries.TimeSeries == hourlyDescreteSellsCounts.DescreteInvoiceDate, how="left_outer") \
    .select("TimeSeries", (when(hourlyDescreteSellsCounts.DescreteInvoiceDate.isNull(), CompleteTimeSeries["count"]).otherwise(hourlyDescreteSellsCounts["count"])).alias("count")) \
    .orderBy("TimeSeries")

CompleteTimeseriesSellsCount.show()

                                                                                

+-------------------+-----+
|         TimeSeries|count|
+-------------------+-----+
|2010-01-01 00:00:00|  194|
|2010-01-01 01:00:00|  227|
|2010-01-01 02:00:00|    0|
|2010-01-01 03:00:00|    0|
|2010-01-01 04:00:00|    0|
|2010-01-01 05:00:00|    0|
|2010-01-01 06:00:00|    0|
|2010-01-01 07:00:00|    0|
|2010-01-01 08:00:00|    9|
|2010-01-01 09:00:00|   57|
|2010-01-01 10:00:00|  186|
|2010-01-01 11:00:00|  163|
|2010-01-01 12:00:00|  273|
|2010-01-01 13:00:00|  200|
|2010-01-01 14:00:00|  134|
|2010-01-01 15:00:00|  742|
|2010-01-01 16:00:00|  188|
|2010-01-01 17:00:00|  707|
|2010-01-01 18:00:00|   28|
|2010-01-01 19:00:00|    0|
+-------------------+-----+
only showing top 20 rows



In [11]:
# Total Hourly Revanue Generated Over The Time
TotalHourlySells = hourlyDescreteSells.agg(round(sum(sellsRaw.Quantity*sellsRaw.UnitPrice),2).alias("TotalRevenue"))
TotalHourlySells.orderBy(TotalHourlySells.DescreteInvoiceDate).show()



+-------------------+------------+
|DescreteInvoiceDate|TotalRevenue|
+-------------------+------------+
|2010-01-01 00:00:00|     5290.74|
|2010-01-01 01:00:00|     2131.68|
|2010-01-01 08:00:00|      161.32|
|2010-01-01 09:00:00|     2058.95|
|2010-01-01 10:00:00|     8848.43|
|2010-01-01 11:00:00|     4108.81|
|2010-01-01 12:00:00|     2859.03|
|2010-01-01 13:00:00|     3807.39|
|2010-01-01 14:00:00|     3082.88|
|2010-01-01 15:00:00|     8536.03|
|2010-01-01 16:00:00|     9190.33|
|2010-01-01 17:00:00|     8457.18|
|2010-01-01 18:00:00|      102.79|
|2010-01-02 00:00:00|     2853.76|
|2010-01-02 01:00:00|     1894.97|
|2010-01-02 08:00:00|       467.0|
|2010-01-02 09:00:00|      1215.0|
|2010-01-02 10:00:00|     3991.03|
|2010-01-02 11:00:00|     3909.71|
|2010-01-02 12:00:00|     1848.14|
+-------------------+------------+
only showing top 20 rows



                                                                                

In [12]:
# Total Daily Revanue Generated Over The Time
TotalDailySells = TotalHourlySells.withColumn("InvoiceDate", to_date(TotalHourlySells.DescreteInvoiceDate))
TotalDailySells.groupBy(TotalDailySells.InvoiceDate).agg(round(sum(TotalDailySells.TotalRevenue), 2).alias("TotalRevenue")).orderBy(TotalDailySells.InvoiceDate).show()



+-----------+------------+
|InvoiceDate|TotalRevenue|
+-----------+------------+
| 2010-01-01|    58635.56|
| 2010-01-02|    46207.28|
| 2010-01-03|    45620.46|
| 2010-01-05|    31383.95|
| 2010-01-06|    53860.18|
| 2010-01-07|    45059.05|
| 2010-01-08|    44189.84|
| 2010-01-09|    52532.13|
| 2010-01-10|    57404.91|
| 2010-01-12|    17240.92|
| 2010-01-13|    35379.34|
| 2010-01-14|    42843.29|
| 2010-01-15|    29443.69|
| 2010-01-16|    48334.35|
| 2010-01-17|    43534.19|
| 2010-01-19|     7517.31|
| 2010-01-20|    24741.75|
| 2010-01-21|    47097.94|
| 2010-01-22|     6134.57|
| 2010-01-23|    11796.31|
+-----------+------------+
only showing top 20 rows



                                                                                

In [13]:
# Total Monthly Revenue
TotalMonthlySells = TotalDailySells.withColumn("month_year", substring(TotalDailySells.InvoiceDate.cast("string"),0,7))

TotalMonthlySells.groupBy(TotalMonthlySells.month_year).agg(round(sum(TotalMonthlySells.TotalRevenue), 2).alias("TotalRevenue")).show()



+----------+------------+
|month_year|TotalRevenue|
+----------+------------+
|   2010-01|   748957.02|
|   2011-01|  8998790.91|
+----------+------------+



                                                                                

In [14]:
SellsWithDailyDescreteTimestamp = SellsWithHourlyDescreteTimestamp.withColumn("InvoiceDate", to_date("DescreteInvoiceDate")).drop("DescreteInvoiceDate")

In [15]:
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank, desc
# Top 5 Sold Products For Each Date

DailyWindow = Window.partitionBy("InvoiceDate").orderBy(desc("Quantity"))
top5Daily = SellsWithDailyDescreteTimestamp.withColumn("rank", dense_rank().over(DailyWindow))
top5Daily.where(top5Daily.rank <= 5).show()



+---------+---------+--------------------+--------+-----------+---------+----------+--------------+----+
|InvoiceNo|StockCode|         Description|Quantity|InvoiceDate|UnitPrice|CustomerID|       Country|rank|
+---------+---------+--------------------+--------+-----------+---------+----------+--------------+----+
|   536437|    17021|NAMASTE SWAGAT IN...|     600| 2010-01-01|     0.24|     13694|United Kingdom|   1|
|   536477|    21137|BLACK RECORD COVE...|     480| 2010-01-01|     3.39|     16210|United Kingdom|   2|
|   536387|    22466|FAIRY TALE COTTAG...|     432| 2010-01-01|     1.45|     16029|United Kingdom|   3|
|   536387|    21731|RED TOADSTOOL LED...|     432| 2010-01-01|     1.25|     16029|United Kingdom|   3|
|   536584|     NULL|RED WOOLLY HOTTIE...|     384| 2010-01-01|     2.95|     13777|United Kingdom|   4|
|   536390|    20668|DISCO BALL CHRIST...|     288| 2010-01-01|      0.1|     17511|United Kingdom|   5|
|   536830|    84077|WORLD WAR 2 GLIDE...|    2880| 201

                                                                                

In [16]:
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank, desc

# Top 5 Invoices Of Day In terms Of Revenue and 

DailyRevenueWindow = Window.partitionBy("InvoiceDate").orderBy(desc("Revenue"))
top5RevenueDaily = SellsWithDailyDescreteTimestamp.groupBy("InvoiceDate", "InvoiceNo").agg(round(sum(col("UnitPrice") * col("Quantity")),2).alias("Revenue"))
top5RevenueDailyRanked = top5RevenueDaily.withColumn("rank", dense_rank().over(DailyRevenueWindow))
top5RevenueDailyRanked.where(top5RevenueDailyRanked.rank <= 5).show()



+-----------+---------+--------+----+
|InvoiceDate|InvoiceNo| Revenue|rank|
+-----------+---------+--------+----+
| 2010-01-01|   536592| 6915.65|   1|
| 2010-01-01|   536544| 5521.14|   2|
| 2010-01-01|   536387| 3193.92|   3|
| 2010-01-01|   536576| 2558.42|   4|
| 2010-01-01|   536477| 2474.74|   5|
| 2010-01-02|   536783| 4076.48|   1|
| 2010-01-02|   536785| 2730.96|   2|
| 2010-01-02|   536784|  2170.9|   3|
| 2010-01-02|   536830|  2002.4|   4|
| 2010-01-02|   536804| 1714.25|   5|
| 2010-01-03|   536982|10661.69|   1|
| 2010-01-03|   536876| 7257.08|   2|
| 2010-01-03|   536865| 2396.96|   3|
| 2010-01-03|   537034| 2355.18|   4|
| 2010-01-03|   536975| 1705.65|   5|
| 2010-01-05|   537214|  1668.2|   1|
| 2010-01-05|   537201|  1631.3|   2|
| 2010-01-05|   537065| 1526.92|   3|
| 2010-01-05|   537224| 1415.97|   4|
| 2010-01-05|   537081| 1145.06|   5|
+-----------+---------+--------+----+
only showing top 20 rows



                                                                                

In [18]:
# Top 5 High Revenue Customers Of Day

DailyCustomerRevenueWindow = Window.partitionBy("InvoiceDate", "CustomerID").orderBy(desc("Revenue"))
top5CustomerDaily = SellsWithDailyDescreteTimestamp.groupBy("InvoiceDate", "InvoiceNo", "CustomerID").agg(round(sum(col("UnitPrice") * col("Quantity")),2).alias("Revenue"))
top5CustomerDailyRanked = top5CustomerDaily.withColumn("rank", dense_rank().over(DailyCustomerRevenueWindow))
top5CustomerDailyRanked.where(top5CustomerDailyRanked.rank <= 5).show()




+-----------+---------+----------+-------+----+
|InvoiceDate|InvoiceNo|CustomerID|Revenue|rank|
+-----------+---------+----------+-------+----+
| 2010-01-01|   536592|      NULL|6915.65|   1|
| 2010-01-01|   536544|      NULL|5521.14|   2|
| 2010-01-01|   536558|      NULL|  99.75|   3|
| 2010-01-01|   536596|      NULL|  38.09|   4|
| 2010-01-01|   536565|      NULL|    6.7|   5|
| 2010-01-01|   536389|     12431| 358.25|   1|
| 2010-01-01|   536532|     12433|1919.14|   1|
| 2010-01-01|     NULL|     12472| -122.3|   1|
| 2010-01-01|   536370|     12583| 855.86|   1|
| 2010-01-01|   536527|     12662| 261.48|   1|
| 2010-01-01|   536521|     12748|   4.95|   1|
| 2010-01-01|   536403|     12791|  192.6|   1|
| 2010-01-01|   536415|     12838| 390.79|   1|
| 2010-01-01|   536523|     12868|  203.3|   1|
| 2010-01-01|   536561|     12921|  322.4|   1|
| 2010-01-01|   536582|     12947| 304.04|   1|
| 2010-01-01|   536367|     13047| 278.73|   1|
| 2010-01-01|   536368|     13047|  70.0

                                                                                