<a href="https://colab.research.google.com/github/mohammadreza-mohammadi94/PySpark-Analytics-Hub/blob/main/E_Commerce%20Data%20Analysis%20PySpark/E_Commerce_Data_Analysis_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Download Dataset

In [2]:
%%bash
#!/bin/bash
kaggle datasets download carrie1/ecommerce-data
unzip  ecommerce-data.zip

Dataset URL: https://www.kaggle.com/datasets/carrie1/ecommerce-data
License(s): unknown
ecommerce-data.zip: Skipping, found more recently modified local copy (use --force to force download)
Archive:  ecommerce-data.zip
  inflating: data.csv                


# Import Libraries

In [39]:
# spark.stop()

In [12]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import pyspark.sql.types as t

In [40]:
# Initialize SparkSession
spark = SparkSession.builder.appName("Ecommerce").getOrCreate()

# Import Dataset

In [41]:
df = (
    spark.read.csv(
        path=r"/content/data.csv",
        header=True,
        inferSchema=True,
        quote='"')
)

df.show()

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/2010 8:26|     4.

In [42]:
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



## Getting to Know the DaraFrame

In [43]:
df.summary().show()

+-------+------------------+------------------+--------------------+------------------+---------------+-----------------+------------------+-----------+
|summary|         InvoiceNo|         StockCode|         Description|          Quantity|    InvoiceDate|        UnitPrice|        CustomerID|    Country|
+-------+------------------+------------------+--------------------+------------------+---------------+-----------------+------------------+-----------+
|  count|            541909|            541909|              540455|            541909|         541909|           541909|            406829|     541909|
|   mean|  559965.752026781|27623.240210938104|             20713.0|  9.55224954743324|           NULL|4.611113626082972|15287.690570239585|       NULL|
| stddev|13428.417280800133| 16799.73762842775|                NULL|218.08115785023486|           NULL| 96.7598530611797| 1713.600303321594|       NULL|
|    min|            536365|             10002| 4 PURPLE FLOCK D...|            -8

In [44]:
# Unique values of Country
df.select('Country').distinct().show()

+------------------+
|           Country|
+------------------+
|            Sweden|
|         Singapore|
|           Germany|
|            France|
|            Greece|
|European Community|
|           Belgium|
|           Finland|
|             Malta|
|       Unspecified|
|             Italy|
|              EIRE|
|         Lithuania|
|            Norway|
|             Spain|
|           Denmark|
|         Hong Kong|
|           Iceland|
|            Israel|
|   Channel Islands|
+------------------+
only showing top 20 rows



In [11]:
df.select(f.count_distinct('Country')).show()

+-----------------------+
|count(DISTINCT Country)|
+-----------------------+
|                     38|
+-----------------------+



In [69]:
def check_count(df):
    """
    This function takes a PySpark DataFrame as input and returns the count of non-null values
    in each column of the DataFrame.

    Args:
        df (pyspark.sql.dataframe.DataFrame): The input PySpark DataFrame.

    Returns:
        None: The function prints the count of non-null values in each column of the DataFrame.
    """
    return df.select([f.count(f.col(c)) for c in df.columns]).show()

check_count(df)

+----------------+----------------+------------------+---------------+------------------+----------------+-----------------+--------------+
|count(InvoiceNo)|count(StockCode)|count(Description)|count(Quantity)|count(InvoiceDate)|count(UnitPrice)|count(CustomerID)|count(Country)|
+----------------+----------------+------------------+---------------+------------------+----------------+-----------------+--------------+
|          541909|          541909|            540455|         541909|            541909|          541909|           406829|        541909|
+----------------+----------------+------------------+---------------+------------------+----------------+-----------------+--------------+



## Preprocessing

***Correct Datatype***

In [49]:
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



In [54]:
df_fixed = (
    df
    .withColumn('InvoiceNo', f.col('InvoiceNo').cast(t.IntegerType()))
    .withColumn("StockCode", f.col("StockCode").cast(t.StringType()))
    .withColumn("Description", f.col("Description").cast(t.StringType()))
    .withColumn("InvoiceDate", f.col("InvoiceDate").cast(t.StringType()))
    .withColumn('Quantity', f.col('Quantity').cast(t.IntegerType()))
    .withColumn('UnitPrice', f.col('UnitPrice').cast(t.FloatType()))
    .withColumn('CustomerID', f.col('CustomerID').cast(t.IntegerType()))
    .withColumn("Country", f.col("Country").cast(t.StringType()))
)

In [55]:
df_fixed.show()

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/2010 8:26|     4.

In [60]:
df_fixed.printSchema()

root
 |-- InvoiceNo: integer (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: float (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



In [70]:
def check_nans(df):
    """
    This function takes a PySpark DataFrame as input and returns the count of null (NaN) values
    in each column of the DataFrame.

    Args:
        df (pyspark.sql.dataframe.DataFrame): The input PySpark DataFrame.

    Returns:
        None: The function prints the count of NaN values in each column of the DataFrame.

    Example:
        >>> check_nans(df)
        +--------+--------+-----------+--------+
        |InvoiceNo|StockCode|Description|Quantity|
        +--------+--------+-----------+--------+
        |        0|       2|          0|       0|
        +--------+--------+-----------+--------+
    """
    return df.select([f.count(f.when(f.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [71]:
check_nans(df_fixed)

+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|     9291|        0|       1454|       0|          0|        0|    135080|      0|
+---------+---------+-----------+--------+-----------+---------+----------+-------+



# Data Analysis

In [59]:
countries_grouped = df_fixed.groupby('Country').agg(
    f.count('Country').alias('Count'),
    f.min('UnitPrice').alias('min_UnitPrice'),
    f.max('UnitPrice').alias('max_UnitPrice'),
    f.avg('UnitPrice').alias('avg_UnitPrice'),
    f.min('Quantity').alias('min_Quantity'),
    f.max('Quantity').alias('max_Quantity'),
    f.avg('Quantity').alias('avg_Quantity'),
)

countries_grouped.show()

+------------------+-----+-------------+-------------+------------------+------------+------------+------------------+
|           Country|Count|min_UnitPrice|max_UnitPrice|     avg_UnitPrice|min_Quantity|max_Quantity|      avg_Quantity|
+------------------+-----+-------------+-------------+------------------+------------+------------+------------------+
|            Sweden|  462|         0.19|         40.0| 3.910887426628179|        -240|         768| 77.13636363636364|
|         Singapore|  229|         0.19|      3949.32|109.64580793073604|          -1|         288| 22.85589519650655|
|           Germany| 9495|          0.0|        599.5| 3.966929951925743|        -288|         600|12.369457609268036|
|            France| 8557|          0.0|      4161.06| 5.028864104887589|        -250|         912| 12.91106696272058|
|            Greece|  146|         0.14|         50.0| 4.885547924021336|          -1|          48|10.657534246575343|
|European Community|   61|         0.55|        

***DataFrame Filtered on Specific Country***

In [72]:
def get_countries_df(df, country):
    """
    Filters the input PySpark DataFrame to return rows where the 'Country' column matches the specified country.

    Args:
        df (pyspark.sql.dataframe.DataFrame): The input PySpark DataFrame.
        country (str): The country to filter the DataFrame by.

    Returns:
        pyspark.sql.dataframe.DataFrame: A new DataFrame containing only the rows where the 'Country' column matches the specified country.

    Example:
        >>> get_countries_df(df, "United Kingdom")
        +---------+---------+--------------------+--------+--------------+---------+----------+--------------+
        |InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
        +---------+---------+--------------------+--------+--------------+---------+----------+--------------+
        |   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
        +---------+---------+--------------------+--------+--------------+---------+----------+--------------+
    """
    return df.filter(f.col('Country') == country)

In [73]:
get_countries_df(df_fixed, 'United Kingdom').show()

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/2010 8:26|     4.

In [74]:
get_countries_df(df_fixed, 'Sweden').show()

+---------+---------+--------------------+--------+----------------+---------+----------+-------+
|InvoiceNo|StockCode|         Description|Quantity|     InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+--------------------+--------+----------------+---------+----------+-------+
|     NULL|   85232B|SET OF 3 BABUSHKA...|    -240|12/14/2010 13:27|     4.95|     17404| Sweden|
|   538848|   85232B|SET OF 3 BABUSHKA...|     240|12/14/2010 13:28|     4.95|     17404| Sweden|
|   539338|    84077|WORLD WAR 2 GLIDE...|     576|12/17/2010 10:54|     0.29|     17404| Sweden|
|   539338|    22951|60 CAKE CASES DOL...|     240|12/17/2010 10:54|     0.55|     17404| Sweden|
|   539338|    22417|PACK OF 60 SPACEB...|     240|12/17/2010 10:54|     0.55|     17404| Sweden|
|   539338|    21977|PACK OF 60 PINK P...|     240|12/17/2010 10:54|     0.55|     17404| Sweden|
|   539338|    84992|72 SWEETHEART FAI...|     240|12/17/2010 10:54|     0.55|     17404| Sweden|
|   539338|    21212

***Group by Invoice Number and Calculate Total Amount***

In [77]:
df_fixed = df_fixed.withColumn("TotalAmount", f.col("UnitPrice") * f.col("Quantity"))

df_fixed.groupby('InvoiceNo').agg(
    f.sum("TotalAmount").alias("InvoiceTotal")
).show()

+---------+------------------+
|InvoiceNo|      InvoiceTotal|
+---------+------------------+
|   536532|1919.1399841308594|
|   537632|   13541.330078125|
|   538708| 365.2699947357178|
|   538877|12681.579971313477|
|   538993|101.20000076293945|
|   539735| 596.3399975299835|
|   539958| 5262.059963583946|
|   540562|337.39000034332275|
|   540563| 175.1999969482422|
|   541008| 72.26000130176544|
|   542058|461.64000844955444|
|   542239|114.84999975562096|
|   542694| 317.9999694824219|
|   542723|127.89999902248383|
|   543751|11.199999809265137|
|   544198| 84.00000190734863|
|   544205| 746.4899958968163|
|   544702|284.96999168395996|
|   545638|235.91000080108643|
|   545649|119.30999997258186|
+---------+------------------+
only showing top 20 rows



***Find the top 5 invoices by total amount.***

In [79]:
from pyspark.sql.window import Window

window_spec = Window.orderBy(f.desc("TotalAmount"))
df_ranked = df_fixed.withColumn("Rank", f.rank().over(window_spec))
df_top_5_invoices = df_ranked.filter(f.col("Rank") <= 5)
df_top_5_invoices.show()

+---------+---------+--------------------+--------+---------------+---------+----------+--------------+-----------+----+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|TotalAmount|Rank|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+-----------+----+
|   581483|    23843|PAPER CRAFT , LIT...|   80995| 12/9/2011 9:15|     2.08|     16446|United Kingdom|   168469.6|   1|
|   541431|    23166|MEDIUM CERAMIC TO...|   74215|1/18/2011 10:01|     1.04|     12346|United Kingdom|   77183.59|   2|
|   556444|    22502|PICNIC BASKET WIC...|      60|6/10/2011 15:28|    649.5|     15098|United Kingdom|    38970.0|   3|
|   537632|AMAZONFEE|          AMAZON FEE|       1|12/7/2010 15:08| 13541.33|      NULL|United Kingdom|   13541.33|   4|
|     NULL|        B|     Adjust bad debt|       1|8/12/2011 14:50| 11062.06|      NULL|United Kingdom|   11062.06|   5|
+---------+---------+-----------

***Calculating Average Unit Price by Product***

In [80]:
df_avg_unit_price = df_fixed.groupby("StockCode").agg(
    f.avg("UnitPrice").alias("AvgUnitPrice")
)

df_avg_unit_price.show()

+---------+------------------+
|StockCode|      AvgUnitPrice|
+---------+------------------+
|    22728|4.5603703728428595|
|    21889| 1.572586493201468|
|   90210B| 2.151428597314017|
|    21259| 6.950776883073755|
|    21894|1.8020000122211597|
|    21452|3.5895000290870667|
|    22121| 6.612198430595669|
|    90022| 3.759523800441197|
|    21249| 3.870336135896314|
|    90143| 7.660909089175138|
|    84881| 6.352499961853027|
|    21248|1.8095588508774252|
|    22254|1.6237705027470823|
|    20868|0.7071739072385042|
|    21331| 14.34249997138977|
|   90197B| 4.994074097386113|
|    22596| 1.295583944686138|
|   90026D|               8.5|
|   90177A|2.9100000858306885|
|   84899F|               0.0|
+---------+------------------+
only showing top 20 rows



***Calculating Average Total Price by Country***

In [86]:
get_countries_df(df_fixed, 'United Kingdom').agg(
    f.avg('TotalAmount').alias('AverageTotalAmount'),
).show()

+------------------+
|AverageTotalAmount|
+------------------+
|16.525065371515485|
+------------------+



# Save PySpark DataFrame

In [91]:
df_fixed.write.parquet("parquet")
df_fixed.write.csv("csv")

In [92]:
!zip -r parquet.zip parquet
!zip -r csv.zip csv

  adding: parquet/ (stored 0%)
  adding: parquet/._SUCCESS.crc (stored 0%)
  adding: parquet/part-00000-e28ec433-7af3-4c53-b82a-7d94f61c817d-c000.snappy.parquet (deflated 15%)
  adding: parquet/.part-00001-e28ec433-7af3-4c53-b82a-7d94f61c817d-c000.snappy.parquet.crc (stored 0%)
  adding: parquet/_SUCCESS (stored 0%)
  adding: parquet/part-00001-e28ec433-7af3-4c53-b82a-7d94f61c817d-c000.snappy.parquet (deflated 15%)
  adding: parquet/.part-00000-e28ec433-7af3-4c53-b82a-7d94f61c817d-c000.snappy.parquet.crc (stored 0%)
  adding: csv/ (stored 0%)
  adding: csv/._SUCCESS.crc (stored 0%)
  adding: csv/_SUCCESS (stored 0%)
  adding: csv/.part-00001-7e0fdd42-3d2f-4c26-b95e-9c53a164d29b-c000.csv.crc (deflated 0%)
  adding: csv/.part-00000-7e0fdd42-3d2f-4c26-b95e-9c53a164d29b-c000.csv.crc (deflated 0%)
  adding: csv/part-00000-7e0fdd42-3d2f-4c26-b95e-9c53a164d29b-c000.csv (deflated 82%)
  adding: csv/part-00001-7e0fdd42-3d2f-4c26-b95e-9c53a164d29b-c000.csv (deflated 82%)
