# 0. Instructions


### 1/ Description

The goal here is to get insights from a dataset by using Spark, and write the analysis and additional information into a brief report.

### 2/ Contents Organization

The content of the attached report is as follows:
- Background/scenario description (what)
- Goal of the analysis (why)
- Analysis deep dive (how)
- Conclusions (insights)

## 1. PySpark environment setup

In [2]:
import findspark 
findspark.init() 

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession 


sc = SparkContext.getOrCreate()
spark = SparkSession(sc) 

## 2. Data source and Spark data abstraction (DataFrame) setup

In [3]:
ecomDF = spark.read \
                 .option("inferSchema", "true") \
                 .option("header", "true") \
                 .csv("ecommerce.csv") 

## 3. Data set metadata analysis

### A. An Introduction to the Dataset

This dataset comes from the UCI Machine Learning Repository that has made this rare e-commerce dataset publicly available. It can be found here: https://archive.ics.uci.edu/ml/datasets/Online+Retail+II.

##### Description: 
This dataset contains all the transactionsfor an Online Retail business registered in the United Kingdom. The transactions took place from the 1st of December 2009 to the 9th of December 2011. The main products sold by the company are unique all-occasion gift-ware. And also, many customers of this e-commerce are wholesalers.


##### The variables information: 

**InvoiceNo**: a nominal Invoice number. It is a 6-digit integral number uniquely assigned to each transaction. If this code starts with the letter 'c', it indicates a cancellation.

**StockCode**: the nominal product (item) code. A 5-digit integral number uniquely assigned to each distinct product.

**Description**: the nominal product (item) name. 

**Quantity**: the quantities of each product (item) per transaction (numeric).

**InvoiceDate**: the invoice date and time (numeric). The day and time when a transaction was generated.

**UnitPrice**: the unit price (numeric). Product price per unit (in sterling pound).

**CustomerID**: a nominal customer number. A 5-digit integral number uniquely assigned to each customer.

**Country**: the nominal country name. The name of the country where a customer resides.


### B. Display schema and size of the DataFrame

In [4]:
from IPython.display import display, Markdown

ecomDF.printSchema() 
display(Markdown("This DataFrame has **%d rows**." % ecomDF.count()))


root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



This DataFrame has **541909 rows**.

### C. Get one or multiple random samples from the data set

In [5]:
ecomDF.cache()
ecomDF.sample(False, 0.1).take(5)

[Row(InvoiceNo='536367', StockCode='21755', Description='LOVE BUILDING BLOCK WORD', Quantity=3, InvoiceDate='12/1/2010 8:34', UnitPrice=5.95, CustomerID=13047, Country='United Kingdom'),
 Row(InvoiceNo='536370', StockCode='22544', Description='MINI JIGSAW SPACEBOY', Quantity=24, InvoiceDate='12/1/2010 8:45', UnitPrice=0.42, CustomerID=12583, Country='France'),
 Row(InvoiceNo='536375', StockCode='84029G', Description='KNITTED UNION FLAG HOT WATER BOTTLE', Quantity=6, InvoiceDate='12/1/2010 9:32', UnitPrice=3.39, CustomerID=17850, Country='United Kingdom'),
 Row(InvoiceNo='536378', StockCode='22386', Description='JUMBO BAG PINK POLKADOT', Quantity=10, InvoiceDate='12/1/2010 9:37', UnitPrice=1.95, CustomerID=14688, Country='United Kingdom'),
 Row(InvoiceNo='536378', StockCode='21977', Description='PACK OF 60 PINK PAISLEY CAKE CASES', Quantity=24, InvoiceDate='12/1/2010 9:37', UnitPrice=0.55, CustomerID=14688, Country='United Kingdom')]

### D. Data entities, metrics and dimensions

I've identified the following elements:

* **Entities:** Sales transactions (main one measured - facts), InvoiceNo (dimension), CustomerID (dimension)
* **Metrics:** Quantity, InvoiceDate, UnitPrice
* **Dimensions:** StockCode, Description, Country

### E. Column categorization

The following could be a potential column categorization:

* **Timing related columns:** *InvoiceDate*
* **Sales related columns:** *InvoiceNo*, *StockCode*, *Description*, *Quantity*, *UnitPrice*
* **Customer related columns:** *CustomerID*, *Country*

(the data will be enriched before jumping to Data Profiling, and I will add more Timing related columns)

## 4. Time Data Expansion

Before the data profiling, I believe it is appropriate to perform some expansion of our data, solely based on the columns we have. These expansions will mainly be on the time column. I would like to expand it to be able to perform better analysis later on. I thus expand the original InvoiceDate column into: date, month, week, day, hour, period of the day.

In [6]:
from pyspark.sql.functions import split, count, when, col

split_col = split(ecomDF['InvoiceDate'], ' ')
ecomDF = ecomDF.withColumn('Date', split_col.getItem(0))
ecomDF = ecomDF.withColumn('Time', split_col.getItem(1))

#checking if the column were created:
ecomDF.show(2)

#checking if they are correctly filled:
print("Checking for nulls on columns Date and Time as compared to InvoiceDate:")
ecomDF.select([count(when(col(c).isNull(), c)).alias(c) for c in ["Date","Time", "InvoiceDate"]]).show()

ecomDF.printSchema()


+---------+---------+--------------------+--------+--------------+---------+----------+--------------+---------+----+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|     Date|Time|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+---------+----+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|12/1/2010|8:26|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|12/1/2010|8:26|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+---------+----+
only showing top 2 rows

Checking for nulls on columns Date and Time as compared to InvoiceDate:
+----+----+-----------+
|Date|Time|InvoiceDate|
+----+----+-----------+
|   0|   0|          0|
+----+----+-----------+

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (null

In [7]:
#we see that the type of Date is a string, so to be able to add the column "DayOfWeek", we have to transform it to a Date type and apply the function
from datetime import datetime
from pyspark.sql.functions import col, udf, dayofweek, date_format, year, month, dayofmonth
from pyspark.sql.types import DateType, IntegerType

# This function converts the string cell into a date:
func =  udf (lambda x: datetime.strptime(x, '%m/%d/%Y'), DateType())

ecomDF1 = ecomDF.withColumn('DateFull', func(col('Date')))

#We want to add to our main dataframe a column containing the day of the week:
ecomDF1=ecomDF1.select('DateFull' , "Time","InvoiceNo","StockCode", "Description","Quantity","UnitPrice","CustomerID","Country",
               date_format('Datefull', 'u').alias('DayOfWeekN'), date_format('DateFull', 'E').alias('DayOfWeekS'))


#I also add the year, month, and day numbers coming from the date:
ecomDF1= ecomDF1.select('DateFull' , "Time","InvoiceNo","StockCode", "Description","Quantity","UnitPrice",
                        "CustomerID","Country", "DayofWeekN", "DayofWeekS",
    year("DateFull").alias('year'), 
    month("DateFull").alias('month'), 
    dayofmonth("DateFull").alias('day'))

#I reorder the columns to have the time related ones next to each other:
ecomDF1=ecomDF1.select("DateFull" ,"year","month","day","DayofWeekN", "DayofWeekS", "Time","InvoiceNo","StockCode", "Description","Quantity","UnitPrice",
                        "CustomerID","Country")

#and finally changing the DafofWeekN into an integer because it was given as a string type:
ecomDF1 = ecomDF1.withColumn("DayofWeekN", ecomDF1["DayofWeekN"].cast(IntegerType()))
ecomDF1.show(5)
ecomDF1.printSchema()


+----------+----+-----+---+----------+----------+----+---------+---------+--------------------+--------+---------+----------+--------------+
|  DateFull|year|month|day|DayofWeekN|DayofWeekS|Time|InvoiceNo|StockCode|         Description|Quantity|UnitPrice|CustomerID|       Country|
+----------+----+-----+---+----------+----------+----+---------+---------+--------------------+--------+---------+----------+--------------+
|2010-12-01|2010|   12|  1|         3|       Wed|8:26|   536365|   85123A|WHITE HANGING HEA...|       6|     2.55|     17850|United Kingdom|
|2010-12-01|2010|   12|  1|         3|       Wed|8:26|   536365|    71053| WHITE METAL LANTERN|       6|     3.39|     17850|United Kingdom|
|2010-12-01|2010|   12|  1|         3|       Wed|8:26|   536365|   84406B|CREAM CUPID HEART...|       8|     2.75|     17850|United Kingdom|
|2010-12-01|2010|   12|  1|         3|       Wed|8:26|   536365|   84029G|KNITTED UNION FLA...|       6|     3.39|     17850|United Kingdom|
|2010-12-01|2

## 5. Columns groups basic profiling to better understand our data set
### A. Timing related columns basic profiling

We run profiling on the time related columns to see if there are any irregularities we could find:


In [8]:
from IPython.display import display, Markdown 
from pyspark.sql.functions import when, count, col, countDistinct, desc, first, lit 


print ("Here are the summary of the columns year, month, and DayofWeekN:")
ecomDF1.select("year","month","day","DayofWeekN").summary().show()

Here are the summary of the columns year, month, and DayofWeekN:
+-------+-------------------+-----------------+------------------+------------------+
|summary|               year|            month|               day|        DayofWeekN|
+-------+-------------------+-----------------+------------------+------------------+
|  count|             541909|           541909|            541909|            541909|
|   mean| 2010.9216086095637|7.553127923691985|15.023096128685813|  3.43127720705875|
| stddev|0.26878674384472484|3.509055367918596| 8.664062753327219|1.8447086898496354|
|    min|               2010|                1|                 1|                 1|
|    25%|               2011|                5|                 7|                 2|
|    50%|               2011|                8|                15|                 3|
|    75%|               2011|               11|                22|                 5|
|    max|               2011|               12|                31|         

OBSERVATIONS:
- We see that those 5 time-related columns behave normally
- For now we can say that we have data for 2 years: 2010 and 2011
- The month, day, and day of the week columns seem to be correct too

We can now check for any nulls in these columns:

In [9]:
print("We now look for nulls in the columns year, month, and DayofWeekN:")
ecomDF1.select([count(when(col(c).isNull(), c)).alias(c) for c in ["year","month","day","DayofWeekN"]]).show()

print("And also for the distinct values in those columns:")
ecomDF1.select([countDistinct(c).alias(c) for c in ["year","month","day","DayofWeekN"]]).show()

We now look for nulls in the columns year, month, and DayofWeekN:
+----+-----+---+----------+
|year|month|day|DayofWeekN|
+----+-----+---+----------+
|   0|    0|  0|         0|
+----+-----+---+----------+

And also for the distinct values in those columns:
+----+-----+---+----------+
|year|month|day|DayofWeekN|
+----+-----+---+----------+
|   2|   12| 31|         6|
+----+-----+---+----------+



OBSERVATIONS:

- All our columns are complete

NOTE: here we do not expect to have nulls in all thse columns if the DateFull column has no nulls. Indeed, all these columns are derived from it and thus this is used here as a way to make sure that the transformation above were correctly implemented.

We can now derive some basic insights that could help us in answering the main questions later on:

In [10]:
print ("Let's look at the most and least frequent occurrences for the days in regards to month and week:")
dayofMonthOccurrencesDF = ecomDF1.groupBy("day").agg(count(lit(1)).alias("Total")) #grouping by day of month
dayOfWeekDF = ecomDF1.groupBy("DayofWeekN").agg(count(lit(1)).alias("Total"))
#calculate most and least occurences. Grouping by day of month and counting how many day of months appear in the dataset

TheleastFreqDayOfMonth    = dayofMonthOccurrencesDF.orderBy(col("Total").asc()).first()
ThemostFreqDayOfMonth     = dayofMonthOccurrencesDF.orderBy(col("Total").desc()).first()
TheleastFreqDayOfWeek     = dayOfWeekDF.orderBy(col("Total").asc()).first()
ThemostFreqDayOfWeek      = dayOfWeekDF.orderBy(col("Total").desc()).first()

#displaying iy with the Markdown function:
display(Markdown("""
| %s | %s | %s | %s |
|----|----|----|----|
| %s | %s | %s | %s |
""" % ("leastFreqDayOfMonth", "mostFreqDayOfMonth", "leastFreqDayOfWeek", "mostFreqDayOfWeek", \
       "%d (%d occurrences)" % (TheleastFreqDayOfMonth["day"], TheleastFreqDayOfMonth["Total"]), \
       "%d (%d occurrences)" % (ThemostFreqDayOfMonth["day"], ThemostFreqDayOfMonth["Total"]), \
       "%d (%d occurrences)" % (TheleastFreqDayOfWeek["DayofWeekN"], TheleastFreqDayOfWeek["Total"]), \
       "%d (%d occurrences)" % (ThemostFreqDayOfWeek["DayofWeekN"], ThemostFreqDayOfWeek["Total"]))))

Let's look at the most and least frequent occurrences for the days in regards to month and week:



| leastFreqDayOfMonth | mostFreqDayOfMonth | leastFreqDayOfWeek | mostFreqDayOfWeek |
|----|----|----|----|
| 31 (10518 occurrences) | 8 (24658 occurrences) | 7 (64375 occurrences) | 4 (103857 occurrences) |


OBSERVATION:

- We can directly conclude that the business has the least transactions on Sundays and the most on Thursdays
- And they also have probably less transactions towards the end of the months as compared to the beginning

### B. Sales related columns basic profiling

Let us now turn to the sales related columns and see what they tell us: *InvoiceNo*, *StockCode*, *Description*, *Quantity*, *UnitPrice*

In [11]:
from IPython.display import display, Markdown
from pyspark.sql.functions import when, count, col, countDistinct, desc, first

print ("Like above, we start with a summary of the columns. Here, we focus on the integer columns Quantity, and UnitPrice:")
ecomDF1.select("Quantity", "UnitPrice").summary().show()

Like above, we start with a summary of the columns. Here, we focus on the integer columns Quantity, and UnitPrice:
+-------+------------------+-----------------+
|summary|          Quantity|        UnitPrice|
+-------+------------------+-----------------+
|  count|            541909|           541909|
|   mean|  9.55224954743324|4.611113626083471|
| stddev|218.08115785023355|96.75985306117803|
|    min|            -80995|        -11062.06|
|    25%|                 1|             1.25|
|    50%|                 3|             2.08|
|    75%|                10|             4.13|
|    max|             80995|          38970.0|
+-------+------------------+-----------------+



OBSERVATIONS:

- It seems like we have some outliers in both columns. It is not very logical to have negative quantities and prices (or they could be returns by customers, this is to further analayse)
- The maximum values too seem to be outliers, and maybe mistakes. It is not very logical to have an item's price at 38,970 when the average price of an item is around 4.6 and the median 4.13. 

We can now check for nulls and distinct values in the columns:

In [12]:
print("Now Looking for nulls in the columns ,InvoiceNo, StockCode, Description, Quantity, and UnitPrice:")
ecomDF1.select([count(when(col(c).isNull(), c)).alias(c) for c in ["InvoiceNo", "StockCode", "Description", "Quantity", "UnitPrice"]]).show()

print("And here the amount of distinct values in the columns:")
ecomDF1.select([countDistinct(c).alias(c) for c in ["InvoiceNo", "StockCode", "Description", "Quantity", "UnitPrice"]]).show()


Now Looking for nulls in the columns ,InvoiceNo, StockCode, Description, Quantity, and UnitPrice:
+---------+---------+-----------+--------+---------+
|InvoiceNo|StockCode|Description|Quantity|UnitPrice|
+---------+---------+-----------+--------+---------+
|        0|        0|       1454|       0|        0|
+---------+---------+-----------+--------+---------+

And here the amount of distinct values in the columns:
+---------+---------+-----------+--------+---------+
|InvoiceNo|StockCode|Description|Quantity|UnitPrice|
+---------+---------+-----------+--------+---------+
|    25900|     4070|       4223|     722|     1630|
+---------+---------+-----------+--------+---------+



OBSERVATIONS:

- We have a few missing descriptions, but the number is minimal when compared to the total number of rows (541,909)
- It seems that we have 25,900 sales transactions
- It also seems that the company has a wide variety of products with 4,070 different codes

We can now take a look at the most frequent item sold according to StockCode:

In [13]:
print ("Let's look at the most and least frequent occurrences for sales of products according to StockCode:")
StockCodeDF = ecomDF1.groupBy("StockCode").agg(count(lit(1)).alias("Total"))

leastFreqStockCode    = StockCodeDF.orderBy(col("Total").asc()).first()
mostFreqStockCode     = StockCodeDF.orderBy(col("Total").desc()).first()


display(Markdown("""
| %s | %s |
|----|----|
| %s | %s |
""" % ("leastFreqStockCode", "mostFreqStockCode", \
       "%s (%d occurrences)" % (leastFreqStockCode["StockCode"], leastFreqStockCode["Total"]), \
       "%s (%d occurrences)" % (mostFreqStockCode["StockCode"], mostFreqStockCode["Total"]))))

Let's look at the most and least frequent occurrences for sales of products according to StockCode:



| leastFreqStockCode | mostFreqStockCode |
|----|----|
| 84899F (1 occurrences) | 85123A (2313 occurrences) |


### C. Customer related columns

Finally, we look at the customer related columns and see what they tell us: *CustomerID*, *Country*.

Let's first look at the nulls and distinct values:

In [14]:
print("Checking for nulls in columns CustomerID,and Country:")
ecomDF1.select([count(when(col(c).isNull(), c)).alias(c) for c in ["CustomerID", "Country"]]).show()

print("Checking amount of distinct values in columns CustomerID, and Country:")
ecomDF1.select([countDistinct(c).alias(c) for c in ["CustomerID", "Country"]]).show()

Checking for nulls in columns CustomerID,and Country:
+----------+-------+
|CustomerID|Country|
+----------+-------+
|    135080|      0|
+----------+-------+

Checking amount of distinct values in columns CustomerID, and Country:
+----------+-------+
|CustomerID|Country|
+----------+-------+
|      4372|     38|
+----------+-------+



Here, we encounter the first problem with the dataset, we have 135,080 transactions with a missing customer ID. This represents around 27% of the rows, which is a significant amount. 

For now we could keep them in the data for the analyses of the first elements.
However, when answering questions about the customers, it would be reasonable to exclude these columns for the specific analysis.

Other OBSERVATIONS:

- The business has 4,372 registered customers coming from 38 countries

Let's try analysing now the most and least frequent countries of purshase and customers:


In [15]:
print ("Most and least frequent occurrences for Country, and CustomerID:")
CountryDF = ecomDF1.groupBy("Country").agg(count(lit(1)).alias("Total"))
CustomerIDDF   = ecomDF1.groupBy("CustomerID").agg(count(lit(1)).alias("Total"))


leastFreqCountry    = CountryDF.orderBy(col("Total").asc()).first()
mostFreqCountry     = CountryDF.orderBy(col("Total").desc()).first()
leastFreqCustomerID      = CustomerIDDF.orderBy(col("Total").asc()).first()
mostFreqCustomerID       = CustomerIDDF.orderBy(col("Total").desc()).first()


display(Markdown("""
| %s | %s | %s | %s |
|----|----|----|----|
| %s | %s | %s | %s |
""" % ("leastFreqCountry", "mostFreqCountry", "leastFreqCustomerID", "mostFreqCustomerID", \
       "%s (%d occurrences)" % (leastFreqCountry["Country"], leastFreqCountry["Total"]), \
       "%s (%d occurrences)" % (mostFreqCountry["Country"], mostFreqCountry["Total"]), \
       "%s (%d occurrences)" % (leastFreqCustomerID["CustomerID"], leastFreqCustomerID["Total"]), \
       "%s (%d occurrences)" % (mostFreqCustomerID["CustomerID"], mostFreqCustomerID["Total"]))))

Most and least frequent occurrences for Country, and CustomerID:



| leastFreqCountry | mostFreqCountry | leastFreqCustomerID | mostFreqCustomerID |
|----|----|----|----|
| Saudi Arabia (10 occurrences) | United Kingdom (495478 occurrences) | 15070 (1 occurrences) | None (135080 occurrences) |


OBSERVATION:

- We see that most transactions happened in the UK, 495,478 out of 541,909 or around 91%.
- It looks like we have big customers, like the most repeated customer in 135,080 transactions.

Also, an important observation is that the descriptions of the products are not unified and thus it would be hard to explore product-related questions.


### D. A Filtered dataset

Here we filter the dataset from the observations drawn in this profiling step. This allows us to have clean data to use in our analysis when we will analyse customers information.

In [102]:
# We first remove the null values from the customer column:
ecomDF0=ecomDF1.where(col("CustomerID").isNotNull())

#We also remove the null values from the description column:
ecomDF0=ecomDF0.where(col("Description").isNotNull())

#We remove the negative values

After this basic profiling, and with a good understanding of our data, we can now jump into analysing and try answering some business questions relevant to this e-commerce!

# 6. Getting some Insights to make the e-commerce grow

## A. Revenues and number of orders, how good are they?

In [220]:
from pyspark.sql.functions import sum, round, countDistinct

ecomDF2=ecomDF1.withColumn("Total_Spent", col("UnitPrice")*col("Quantity"))

print("Total revenues for the period were:")
totRev=ecomDF2.select(round(sum("Total_Spent")).alias("Total Revenues"))
totRev.show()

print("And this could be broken down to revenues by country:")
revByCountry=ecomDF2.groupBy("Country")\
        .agg(sum("Total_spent"))

revByCountry=revByCountry.withColumn("PercentageOfTotRevenues", col("sum(Total_spent)")*100/totRev.collect()[0][0])
revByCountry=revByCountry.sort(col("sum(Total_spent)").desc())
revByCountry.select("Country",round("sum(Total_spent)").alias("Total Revenues by Country"), round("PercentageOfTotRevenues",3).alias("% of Total Revenues")).show(50)


print("The total number of orders were:")
ecomDF1.select(countDistinct("InvoiceNo").alias("Total Orders")).show()

print("And this could be broken down to orders by country:")
ordersByCountry=ecomDF1.groupBy("Country")\
        .agg(countDistinct("InvoiceNo"))

ordersByCountry=ordersByCountry.withColumn("PercentageOfTotOrders", col("count(DISTINCT InvoiceNo)")*100/25900)
ordersByCountry=ordersByCountry.sort(col("count(DISTINCT InvoiceNo)").desc())
ordersByCountry.select("Country",col("count(DISTINCT InvoiceNo)").alias("Total Orders by Country"), round("PercentageOfTotOrders",3).alias("% of Total Orders")).show(50)

Total revenues for the period were:
+--------------+
|Total Revenues|
+--------------+
|     9747748.0|
+--------------+

And this could be broken down to revenues by country:
+--------------------+-------------------------+-------------------+
|             Country|Total Revenues by Country|% of Total Revenues|
+--------------------+-------------------------+-------------------+
|      United Kingdom|                8187806.0|             83.997|
|         Netherlands|                 284662.0|               2.92|
|                EIRE|                 263277.0|              2.701|
|             Germany|                 221698.0|              2.274|
|              France|                 197404.0|              2.025|
|           Australia|                 137077.0|              1.406|
|         Switzerland|                  56385.0|              0.578|
|               Spain|                  54775.0|              0.562|
|             Belgium|                  40911.0|               0.

In [226]:
from pyspark.sql.functions import concat, count
cancel=ecomDF.withColumn('can', concat(ecomDF.InvoiceNo.substr(0,1)))

cancel=cancel.where(col("can")=="C")

print("The number of cancelled items are:")
cancel.select(count(col("can")).alias("Number of cancelled transactions")).show()

The number of cancelled items are:
+--------------------------------+
|Number of cancelled transactions|
+--------------------------------+
|                            9288|
+--------------------------------+



# B. When do people shop?

In [83]:
from pyspark.sql.functions import split
from pyspark.sql.types import IntegerType

print("Bin hours of day into 6 categories and find most amount of traffic + percentages:")
split_col = split(ecomDF1['Time'], ':')
hours = ecomDF1.withColumn('Timing', split_col.getItem(0))


hoursDF = hours.withColumn("TimeH", hours["Timing"].cast(IntegerType()))

totalTr = hoursDF.count()
hoursCategorizationDF = hoursDF\
   .withColumn("Period", when((col("TimeH")>=0) & (col("TimeH")<4),"Very late night")\
                               .when((col("TimeH")>=4) & (col("TimeH")<8),"Early morning")\
                               .when((col("TimeH")>=8) & (col("TimeH")<12),"Morning")\
                               .when((col("TimeH")>=12) & (col("TimeH")<16),"Early afternoon")\
                               .when((col("TimeH")>=16) & (col("TimeH")<20),"Late afternoon")\
                               .otherwise("Night"))

hoursCategorizationDF.select("Period")\
                     .groupBy("Period")\
                     .agg(count("Period").alias("NumTransactions"), \
                          (count("Period")/totalTr*100).alias("Ratio"))\
                     .sort(col("NumTransactions").desc())\
                     .select("Period","NumTransactions",round("Ratio",2).alias("RoundedRatio")).show()

Bin hours of day into 6 categories and find most amount of traffic + percentages:
+---------------+---------------+------------+
|         Period|NumTransactions|RoundedRatio|
+---------------+---------------+------------+
|Early afternoon|         295958|       54.61|
|        Morning|         149952|       27.67|
| Late afternoon|          94704|       17.48|
|          Night|            871|        0.16|
|  Early morning|            424|        0.08|
+---------------+---------------+------------+



In [79]:
from pyspark.sql.functions import sum, round

ecomDF2=ecomDF1.withColumn("Total_Spent", col("UnitPrice")*col("Quantity"))

totRev=ecomDF2.select(round(sum("Total_Spent")).alias("Total Revenues"))

print("Revenues broken down by month:")
revBymonth=ecomDF2.groupBy("month")\
        .agg(sum("Total_spent"))

revBymonth=revBymonth.withColumn("PercentageOfTotRevenues", col("sum(Total_spent)")*100/totRev.collect()[0][0])
revBymonth=revBymonth.sort(col("sum(Total_spent)").desc())
revBymonth.select("Month",round("sum(Total_spent)").alias("Total Revenues by Month"), round("PercentageOfTotRevenues",3).alias("% of Total Revenues")).show()

print("Orders broken down by month:")
ordersBymonth=ecomDF1.groupBy("month")\
        .agg(countDistinct("InvoiceNo"))

ordersBymonth=ordersBymonth.withColumn("PercentageOfTotOrders", col("count(DISTINCT InvoiceNo)")*100/25900)
ordersBymonth=ordersBymonth.sort(col("count(DISTINCT InvoiceNo)").desc())
ordersBymonth.select("month",col("count(DISTINCT InvoiceNo)").alias("Total Orders per month"), round("PercentageOfTotOrders",3).alias("% of Total Orders")).show()



Revenues broken down by month:
+-----+-----------------------+-------------------+
|Month|Total Revenues by Month|% of Total Revenues|
+-----+-----------------------+-------------------+
|   11|              1461756.0|             14.996|
|   12|              1182625.0|             12.132|
|   10|              1070705.0|             10.984|
|    9|              1019688.0|             10.461|
|    5|               723334.0|              7.421|
|    6|               691123.0|               7.09|
|    3|               683267.0|              7.009|
|    8|               682681.0|              7.003|
|    7|               681300.0|              6.989|
|    1|               560000.0|              5.745|
|    2|               498063.0|               5.11|
|    4|               493207.0|               5.06|
+-----+-----------------------+-------------------+

Orders broken down by month:
+-----+----------------------+-----------------+
|month|Total Orders per month|% of Total Orders|
+-----+--

In [81]:
from pyspark.sql.functions import sum, round

ecomDF2=ecomDF1.withColumn("Total_Spent", col("UnitPrice")*col("Quantity"))

totRev=ecomDF2.select(round(sum("Total_Spent")).alias("Total Revenues"))

print("Revenues broken down by day of the week:")
revByCountry=ecomDF2.groupBy("DayofWeekS")\
        .agg(sum("Total_spent"))

revByCountry=revByCountry.withColumn("PercentageOfTotRevenues", col("sum(Total_spent)")*100/totRev.collect()[0][0])
revByCountry=revByCountry.sort(col("sum(Total_spent)").desc())
revByCountry.select("DayofWeekS",round("sum(Total_spent)").alias("Total Revenues by day of the week"), round("PercentageOfTotRevenues",3).alias("% of Total Revenues")).show()

print("Orders broken down by day of the week:")
ordersByDOW=ecomDF1.groupBy("DayofWeekS")\
        .agg(countDistinct("InvoiceNo"))

ordersByDOW=ordersByDOW.withColumn("PercentageOfTotOrders", col("count(DISTINCT InvoiceNo)")*100/25900)
ordersByDOW=ordersByDOW.sort(col("count(DISTINCT InvoiceNo)").desc())
ordersByDOW.select("DayofWeekS",col("count(DISTINCT InvoiceNo)").alias("Total Orders by day of the week"), round("PercentageOfTotOrders",3).alias("% of Total Orders")).show()




Revenues broken down by day of the week:
+----------+---------------------------------+-------------------+
|DayofWeekS|Total Revenues by day of the week|% of Total Revenues|
+----------+---------------------------------+-------------------+
|       Thu|                        2112519.0|             21.672|
|       Tue|                        1966183.0|             20.171|
|       Wed|                        1734147.0|              17.79|
|       Mon|                        1588609.0|             16.297|
|       Fri|                        1540611.0|             15.805|
|       Sun|                         805679.0|              8.265|
+----------+---------------------------------+-------------------+

Orders broken down by day of the week:
+----------+----------------------+-----------------+
|DayofWeekS|Total Orders per month|% of Total Orders|
+----------+----------------------+-----------------+
|       Thu|                  5660|           21.853|
|       Wed|                  481

# C. What does customers analysis  show? 

In [227]:
# calculate customer lifetime values: amount earned over time 
from pyspark.sql.functions import sum, round, countDistinct

ecomDF5=ecomDF0.withColumn("Total_Spent", col("UnitPrice")*col("Quantity"))

print("Total Revenues from the customers of whom we have the IDs:")
totRev=ecomDF5.select(round(sum("Total_Spent")).alias("Total Revenues"))
totRev.show()

totOrd=ecomDF5.select(countDistinct("InvoiceNo").alias("Total Orders"))

#get the average amount of money spent by transaction by customers
print("The average amount spent by customers for an order is: " + str(totRev.collect()[0][0]/totOrd.collect()[0][0]) + " Pounds.")

totCusto=ecomDF5.select(countDistinct("CustomerID").alias("Total Customers"))
print(" ")
print("The total number of customers of the business is: ")
totCusto.show()

print("The average amount of orders by customers is: " + str(totOrd.collect()[0][0]/totCusto.collect()[0][0]) + " orders.")

print(" ")

print("Who are the most valuable customers, where are they from and how much they have spent:")
revByCountry=ecomDF5.groupBy("CustomerID", "Country")\
        .agg(sum("Total_spent"))

revByCountry=revByCountry.withColumn("PercentageOfTotRevenues", col("sum(Total_spent)")*100/totRev.collect()[0][0])
revByCountry=revByCountry.sort(col("sum(Total_spent)").desc())
revByCountry.select("Country","CustomerID",round("sum(Total_spent)").alias("Total Revenues by Customer"), round("PercentageOfTotRevenues",3).alias("% of Total Revenues")).show()

print("Orders broken down by customers:")
ordersByCusto=ecomDF5.groupBy("CustomerID")\
        .agg(countDistinct("InvoiceNo"))

ordersByCusto=ordersByCusto.withColumn("PercentageOfTotOrders", col("count(DISTINCT InvoiceNo)")*100/25900)
ordersByCusto=ordersByCusto.sort(col("count(DISTINCT InvoiceNo)").desc())
ordersByCusto.select("CustomerID",col("count(DISTINCT InvoiceNo)").alias("Total Orders by customers:"), round("PercentageOfTotOrders",3).alias("% of Total Orders")).show()



Total Revenues from the customers of whom we have the IDs:
+--------------+
|Total Revenues|
+--------------+
|     8300066.0|
+--------------+

The average amount spent by customers for an order is: 374.0453357368184 Pounds.
 
The total number of customers of the business is: 
+---------------+
|Total Customers|
+---------------+
|           4372|
+---------------+

The average amount of orders by customers is: 5.07548032936871 orders.
 
Who are the most valuable customers, where are they from and how much they have spent:
+--------------+----------+--------------------------+-------------------+
|       Country|CustomerID|Total Revenues by Customer|% of Total Revenues|
+--------------+----------+--------------------------+-------------------+
|   Netherlands|     14646|                  279489.0|              3.367|
|United Kingdom|     18102|                  256438.0|               3.09|
|United Kingdom|     17450|                  187482.0|              2.259|
|          EIRE|    

In [192]:
from pyspark.sql.functions import sum, round, countDistinct
print("Orders broken down by customers:")
ordersByCusto=ecomDF5.groupBy("CustomerID")\
        .agg(countDistinct("InvoiceNo"))\
        .withColumn("PercentageOfTotOrders", col("count(DISTINCT InvoiceNo)")*100/25900)\
        .sort(col("count(DISTINCT InvoiceNo)").desc())\
        .select("CustomerID",col("count(DISTINCT InvoiceNo)").alias("Occurences"))\
        .groupBy("Occurences")\
        .agg(count("Occurences"))\
        .sort(col("Occurences").alias("Occurences").asc())
ordersByCusto.withColumn("Percentage of customers",round(col("count(Occurences)")*100/4372)).show()

print("So we see that almost 60% of customers only bought three times or less.")

Orders broken down by customers:
+----------+-----------------+-----------------------+
|Occurences|count(Occurences)|Percentage of customers|
+----------+-----------------+-----------------------+
|         1|             1313|                   30.0|
|         2|              817|                   19.0|
|         3|              490|                   11.0|
|         4|              377|                    9.0|
|         5|              288|                    7.0|
|         6|              196|                    4.0|
|         7|              157|                    4.0|
|         8|              117|                    3.0|
|         9|               80|                    2.0|
|        10|               78|                    2.0|
|        11|               62|                    1.0|
|        12|               51|                    1.0|
|        13|               38|                    1.0|
|        14|               41|                    1.0|
|        15|               28|  