In [1]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("PySpark Online Retail Analysis") \
    .config(conf=SparkConf()) \
    .getOrCreate()

In [2]:
import pandas as pd
import numpy as np

df_dtypes = {'Invoice': object, 'StockCode': object, 'Description': object, 'Quantity': np.int32,
             'InvoiceDate': np.datetime64, 'Price': np.float64, 'CustomerID': object, 'Country': object}

df = pd.concat([pd.read_excel('online_retail_II.xlsx', sheet_name='Year 2009-2010'), 
                pd.read_excel('online_retail_II.xlsx', sheet_name='Year 2010-2011')], ignore_index=True)

In [3]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1067371 entries, 0 to 1067370
Data columns (total 8 columns):
Invoice        1067371 non-null object
StockCode      1067371 non-null object
Description    1062989 non-null object
Quantity       1067371 non-null int64
InvoiceDate    1067371 non-null datetime64[ns]
Price          1067371 non-null float64
Customer ID    824364 non-null float64
Country        1067371 non-null object
dtypes: datetime64[ns](1), float64(2), int64(1), object(4)
memory usage: 65.1+ MB


In [4]:
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, FloatType, TimestampType, IntegerType

spark_Schema = StructType([StructField("Invoice", StringType(), False),
                           StructField("StockCode", StringType(), False),
                           StructField("Description", StringType(), True),
                           StructField("Quantity", IntegerType(), False),
                           StructField("InvoiceDate", TimestampType(), False),
                           StructField("Price", FloatType(), False),
                           StructField("CustomerID", StringType(), True), 
                           StructField("Country", StringType(), False)])

df = spark.createDataFrame(df, schema=spark_Schema)

In [5]:
df.printSchema()

root
 |-- Invoice: string (nullable = false)
 |-- StockCode: string (nullable = false)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = false)
 |-- InvoiceDate: timestamp (nullable = false)
 |-- Price: float (nullable = false)
 |-- CustomerID: string (nullable = true)
 |-- Country: string (nullable = false)



In [6]:
df = df.withColumn('Description', F.when(F.isnan(F.col('Description')), 'None Provided').otherwise(F.col('Description')))
df = df.withColumn('CustomerID', F.when(F.isnan(F.col('CustomerID')), '0').otherwise(F.col('CustomerID')))

In [7]:
df.filter(df["CustomerID"] == '0').sample(False, 0.0002, 999).show()

+-------+---------+--------------------+--------+-------------------+-----+----------+--------------+
|Invoice|StockCode|         Description|Quantity|        InvoiceDate|Price|CustomerID|       Country|
+-------+---------+--------------------+--------+-------------------+-----+----------+--------------+
| 490149|    84378|SET OF 3 HEART CO...|       3|2009-12-04 09:43:00| 2.57|         0|United Kingdom|
| 492303|    35954|SMALL FOLKART STA...|       5|2009-12-16 11:57:00| 3.43|         0|United Kingdom|
| 493073|    22059|CERAMIC STRWBERRY...|       1|2009-12-22 09:41:00| 3.43|         0|United Kingdom|
| 493261|      DOT|      DOTCOM POSTAGE|       1|2009-12-22 14:48:00|88.48|         0|United Kingdom|
| 494495|    20759|CHRYSANTHEMUM POC...|       6|2010-01-14 17:43:00| 0.64|         0|United Kingdom|
| 497584|        M|              Manual|       2|2010-02-10 15:10:00| 1.65|         0|United Kingdom|
| 498861|    21780|       None Provided|     -25|2010-02-23 12:23:00|  0.0|       

In [8]:
df.where((df["CustomerID"] == '0') & (df["Price"] > '0')) \
.withColumn('Total', df.Quantity*df.Price).agg(F.round(F.sum('Total'), 2).alias('Customer ID 0 Total')).show()

+-------------------+
|Customer ID 0 Total|
+-------------------+
|         2797634.31|
+-------------------+



In [9]:
df.filter((df["CustomerID"] != '0') & (df["Price"] == '0')).show()

+-------+---------+--------------------+--------+-------------------+-----+----------+--------------+
|Invoice|StockCode|         Description|Quantity|        InvoiceDate|Price|CustomerID|       Country|
+-------+---------+--------------------+--------+-------------------+-----+----------+--------------+
| 489825|    22076|  6 RIBBONS EMPIRE  |      12|2009-12-02 13:34:00|  0.0|   16126.0|United Kingdom|
| 489998|    48185| DOOR MAT FAIRY CAKE|       2|2009-12-03 11:19:00|  0.0|   15658.0|United Kingdom|
| 490727|        M|              Manual|       1|2009-12-07 16:38:00|  0.0|   17231.0|United Kingdom|
| 490961|    22065|CHRISTMAS PUDDING...|       1|2009-12-08 15:25:00|  0.0|   14108.0|United Kingdom|
| 490961|    22142|CHRISTMAS CRAFT W...|      12|2009-12-08 15:25:00|  0.0|   14108.0|United Kingdom|
| 492079|    85042|ANTIQUE LILY FAIR...|       8|2009-12-15 13:49:00|  0.0|   15070.0|United Kingdom|
| 492760|    21143|ANTIQUE GLASS HEA...|      12|2009-12-18 14:22:00|  0.0|   1807

In [10]:
df.filter((df["CustomerID"] != '0') & (df["Price"] == '0')).count()

71

In [11]:
df.filter("Price < '0'").show()

+-------+---------+---------------+--------+-------------------+---------+----------+--------------+
|Invoice|StockCode|    Description|Quantity|        InvoiceDate|    Price|CustomerID|       Country|
+-------+---------+---------------+--------+-------------------+---------+----------+--------------+
|A506401|        B|Adjust bad debt|       1|2010-04-29 13:36:00|-53594.36|         0|United Kingdom|
|A516228|        B|Adjust bad debt|       1|2010-07-19 11:24:00|-44031.79|         0|United Kingdom|
|A528059|        B|Adjust bad debt|       1|2010-10-20 12:04:00|-38925.87|         0|United Kingdom|
|A563186|        B|Adjust bad debt|       1|2011-08-12 14:51:00|-11062.06|         0|United Kingdom|
|A563187|        B|Adjust bad debt|       1|2011-08-12 14:52:00|-11062.06|         0|United Kingdom|
+-------+---------+---------------+--------+-------------------+---------+----------+--------------+



In [12]:
df.filter("Price < '0'").agg(F.round(F.sum('Price'), 2).alias('Total Bad Debt')).show()

+--------------+
|Total Bad Debt|
+--------------+
|    -158676.14|
+--------------+



In [13]:
df.filter("Price = '0'").count()

6202

In [14]:
df.where("Price = '0' and Description = 'None Provided' and Quantity < '0'") \
.groupby(F.date_format('InvoiceDate', 'yyyy.MM').alias('date')).count().sort('date').show()

+-------+-----+
|   date|count|
+-------+-----+
|2009.12|   71|
|2010.01|  123|
|2010.02|  432|
|2010.03|  169|
|2010.04|   62|
|2010.05|  338|
|2010.06|  108|
|2010.07|   35|
|2010.08|   75|
|2010.09|   69|
|2010.10|   90|
|2010.11|  199|
|2010.12|  118|
|2011.01|   83|
|2011.02|   40|
|2011.03|   93|
|2011.04|  163|
|2011.05|   96|
|2011.06|   53|
|2011.07|  100|
+-------+-----+
only showing top 20 rows



In [15]:
df.where("Price = '0' and Description != 'None Provided' and Quantity < '0'") \
.groupby(F.date_format('InvoiceDate', 'yyyy.MM').alias('date')).count().sort('date').show()

+-------+-----+
|   date|count|
+-------+-----+
|2009.12|   19|
|2010.01|    5|
|2010.02|   16|
|2010.03|   21|
|2010.04|   13|
|2010.05|   31|
|2010.06|   32|
|2010.07|   53|
|2010.08|   18|
|2010.09|   24|
|2010.10|   28|
|2010.11|   26|
|2010.12|   16|
|2011.01|   13|
|2011.02|    8|
|2011.03|   41|
|2011.04|   23|
|2011.05|   21|
|2011.06|   54|
|2011.07|   17|
+-------+-----+
only showing top 20 rows



In [16]:
df.where("Price = '0' and Description != 'None Provided' and Quantity < '0'") \
.groupby("Description").count().sort(F.col("count").desc()).show()

+--------------------+-----+
|         Description|count|
+--------------------+-----+
|               check|  123|
|             damages|   84|
|                   ?|   83|
|             damaged|   78|
|             missing|   27|
|sold as set on do...|   20|
|             Damaged|   17|
|Unsaleable, destr...|    9|
|             smashed|    9|
|         thrown away|    9|
|              dotcom|    8|
|                  ??|    7|
|            damages?|    7|
|          given away|    6|
|             crushed|    6|
|             counted|    5|
|         wet damaged|    5|
|                ebay|    5|
|             checked|    5|
|                 MIA|    5|
+--------------------+-----+
only showing top 20 rows



In [17]:
df.where("Price = '0' and Quantity < '0'") \
.groupby("CustomerID").count().sort(F.col("count").desc()).show()

+----------+-----+
|CustomerID|count|
+----------+-----+
|         0| 3457|
+----------+-----+



In [18]:
df1 = df.where(df["CustomerID"] != '0')
df1 = df1.withColumn("ItemTotal", F.round(df1.Price*df1.Quantity, 2))

In [19]:
from pyspark.sql.window import Window
invoice_part = Window().partitionBy('Invoice')
df1 = df1.withColumn('InvoiceTotal', F.round(F.sum('ItemTotal').over(invoice_part), 2))

In [20]:
df1.show(5)

+-------+---------+--------------------+--------+-------------------+-----+----------+--------------+---------+------------+
|Invoice|StockCode|         Description|Quantity|        InvoiceDate|Price|CustomerID|       Country|ItemTotal|InvoiceTotal|
+-------+---------+--------------------+--------+-------------------+-----+----------+--------------+---------+------------+
| 489677|    21341|KASBAH LANTERN WI...|      24|2009-12-02 09:50:00| 4.25|   14000.0|United Kingdom|    102.0|       192.0|
| 489677|    21323|HANGING MEDINA LA...|      24|2009-12-02 09:50:00| 3.75|   14000.0|United Kingdom|     90.0|       192.0|
| 491045|    71477|COLOUR GLASS. STA...|      48|2009-12-09 10:02:00| 2.75|   12747.0|United Kingdom|    132.0|       303.2|
| 491045|    21338|MARAKESH LANTERN ...|       6|2009-12-09 10:02:00| 5.95|   12747.0|United Kingdom|     35.7|       303.2|
| 491045|    21339|MARAKESH LANTERN ...|       4|2009-12-09 10:02:00|16.95|   12747.0|United Kingdom|     67.8|       303.2|


In [21]:
df1.createOrReplaceTempView("retail")
tab1 = spark.sql("SELECT DISTINCT Country FROM retail ORDER BY Country")
tab1.show(tab1.count())

+--------------------+
|             Country|
+--------------------+
|           Australia|
|             Austria|
|             Bahrain|
|             Belgium|
|              Brazil|
|              Canada|
|     Channel Islands|
|              Cyprus|
|      Czech Republic|
|             Denmark|
|                EIRE|
|  European Community|
|             Finland|
|              France|
|             Germany|
|              Greece|
|             Iceland|
|              Israel|
|               Italy|
|               Japan|
|               Korea|
|             Lebanon|
|           Lithuania|
|               Malta|
|         Netherlands|
|             Nigeria|
|              Norway|
|              Poland|
|            Portugal|
|                 RSA|
|        Saudi Arabia|
|           Singapore|
|               Spain|
|              Sweden|
|         Switzerland|
|            Thailand|
|                 USA|
|United Arab Emirates|
|      United Kingdom|
|         Unspecified|
|         W

In [22]:
tab1 = spark.sql("SELECT Country, ROUND(SUM(ItemTotal), 2) AS TotalSale, COUNT(DISTINCT Invoice) AS NumberOfOrders \
                 FROM retail GROUP BY Country ORDER BY TotalSale DESC LIMIT 10")
tab1.show()

+--------------+-------------+--------------+
|       Country|    TotalSale|NumberOfOrders|
+--------------+-------------+--------------+
|United Kingdom|1.380642301E7|         40505|
|          EIRE|    578501.63|           727|
|   Netherlands|    548524.95|           250|
|       Germany|    417988.56|          1095|
|        France|    326504.67|           738|
|     Australia|    167129.07|           117|
|   Switzerland|     99082.81|           120|
|         Spain|     91859.48|           188|
|        Sweden|     87455.42|           128|
|       Denmark|     65741.09|            53|
+--------------+-------------+--------------+



In [23]:
tab2 = spark.sql("SELECT CustomerID, ROUND(SUM(ItemTotal), 2) AS TotalSale, \
                 ROUND(SUM(ItemTotal)/COUNT(DISTINCT Invoice), 2) AS AverageSale, \
                 Country FROM retail GROUP BY CustomerID, Country ORDER BY TotalSale DESC LIMIT 25")
tab2.show(25)

+----------+---------+-----------+--------------+
|CustomerID|TotalSale|AverageSale|       Country|
+----------+---------+-----------+--------------+
|   18102.0|598215.22|     3909.9|United Kingdom|
|   14646.0|523342.07|    3191.11|   Netherlands|
|   14156.0|296564.69|    1468.14|          EIRE|
|   14911.0|270248.53|      529.9|          EIRE|
|   17450.0|233579.39|    3829.17|United Kingdom|
|   13694.0|190825.52|    1163.57|United Kingdom|
|   17511.0|171885.98|    2022.19|United Kingdom|
|   12415.0|143269.29|    4341.49|     Australia|
|   16684.0|141502.25|    2176.96|United Kingdom|
|   15061.0|136391.48|     988.34|United Kingdom|
|   15311.0|113513.07|     420.42|United Kingdom|
|   13089.0|113214.19|     458.36|United Kingdom|
|   17949.0| 98895.59|     716.63|United Kingdom|
|   16029.0| 91800.91|     734.41|United Kingdom|
|   14298.0| 90489.31|    1077.25|United Kingdom|
|   15769.0| 84269.38|    1652.34|United Kingdom|
|   13798.0| 73573.47|     588.59|United Kingdom|


In [24]:
tab3 = spark.sql("SELECT CustomerID AS TopCustomerID, Country, TotalSale, NumberOfOrders, ROUND(TotalSale/NumberOfOrders, 2) AS AverageSale FROM \
                 (SELECT CustomerID, RANK() OVER (PARTITION BY Country ORDER BY TotalSale DESC) AS Ordinal, TotalSale, NumberOfOrders, Country FROM \
                 (SELECT CustomerID, ROUND(SUM(ItemTotal), 2) AS TotalSale, COUNT(DISTINCT Invoice) AS NumberOfOrders, Country FROM retail GROUP BY Country, CustomerID) \
                 ORDER BY Ordinal, TotalSale DESC) WHERE Ordinal = 1")
tab3.show(tab3.count())

+-------------+--------------------+---------+--------------+-----------+
|TopCustomerID|             Country|TotalSale|NumberOfOrders|AverageSale|
+-------------+--------------------+---------+--------------+-----------+
|      18102.0|      United Kingdom|598215.22|           153|     3909.9|
|      14646.0|         Netherlands|523342.07|           164|    3191.11|
|      14156.0|                EIRE|296564.69|           202|    1468.14|
|      12415.0|           Australia|143269.29|            33|    4341.49|
|      17404.0|              Sweden| 46006.82|            28|     1643.1|
|      12471.0|             Germany| 37948.61|           129|     294.18|
|      12678.0|              France| 33851.13|            23|    1471.79|
|      13902.0|             Denmark| 30411.26|             8|    3801.41|
|      12409.0|         Switzerland| 23090.47|            13|    1776.19|
|      12540.0|               Spain| 22107.29|            36|     614.09|
|      12753.0|               Japan| 2

In [25]:
tab4 = spark.sql("SELECT DISTINCT Invoice, InvoiceTotal, DATE_FORMAT(InvoiceDate, 'MM/yyyy') AS Date FROM retail WHERE CustomerID = 18102.0 ORDER BY InvoiceTotal DESC")
tab4.show(tab4.count())

+-------+------------+-------+
|Invoice|InvoiceTotal|   Date|
+-------+------------+-------+
| 537659|    31770.98|12/2010|
| 526934|    26007.08|10/2010|
| 515944|    22863.36|07/2010|
| 572209|     22206.0|10/2011|
| 517731|     21984.0|08/2010|
| 537657|    19278.24|12/2010|
| 494243|     18532.3|01/2010|
| 531866|    18122.17|11/2010|
| 556255|     16488.0|06/2011|
| 490059|     14475.0|12/2009|
| 566934|    13249.94|09/2011|
| 491456|     11016.0|12/2009|
| 555920|    10999.52|06/2011|
| 581457|    10363.82|12/2011|
| 556726|     8915.52|06/2011|
| 553368|     8895.66|05/2011|
| 516781|     8328.31|07/2010|
| 558775|      7786.0|07/2011|
| 572196|     7398.54|10/2011|
| 569343|     7298.76|10/2011|
| 566935|      6870.0|09/2011|
| 494244|      6711.0|01/2010|
| 493885|      6624.0|01/2010|
| 561655|      6591.6|07/2011|
| 498440|      6412.0|02/2010|
| 543379|     6155.72|02/2011|
| 503186|     6051.45|03/2010|
| 511335|     6007.68|06/2010|
| 509352|      5782.0|05/2010|
| 566931

In [26]:
tab5 = spark.sql("SELECT Month, Year, ROUND(SUM(TotalUnits)/COUNT(DISTINCT Invoice), 1) AS UnitsPerTransaction FROM \
                 (SELECT SUM(Quantity) AS TotalUnits, Invoice, YEAR(InvoiceDate) AS Year, Month(InvoiceDate) AS Month FROM retail GROUP BY YEAR(InvoiceDate), MONTH(InvoiceDate), Invoice) \
                 GROUP BY Year, Month ORDER BY Year, Month")
tab5.show(tab5.count())

+-----+----+-------------------+
|Month|Year|UnitsPerTransaction|
+-----+----+-------------------+
|   12|2009|              205.4|
|    1|2010|              283.3|
|    2|2010|              274.4|
|    3|2010|              261.7|
|    4|2010|              214.0|
|    5|2010|              209.1|
|    6|2010|              205.3|
|    7|2010|              186.6|
|    8|2010|              288.9|
|    9|2010|              232.3|
|   10|2010|              227.1|
|   11|2010|              203.1|
|   12|2010|              259.3|
|    1|2011|              217.9|
|    2|2011|              218.7|
|    3|2011|              212.5|
|    4|2011|              201.3|
|    5|2011|              198.9|
|    6|2011|              209.1|
|    7|2011|              228.1|
|    8|2011|              250.4|
|    9|2011|              258.7|
|   10|2011|              251.7|
|   11|2011|              217.1|
|   12|2011|              221.3|
+-----+----+-------------------+



In [27]:
tab6 = spark.sql("SELECT MONTH(InvoiceDate) AS Month, YEAR(InvoiceDate) AS Year, ROUND(SUM(ItemTotal)/COUNT(DISTINCT Invoice), 2) AS AverageSale FROM retail GROUP BY Year, Month ORDER BY Year, Month")
tab6.show(tab6.count())

+-----+----+-----------+
|Month|Year|AverageSale|
+-----+----+-----------+
|   12|2009|     349.09|
|    1|2010|     410.46|
|    2|2010|     366.59|
|    3|2010|     333.51|
|    4|2010|     347.14|
|    5|2010|      316.7|
|    6|2010|     311.76|
|    7|2010|     328.54|
|    8|2010|     379.61|
|    9|2010|     382.67|
|   10|2010|     373.16|
|   11|2010|     360.85|
|   12|2010|     503.06|
|    1|2011|     384.36|
|    2|2011|     363.18|
|    3|2011|     358.22|
|    4|2011|     307.84|
|    5|2011|      350.6|
|    6|2011|     356.19|
|    7|2011|     360.48|
|    8|2011|      399.2|
|    9|2011|     448.24|
|   10|2011|     430.67|
|   11|2011|     366.95|
|   12|2011|      371.9|
+-----+----+-----------+



In [28]:
tab7 = spark.sql("SELECT Month, Year, StockCode, Description, TotalUnits, Price FROM \
                (SELECT Month, Year, StockCode, Description, TotalUnits, Price, RANK() OVER(PARTITION BY Year, Month ORDER BY TotalUnits DESC) AS Ordinal FROM \
                (SELECT MONTH(InvoiceDate) AS Month, YEAR(InvoiceDate) AS Year, StockCode, Description, ROUND(SUM(ItemTotal)/SUM(Quantity), 2) AS Price, SUM(Quantity) AS TotalUnits FROM retail \
                 GROUP BY Year, Month, StockCode, Description ORDER BY Year, Month)) WHERE Ordinal = 1")
tab7.show(tab7.count())

+-----+----+---------+--------------------+----------+-----+
|Month|Year|StockCode|         Description|TotalUnits|Price|
+-----+----+---------+--------------------+----------+-----+
|   12|2009|   85123A|WHITE HANGING HEA...|      6204| 2.65|
|    1|2010|    20993|JAZZ HEARTS MEMO PAD|      9489|  0.1|
|    2|2010|    37410|BLACK AND WHITE P...|     19248|  0.1|
|    3|2010|    21091|SET/6 WOODLAND PA...|     13099| 0.11|
|    4|2010|    21212|PACK OF 72 RETRO ...|      5258| 0.47|
|    5|2010|    21982|PACK OF 12 SUKI T...|      5570| 0.48|
|    6|2010|    84077|WORLD WAR 2 GLIDE...|      5388|  0.2|
|    7|2010|    21212|PACK OF 72 RETRO ...|      4081| 0.51|
|    8|2010|    21088|SET/6 FRUIT SALAD...|      7131| 0.08|
|    9|2010|    17003| BROCADE RING PURSE |     13817| 0.19|
|   10|2010|    84568|GIRLS ALPHABET IR...|      6336| 0.18|
|   11|2010|    84347|ROTATING SILVER A...|     11474| 1.79|
|   12|2010|    84077|WORLD WAR 2 GLIDE...|      8883| 0.21|
|    1|2011|   85123A|WH

In [29]:
tab7a = spark.sql("SELECT * FROM retail WHERE (StockCode = 84826) and (MONTH(InvoiceDate) = MONTH(DATE('2011-11-01'))) and (YEAR(InvoiceDate) = YEAR(DATE('2011-11-01')))")
tab7a.show()

+-------+---------+--------------------+--------+-------------------+-----+----------+--------------+---------+------------+
|Invoice|StockCode|         Description|Quantity|        InvoiceDate|Price|CustomerID|       Country|ItemTotal|InvoiceTotal|
+-------+---------+--------------------+--------+-------------------+-----+----------+--------------+---------+------------+
| 575336|    84826|ASSTD DESIGN 3D P...|       4|2011-11-09 13:58:00| 0.85|   14968.0|United Kingdom|      3.4|      147.59|
| 575337|    84826|ASSTD DESIGN 3D P...|       5|2011-11-09 14:11:00| 0.85|   17867.0|United Kingdom|     4.25|      224.67|
| 575337|    84826|ASSTD DESIGN 3D P...|       1|2011-11-09 14:11:00| 0.85|   17867.0|United Kingdom|     0.85|      224.67|
| 578841|    84826|ASSTD DESIGN 3D P...|   12540|2011-11-25 15:57:00|  0.0|   13256.0|United Kingdom|      0.0|         0.0|
| 575767|    84826|ASSTD DESIGN 3D P...|       1|2011-11-11 11:11:00| 0.85|   17348.0|United Kingdom|     0.85|      209.73|


In [30]:
tab7b = spark.sql("SELECT * FROM retail WHERE (StockCode = 21088) and (MONTH(InvoiceDate) = MONTH(DATE('2010-08-01'))) and (YEAR(InvoiceDate) = YEAR(DATE('2010-08-01')))")
tab7b.show()

+-------+---------+--------------------+--------+-------------------+-----+----------+--------------+---------+------------+
|Invoice|StockCode|         Description|Quantity|        InvoiceDate|Price|CustomerID|       Country|ItemTotal|InvoiceTotal|
+-------+---------+--------------------+--------+-------------------+-----+----------+--------------+---------+------------+
| 517861|    21088|SET/6 FRUIT SALAD...|       3|2010-08-02 14:07:00| 0.65|   13090.0|United Kingdom|     1.95|      190.12|
| 518505|    21088|SET/6 FRUIT SALAD...|    7128|2010-08-09 13:10:00| 0.08|   14277.0|        France|   570.24|    11880.84|
+-------+---------+--------------------+--------+-------------------+-----+----------+--------------+---------+------------+



In [31]:
tab7c = spark.sql("SELECT * FROM retail WHERE (StockCode = 20993) and (MONTH(InvoiceDate) = MONTH(DATE('2010-01-01'))) and (YEAR(InvoiceDate) = YEAR(DATE('2010-01-01')))")
tab7c.show()

+-------+---------+--------------------+--------+-------------------+-----+----------+--------------+---------+------------+
|Invoice|StockCode|         Description|Quantity|        InvoiceDate|Price|CustomerID|       Country|ItemTotal|InvoiceTotal|
+-------+---------+--------------------+--------+-------------------+-----+----------+--------------+---------+------------+
| 493953|    20993|JAZZ HEARTS MEMO PAD|      48|2010-01-08 14:04:00| 0.19|   14327.0|United Kingdom|     9.12|      328.62|
| 494208|    20993|JAZZ HEARTS MEMO PAD|       1|2010-01-12 11:33:00| 0.85|   17945.0|United Kingdom|     0.85|       137.7|
| 493577|    20993|JAZZ HEARTS MEMO PAD|      48|2010-01-05 11:16:00| 0.19|   13557.0|United Kingdom|     9.12|      311.42|
| 495488|    20993|JAZZ HEARTS MEMO PAD|      12|2010-01-25 12:19:00| 0.85|   14911.0|          EIRE|     10.2|      627.43|
| 494644|    20993|JAZZ HEARTS MEMO PAD|       2|2010-01-17 12:12:00| 0.85|   16595.0|United Kingdom|      1.7|      519.18|


In [32]:
tab7d = spark.sql("SELECT * FROM retail WHERE (StockCode = 37410) and (MONTH(InvoiceDate) = MONTH(DATE('2010-02-01'))) and (YEAR(InvoiceDate) = YEAR(DATE('2010-02-01')))")
tab7d.show()

+-------+---------+--------------------+--------+-------------------+-----+----------+--------------+---------+------------+
|Invoice|StockCode|         Description|Quantity|        InvoiceDate|Price|CustomerID|       Country|ItemTotal|InvoiceTotal|
+-------+---------+--------------------+--------+-------------------+-----+----------+--------------+---------+------------+
| 499486|    37410|BLACK AND WHITE P...|      72|2010-02-28 13:42:00| 0.85|   15061.0|United Kingdom|     61.2|     1422.78|
| 498548|    37410|BLACK AND WHITE P...|      12|2010-02-19 17:40:00| 1.25|   17400.0|United Kingdom|     15.0|      976.01|
| 499456|    37410|BLACK AND WHITE P...|      12|2010-02-28 10:40:00| 1.25|   17819.0|United Kingdom|     15.0|       131.4|
| 497946|    37410|BLACK AND WHITE P...|   19152|2010-02-15 11:57:00|  0.1|   13902.0|       Denmark|   1915.2|     16973.1|
+-------+---------+--------------------+--------+-------------------+-----+----------+--------------+---------+------------+


In [33]:
tab7e = spark.sql("SELECT * FROM retail WHERE (StockCode = 21091) and (MONTH(InvoiceDate) = MONTH(DATE('2010-03-01'))) and (YEAR(InvoiceDate) = YEAR(DATE('2010-03-01')))")
tab7e.show()

+-------+---------+--------------------+--------+-------------------+-----+----------+--------------+---------+------------+
|Invoice|StockCode|         Description|Quantity|        InvoiceDate|Price|CustomerID|       Country|ItemTotal|InvoiceTotal|
+-------+---------+--------------------+--------+-------------------+-----+----------+--------------+---------+------------+
| 500336|    21091|SET/6 WOODLAND PA...|       6|2010-03-07 13:48:00| 0.85|   17589.0|United Kingdom|      5.1|      619.04|
| 500133|    21091|SET/6 WOODLAND PA...|       4|2010-03-04 15:07:00| 0.85|   16329.0|United Kingdom|      3.4|      402.79|
| 499517|    21091|SET/6 WOODLAND PA...|      12|2010-03-01 09:58:00| 0.85|   12598.0|        France|     10.2|      629.39|
| 501652|    21091|SET/6 WOODLAND PA...|       1|2010-03-18 12:47:00| 0.85|   17841.0|United Kingdom|     0.85|      281.03|
| 501652|    21091|SET/6 WOODLAND PA...|       3|2010-03-18 12:47:00| 0.85|   17841.0|United Kingdom|     2.55|      281.03|


In [34]:
tab8 = spark.sql("SELECT Month, Year, StockCode, Description, ROUND(ItemTotal, 2) AS ItemTotal, Quantity, ROUND(ItemTotal/Quantity, 2) AS UnitPrice FROM \
                (SELECT Month, Year, StockCode, Description, ItemTotal, RANK() OVER(PARTITION BY Year, Month ORDER BY ItemTotal DESC) AS Ordinal, Quantity FROM \
                (SELECT MONTH(InvoiceDate) AS Month, YEAR(InvoiceDate) AS Year, StockCode, Description, SUM(ItemTotal) AS ItemTotal, SUM(Quantity) AS Quantity FROM retail \
                 GROUP BY Year, Month, StockCode, Description ORDER BY Year, Month)) WHERE Ordinal = 1")
tab8.show(tab8.count())

+-----+----+---------+--------------------+---------+--------+---------+
|Month|Year|StockCode|         Description|ItemTotal|Quantity|UnitPrice|
+-----+----+---------+--------------------+---------+--------+---------+
|   12|2009|   85123A|WHITE HANGING HEA...| 16417.86|    6204|     2.65|
|    1|2010|   85123A|WHITE HANGING HEA...|  14223.4|    5380|     2.64|
|    2|2010|   85123A|WHITE HANGING HEA...|   7592.2|    2804|     2.71|
|    3|2010|   85123A|WHITE HANGING HEA...| 12540.62|    4718|     2.66|
|    4|2010|    22423|REGENCY CAKESTAND...|  12717.0|    1092|    11.65|
|    5|2010|   85123A|WHITE HANGING HEA...|  11704.4|    4384|     2.67|
|    6|2010|    22423|REGENCY CAKESTAND...| 14902.35|    1290|    11.55|
|    7|2010|    22423|REGENCY CAKESTAND...| 15168.75|    1321|    11.48|
|    8|2010|   85123A|WHITE HANGING HEA...|  11257.2|    4224|     2.67|
|    9|2010|    22423|REGENCY CAKESTAND...| 21332.85|    1867|    11.43|
|   10|2010|    22423|REGENCY CAKESTAND...| 20332.0

In [35]:
tab8a = spark.sql("SELECT StockCode, Description, ROUND(SUM(ItemTotal), 2) AS ItemTotal, SUM(Quantity) AS Quantity, COUNT(DISTINCT CustomerID) AS NumberOfCustomers, COUNT(DISTINCT Invoice) AS NumberOfOrders FROM retail \
                    GROUP BY StockCode, Description ORDER BY ItemTotal DESC LIMIT 10")
tab8a.show()

+---------+--------------------+---------+--------+-----------------+--------------+
|StockCode|         Description|ItemTotal|Quantity|NumberOfCustomers|NumberOfOrders|
+---------+--------------------+---------+--------+-----------------+--------------+
|    22423|REGENCY CAKESTAND...| 269736.7|   23446|             1316|          3659|
|   85123A|WHITE HANGING HEA...|242700.51|   90008|             1494|          5021|
|   85099B|JUMBO BAG RED RET...|134845.16|   74564|              861|          2683|
|    84879|ASSORTED COLOUR B...|126354.18|   79434|             1012|          2669|
|     POST|             POSTAGE| 112249.1|    5078|              496|          1981|
|    47566|       PARTY BUNTING|102686.23|   23335|              894|          2100|
|    22086|PAPER CHAIN KIT 5...| 78366.93|   29001|              896|          1706|
|    79321|       CHILLI LIGHTS| 72229.34|   15591|              306|           942|
|    21137|BLACK RECORD COVE...| 67127.15|   19606|              

In [36]:
tab8b = spark.sql("SELECT StockCode, Description, ROUND(SUM(ItemTotal), 2) AS ItemTotal, SUM(Quantity) AS TotalQuantity, COUNT(DISTINCT CustomerID) AS NumberOfCustomers, COUNT(DISTINCT Invoice) AS NumberOfOrders FROM retail \
                 WHERE Price > 0 AND Quantity > 0 GROUP BY StockCode, Description ORDER BY TotalQuantity LIMIT 10")
tab8b.show()

+---------+--------------------+---------+-------------+-----------------+--------------+
|StockCode|         Description|ItemTotal|TotalQuantity|NumberOfCustomers|NumberOfOrders|
+---------+--------------------+---------+-------------+-----------------+--------------+
|    21772|VINTAGE METAL CAK...|    12.75|            1|                1|             1|
|    22631|CIRCUS PARADE LUN...|     1.95|            1|                1|             1|
|   90014C|SILVER AND BLACK ...|     2.95|            1|                1|             1|
|    47569|ENGLISH ROSE DESI...|     2.55|            1|                1|             1|
|   48173A| DOOR MAT BLUE FLOCK|     6.75|            1|                1|             1|
|   90025B|BAROQUE BUTTERFLY...|     3.75|            1|                1|             1|
|   84845A|HOLLYHOCK SQUARE ...|     1.25|            1|                1|             1|
|    84626|CANDY STRIPE ROSE...|    45.95|            1|                1|             1|
|    35999

In [37]:
tab8c = spark.sql("SELECT StockCode, Description, SUM(Quantity) AS TotalQuantity, COUNT(DISTINCT CustomerID) AS NumberOfCustomers, COUNT(DISTINCT Invoice) AS NumberOfOrders FROM Retail \
                  WHERE Quantity < 0 GROUP BY StockCode, Description ORDER BY NumberOfCustomers DESC, TotalQuantity LIMIT 10")
tab8c.show()

+---------+--------------------+-------------+-----------------+--------------+
|StockCode|         Description|TotalQuantity|NumberOfCustomers|NumberOfOrders|
+---------+--------------------+-------------+-----------------+--------------+
|        M|              Manual|        -5311|              229|           360|
|    22423|REGENCY CAKESTAND...|        -1468|              195|           341|
|    22138|BAKING SET 9 PIEC...|         -861|              181|           208|
|     POST|             POSTAGE|         -255|              155|           178|
|   79323W| WHITE CHERRY LIGHTS|        -1067|              107|           117|
|    21232|STRAWBERRY CERAMI...|         -928|              102|           178|
|   85123A|WHITE HANGING HEA...|        -3632|               99|           133|
|    21843|RED RETROSPOT CAK...|         -506|               84|           105|
|   79323P|  PINK CHERRY LIGHTS|         -693|               78|            86|
|    22960|JAM MAKING SET WI...|        

In [38]:
codes = tab8c.select('StockCode').collect()
codes = [row['StockCode'] for row in codes]
codes = "'" + "', '".join(codes) + "'"
codes

"'M', '22423', '22138', 'POST', '79323W', '21232', '85123A', '21843', '79323P', '22960'"

In [39]:
tab8d = spark.sql(f"SELECT StockCode, Description, SUM(Quantity) AS TotalQuantity, COUNT(DISTINCT CustomerID) AS NumberOfCustomers, COUNT(DISTINCT Invoice) AS NumberOfOrders FROM retail \
                 WHERE Quantity > 0 AND StockCode IN ({codes}) GROUP BY StockCode, Description ORDER BY NumberOfCustomers DESC, TotalQuantity")
tab8d.show()

+---------+--------------------+-------------+-----------------+--------------+
|StockCode|         Description|TotalQuantity|NumberOfCustomers|NumberOfOrders|
+---------+--------------------+-------------+-----------------+--------------+
|   85123A|WHITE HANGING HEA...|        93640|             1490|          4888|
|    22423|REGENCY CAKESTAND...|        24914|             1314|          3318|
|    22138|BAKING SET 9 PIEC...|         9052|             1137|          1742|
|    21232|STRAWBERRY CERAMI...|        34726|              683|          1818|
|    22960|JAM MAKING SET WI...|         8713|              583|           904|
|    21843|RED RETROSPOT CAK...|         4437|              502|           985|
|        M|              Manual|         9810|              443|           626|
|     POST|             POSTAGE|         5333|              405|          1803|
|    21843|RETRO SPOT CAKE S...|         1456|              207|           295|
|   79323W| WHITE CHERRY LIGHTS|        

In [40]:
tab8c = tab8c.select('StockCode', F.col('TotalQuantity').alias('QuantityReturn'), F.col('NumberOfCustomers').alias('CustomersReturned'), F.col('NumberOfOrders').alias('NumberOfReturns'))
tab8d = tab8d.select('StockCode', 'TotalQuantity', 'NumberOfCustomers', 'NumberOfOrders').groupby('StockCode').agg(F.sum('TotalQuantity').alias('QuantitySold'), F.sum('NumberOfCustomers').alias('CustomersPurchased'), F.sum('NumberOfOrders').alias('NumberOfOrders'))
tab8c.show()
tab8d.show()

+---------+--------------+-----------------+---------------+
|StockCode|QuantityReturn|CustomersReturned|NumberOfReturns|
+---------+--------------+-----------------+---------------+
|        M|         -5311|              229|            360|
|    22423|         -1468|              195|            341|
|    22138|          -861|              181|            208|
|     POST|          -255|              155|            178|
|   79323W|         -1067|              107|            117|
|    21232|          -928|              102|            178|
|   85123A|         -3632|               99|            133|
|    21843|          -506|               84|            105|
|   79323P|          -693|               78|             86|
|    22960|          -255|               71|             86|
+---------+--------------+-----------------+---------------+

+---------+------------+------------------+--------------+
|StockCode|QuantitySold|CustomersPurchased|NumberOfOrders|
+---------+------------+---

In [41]:
tab8cd = tab8c.join(tab8d, tab8c.StockCode==tab8d.StockCode).drop(tab8d.StockCode)
tab8cd = tab8cd.withColumn('PercentQuantity', F.round(F.abs(F.col('QuantityReturn'))/F.col('QuantitySold'), 2))\
    .withColumn('PercentCustomer', F.round(F.col('CustomersReturned')/F.col('CustomersPurchased'), 2))\
    .withColumn('PercentOrder', F.round(F.col('NumberOfReturns')/F.col('NumberOfOrders'), 2))\
    .select('Stockcode', 'PercentQuantity', 'PercentCustomer', 'PercentOrder').sort(F.col('PercentQuantity').desc())
tab8cd.show()

+---------+---------------+---------------+------------+
|Stockcode|PercentQuantity|PercentCustomer|PercentOrder|
+---------+---------------+---------------+------------+
|        M|           0.54|           0.52|        0.58|
|   79323W|           0.36|           0.49|        0.37|
|   79323P|           0.31|           0.59|        0.38|
|    22138|            0.1|           0.16|        0.12|
|    21843|           0.09|           0.12|        0.08|
|    22423|           0.06|           0.15|         0.1|
|     POST|           0.05|           0.38|         0.1|
|   85123A|           0.04|           0.07|        0.03|
|    21232|           0.03|           0.14|        0.09|
|    22960|           0.03|           0.12|         0.1|
+---------+---------------+---------------+------------+



In [42]:
tab9 = spark.sql("SELECT COUNT(*) AS NumberOfNonRepeatClients FROM (SELECT CustomerID FROM retail \
                GROUP BY CustomerID HAVING COUNT(DISTINCT Invoice) = 1)")
tab9.show()

+------------------------+
|NumberOfNonRepeatClients|
+------------------------+
|                    1461|
+------------------------+



In [43]:
tab10 = spark.sql("SELECT * FROM retail WHERE Invoice = (SELECT Invoice FROM retail GROUP BY Invoice ORDER BY SUM(ItemTotal) DESC LIMIT 1)")
tab10.show(1, False)

+-------+---------+---------------------------+--------+-------------------+-----+----------+--------------+---------+------------+
|Invoice|StockCode|Description                |Quantity|InvoiceDate        |Price|CustomerID|Country       |ItemTotal|InvoiceTotal|
+-------+---------+---------------------------+--------+-------------------+-----+----------+--------------+---------+------------+
|581483 |23843    |PAPER CRAFT , LITTLE BIRDIE|80995   |2011-12-09 09:15:00|2.08 |16446.0   |United Kingdom|168469.6 |168469.59   |
+-------+---------+---------------------------+--------+-------------------+-----+----------+--------------+---------+------------+



In [44]:
tab10a = spark.sql("SELECT * FROM retail WHERE CustomerID = 16446.0")
tab10a.show()

+-------+---------+--------------------+--------+-------------------+-----+----------+--------------+---------+------------+
|Invoice|StockCode|         Description|Quantity|        InvoiceDate|Price|CustomerID|       Country|ItemTotal|InvoiceTotal|
+-------+---------+--------------------+--------+-------------------+-----+----------+--------------+---------+------------+
| 553573|    22980|PANTRY SCRUBBING ...|       1|2011-05-18 09:52:00| 1.65|   16446.0|United Kingdom|     1.65|         2.9|
| 553573|    22982| PANTRY PASTRY BRUSH|       1|2011-05-18 09:52:00| 1.25|   16446.0|United Kingdom|     1.25|         2.9|
| 581483|    23843|PAPER CRAFT , LIT...|   80995|2011-12-09 09:15:00| 2.08|   16446.0|United Kingdom| 168469.6|   168469.59|
|C525275|  TEST001|This is a test pr...|      -2|2010-10-04 16:38:00|  4.5|   16446.0|United Kingdom|     -9.0|        -9.0|
|C581484|    23843|PAPER CRAFT , LIT...|  -80995|2011-12-09 09:27:00| 2.08|   16446.0|United Kingdom|-168469.6|  -168469.59|


In [45]:
tab11 = spark.sql("SELECT COUNT(DISTINCT CustomerID) AS TotalCustomers, COUNT(DISTINCT StockCode) AS TotalStockCode FROM retail where Quantity > 0")
tab11.show()

+--------------+--------------+
|TotalCustomers|TotalStockCode|
+--------------+--------------+
|          5881|          4631|
+--------------+--------------+



In [46]:
tab11a = spark.sql("SELECT COUNT(DISTINCT *) FROM (SELECT CustomerID FROM retail GROUP BY CustomerID, StockCode HAVING SUM(Quantity) > 0)")
tab11a.show()

+--------------------------+
|count(DISTINCT CustomerID)|
+--------------------------+
|                      5848|
+--------------------------+



In [47]:
tab11b = spark.sql("SELECT DISTINCT CustomerID FROM retail WHERE CustomerID NOT IN (SELECT CustomerID FROM retail GROUP BY CustomerID, StockCode HAVING SUM(Quantity) > 0)")
tab11b.show(tab11b.count())
invalid = tab11b.select('CustomerID').collect()
invalid = [row['CustomerID'] for row in invalid]

+----------+
|CustomerID|
+----------+
|   13749.0|
|   17130.0|
|   16994.0|
|   15383.0|
|   16580.0|
|   13054.0|
|   12918.0|
|   12558.0|
|   16154.0|
|   12706.0|
|   12773.0|
|   13100.0|
|   13231.0|
|   13633.0|
|   13910.0|
|   13829.0|
|   14255.0|
|   14925.0|
|   16512.0|
|   15202.0|
|   14120.0|
|   13378.0|
|   16995.0|
|   14763.0|
|   13290.0|
|   16220.0|
|   17546.0|
|   13664.0|
|   17592.0|
|   17645.0|
|   15760.0|
|   14308.0|
|   17424.0|
|   14914.0|
|   13401.0|
|   13409.0|
|   17943.0|
|   17755.0|
|   15354.0|
|   17013.0|
|   14337.0|
|   12467.0|
|   15357.0|
|   15997.0|
|   15940.0|
|   16252.0|
|   17485.0|
|   12768.0|
|   18274.0|
|   17632.0|
|   13335.0|
|   16151.0|
|   16703.0|
|   18023.0|
|   13342.0|
|   17378.0|
|   17399.0|
|   14802.0|
|   14792.0|
|   14313.0|
|   13463.0|
|   14328.0|
|   16514.0|
|   15896.0|
|   14864.0|
|   14906.0|
|   15767.0|
|   16575.0|
|   14832.0|
|   14939.0|
|   14781.0|
|   17445.0|
|   17661.0|
|   12382.0|

In [48]:
spark.catalog.dropTempView("retail")

In [98]:
df2 = df1.where(~df1.CustomerID.isin(*invalid))
df2.count()

823678

In [50]:
pos_udf = F.udf(lambda x: x if x and x > 0 else 0, IntegerType())
dfsc = df2.withColumn('Qty', pos_udf(F.sum('Quantity').over(Window().partitionBy('CustomerID', 'StockCode'))))\
        .groupby('CustomerID').pivot('StockCode').agg({'Qty':'max'})
dfsc.cache()
dfsc.show()

+----------+-----+-----+-----+-----+------+------+------+------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+-------+------+------+------+------+------+------+------+-----+-----+-----+-----+-----+-----+-----+------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+------+-----+------+------+------+------+-----+-----+-----+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+-----+------+------+-----+-----+-----+-----+------+------+------+------+------+------+------+------+------+------+------

In [51]:
import calendar as cal
import functools

pos_udf = F.udf(lambda x: x if x and x > 0 else 0.0, FloatType())
dfmt = df2.withColumn('TotalByMonth', pos_udf(F.round(F.sum('ItemTotal').over(Window().partitionBy('CustomerID', F.month('InvoiceDate'))), 2)))\
    .withColumn('Month', F.date_format('InvoiceDate', 'MM'))\
    .groupby('CustomerID').pivot('Month').agg({'TotalByMonth':'max'})

oldcols = dfmt.schema.names
newcols = [cal.month_abbr[i] for i in range(1, 13)]
newcols.insert(0, 'CustomerID')

dfmt = functools.reduce(lambda dfmt, idx: dfmt.withColumnRenamed(oldcols[idx], newcols[idx]), range(13), dfmt)
dfmt.show()

+----------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|CustomerID|    Jan|    Feb|    Mar|    Apr|    May|    Jun|    Jul|    Aug|    Sep|    Oct|    Nov|    Dec|
+----------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|   12535.0|   null|   null|   null|   null|   null| 371.45|   null|   null|  344.9|   null|   null|   null|
|   13259.0| 337.28|  138.3|   null| 177.98| 173.45|   null|   null|   null|   null| 292.32| 314.05|   null|
|   17966.0| 392.42|   null| 439.47|  191.1|   null|  136.5| 310.93|   null|   null|   null|  504.0|   null|
|   16939.0|   null| 170.35| 156.74|   null|   null|   null|   null| 121.91|   null|   null|   null|   null|
|   12777.0|   null|   null|   null|   null|   null|   null|   null|   null| 519.45|   null|   null|   null|
|   17955.0|   null|  392.6|   null|   null|  557.3|   null|   null|   null|   null|   null|   null|   null|
|   13178.0| 595.65

In [99]:
df2 = df2.withColumn('NumberOfOrders',F.size(F.collect_set('Invoice').over(Window().partitionBy('CustomerID'))))\
    .withColumn('TotalNumItem', F.size(F.collect_set('StockCode').over(Window().partitionBy('CustomerID'))))\
    .withColumn('AverageItemPrice', F.round((F.sum('ItemTotal').over(Window().partitionBy('CustomerID')))/(F.sum('Quantity').over(Window().partitionBy('CustomerID'))), 2))
df2.show()

+-------+---------+--------------------+--------+-------------------+-----+----------+-------+---------+------------+--------------+------------+----------------+
|Invoice|StockCode|         Description|Quantity|        InvoiceDate|Price|CustomerID|Country|ItemTotal|InvoiceTotal|NumberOfOrders|TotalNumItem|AverageItemPrice|
+-------+---------+--------------------+--------+-------------------+-----+----------+-------+---------+------------+--------------+------------+----------------+
| 566074|    22624|IVORY KITCHEN SCALES|       2|2011-09-09 08:48:00|  8.5|   12535.0| France|     17.0|       344.9|             2|          28|            3.27|
| 566074|    22138|BAKING SET 9 PIEC...|       6|2011-09-09 08:48:00| 4.95|   12535.0| France|     29.7|       344.9|             2|          28|            3.27|
| 566074|   90184C|BLACK CHUNKY BEAD...|       4|2011-09-09 08:48:00| 8.95|   12535.0| France|     35.8|       344.9|             2|          28|            3.27|
| 566074|    21155|RED

In [100]:
df2 = df2.withColumn('TopOrder', (F.max('InvoiceTotal').over(Window().partitionBy('CustomerID'))))\
    .withColumn('AverageOrder', F.round((F.sum('ItemTotal').over(Window().partitionBy('CustomerID')))/F.col('NumberOfOrders'),2))\
    .withColumn('AverageQuantityPerOrder', F.round((F.sum('Quantity').over(Window().partitionBy('CustomerID')))/(F.col('NumberOfOrders')),1))
df2.show()

+-------+---------+--------------------+--------+-------------------+-----+----------+-------+---------+------------+--------------+------------+----------------+--------+------------+-----------------------+
|Invoice|StockCode|         Description|Quantity|        InvoiceDate|Price|CustomerID|Country|ItemTotal|InvoiceTotal|NumberOfOrders|TotalNumItem|AverageItemPrice|TopOrder|AverageOrder|AverageQuantityPerOrder|
+-------+---------+--------------------+--------+-------------------+-----+----------+-------+---------+------------+--------------+------------+----------------+--------+------------+-----------------------+
| 566074|    22624|IVORY KITCHEN SCALES|       2|2011-09-09 08:48:00|  8.5|   12535.0| France|     17.0|       344.9|             2|          28|            3.27|  371.45|      358.17|                  109.5|
| 566074|    22138|BAKING SET 9 PIEC...|       6|2011-09-09 08:48:00| 4.95|   12535.0| France|     29.7|       344.9|             2|          28|            3.27|  

In [101]:
restofeurope = ['Austria', 'Belgium', 'Cyprus', 'Czech Republic', 'Denmark', 'EIRE', 'European Community','Finland', 'France', 'Germany', 'Greece', 'Iceland', 'Italy', 'Lithuania', 'Malta', 'Netherlands', 'Norway', 'Poland', 'Portugal', 'Spain', 'Sweden', 'Switzerland']
df2 = df2.withColumn('Region', F.when((F.last('Country').over(Window().partitionBy('CustomerID'))) == 'United Kingdom', 1)\
                .when(F.last('Country').over(Window().partitionBy('CustomerID')).isin(restofeurope), 2)\
               .otherwise(3))
df2.show()

+-------+---------+--------------------+--------+-------------------+-----+----------+-------+---------+------------+--------------+------------+----------------+--------+------------+-----------------------+------+
|Invoice|StockCode|         Description|Quantity|        InvoiceDate|Price|CustomerID|Country|ItemTotal|InvoiceTotal|NumberOfOrders|TotalNumItem|AverageItemPrice|TopOrder|AverageOrder|AverageQuantityPerOrder|Region|
+-------+---------+--------------------+--------+-------------------+-----+----------+-------+---------+------------+--------------+------------+----------------+--------+------------+-----------------------+------+
| 566074|    22624|IVORY KITCHEN SCALES|       2|2011-09-09 08:48:00|  8.5|   12535.0| France|     17.0|       344.9|             2|          28|            3.27|  371.45|      358.17|                  109.5|     2|
| 566074|    22138|BAKING SET 9 PIEC...|       6|2011-09-09 08:48:00| 4.95|   12535.0| France|     29.7|       344.9|             2|    

In [102]:
dfcs = df2.groupBy('CustomerID').agg({'NumberOfOrders':'max', 'TotalNumItem':'max', 'AverageItemPrice':'max', 'TopOrder':'max', 'AverageOrder':'max', 'AverageQuantityPerOrder':'max', 'Region':'max'})

oldcols = ['CustomerID', 'max(NumberOfOrders)', 'max(TotalNumItem)', 'max(AverageItemPrice)', 'max(TopOrder)', 'max(AverageOrder)', 'max(AverageQuantityPerOrder)', 'max(Region)']
newcols = ['CustomerID', 'NumberOfOrders', 'TotalNumItem', 'AverageItemPrice', 'TopOrder', 'AverageOrder', 'AverageQuantityPerOrder', 'Region']

dfcs = functools.reduce(lambda dfcs, idx: dfcs.withColumnRenamed(oldcols[idx], newcols[idx]), range(8), dfcs)
dfcs.show()

+----------+--------+------+--------------+------------+----------------+------------+-----------------------+
|CustomerID|TopOrder|Region|NumberOfOrders|TotalNumItem|AverageItemPrice|AverageOrder|AverageQuantityPerOrder|
+----------+--------+------+--------------+------------+----------------+------------+-----------------------+
|   12535.0|  371.45|     2|             2|          28|            3.27|      358.17|                  109.5|
|   12777.0|  519.45|     1|             1|          26|            1.13|      519.45|                  458.0|
|   12891.0|   204.0|     1|            12|           2|            0.36|       68.29|                  189.6|
|   12985.0|  754.38|     1|             3|          73|            0.86|      405.21|                  471.0|
|   13067.0|   251.4|     1|             3|          24|            3.88|       169.5|                   43.7|
|   13178.0|  1258.2|     1|            25|         195|             1.6|      559.92|                  350.0|
|