In [0]:
%python
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,DataType
from pyspark.sql.functions import col
from pyspark.sql.functions import expr
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col, datediff, lit, countDistinct, sum
from pyspark.sql.functions import count, sum,current_timestamp

In [0]:
%python
data = spark.read.csv('/FileStore/tables/Online_Retail-2.csv', header=True, inferSchema=True)

In [0]:
%python
data.show()

+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity| InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/10 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/10 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/10 8:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/10 8:26|     4.25|     17850|United

In [0]:
%python
df = data.select("CustomerID").distinct()

# Display unique CustomerIDs
df.show()

+----------+
|CustomerID|
+----------+
|     17420|
|     15100|
|     17809|
|     12583|
|     12431|
|     18074|
|     16098|
|     13047|
|     15311|
|     13408|
|     14527|
|     17850|
|     13748|
|     16029|
|     17548|
|     13705|
|     15291|
|     17511|
|     13747|
|     14688|
+----------+
only showing top 20 rows



In [0]:
%python
data.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



In [0]:
%python

# Calculate and print the percentage of missing data in the 'CustomerID' column
missing_percentage = (data.filter(data["CustomerID"].isNull()).count() / data.count()) * 100
print(f'Percentage of missing data from CustomerID column is: {round(missing_percentage, 2)}%')


Percentage of missing data from CustomerID column is: 24.93%


In [0]:
%python
# Fill missing values in the 'Description' column with 'No Description'
data = data.na.fill('No Description', subset=['Description'])

# Check again for missing values in the 'Description' column
data.select("Description").na.drop()

# Drop rows with any missing values
df = data.na.drop()




In [0]:
%python
# Ensure 'Quantity' is non-negative


# Corrected line to take absolute values of 'Quantity'
df = df.withColumn("Quantity", expr("abs(Quantity)"))

# Drop rows where 'UnitPrice' is 0
df = df.filter(col("UnitPrice") != 0)

In [0]:
%python
df.show()

+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity| InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/10 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/10 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/10 8:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/10 8:26|     4.25|     17850|United

In [0]:
%python
df = df.withColumn("TotalPrice", col("Quantity") * col("UnitPrice"))
df.show(5)

+---------+---------+--------------------+--------+------------+---------+----------+--------------+------------------+
|InvoiceNo|StockCode|         Description|Quantity| InvoiceDate|UnitPrice|CustomerID|       Country|        TotalPrice|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+------------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/10 8:26|     2.55|     17850|United Kingdom|15.299999999999999|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|             20.34|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/10 8:26|     2.75|     17850|United Kingdom|              22.0|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|             20.34|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|             20.34|
+---------+---------+-------------------

In [0]:
%python
# Check the schema again
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = false)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- TotalPrice: double (nullable = true)



In [0]:
%python
most_recent_date = data.agg({"InvoiceDate": "max"}).collect()[0][0]

# Calculate Recency, Frequency, and Monetary for each customer
customer_df = data.groupBy('CustomerID').agg(
    (datediff(current_timestamp(), F.max("InvoiceDate"))).alias("Recency"),
    countDistinct("InvoiceNo").alias("Frequency"),
    sum("TotalPrice").alias("Monetary")
)

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-2763636824497793>:4[0m
[1;32m      1[0m most_recent_date [38;5;241m=[39m data[38;5;241m.[39magg({[38;5;124m"[39m[38;5;124mInvoiceDate[39m[38;5;124m"[39m: [38;5;124m"[39m[38;5;124mmax[39m[38;5;124m"[39m})[38;5;241m.[39mcollect()[[38;5;241m0[39m][[38;5;241m0[39m]
[1;32m      3[0m [38;5;66;03m# Calculate Recency, Frequency, and Monetary for each customer[39;00m
[0;32m----> 4[0m customer_df [38;5;241m=[39m data[38;5;241m.[39mgroupBy([38;5;124m'[39m[38;5;124mCustomerID[39m[38;5;124m'[39m)[38;5;241m.[39magg(
[1;32m      5[0m     (datediff(current_timestamp(), F[38;5;241m.[39mmax([38;5;124m"[39m[38;5;124mInvoiceDate[39m[38;5;124m"[39m)))[38;5;241m.[39malias([38;5;124m"[39m[38;5;124mRecency[39m[38;5;124m"[39m),
[1;32m      6[0m     countDis

In [0]:
%python
customer_df = df.groupBy('CustomerID').agg(
    (datediff(current_timestamp(), F.max("InvoiceDate"))).alias("Recency"),
    countDistinct('InvoiceNo').alias('Frequency'),
    sum('TotalPrice').alias('Monetary')
)



In [0]:
%python

customer_df.show()

+----------+-------+---------+------------------+
|CustomerID|Recency|Frequency|          Monetary|
+----------+-------+---------+------------------+
|     12346|   null|        2|          154367.2|
|     12347|   null|        7|            4310.0|
|     12348|   null|        4|           1797.24|
|     12349|   null|        1|           1757.55|
|     12350|   null|        1|334.40000000000003|
|     12352|   null|       11|3466.6699999999996|
|     12353|   null|        1|              89.0|
|     12354|   null|        1|            1079.4|
|     12355|   null|        1|             459.4|
|     12356|   null|        3|2811.4300000000003|
|     12357|   null|        1| 6207.669999999996|
|     12358|   null|        2|           1168.06|
|     12359|   null|        6|           6499.63|
|     12360|   null|        3|           2662.06|
|     12361|   null|        1|189.89999999999998|
|     12362|   null|       13|           5297.88|
|     12363|   null|        2|             552.0|


In [0]:
%python
customer_df = spark.read.csv('/FileStore/tables/customer_df.csv', header=True, inferSchema=True)


In [0]:
%python
customer_df.show()

+----------+-------+---------+-----------------+
|CustomerID|Recency|Frequency|         Monetary|
+----------+-------+---------+-----------------+
|   12346.0|    325|        2|         154367.2|
|   12347.0|      1|      182|           4310.0|
|   12348.0|     74|       31|          1797.24|
|   12349.0|     18|       73|          1757.55|
|   12350.0|    309|       17|            334.4|
|   12352.0|     35|       95|          3466.67|
|   12353.0|    203|        4|             89.0|
|   12354.0|    231|       58|           1079.4|
|   12355.0|    213|       13|            459.4|
|   12356.0|     22|       59|          2811.43|
|   12357.0|     32|      131|          6207.67|
|   12358.0|      1|       19|          1168.06|
|   12359.0|      7|      254|          6499.63|
|   12360.0|     51|      129|          2662.06|
|   12361.0|    286|       10|            189.9|
|   12362.0|      2|      274|          5297.88|
|   12363.0|    109|       23|            552.0|
|   12364.0|      7|

In [0]:
%python
customer_df.describe().show()

+-------+------------------+------------------+------------------+------------------+
|summary|        CustomerID|           Recency|         Frequency|          Monetary|
+-------+------------------+------------------+------------------+------------------+
|  count|              4371|              4371|              4371|              4371|
|   mean|15300.145275680623| 91.06497369023107| 93.06543125142989|2178.6204516129023|
| stddev|  1722.31026187441|100.77004606913796|232.46377699995563|10503.405397270177|
|    min|           12346.0|                 0|                 1|              1.25|
|    max|           18287.0|               373|              7983|          336942.1|
+-------+------------------+------------------+------------------+------------------+



In [0]:
%python
display(customer_df)

CustomerID,Recency,Frequency,Monetary
12346.0,325,2,154367.2
12347.0,1,182,4310.0
12348.0,74,31,1797.24
12349.0,18,73,1757.55
12350.0,309,17,334.4
12352.0,35,95,3466.67
12353.0,203,4,89.0
12354.0,231,58,1079.4
12355.0,213,13,459.4
12356.0,22,59,2811.43


Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.sql.functions import col

# Assuming 'customer_df' is a PySpark DataFrame
# Replace 'Monetary', 'Frequency', and 'Recency' with your actual column names

# Feature scaling using StandardScaler
features = ['Monetary', 'Frequency', 'Recency']
feature_assembler = VectorAssembler(inputCols=features, outputCol='features')
scaler = StandardScaler(inputCol='features', outputCol='scaled_features')

# K-Means model
kmeans = KMeans(featuresCol='scaled_features', predictionCol='Cluster', k=3, seed=42, maxIter=50, initSteps=10)

# **Option 1: Use the "Cluster" column for evaluation**
pipeline = Pipeline(stages=[feature_assembler, scaler, kmeans])
model = pipeline.fit(customer_df)
result_df = model.transform(customer_df)
evaluator = ClusteringEvaluator(predictionCol='Cluster')
silhouette = evaluator.evaluate(result_df)
print(f"Silhouette score: {silhouette}")


# Displaying the clusters
result_df.show(10)

# Aggregating statistics for each cluster
result_df.groupBy('Cluster').agg({'Monetary':'mean', 'Frequency':'mean', 'Recency':'mean'}).show()


Silhouette score: -0.2911551343612216
+----------+-------+---------+--------+--------------------+--------------------+-------+
|CustomerID|Recency|Frequency|Monetary|            features|     scaled_features|Cluster|
+----------+-------+---------+--------+--------------------+--------------------+-------+
|   12346.0|    325|        2|154367.2|[154367.2,2.0,325.0]|[14.6968715536886...|      1|
|   12347.0|      1|      182|  4310.0|  [4310.0,182.0,1.0]|[0.41034310654334...|      0|
|   12348.0|     74|       31| 1797.24| [1797.24,31.0,74.0]|[0.17111021921205...|      0|
|   12349.0|     18|       73| 1757.55| [1757.55,73.0,18.0]|[0.16733144475760...|      0|
|   12350.0|    309|       17|   334.4|  [334.4,17.0,309.0]|[0.03183729346359...|      2|
|   12352.0|     35|       95| 3466.67| [3466.67,95.0,35.0]|[0.33005200398158...|      0|
|   12353.0|    203|        4|    89.0|    [89.0,4.0,203.0]|[0.00847344233929...|      2|
|   12354.0|    231|       58|  1079.4| [1079.4,58.0,231.0]|[0

In [0]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator



In [0]:
from operator import add



In [0]:
data = spark.read.csv("/FileStore/tables/final.csv", header=True, inferSchema=True)

# Map: Extract (Country, TotalPrice) pairs
mapped_data = data.rdd.map(lambda row: (row["Country"], row["TotalPrice"]))

# Reduce: Sum TotalPrice by Country
reduced_data = mapped_data.reduceByKey(add)

result = reduced_data.collect()
for country, total_price in result:
    print(f"{country}: {total_price}")



In [0]:
display(result)



In [0]:

filtered_df=data

counts_list = filtered_df.select("Description").distinct().rdd.flatMap(lambda x: x).collect()

# Map phase: Count occurrences of each item in filtered_df
mapped_counts = filtered_df.rdd.map(lambda row: (row['Description'], 1)).reduceByKey(lambda count1, count2: count1 + count2)

# Convert the result to a DataFrame
counts_df = mapped_counts.toDF(['Description', 'Count'])

# Sort the DataFrame by 'Count' in descending order
counts_df = counts_df.sort("Count", ascending=False)

# Show the top 5 items
c_df = counts_df.head(5)
display(c_df)



In [0]:
mapped_data = filtered_df.rdd.map(lambda row: (row['CustomerID'], float(row['TotalPrice'])))

# Reduce phase: Sum the 'TotalPrice' for each 'CustomerID'
reduced_data = mapped_data.reduceByKey(lambda total1, total2: total1 + total2)

# Convert the result to a DataFrame
result_df = reduced_data.toDF(['CustomerID', 'TotalPrice'])
display(result_df)



In [0]:
monthly_total_sales = (
    filtered_df
    .groupBy('YearMonth')
    .agg(sum('TotalPrice').alias('MonthlyTotalSales'))
    .orderBy('YearMonth')
)



In [0]:
display(monthly_total_sales)

