In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.sql.functions import col, to_date, datediff, current_date, count, avg, sum, max
from pyspark.sql.types import *
import matplotlib.pyplot as plt
import numpy as np
import os
os.environ["JAVA_HOME"] = r"C:\Program Files\Java\jdk-11.0.24"

# Initialize Spark session
spark = SparkSession.builder \
    .appName("RFM Customer Segmentation") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# Load and prepare data
transactions = spark.read.csv('D:/tatu/Project/Project 1/product_recommendation-main/data/Processed_transactions.csv', header=True, inferSchema=True)
products = spark.read.csv('D:/tatu/Project/Project 1/product_recommendation-main/data/Products_with_Categories.csv', header=True, inferSchema=True)

# Calculate RFM metrics
transactions = transactions.withColumn('Date', to_date(col('Date')))

rfm = transactions.groupBy('Member_number').agg(
    datediff(current_date(), max('Date')).alias('Recency'),
    count('*').alias('Frequency'),
    sum('total_spent').cast(DoubleType()).alias('Monetary')
)

# Prepare features for clustering
assembler = VectorAssembler(
    inputCols=['Recency', 'Frequency', 'Monetary'],
    outputCol='features'
)
rfm_vector = assembler.transform(rfm)

# Scale the features
scaler = StandardScaler(
    inputCol='features',
    outputCol='scaledFeatures',
    withStd=True,
    withMean=True
)
scaler_model = scaler.fit(rfm_vector)
rfm_scaled = scaler_model.transform(rfm_vector)

# Find optimal K using Sum of Squared Errors
cost_values = []
K = range(2, 11)

for k in K:
    kmeans = KMeans(featuresCol='scaledFeatures', k=k)
    model = kmeans.fit(rfm_scaled)
    # Calculate SSE
    cost = model.summary.trainingCost
    cost_values.append(cost)
    print(f"k={k}, cost={cost}")

# Plot elbow curve
plt.figure(figsize=(10, 6))
plt.plot(np.array(K), np.array(cost_values), 'bx-')
plt.xlabel('k')
plt.ylabel('Sum of Squared Errors')
plt.title('Elbow Method For Optimal k')
plt.show()

# Apply K-means with k=5
kmeans = KMeans(featuresCol='scaledFeatures', k=5)
model = kmeans.fit(rfm_scaled)
predictions = model.transform(rfm_scaled)

# Analyze clusters
cluster_sizes = predictions.groupBy('prediction').count().orderBy('prediction')
print("\nCluster Sizes:")
cluster_sizes.show()

# Calculate cluster statistics
cluster_stats = predictions.groupBy('prediction').agg(
    avg('Recency').alias('avg_recency'),
    avg('Frequency').alias('avg_frequency'),
    avg('Monetary').alias('avg_monetary')
).orderBy('prediction')

print("\nCluster Statistics:")
cluster_stats.show()

# Save results
predictions.select('Member_number', 'Recency', 'Frequency', 'Monetary', 'prediction') \
    .write.csv('customer_segments', header=True, mode='overwrite')

# Visualization
pdf = predictions.select('Recency', 'Frequency', 'Monetary', 'prediction').toPandas()

fig = plt.figure(figsize=(10, 7))
ax = fig.add_subplot(111, projection='3d')

scatter = ax.scatter(pdf['Recency'],
                    pdf['Frequency'],
                    pdf['Monetary'],
                    c=pdf['prediction'],
                    cmap='viridis')

ax.set_xlabel('Recency')
ax.set_ylabel('Frequency')
ax.set_zlabel('Monetary')
plt.colorbar(scatter)
plt.title('Customer Segments')
plt.show()

# Stop Spark session
spark.stop()

In [2]:
import os
print(os.environ.get("JAVA_HOME"))


None
