<a href="https://colab.research.google.com/github/poojamahajan0712/Data-Science-Portfolio/blob/main/PySpark/TransactionData_exploratory_analysis_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# https://www.datacamp.com/tutorial/pyspark-tutorial-getting-started-with-pyspark
# https://www.kaggle.com/datasets/thedevastator/unlock-profits-with-e-commerce-sales-data

# other dataset
# https://www.kaggle.com/code/toludoyinshopein/rfm-segmentation-with-pyspark


## load dataset
## create spark session
## data exploration
## data preprocessing
## feature engg
## model building

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [2]:
spark = SparkSession.builder.appName("Customer Segmentation")\
                              .config("spark.memory.offHeap.enabled","true")\
                              .config("spark.memory.offHeap.size","10g").getOrCreate()

#Specifies the size of the off-heap memory to 10 gigabytes. This memory is used for caching and computation.

In [3]:
df = spark.read.csv('/content/drive/MyDrive/AI Projects/Datasets/Amazon Sale Report/online_retail_listing.csv',header=True,sep=';')

In [None]:
df.show()

+-------+---------+--------------------+--------+---------------+-----+-----------+--------------+
|Invoice|StockCode|         Description|Quantity|    InvoiceDate|Price|Customer ID|       Country|
+-------+---------+--------------------+--------+---------------+-----+-----------+--------------+
| 489434|    85048|15CM CHRISTMAS GL...|      12|1.12.2009 07:45| 6,95|      13085|United Kingdom|
| 489434|   79323P|  PINK CHERRY LIGHTS|      12|1.12.2009 07:45| 6,75|      13085|United Kingdom|
| 489434|   79323W| WHITE CHERRY LIGHTS|      12|1.12.2009 07:45| 6,75|      13085|United Kingdom|
| 489434|    22041|"RECORD FRAME 7""...|      48|1.12.2009 07:45|  2,1|      13085|United Kingdom|
| 489434|    21232|STRAWBERRY CERAMI...|      24|1.12.2009 07:45| 1,25|      13085|United Kingdom|
| 489434|    22064|PINK DOUGHNUT TRI...|      24|1.12.2009 07:45| 1,65|      13085|United Kingdom|
| 489434|    21871| SAVE THE PLANET MUG|      24|1.12.2009 07:45| 1,25|      13085|United Kingdom|
| 489434| 

In [None]:
df.count(), len(df.columns) ## 1M rows

(1048575, 8)


* customer base
* total number of transactions
* Items extent in the transaction database

In [8]:
print("unique customers",df.select('Customer ID').distinct().count())
print("number of transcations",df.select('Invoice').distinct().count())## 52.9k transactions
print("unique items ",df.select("StockCode").distinct().count()) ## data is about buying of around 5.3k products.

unique customers 5925
number of transcations 52961
unique items  5304


* which countries are purchasing most
* how many countries available
* average transaction size
* average transaction value

In [30]:
df.dtypes

[('Invoice', 'string'),
 ('StockCode', 'string'),
 ('Description', 'string'),
 ('Quantity', 'string'),
 ('InvoiceDate', 'string'),
 ('Price', 'string'),
 ('Customer ID', 'string'),
 ('Country', 'string')]

In [14]:
df.select('Country').distinct().count()

43

In [19]:
#90% customers are from UK
df.groupby('Country').agg(countDistinct('Customer ID')\
                      .alias('customer_count'))\
                      .orderBy('customer_count',ascending=False)\
                      .show()

+---------------+--------------+
|        Country|customer_count|
+---------------+--------------+
| United Kingdom|          5397|
|        Germany|           106|
|         France|            94|
|          Spain|            40|
|        Belgium|            29|
|       Portugal|            24|
|    Netherlands|            23|
|    Switzerland|            22|
|         Sweden|            19|
|          Italy|            17|
|        Finland|            15|
|      Australia|            15|
|Channel Islands|            14|
|         Norway|            13|
|        Austria|            13|
|         Cyprus|            11|
|        Denmark|            11|
|          Japan|            10|
|            USA|             9|
|    Unspecified|             7|
+---------------+--------------+
only showing top 20 rows



In [55]:
df.dtypes

[('Invoice', 'string'),
 ('StockCode', 'string'),
 ('Description', 'string'),
 ('Quantity', 'string'),
 ('InvoiceDate', 'string'),
 ('Price', 'string'),
 ('Customer ID', 'string'),
 ('Country', 'string'),
 ('price_edited', 'float'),
 ('qty', 'float')]

In [54]:
df = df.withColumn('qty',col('Quantity').cast('float'))
df.groupby('Invoice').agg(sum('qty')\
                      .alias('basket_size'))\
                      .select(median('basket_size')).show()

+-------------------+
|median(basket_size)|
+-------------------+
|               93.0|
+-------------------+



In [40]:

df = df.withColumn("price_edited",regexp_replace(col('Price'),',','.').cast('float'))
df.show(5)

+-------+---------+--------------------+--------+---------------+-----+-----------+--------------+------------+
|Invoice|StockCode|         Description|Quantity|    InvoiceDate|Price|Customer ID|       Country|price_edited|
+-------+---------+--------------------+--------+---------------+-----+-----------+--------------+------------+
| 489434|    85048|15CM CHRISTMAS GL...|      12|1.12.2009 07:45| 6,95|      13085|United Kingdom|        6.95|
| 489434|   79323P|  PINK CHERRY LIGHTS|      12|1.12.2009 07:45| 6,75|      13085|United Kingdom|        6.75|
| 489434|   79323W| WHITE CHERRY LIGHTS|      12|1.12.2009 07:45| 6,75|      13085|United Kingdom|        6.75|
| 489434|    22041|"RECORD FRAME 7""...|      48|1.12.2009 07:45|  2,1|      13085|United Kingdom|         2.1|
| 489434|    21232|STRAWBERRY CERAMI...|      24|1.12.2009 07:45| 1,25|      13085|United Kingdom|        1.25|
+-------+---------+--------------------+--------+---------------+-----+-----------+--------------+------

In [37]:
df.dtypes

[('Invoice', 'string'),
 ('StockCode', 'string'),
 ('Description', 'string'),
 ('Quantity', 'string'),
 ('InvoiceDate', 'string'),
 ('Price', 'string'),
 ('Customer ID', 'string'),
 ('Country', 'string'),
 ('price_edited', 'float')]

In [39]:
df.groupby('Invoice').agg(sum('price_edited')\
                      .alias('avg_ord_val'))\
                      .select(mean('avg_ord_val')).show()

+-----------------+
| avg(avg_ord_val)|
+-----------------+
|91.61683989018914|
+-----------------+



* rows with negative price, quantity
* rows with null customer_id

In [42]:
# rows with negative price
df.filter(col('price_edited')<0).show()

+-------+---------+---------------+--------+----------------+---------+-----------+--------------+------------+
|Invoice|StockCode|    Description|Quantity|     InvoiceDate|    Price|Customer ID|       Country|price_edited|
+-------+---------+---------------+--------+----------------+---------+-----------+--------------+------------+
|A506401|        B|Adjust bad debt|       1|29.04.2010 13:36|-53594,36|       NULL|United Kingdom|   -53594.36|
|A516228|        B|Adjust bad debt|       1|19.07.2010 11:24|-44031,79|       NULL|United Kingdom|   -44031.79|
|A528059|        B|Adjust bad debt|       1|20.10.2010 12:04|-38925,87|       NULL|United Kingdom|   -38925.87|
|A563186|        B|Adjust bad debt|       1|12.08.2011 14:51|-11062,06|       NULL|United Kingdom|   -11062.06|
|A563187|        B|Adjust bad debt|       1|12.08.2011 14:52|-11062,06|       NULL|United Kingdom|   -11062.06|
+-------+---------+---------------+--------+----------------+---------+-----------+--------------+------

In [57]:
print("rows with negative quantity",df.filter(col('qty')<0).count())
df.filter(col('qty')<0).show()
## all have invoice staring with C

rows with negative quantity 22697
+-------+---------+--------------------+--------+---------------+-----+-----------+--------------+------------+-----+
|Invoice|StockCode|         Description|Quantity|    InvoiceDate|Price|Customer ID|       Country|price_edited|  qty|
+-------+---------+--------------------+--------+---------------+-----+-----------+--------------+------------+-----+
|C489449|    22087|PAPER BUNTING WHI...|     -12|1.12.2009 10:33| 2,95|      16321|     Australia|        2.95|-12.0|
|C489449|   85206A|CREAM FELT EASTER...|      -6|1.12.2009 10:33| 1,65|      16321|     Australia|        1.65| -6.0|
|C489449|    21895|POTTING SHED SOW ...|      -4|1.12.2009 10:33| 4,25|      16321|     Australia|        4.25| -4.0|
|C489449|    21896|  POTTING SHED TWINE|      -6|1.12.2009 10:33|  2,1|      16321|     Australia|         2.1| -6.0|
|C489449|    22083|PAPER CHAIN KIT R...|     -12|1.12.2009 10:33| 2,95|      16321|     Australia|        2.95|-12.0|
|C489449|    21871| SA

In [44]:
df.filter(col('Description')=='Adjust bad debt').count()

6

In [50]:
# rows with null customer_id
print("count of rows with null customer id",df.filter(col('Customer ID').isNull()).count())
df.filter(col('Customer ID').isNull()).show()

count of rows with null customer id 236682
+-------+---------+--------------------+--------+---------------+-----+-----------+--------------+------------+
|Invoice|StockCode|         Description|Quantity|    InvoiceDate|Price|Customer ID|       Country|price_edited|
+-------+---------+--------------------+--------+---------------+-----+-----------+--------------+------------+
| 489464|    21733|        85123a mixed|     -96|1.12.2009 10:52|    0|       NULL|United Kingdom|         0.0|
| 489463|    71477|               short|    -240|1.12.2009 10:52|    0|       NULL|United Kingdom|         0.0|
| 489467|   85123A|         21733 mixed|    -192|1.12.2009 10:53|    0|       NULL|United Kingdom|         0.0|
| 489521|    21646|                NULL|     -50|1.12.2009 11:44|    0|       NULL|United Kingdom|         0.0|
| 489525|   85226C|BLUE PULL BACK RA...|       1|1.12.2009 11:49| 0,55|       NULL|United Kingdom|        0.55|
| 489525|    85227|SET/6 3D KIT CARD...|       1|1.12.2009 11

In [58]:
df.filter(col('Invoice').like('C%')).count()

19261

In [61]:
df.filter((col('qty')<0) & (~col('Invoice').like('C%'))).show() #3.4k

+-------+---------+---------------+--------+---------------+-----+-----------+--------------+------------+-------+
|Invoice|StockCode|    Description|Quantity|    InvoiceDate|Price|Customer ID|       Country|price_edited|    qty|
+-------+---------+---------------+--------+---------------+-----+-----------+--------------+------------+-------+
| 489464|    21733|   85123a mixed|     -96|1.12.2009 10:52|    0|       NULL|United Kingdom|         0.0|  -96.0|
| 489463|    71477|          short|    -240|1.12.2009 10:52|    0|       NULL|United Kingdom|         0.0| -240.0|
| 489467|   85123A|    21733 mixed|    -192|1.12.2009 10:53|    0|       NULL|United Kingdom|         0.0| -192.0|
| 489521|    21646|           NULL|     -50|1.12.2009 11:44|    0|       NULL|United Kingdom|         0.0|  -50.0|
| 489655|    20683|           NULL|     -44|1.12.2009 17:26|    0|       NULL|United Kingdom|         0.0|  -44.0|
| 489660|    35956|           lost|   -1043|1.12.2009 17:43|    0|       NULL|Un

* Min date, max date
* month with highest number of transactions

In [71]:
# spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")- need to set this , otherwise throws error
df = df.withColumn('date',to_timestamp(col('InvoiceDate'),"dd.MM.yyyy HH:mm"))

In [73]:
df.select(min('date')).show(), df.select(max('date')).show()

+-------------------+
|          min(date)|
+-------------------+
|2009-12-01 07:45:00|
+-------------------+

+-------------------+
|          max(date)|
+-------------------+
|2011-12-04 13:15:00|
+-------------------+



(None, None)