In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CustomerBehaviour").getOrCreate()
spark

In [59]:
# import sys
# import os

# # Add src directory to path
# cwd = os.getcwd()
# sys.path.append(os.path.join(cwd, '..', 'src'))

In [3]:
from data_cleaning import CleanData
from data_ingestion import Session
from data_transformation import TransformData

In [5]:
# Load data
retail_data = Session().load_data(spark)
retail_data.show(n=5)

CSV Data is at c:\nikhil_pyspark\customerBehaviour\src\..\data\Online_Retail.csv
+---------+---------+--------------------+--------+----------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|     InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+----------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|01-12-2010 08:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|01-12-2010 08:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|01-12-2010 08:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|01-12-2010 08:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|01-12-2010 08:26|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+----------------+---------+-

In [6]:
# Transform data
customers_df = TransformData().transform_data(retail_data)
customers_df.show(n=5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|TotalPrice|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|     17850|United Kingdom|    15.300|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|    20.340|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|     17850|United Kingdom|    22.000|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|    20.340|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|    20.340|
+---------+---------+--------------------+------

                                    DATA ANALYSIS

In [8]:
from pyspark.sql.functions import sum, avg, when, col
total_revenue = customers_df.agg(
        sum(col("TotalPrice")).alias("Revenue")
    )
total_revenue.show()

+-----------------+
|          Revenue|
+-----------------+
|9394843.593998538|
+-----------------+



In [11]:
# Calculate the total revenue generated per country.
from pyspark.sql.functions import format_number
country_revenue = customers_df.groupby(col("Country")).agg(
    format_number(sum(col("TotalPrice")), 4).alias("Revenue")
)
country_revenue.show()


+--------------------+------------+
|             Country|     Revenue|
+--------------------+------------+
|              Sweden| 36,595.9100|
|           Singapore|  9,120.3900|
|             Germany|221,698.2100|
|              France|195,771.4200|
|              Greece|  4,710.5200|
|             Belgium| 40,910.9600|
|             Finland| 22,326.7400|
|               Italy| 16,890.5100|
|                EIRE|254,432.0000|
|           Lithuania|  1,661.0600|
|              Norway| 35,163.4600|
|               Spain| 50,045.6300|
|             Denmark| 18,768.1400|
|           Hong Kong| 10,117.0400|
|             Iceland|  4,310.0000|
|              Israel|  7,907.8200|
|     Channel Islands| 20,086.2900|
|              Cyprus| 12,946.2900|
|         Switzerland| 56,385.3500|
|United Arab Emirates|  1,902.2800|
+--------------------+------------+
only showing top 20 rows



In [12]:
# Determine the average order value per customer.
order_avg = customers_df.groupby(col("CustomerId")).agg(
    format_number(avg(col("TotalPrice")),4).alias("AverageOrderValue")
)
order_avg.show()

+----------+-----------------+
|CustomerId|AverageOrderValue|
+----------+-----------------+
|     16250|          16.2267|
|     15574|           4.1801|
|     15555|           5.1440|
|     15271|           9.0393|
|     17714|          15.3000|
|     17686|          20.0680|
|     17757|           7.5276|
|     17551|           7.1358|
|     13187|           6.3789|
|     16549|           4.2351|
|     12637|          15.1098|
|     15052|           7.1927|
|     15448|          17.6657|
|     14525|          14.1657|
|     18283|           2.7710|
|     13107|          25.4012|
|     16303|          31.7714|
|     17256|          15.0000|
|     16027|          50.1247|
|     13174|           8.9714|
+----------+-----------------+
only showing top 20 rows



In [13]:
# Identify the top 10 products by total sales revenue.
top_10_products_by_revenue = customers_df.orderBy(col("TotalPrice").desc()).head(10)
for product in top_10_products_by_revenue:
    print(product[2])

RABBIT NIGHT LIGHT
CREAM HEART CARD HOLDER
SMALL ZINC HEART WALL ORGANISER
HOT WATER BOTTLE KEEP CALM
SET OF 3 REGENCY CAKE TINS
SET OF 3 REGENCY CAKE TINS
SET OF TEA COFFEE SUGAR TINS PANTRY
SMALL ZINC HEART WALL ORGANISER
MISELTOE HEART WREATH CREAM
JUMBO BAG TOYS 


In [14]:
customers_df.describe("Quantity").show()

+-------+------------------+
|summary|          Quantity|
+-------+------------------+
|  count|            541909|
|   mean|  9.55224954743324|
| stddev|218.08115785023443|
|    min|                -1|
|    max|               992|
+-------+------------------+



In [15]:
# Calculate the total number of unique customers.
unique_customers_cnt = customers_df.select(col("CustomerId")).distinct().count()
print(unique_customers_cnt)

4373


In [16]:
# Identify customers with the highest total spend.
highly_spend_customers = customers_df.groupby(col("CustomerId")).agg(
    (sum(col("TotalPrice"))).alias("Total")
).orderBy(col("Total").desc())
highly_spend_customers.show()

+----------+------------------+
|CustomerId|             Total|
+----------+------------------+
|      NULL|1574435.4400000807|
|     14646| 267171.2599999999|
|     14911|132572.62000000014|
|     18102|128397.31000000001|
|     12415|117614.04999999996|
|     14156|104539.32000000004|
|     17511| 88125.37999999998|
|     17450|          73748.83|
|     13694| 62653.09999999998|
|     16684|          60715.28|
|     15311|59419.340000000004|
|     13089|57385.880000000005|
|     14096|  53363.6599999998|
|     14298| 50862.43999999998|
|     14088|          50415.49|
|     15061|46957.939999999995|
|     16029|          46816.85|
|     15769|          43478.32|
|     17841| 40340.77999999992|
|     13798| 36351.41999999999|
+----------+------------------+
only showing top 20 rows



In [17]:
# Analyze the return rate by identifying invoices starting with 'C'.
cancelled_orders = customers_df.filter(col("InvoiceNo").startswith("C"))
total = customers_df.select(col("InvoiceNo")).count()
cancelled = cancelled_orders.select(col("InvoiceNo")).count()
print(cancelled/total)

0.017139409015166755


In [18]:
# Determine the peak hours for transactions.
from pyspark.sql.functions import hour
hours = customers_df.withColumn("Hour", hour(col("InvoiceDate")))
peak_hours = hours.groupby(col("Hour")).count().orderBy(col("count").desc()).head(5)
print("Peak Hours:")
for _hour in peak_hours:
    print("\t", _hour[0])

Peak Hours:
	 12
	 15
	 13
	 14
	 11


In [19]:
# Calculate the average quantity purchased per transaction.
avg_quantity_per_txn = customers_df.groupby(col("InvoiceNo")).agg(
    avg(col("Quantity")).alias("Average")
).filter(~ col("InvoiceNo").startswith("C"))

avg_quantity_per_txn.show()

+---------+------------------+
|InvoiceNo|           Average|
+---------+------------------+
|   536596|               1.5|
|   536938|33.142857142857146|
|   537252|              31.0|
|   537691|              8.15|
|   538041|              30.0|
|   538184|12.076923076923077|
|   538517|3.0377358490566038|
|   538879|21.157894736842106|
|   539275|              26.0|
|   539630|20.333333333333332|
|   540499|              3.75|
|   540540|2.1363636363636362|
|   540976|10.520833333333334|
|   541432|             12.25|
|   541518| 23.10891089108911|
|   541783|11.314285714285715|
|   542026| 7.666666666666667|
|   542375|               8.0|
|   543641|             6.125|
|   544303|1.3902439024390243|
+---------+------------------+
only showing top 20 rows



In [21]:
# Find the top 10 customers based on purchase frequency.
distinct_txn = customers_df.select(
        col("InvoiceNo"), col("CustomerId")
    ).distinct().orderBy(col("CustomerId"))
# distinct_txn.show(n=10)
high_purchase_frequency = distinct_txn.groupby(col("CustomerId")).count()\
    .orderBy(col("count").desc()).limit(10).withColumnRenamed("count", "Frequency")
high_purchase_frequency.show()

+----------+---------+
|CustomerId|Frequency|
+----------+---------+
|      NULL|     3710|
|     14911|      248|
|     12748|      224|
|     17841|      169|
|     14606|      128|
|     15311|      118|
|     13089|      118|
|     12971|       89|
|     14527|       86|
|     13408|       81|
+----------+---------+



In [22]:
customers_df.filter(col("InvoiceNo")=="536365").show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|TotalPrice|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|     17850|United Kingdom|    15.300|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|    20.340|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|     17850|United Kingdom|    22.000|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|    20.340|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|    20.340|
|   536365|    22752|SET 7 BABUSHKA NE...|      

In [23]:
# Identify the top 10 customers by revenue.
top_10_customers_by_revenue = customers_df.groupby(col("CustomerId")).agg(
    sum(col("TotalPrice")).alias("Total")
).orderBy(col("Total").desc())
top_10_customers_by_revenue = top_10_customers_by_revenue.withColumn("Total", format_number(col("Total"), 3))
top_10_customers_by_revenue.show()

+----------+-------------+
|CustomerId|        Total|
+----------+-------------+
|      NULL|1,574,435.440|
|     14646|  267,171.260|
|     14911|  132,572.620|
|     18102|  128,397.310|
|     12415|  117,614.050|
|     14156|  104,539.320|
|     17511|   88,125.380|
|     17450|   73,748.830|
|     13694|   62,653.100|
|     16684|   60,715.280|
|     15311|   59,419.340|
|     13089|   57,385.880|
|     14096|   53,363.660|
|     14298|   50,862.440|
|     14088|   50,415.490|
|     15061|   46,957.940|
|     16029|   46,816.850|
|     15769|   43,478.320|
|     17841|   40,340.780|
|     13798|   36,351.420|
+----------+-------------+
only showing top 20 rows



In [24]:
# Analyze monthly revenue trends.
from pyspark.sql.functions import month
month_col = customers_df.withColumn("Month", month(col("InvoiceDate")))
month_col.groupby(col("Month")).agg(
    sum(col("TotalPrice")).alias("MonthRevenue")
).orderBy(col("Month")).show()

+-----+------------------+
|Month|      MonthRevenue|
+-----+------------------+
|    1| 550614.0500000254|
|    2|493296.09000001464|
|    3|  665236.480000026|
|    4| 488074.9610000249|
|    5| 711173.3700000129|
|    6| 657166.2100000235|
|    7| 659527.3710000303|
|    8|  687150.190000014|
|    9| 946110.0320000099|
|   10| 1017958.390000046|
|   11|1371579.8500000269|
|   12|1146956.5999999961|
+-----+------------------+



In [25]:
# Determine the most popular products by quantity sold.
popular_products_by_quantity = customers_df.groupBy(col("StockCode")).agg(
    sum(col("Quantity")).alias("total_quantity")
).limit(5)
popular_products_by_quantity.show()

+---------+--------------+
|StockCode|total_quantity|
+---------+--------------+
|    22728|        5323.0|
|    21889|        6377.0|
|   90210B|          41.0|
|    21259|        1117.0|
|    21894|         557.0|
+---------+--------------+



In [26]:
# Calculate the total number of orders per month.
distinct_order = month_col.select(col("Month"), col("InvoiceNo")).distinct()
orders_per_month = distinct_order.groupby(col("Month")).count().orderBy(col("Month"))
orders_per_month.show()

+-----+-----+
|Month|count|
+-----+-----+
|    1| 1476|
|    2| 1393|
|    3| 1983|
|    4| 1744|
|    5| 2162|
|    6| 2012|
|    7| 1927|
|    8| 1737|
|    9| 2327|
|   10| 2637|
|   11| 3462|
|   12| 3040|
+-----+-----+



In [27]:
# Find the most frequent purchase day of the week.
from pyspark.sql.functions import dayofweek
days = {
    1:"Sunday", 
    2: "Monday", 
    3: "Tuesday", 
    4: "Wednesday",
    5: "Thursday",
    6: "Friday",
    7: "Saturday"
    }
day_col = customers_df.withColumn("Day", dayofweek(col("InvoiceDate")))
peak_day = day_col.groupby(col("Day")).count().orderBy(col("count").desc()).first()
peak_day = days[peak_day[0]]
print(peak_day)

Thursday


In [28]:
# Identify seasonal trends in purchases.

def get_season(month):
    return (
        when(month.isin({3, 4, 5}), "Spring").
        when(month.isin({6, 7, 8}), "Summer").
        when(month.isin({9, 10, 11}), "Autumn").
        otherwise("Winter")
    )

month_revenue = month_col.groupby(col("Month")).agg(
    sum(col("TotalPrice")).alias("Revenue")
)

season_revenue = month_revenue.withColumn("Month", get_season(col("Month"))).\
    withColumnRenamed("Month", "Season")

season_revenue = season_revenue.groupby(col("Season")).agg(
        format_number(sum("Revenue"), 4).alias("Revenue")
    )
season_revenue.show()


+------+--------------+
|Season|       Revenue|
+------+--------------+
|Spring|1,864,484.8110|
|Summer|2,003,843.7710|
|Autumn|3,335,648.2720|
|Winter|2,190,866.7400|
+------+--------------+



In [29]:
# Calculate the average unit price per product.
customers_df.agg(avg(col("UnitPrice"))).show()

+------------------+
|    avg(UnitPrice)|
+------------------+
|4.6111136260901056|
+------------------+



                            DATA FILTERING

In [30]:
# Filter transactions that involve a specific product/Stock.
StockCode = "85123A"
specific_txn = customers_df.filter(col("StockCode")==StockCode)
specific_txn.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|TotalPrice|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|     17850|United Kingdom|    15.300|
|   536373|   85123A|WHITE HANGING HEA...|       6|2010-12-01 09:02:00|     2.55|     17850|United Kingdom|    15.300|
|   536375|   85123A|WHITE HANGING HEA...|       6|2010-12-01 09:32:00|     2.55|     17850|United Kingdom|    15.300|
|   536390|   85123A|WHITE HANGING HEA...|      64|2010-12-01 10:19:00|     2.55|     17511|United Kingdom|   163.200|
|   536394|   85123A|WHITE HANGING HEA...|      32|2010-12-01 10:39:00|     2.55|     13408|United Kingdom|    81.600|
|   536396|   85123A|WHITE HANGING HEA...|      

In [31]:
# Filter transactions above a certain total price threshold.
_threshold = 950
customers_df.filter(col("TotalPrice")>_threshold).show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|TotalPrice|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----------+
|   538351|    22423|REGENCY CAKESTAND...|      38|2010-12-10 15:17:00|    25.49|      NULL|United Kingdom|   968.620|
|   538351|   84029E|RED WOOLLY HOTTIE...|     115|2010-12-10 15:17:00|     8.47|      NULL|United Kingdom|   974.050|
|   538877|    22114|HOT WATER BOTTLE ...|     104|2010-12-14 15:29:00|     9.32|      NULL|United Kingdom|   969.280|
|   543379|    82484|WOOD BLACK BOARD ...|     204|2011-02-07 15:37:00|     4.77|     18102|United Kingdom|   973.080|
|   546789|    20711|     JUMBO BAG TOYS |     600|2011-03-17 10:17:00|     1.65|     15769|United Kingdom|   990.000|
|   547812|    22189|CREAM HEART CARD ...|     4

In [32]:
# Filter transactions from a specific country.
country = "Switzerland"
country_txn = customers_df.filter(col("Country")==country)
country_txn.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+-----------+----------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|    Country|TotalPrice|
+---------+---------+--------------------+--------+-------------------+---------+----------+-----------+----------+
|   536858|    22326|ROUND SNACK BOXES...|      30|2010-12-03 10:36:00|     2.95|     13520|Switzerland|    88.500|
|   536858|    22554|PLASTERS IN TIN W...|      36|2010-12-03 10:36:00|     1.65|     13520|Switzerland|    59.400|
|   536858|    21731|RED TOADSTOOL LED...|      24|2010-12-03 10:36:00|     1.65|     13520|Switzerland|    39.600|
|   536858|    20677|  PINK POLKADOT BOWL|      16|2010-12-03 10:36:00|     1.25|     13520|Switzerland|    20.000|
|   536858|    20750|RED RETROSPOT MIN...|       2|2010-12-03 10:36:00|     7.95|     13520|Switzerland|    15.900|
|   536858|     POST|             POSTAGE|       2|2010-12-03 10:36:00| 

In [33]:
# Find transactions made by a specific customer.
customer_id = "17850"
customer_txn = customers_df.filter(col("CustomerID")==customer_id)
customer_txn.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|TotalPrice|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|     17850|United Kingdom|    15.300|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|    20.340|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|     17850|United Kingdom|    22.000|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|    20.340|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|    20.340|
|   536365|    22752|SET 7 BABUSHKA NE...|      

In [34]:
# Filter transactions that occurred within a specific date range.
start_date = "2011-01-04"
end_date = "2012-01-04"

filtered_df = customers_df.filter(
    (col("InvoiceDate") >= start_date) & (col("InvoiceDate") < end_date)
)

filtered_df.show()
# customers_df.filter(col("InvoiceDate")>"2011-01-04").show()


+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|TotalPrice|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----------+
|   539993|    22386|JUMBO BAG PINK PO...|      10|2011-01-04 10:00:00|     1.95|     13313|United Kingdom|    19.500|
|   539993|    21499|  BLUE POLKADOT WRAP|      25|2011-01-04 10:00:00|     0.42|     13313|United Kingdom|    10.500|
|   539993|    21498| RED RETROSPOT WRAP |      25|2011-01-04 10:00:00|     0.42|     13313|United Kingdom|    10.500|
|   539993|    22379|RECYCLING BAG RET...|       5|2011-01-04 10:00:00|      2.1|     13313|United Kingdom|    10.500|
|   539993|    20718|RED RETROSPOT SHO...|      10|2011-01-04 10:00:00|     1.25|     13313|United Kingdom|    12.500|
|   539993|   85099B|JUMBO BAG RED RET...|      

In [35]:
# Identify transactions with a quantity greater than a certain amount.
_amount = 990
customers_df.filter(col("TotalPrice")>_amount).show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|TotalPrice|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----------+
|   547812|    22189|CREAM HEART CARD ...|     432|2011-03-25 14:06:00|     2.31|     18102|United Kingdom|   997.920|
|   556726|    23135|SMALL ZINC HEART ...|     300|2011-06-14 11:31:00|     3.32|     18102|United Kingdom|   996.000|
|   556917|    23243|SET OF TEA COFFEE...|     240|2011-06-15 13:37:00|     4.15|     12415|     Australia|   996.000|
|   556917|    23245|SET OF 3 REGENCY ...|     240|2011-06-15 13:37:00|     4.15|     12415|     Australia|   996.000|
|   561655|    23135|SMALL ZINC HEART ...|     300|2011-07-28 16:00:00|     3.32|     18102|United Kingdom|   996.000|
|   561901|    23131|MISELTOE HEART WR...|     2

In [36]:
# Filter transactions that include products with a specific keyword in the description.
_keyword = "HEART"
customers_df.filter(col("Description").contains(_keyword)).show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|TotalPrice|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|     17850|United Kingdom|    15.300|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|     17850|United Kingdom|    22.000|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|    20.340|
|   536367|    21777|RECIPE BOX WITH M...|       4|2010-12-01 08:34:00|     7.95|     13047|United Kingdom|    31.800|
|   536373|   85123A|WHITE HANGING HEA...|       6|2010-12-01 09:02:00|     2.55|     17850|United Kingdom|    15.300|
|   536373|   84406B|CREAM CUPID HEART...|      

In [37]:
# Filter out canceled transactions (invoices starting with 'C').
customers_df.filter(col("InvoiceNo").startswith("C")).show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|TotalPrice|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----------+
|  C536379|        D|            Discount|      -1|2010-12-01 09:41:00|     27.5|     14527|United Kingdom|   -27.500|
|  C536383|   35004C|SET OF 3 COLOURED...|      -1|2010-12-01 09:49:00|     4.65|     15311|United Kingdom|    -4.650|
|  C536391|    22556|PLASTERS IN TIN C...|     -12|2010-12-01 10:24:00|     1.65|     17548|United Kingdom|   -19.800|
|  C536391|    21984|PACK OF 12 PINK P...|     -24|2010-12-01 10:24:00|     0.29|     17548|United Kingdom|    -6.960|
|  C536391|    21983|PACK OF 12 BLUE P...|     -24|2010-12-01 10:24:00|     0.29|     17548|United Kingdom|    -6.960|
|  C536391|    21980|PACK OF 12 RED RE...|     -

In [38]:
# Identify customers who made purchases in more than one country.

from pyspark.sql.functions import countDistinct
multi_country_purchases = customers_df.groupby(col("CustomerId")).agg(
    countDistinct(col("Country")).alias("CountryCnt")
).filter(col("CountryCnt")>1)
multi_country_purchases.show()

# customers_df.filter(col("CustomerId")=="12394").select(col("Country")).distinct().show()

+----------+----------+
|CustomerId|CountryCnt|
+----------+----------+
|     12394|         2|
|     12429|         2|
|      NULL|         9|
|     12457|         2|
|     12431|         2|
|     12370|         2|
|     12417|         2|
|     12455|         2|
|     12422|         2|
+----------+----------+



In [39]:
# Filter transactions with a unit price above or below a certain value.
_value = 20000
unit_price_beyond_value = customers_df.filter(col("UnitPrice")>_value)
unit_price_beyond_value.show()

+---------+---------+-----------+--------+-------------------+---------+----------+--------------+-----------+
|InvoiceNo|StockCode|Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country| TotalPrice|
+---------+---------+-----------+--------+-------------------+---------+----------+--------------+-----------+
|  C556445|        M|     Manual|      -1|2011-06-10 15:31:00|    38970|     15098|United Kingdom|-38,970.000|
+---------+---------+-----------+--------+-------------------+---------+----------+--------------+-----------+



In [40]:
customers_df.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|TotalPrice|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|     17850|United Kingdom|    15.300|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|    20.340|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|     17850|United Kingdom|    22.000|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|    20.340|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|    20.340|
|   536365|    22752|SET 7 BABUSHKA NE...|      

In [41]:
spark.stop()

Description of the Dataset

The dataset contains the following columns:

    InvoiceNo: Invoice number. Nominal, a 6-digit integral number uniquely assigned to each transaction. If this code starts with the letter 'c', it indicates a cancellation.
    
    StockCode: Product (item) code. Nominal, a 5-digit integral number uniquely assigned to each distinct product.
    
    Description: Product (item) name. Nominal.
    
    Quantity: The quantities of each product (item) per transaction. Numeric.
    
    InvoiceDate: Invoice Date and time. Numeric, the day and time when each transaction was generated.
    
    UnitPrice: Unit price. Numeric, Product price per unit in sterling.
    
    CustomerID: Customer number. Nominal, a 5-digit integral number uniquely assigned to each customer.
    
    Country: Country name. Nominal, the name of the country where each customer resides.

In [42]:
spark.stop()