# Customer Segmentation for a Retail Chain - Part-1 - Preprocessing & EDA 

### - Team 3: 
- Sardar Rohan Singh - G01453457
- Bhargav Patel - G01448937
- Saketh - G01425738
- Monisha Jaganathan - G01412067
- Sai Saketh - G01444348

### -  Professor: Eddy Zhang
### -  Course: AIT-614-001 - Big Data Essentials

In [0]:
# pip install pyspark

In [0]:
from pyspark.sql import SparkSession

In [0]:
retailDF = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/rsardar3@gmu.edu/retail_chain.csv")

In [0]:
retailDF.printSchema()

In [0]:
retailDF.show(5)

In [0]:
retailDF.columns

In [0]:
# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Example") \
    .getOrCreate()

# To get the number of rows
num_rows = retailDF.count()

# To get the number of variables (columns)
num_variables = len(retailDF.columns)

print("Number of rows:", num_rows)
print("Number of variables:", num_variables)

In [0]:
# Copy the Dataframe in case we mess up something
df = retailDF.select(*retailDF.columns)

In [0]:
df.printSchema()

In [0]:
# Drop Duplicated rows of data from the DataFrame, It happened because of Human Error..!! :-p
print("*"*75)
print("Length of DataFrame with duplicate values : ", df.count())
df = df.dropDuplicates(subset=df.columns)
print("*"*75)
print("Length of DataFrame after dropping all the duplicate values : ", df.count())
print("*"*75)

In [0]:
from pyspark.sql.functions import col, count, when
import pandas as pd

def mdF(df):
    total_rows = df.count()
    total_no = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0].asDict()
    missing_percentage = {col: (val / total_rows) * 100 for col, val in total_no.items()}
    mdF = pd.DataFrame({"Missing_Values": total_no, "Missing_Percentage": missing_percentage})
    return mdF[mdF['Missing_Values'] > 1]

mdF(df)


In [0]:
from pyspark.sql.functions import col

# Filter the DataFrame to drop rows where both 'CustomerID' and 'Description' are null
df = df.filter(~(col('CustomerID').isNull() & col('Description').isNull()))


In [0]:
mdF(df)

In [0]:
# Drop null values in the 'CustomerID' field
df = df.na.drop()

In [0]:
# df.printSchema()

In [0]:
# df.count()

In [0]:
# Import necessary libraries
from pyspark.sql.functions import col
import seaborn as sns
import matplotlib.pyplot as plt

# Filter out null values in Quantity and UnitPrice columns
filtered_df = df.filter(~col("Quantity").isNull() & ~col("UnitPrice").isNull())

# Convert Spark DataFrame to Pandas DataFrame
pandas_df = filtered_df.toPandas()

In [0]:
# Convert 'Quantity' column to numeric type, coercing errors to NaN
pandas_df['Quantity_numeric'] = pd.to_numeric(pandas_df['Quantity'], errors='coerce')

# Check for NaN values in the 'Quantity_numeric' column
non_numeric_values = pandas_df[pandas_df['Quantity_numeric'].isnull()]

# Display non-numeric values
if len(non_numeric_values) > 0:
    print("Non-numeric values found in the 'Quantity' column:")
    print(non_numeric_values)
else:
    print("No non-numeric values found in the 'Quantity' column.")

In [0]:
import plotly.express as px

# Create box plots for Quantity and UnitPrice using Plotly Express
fig = px.box(pandas_df, y="Quantity", title="Distribution of Quantity")
fig.update_layout(height=400, width=600, yaxis_title="Quantity")

fig.show()

In [0]:
fig = px.box(pandas_df, y="UnitPrice", title="Distribution of Unit Price")
fig.update_layout(height=400, width=600, yaxis_title="Unit Price")

fig.show()

In [0]:
# Filter out rows where Quantity and UnitPrice are positive
filtered_df = df.filter((col("Quantity") > 0) & (col("UnitPrice") > 0))

# Count the length of the filtered DataFrame
filtered_length = filtered_df.count()
print("Length of DataFrame is:", filtered_length)

In [0]:
# Convert Spark DataFrame to Pandas DataFrame
pandas_filtered_df = filtered_df.toPandas()

# Create box plots for Quantity and UnitPrice using Plotly Express
fig = px.box(pandas_filtered_df, y="Quantity", title="Distribution of Quantity")
fig.update_layout(height=400, width=600, yaxis_title="Quantity")

fig.show()

In [0]:
fig = px.box(pandas_filtered_df, y="UnitPrice", title="Distribution of Unit Price")
fig.update_layout(height=400, width=600, yaxis_title="Unit Price")

fig.show()

In [0]:
# Convert the Data Type of CustomerID to Integer using Lambda Function
df = df.withColumn("CustomerID", col("CustomerID").cast("int"))

In [0]:
# Draw a new column with the total price paid by the customer
df = df.withColumn("Total_Price", col("Quantity") * col("UnitPrice"))

In [0]:
# df.show(5)

In [0]:
from pyspark.sql import functions as F

DM_data = df.select('CustomerID', 'Quantity', 'Total_Price', 'InvoiceNo', 'InvoiceDate')

# Convert 'InvoiceDate' to datetime
DM_data = DM_data.withColumn('InvoiceDate', F.to_timestamp('InvoiceDate'))

In [0]:
# Ensure 'InvoiceDate' is a timestamp
df = df.withColumn('InvoiceDate', F.to_timestamp('InvoiceDate'))

# Extracting date parts
df = df.withColumn('Year', F.year('InvoiceDate'))
df = df.withColumn('Month', F.month('InvoiceDate'))
df = df.withColumn('WeekDay', F.dayofweek('InvoiceDate'))  # Starts at 1 (Sunday) to 7 (Saturday)
df = df.withColumn('TimePeriod', F.hour('InvoiceDate'))

# Mapping for Month and WeekDay
monthDict = {
    1: 'January', 2: 'February', 3: 'March', 4: 'April', 5: 'May', 6: 'June',
    7: 'July', 8: 'August', 9: 'September', 10: 'October', 11: 'November', 12: 'December'
}
weekdayDict = {
    1: 'Sunday', 2: 'Monday', 3: 'Tuesday', 4: 'Wednesday', 5: 'Thursday', 6: 'Friday', 7: 'Saturday'
}


In [0]:
# Creating mapping expressions
month_mapping_expr = F.create_map([F.lit(x) for x in sum(monthDict.items(), ())])
weekday_mapping_expr = F.create_map([F.lit(x) for x in sum(weekdayDict.items(), ())])

# Applying mappings
df = df.withColumn('Month', month_mapping_expr[df['Month']])
df = df.withColumn('WeekDay', weekday_mapping_expr[df['WeekDay']])

In [0]:
# Define and apply the Period categorization
df = df.withColumn(
    'TimePeriod',
    F.when((F.col('TimePeriod') >= 6) & (F.col('TimePeriod') < 12), 'Morning')
     .when((F.col('TimePeriod') >= 12) & (F.col('TimePeriod') < 16), 'Afternoon')
     .when((F.col('TimePeriod') >= 16) & (F.col('TimePeriod') <= 20), 'Evening')
     .otherwise('Night')
)

# Now df includes all transformed and categorized data

In [0]:
df = df.select(
    "InvoiceNo", "StockCode", "Description", "Quantity", "InvoiceDate", "UnitPrice", 
    "CustomerID", "Country", "Region", "Reviews", "Total_Price", 
    "Year", "Month", "WeekDay", "TimePeriod"
)

# Show the DataFrame to verify the structure
# df.show(truncate=False)


In [0]:
df = df.drop('InvoiceDate')

In [0]:
df = df[['CustomerID', 'InvoiceNo', 'Year', 'Month', 'WeekDay','TimePeriod',
         'StockCode', 'Country', 'Description', 'Quantity', 'UnitPrice', 'Total_Price', 'Reviews']]

In [0]:
# Save the Dataframe into another CSV for safety
# df.write.csv("CustomersDF.csv", header=True, mode="overwrite")


Exploratory Data Analysis

In [0]:
customerDF = df

In [0]:
customerDF.show(3)

In [0]:
# customerDF.printSchema()

# 01 - Best Customers

- Best Customers are who have paid more to the company/service and continuously purchasing the products.

Top 30 Customers of our Retail Chain are

In [0]:
best_customers = customerDF.groupBy("CustomerID") \
                           .agg(F.round(F.sum("Total_Price"), 2).alias("Total_Spent")) \
                           .orderBy(F.col("Total_Spent").desc()) \
                           .limit(30)
best_customers.show(5)

In [0]:
# Import necessary libraries

# Group by CustomerID and sum the Total_Price, then sort in descending order and take the top 30
best_customers = customerDF.groupBy('CustomerID').sum('Total_Price') \
                           .orderBy('sum(Total_Price)', ascending=False) \
                           .limit(30)

# Convert Spark DataFrame to Pandas DataFrame for plotting
best_customers_pd = best_customers.toPandas()

In [0]:
# Plot the graph using Seaborn
plt.figure(figsize=(16, 6))
plt.xticks(rotation=90)
plt.title("Best Customers", fontsize=20)
sns.barplot(x='CustomerID', y='sum(Total_Price)', data=best_customers_pd)

# Show the plot
plt.show()

# 02 - Worst Customers

- Every Customer is a Good Customer, the worst could be measured in terms of loyalty.

- Every penny recieved from the customer is an achievement and the hardwork of the company.

# 03 - Periodical Purchasing Stats

In [0]:
import pandas as pd

# Convert PySpark DataFrame to Pandas DataFrame
customer_df_pd = customerDF.toPandas()

plt.figure(figsize=(16, 12))
plt.subplots_adjust(hspace=0.5)

# Plot for Year
plt.subplot(221)
sns.countplot(data=customer_df_pd, x='Year', palette='Set2')
plt.title('Number of Transactions by Year')
plt.xlabel('Year')
plt.ylabel('Count')
for i, count in enumerate(customer_df_pd['Year'].value_counts()):
    plt.text(i, count, str(count), ha='center', va='bottom')

# Plot for Month with hue Year
plt.subplot(222)
plt.xticks(rotation=90)
sns.countplot(data=customer_df_pd, x='Month', order=customer_df_pd['Month'].value_counts().index, hue='Year', palette='Set2')
plt.title('Number of Transactions by Month')
plt.xlabel('Month')
plt.ylabel('Count')
plt.legend(title='Year')
for i, count in enumerate(customer_df_pd['Month'].value_counts()):
    plt.text(i, count, str(count), ha='center', va='bottom')

# Plot for WeekDay
plt.subplot(223)
sns.countplot(data=customer_df_pd, x='WeekDay', order=customer_df_pd['WeekDay'].value_counts().index, palette='Set2')
plt.title('Number of Transactions by Weekday')
plt.xlabel('Weekday')
plt.ylabel('Count')
for i, count in enumerate(customer_df_pd['WeekDay'].value_counts()):
    plt.text(i, count, str(count), ha='center', va='bottom')

# Plot for TimePeriod
plt.subplot(224)
sns.countplot(data=customer_df_pd, x='TimePeriod', order=customer_df_pd['TimePeriod'].value_counts().index, palette='Set2')
plt.title('Number of Transactions by Time Period')
plt.xlabel('Time Period')
plt.ylabel('Count')
for i, count in enumerate(customer_df_pd['TimePeriod'].value_counts()):
    plt.text(i, count, str(count), ha='center', va='bottom')

plt.tight_layout()
plt.show()

#### Insights from the Periodical Plots
- Sales are very high in October, November & December
- We have only December sales data from 2010
- The Retail Store is Closed on Saturday as per the Information available
- More number of sales are happening during Middle of the week(Tuesday, Wednesday and Thursday in ascending order respectively)
- People are usually visiting the store during the afternoon hours when compared to Mornng & Evening Times
- The total no of sales in December 2010 is higher then December 2011
- Management has to concentrate on the decrease of sales in December

# 04 Most Revenue Generated WeekDay

In [0]:
weekly_sales = customerDF.groupBy("WeekDay") \
                         .agg(F.round(F.sum("Total_Price"), 2).alias("Total_Sales")) \
                         .orderBy(F.col("Total_Sales").desc())
weekly_sales.show()

In [0]:
import matplotlib.ticker as ticker

# Convert Spark DataFrame to Pandas DataFrame
weekly_sales_pandas = weekly_sales.toPandas()

plt.figure(figsize=(10, 6))

# Create the bar plot
sns.barplot(x="WeekDay", y="Total_Sales", data=weekly_sales_pandas, order=weekly_sales_pandas["WeekDay"], palette='viridis')

# Add title and labels
plt.title('Weekly Sales')
plt.xlabel('Weekday')
plt.ylabel('Total Sales')

# Add data labels on top of the bars
for i, value in enumerate(weekly_sales_pandas["Total_Sales"]):
    plt.text(i, value, f'{value:.2f}', ha='center', va='bottom')

# Format y-axis tick labels in thousands [K]
plt.gca().yaxis.set_major_formatter(ticker.FuncFormatter(lambda x, _: '{:.0f}K'.format(x / 1000)))

# Show plot
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

- On Thursday Company is generating the highest Revenue
- On Sunday company is generating less revenue

# 05 Regional Purchase Information

In [0]:
country_sales = customerDF.groupBy("Country") \
                          .agg(F.round(F.sum("Total_Price"), 2).alias("Total_Sales")) \
                          .orderBy(F.col("Total_Sales").desc())

# Show the results
country_sales.show(10)

In [0]:
# Convert Spark DataFrame to Pandas DataFrame
country_sales_pandas = country_sales.toPandas()

# Select the top 10 countries by total sales
top_countries = country_sales_pandas.head(10)

# Plot the bar chart
plt.figure(figsize=(12, 8))
sns.barplot(x="Total_Sales", y="Country", data=top_countries, palette="viridis")

# Add labels and title
plt.title('Top 10 Countries by Total Sales')
plt.xlabel('Total Sales')
plt.ylabel('Country')

# Add data labels on top of the bars
for i, sales in enumerate(top_countries["Total_Sales"]):
    plt.text(sales, i, f'{sales:.2f}', va='center')

# Show the plot
plt.tight_layout()
plt.show()

## 5.1 Top Purchasing Countries

In [0]:
top_purchasing_countries = country_sales.orderBy(F.col("Total_Sales").desc()).limit(10)
top_purchasing_countries.show()

## 5.2 Lease Purchasing Countries

In [0]:
least_purchasing_countries = country_sales.orderBy(F.col("Total_Sales")).limit(10)
least_purchasing_countries.show()


# 06 - Product Sales Categorization

In [0]:
product_sales = customerDF.groupBy("Description") \
                          .agg(F.sum("Quantity").alias("Total_Quantity")) \
                          .orderBy(F.col("Total_Quantity").desc())
product_sales.show(10)

## 6.1 Top 20 Sold Products

In [0]:
# Select the top 20 sold products
top_products = product_sales.orderBy(F.col("Total_Quantity").desc()).limit(20)

# Convert Spark DataFrame to Pandas DataFrame
top_products_pandas = top_products.toPandas()

# Plot the bar chart
plt.figure(figsize=(16, 10))
plt.title("Top Sold Products", fontsize=20)
sns.barplot(x="Total_Quantity", y="Description", data=top_products_pandas, palette="viridis")

# Add labels and title
plt.xlabel('Total Quantity')
plt.ylabel('Product Description')

# Show the plot
plt.tight_layout()
plt.show()

## 6.2 Most Revenue Generated Products

In [0]:
product_revenue = customerDF.groupBy("Description") \
                            .agg(F.round(F.sum("Total_Price"), 2).alias("Total_Revenue")) \
                            .orderBy(F.col("Total_Revenue").desc())

# Show the results
product_revenue.show(10)

In [0]:
# Select the top revenue generated products
top_revenue_products = product_revenue.orderBy(F.col("Total_Revenue").desc()).limit(15)

# Convert Spark DataFrame to Pandas DataFrame
top_revenue_products_pandas = top_revenue_products.toPandas()

# Plot the bar chart
plt.figure(figsize=(16, 10))
plt.title("Top Revenue Generated Products", fontsize=20)
sns.barplot(x="Total_Revenue", y="Description", data=top_revenue_products_pandas, palette="viridis")

# Add labels and title
plt.xlabel('Total Revenue')
plt.ylabel('Product Description')

# Show the plot
plt.tight_layout()
plt.show()

# 07 - Customers Stats (Loyalty, Daily Buyer)

- Customer loyalty is the act of choosing one company's products and services consistently over their competitors. When a customer is loyal to one company, they aren't easily swayed by price or availability. They would rather pay more and ensure the same quality service & product they know.

- Since we don't have the purchase stats of our customers with the other companies, Let us seggregate them into 3 Categories based on thier purchases as Bronze, Silver and Gold.

In [0]:
loyal_customers = customerDF.groupBy("CustomerID") \
                            .agg(F.sum("Quantity").alias("Total_Quantity")) \
                            .orderBy(F.col("Total_Quantity").desc())

# Show the results
loyal_customers.show(10)

## 7.1 Customers Who are willing to purchase most from the store

In [0]:
# Select the top loyal customers
top_loyal_customers = loyal_customers.orderBy(F.col("Total_Quantity").desc()).limit(30)

# Convert Spark DataFrame to Pandas DataFrame
top_loyal_customers_pandas = top_loyal_customers.toPandas()

# Plot the bar chart
plt.figure(figsize=(16, 5))
plt.xticks(rotation=90)
plt.title("Highest Quantity of Products Purchased by Customers", fontsize=20)
sns.barplot(x=top_loyal_customers_pandas["CustomerID"], y=top_loyal_customers_pandas["Total_Quantity"], order=top_loyal_customers_pandas["CustomerID"])

# Add labels and title
plt.xlabel('Customer ID')
plt.ylabel('Total Quantity Purchased')

# Show the plot
plt.tight_layout()
plt.show()

## 08 - Customers Who Buy Often, But Spend Very Little

## 8.1 Purchases Generated by Customers

In [0]:
daily_customers = customerDF.groupBy("CustomerID") \
                            .agg(
                                F.countDistinct("InvoiceNo").alias("Unique_Invoices"),
                                F.sum("Total_Price").alias("Total_Sales")
                            )

# Order by 'Unique_Invoices' descending and then by 'Total_Sales' ascending
daily_customers = daily_customers.orderBy(F.col("Unique_Invoices").desc(), F.col("Total_Sales").asc())

# Filter for customers with 'Total_Sales' less than 20,000
daily_customers = daily_customers.filter(F.col("Total_Sales") < 20000)

# Show the top records (e.g., equivalent to .head() in pandas)
daily_customers.show(10)

In [0]:
# Select the customers who buy often but spend little
top_daily_customers = daily_customers.orderBy(F.col("Unique_Invoices").desc(), F.col("Total_Sales").asc()).limit(15)

# Convert Spark DataFrame to Pandas DataFrame
top_daily_customers_pandas = top_daily_customers.toPandas()

# Plot the bar chart
plt.figure(figsize=(16, 5))
plt.title("Customers Who Buy Often but Spend Little", fontsize=20)
sns.barplot(x=top_daily_customers_pandas["CustomerID"], y=top_daily_customers_pandas["Total_Sales"])

# Add labels and title
plt.xlabel('Customer ID')
plt.ylabel('Total Sales')

# Show the plot
plt.xticks(rotation=90)
plt.tight_layout()
plt.show()

# 09 - Customers We have Lost

In [0]:
lost_customers = customerDF.filter(customerDF['Year'] == 2011) \
                           .groupBy("Month") \
                           .agg(F.countDistinct("CustomerID").alias("Unique_Customers"))
lost_customers.show(10)

In [0]:
# Define the order of months
months = ['January', 'February', 'March', 'April', 'May', 'June', 'July', 'August', 'September', 'October', 'November', 'December']

# Convert Spark DataFrame to Pandas DataFrame
lost_customers_pandas = lost_customers.toPandas()

# Plot the bar chart
plt.figure(figsize=(16, 5))
plt.title('Customers We Have Lost', fontsize=20)
sns.barplot(x='Month', y='Unique_Customers', data=lost_customers_pandas, order=months)

# Add labels and title
plt.xlabel('Month')
plt.ylabel('Unique Customers')

# Show the plot
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

- We can observe that very number of customers are lost visiting the store. At the end of the year many customers are lost.

## Part 1 Ends here. Due to Large exceeding file size of Jupyter Notebook. Unable to export in Ipynb or HTML because of exceeding size

## Part 2 Continuation is all about Data Modelling, Machine Learning Algorithm and Visualization

- Please find the attached AIT-614-Team3- Project-PySpark-Part2