In [None]:
# Import c√°c th∆∞ vi·ªán c·∫ßn thi·∫øt
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Thi·∫øt l·∫≠p style cho plots
plt.style.use('seaborn-v0_8-whitegrid')
sns.set_palette('husl')

print('‚úÖ Libraries imported successfully!')

In [None]:
# T·∫°o Spark Session
spark = SparkSession.builder \
    .appName("RetailAnalysis") \
    .master("spark://spark-master:7077") \
    .config("spark.sql.warehouse.dir", "/user/hive/warehouse") \
    .config("hive.metastore.uris", "thrift://hive-metastore:9083") \
    .enableHiveSupport() \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")
print(f'‚úÖ Spark Session created!')
print(f'üìç Spark Version: {spark.version}')
print(f'üìç Application ID: {spark.sparkContext.applicationId}')

## 1. Load v√† Kh√°m ph√° D·ªØ li·ªáu

In [None]:
# Load d·ªØ li·ªáu t·ª´ CSV
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/home/jovyan/data/online_retail.csv")

print(f'üìä Total records: {df.count():,}')
print(f'üìã Columns: {df.columns}')
df.printSchema()

In [None]:
# Xem m·∫´u d·ªØ li·ªáu
df.show(10, truncate=False)

In [None]:
# Th·ªëng k√™ m√¥ t·∫£
df.describe().show()

## 2. L√†m s·∫°ch v√† X·ª≠ l√Ω D·ªØ li·ªáu

In [None]:
# L√†m s·∫°ch d·ªØ li·ªáu
df_cleaned = df \
    .filter(col("CustomerID").isNotNull()) \
    .filter(col("Quantity") > 0) \
    .filter(col("UnitPrice") > 0) \
    .filter(~col("InvoiceNo").startswith("C")) \
    .withColumn("InvoiceDate", to_timestamp(col("InvoiceDate"))) \
    .withColumn("CustomerID", col("CustomerID").cast("integer")) \
    .withColumn("TotalAmount", round(col("Quantity") * col("UnitPrice"), 2)) \
    .withColumn("Year", year(col("InvoiceDate"))) \
    .withColumn("Month", month(col("InvoiceDate"))) \
    .withColumn("DayOfWeek", dayofweek(col("InvoiceDate"))) \
    .withColumn("Hour", hour(col("InvoiceDate")))

print(f'üìä Records after cleaning: {df_cleaned.count():,}')
print(f'üìä Unique customers: {df_cleaned.select("CustomerID").distinct().count():,}')
print(f'üìä Unique products: {df_cleaned.select("StockCode").distinct().count():,}')

In [None]:
# Cache d·ªØ li·ªáu ƒë·ªÉ tƒÉng t·ªëc x·ª≠ l√Ω
df_cleaned.cache()
df_cleaned.createOrReplaceTempView("transactions")
print('‚úÖ Data cached and temp view created!')

## 3. Ph√¢n t√≠ch Doanh thu theo Th·ªùi gian

In [None]:
# Doanh thu theo th√°ng
monthly_revenue = spark.sql("""
    SELECT 
        Year,
        Month,
        COUNT(DISTINCT InvoiceNo) as TotalOrders,
        COUNT(DISTINCT CustomerID) as TotalCustomers,
        ROUND(SUM(TotalAmount), 2) as TotalRevenue
    FROM transactions
    GROUP BY Year, Month
    ORDER BY Year, Month
""")

monthly_revenue_pd = monthly_revenue.toPandas()
monthly_revenue_pd['Period'] = monthly_revenue_pd['Year'].astype(str) + '-' + monthly_revenue_pd['Month'].astype(str).str.zfill(2)

# V·∫Ω bi·ªÉu ƒë·ªì
fig, axes = plt.subplots(2, 1, figsize=(14, 10))

# Revenue trend
axes[0].plot(monthly_revenue_pd['Period'], monthly_revenue_pd['TotalRevenue'], marker='o', linewidth=2, markersize=8)
axes[0].set_title('üìà Doanh thu theo Th√°ng', fontsize=14, fontweight='bold')
axes[0].set_xlabel('Th·ªùi gian')
axes[0].set_ylabel('Doanh thu ($)')
axes[0].tick_params(axis='x', rotation=45)
axes[0].grid(True, alpha=0.3)

# Orders trend
axes[1].bar(monthly_revenue_pd['Period'], monthly_revenue_pd['TotalOrders'], color='steelblue', alpha=0.7)
axes[1].set_title('üì¶ S·ªë ƒë∆°n h√†ng theo Th√°ng', fontsize=14, fontweight='bold')
axes[1].set_xlabel('Th·ªùi gian')
axes[1].set_ylabel('S·ªë ƒë∆°n h√†ng')
axes[1].tick_params(axis='x', rotation=45)
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

In [None]:
# Doanh thu theo ng√†y trong tu·∫ßn
daily_revenue = spark.sql("""
    SELECT 
        DayOfWeek,
        CASE DayOfWeek
            WHEN 1 THEN 'Sunday'
            WHEN 2 THEN 'Monday'
            WHEN 3 THEN 'Tuesday'
            WHEN 4 THEN 'Wednesday'
            WHEN 5 THEN 'Thursday'
            WHEN 6 THEN 'Friday'
            WHEN 7 THEN 'Saturday'
        END as DayName,
        COUNT(DISTINCT InvoiceNo) as TotalOrders,
        ROUND(SUM(TotalAmount), 2) as TotalRevenue
    FROM transactions
    GROUP BY DayOfWeek
    ORDER BY DayOfWeek
""")

daily_revenue_pd = daily_revenue.toPandas()

fig, ax = plt.subplots(figsize=(10, 6))
bars = ax.bar(daily_revenue_pd['DayName'], daily_revenue_pd['TotalRevenue'], color='coral', alpha=0.8)
ax.set_title('üìÖ Doanh thu theo Ng√†y trong Tu·∫ßn', fontsize=14, fontweight='bold')
ax.set_xlabel('Ng√†y')
ax.set_ylabel('Doanh thu ($)')
ax.grid(True, alpha=0.3, axis='y')

# Th√™m labels
for bar, val in zip(bars, daily_revenue_pd['TotalRevenue']):
    ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 10000, 
            f'${val:,.0f}', ha='center', fontsize=9)

plt.tight_layout()
plt.show()

In [None]:
# Doanh thu theo gi·ªù
hourly_revenue = spark.sql("""
    SELECT 
        Hour,
        COUNT(DISTINCT InvoiceNo) as TotalOrders,
        ROUND(SUM(TotalAmount), 2) as TotalRevenue
    FROM transactions
    GROUP BY Hour
    ORDER BY Hour
""")

hourly_revenue_pd = hourly_revenue.toPandas()

fig, ax = plt.subplots(figsize=(12, 6))
ax.fill_between(hourly_revenue_pd['Hour'], hourly_revenue_pd['TotalRevenue'], alpha=0.3)
ax.plot(hourly_revenue_pd['Hour'], hourly_revenue_pd['TotalRevenue'], marker='o', linewidth=2)
ax.set_title('‚è∞ Doanh thu theo Gi·ªù trong Ng√†y', fontsize=14, fontweight='bold')
ax.set_xlabel('Gi·ªù')
ax.set_ylabel('Doanh thu ($)')
ax.set_xticks(range(0, 24))
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

## 4. Top S·∫£n ph·∫©m B√°n ch·∫°y

In [None]:
# Top 15 s·∫£n ph·∫©m theo doanh thu
top_products = spark.sql("""
    SELECT 
        StockCode,
        FIRST(Description) as Description,
        SUM(Quantity) as TotalQuantity,
        ROUND(SUM(TotalAmount), 2) as TotalRevenue,
        COUNT(DISTINCT CustomerID) as UniqueCustomers
    FROM transactions
    GROUP BY StockCode
    ORDER BY TotalRevenue DESC
    LIMIT 15
""")

top_products_pd = top_products.toPandas()

fig, ax = plt.subplots(figsize=(12, 8))
bars = ax.barh(range(len(top_products_pd)), top_products_pd['TotalRevenue'], color='teal', alpha=0.8)
ax.set_yticks(range(len(top_products_pd)))
ax.set_yticklabels([f"{code}\n{desc[:30]}..." if len(str(desc)) > 30 else f"{code}\n{desc}" 
                   for code, desc in zip(top_products_pd['StockCode'], top_products_pd['Description'])])
ax.set_title('üèÜ Top 15 S·∫£n ph·∫©m theo Doanh thu', fontsize=14, fontweight='bold')
ax.set_xlabel('Doanh thu ($)')
ax.invert_yaxis()
ax.grid(True, alpha=0.3, axis='x')

for bar, val in zip(bars, top_products_pd['TotalRevenue']):
    ax.text(val + 1000, bar.get_y() + bar.get_height()/2, f'${val:,.0f}', va='center', fontsize=9)

plt.tight_layout()
plt.show()

## 5. Ph√¢n t√≠ch Kh√°ch h√†ng - RFM Analysis

In [None]:
# T√≠nh RFM cho t·ª´ng kh√°ch h√†ng
rfm_df = spark.sql("""
    WITH max_date AS (
        SELECT MAX(InvoiceDate) as MaxDate FROM transactions
    )
    SELECT 
        CustomerID,
        DATEDIFF((SELECT MaxDate FROM max_date), MAX(InvoiceDate)) as Recency,
        COUNT(DISTINCT InvoiceNo) as Frequency,
        ROUND(SUM(TotalAmount), 2) as Monetary
    FROM transactions
    GROUP BY CustomerID
""")

rfm_pd = rfm_df.toPandas()

# V·∫Ω ph√¢n ph·ªëi RFM
fig, axes = plt.subplots(1, 3, figsize=(15, 5))

axes[0].hist(rfm_pd['Recency'], bins=50, color='steelblue', alpha=0.7, edgecolor='white')
axes[0].set_title('üìÖ Ph√¢n ph·ªëi Recency', fontsize=12, fontweight='bold')
axes[0].set_xlabel('S·ªë ng√†y t·ª´ l·∫ßn mua cu·ªëi')
axes[0].set_ylabel('S·ªë kh√°ch h√†ng')

axes[1].hist(rfm_pd['Frequency'], bins=50, color='coral', alpha=0.7, edgecolor='white')
axes[1].set_title('üîÑ Ph√¢n ph·ªëi Frequency', fontsize=12, fontweight='bold')
axes[1].set_xlabel('S·ªë l·∫ßn mua h√†ng')
axes[1].set_ylabel('S·ªë kh√°ch h√†ng')

axes[2].hist(rfm_pd['Monetary'], bins=50, color='teal', alpha=0.7, edgecolor='white')
axes[2].set_title('üí∞ Ph√¢n ph·ªëi Monetary', fontsize=12, fontweight='bold')
axes[2].set_xlabel('T·ªïng chi ti√™u ($)')
axes[2].set_ylabel('S·ªë kh√°ch h√†ng')

plt.tight_layout()
plt.show()

In [None]:
# Ph√¢n kh√∫c kh√°ch h√†ng theo RFM Score
customer_segments = spark.sql("""
    WITH rfm_base AS (
        SELECT 
            CustomerID,
            DATEDIFF(
                (SELECT MAX(InvoiceDate) FROM transactions),
                MAX(InvoiceDate)
            ) as Recency,
            COUNT(DISTINCT InvoiceNo) as Frequency,
            SUM(TotalAmount) as Monetary
        FROM transactions
        GROUP BY CustomerID
    ),
    rfm_scores AS (
        SELECT 
            *,
            NTILE(5) OVER (ORDER BY Recency DESC) as R_Score,
            NTILE(5) OVER (ORDER BY Frequency) as F_Score,
            NTILE(5) OVER (ORDER BY Monetary) as M_Score
        FROM rfm_base
    )
    SELECT 
        CASE 
            WHEN R_Score >= 4 AND F_Score >= 4 AND M_Score >= 4 THEN 'Champions'
            WHEN R_Score >= 3 AND F_Score >= 3 AND M_Score >= 3 THEN 'Loyal Customers'
            WHEN R_Score >= 4 AND F_Score <= 2 THEN 'New Customers'
            WHEN R_Score <= 2 AND F_Score >= 3 THEN 'At Risk'
            WHEN R_Score <= 2 AND F_Score <= 2 AND M_Score <= 2 THEN 'Lost'
            ELSE 'Regular'
        END as Segment,
        COUNT(*) as CustomerCount,
        ROUND(AVG(Monetary), 2) as AvgMonetary,
        ROUND(SUM(Monetary), 2) as TotalMonetary
    FROM rfm_scores
    GROUP BY 
        CASE 
            WHEN R_Score >= 4 AND F_Score >= 4 AND M_Score >= 4 THEN 'Champions'
            WHEN R_Score >= 3 AND F_Score >= 3 AND M_Score >= 3 THEN 'Loyal Customers'
            WHEN R_Score >= 4 AND F_Score <= 2 THEN 'New Customers'
            WHEN R_Score <= 2 AND F_Score >= 3 THEN 'At Risk'
            WHEN R_Score <= 2 AND F_Score <= 2 AND M_Score <= 2 THEN 'Lost'
            ELSE 'Regular'
        END
    ORDER BY TotalMonetary DESC
""")

segments_pd = customer_segments.toPandas()
print('üìä Customer Segments:')
print(segments_pd)

# V·∫Ω pie chart
fig, axes = plt.subplots(1, 2, figsize=(14, 6))

colors = ['#2ecc71', '#3498db', '#9b59b6', '#e74c3c', '#95a5a6', '#f39c12']
axes[0].pie(segments_pd['CustomerCount'], labels=segments_pd['Segment'], autopct='%1.1f%%', 
            colors=colors, startangle=90)
axes[0].set_title('üë• Ph√¢n b·ªë Kh√°ch h√†ng theo Segment', fontsize=12, fontweight='bold')

axes[1].pie(segments_pd['TotalMonetary'], labels=segments_pd['Segment'], autopct='%1.1f%%', 
            colors=colors, startangle=90)
axes[1].set_title('üí∞ Doanh thu theo Segment', fontsize=12, fontweight='bold')

plt.tight_layout()
plt.show()

## 6. Customer Clustering v·ªõi K-Means

In [None]:
# Chu·∫©n b·ªã features cho clustering
customer_features = spark.sql("""
    SELECT 
        CustomerID,
        DATEDIFF(
            (SELECT MAX(InvoiceDate) FROM transactions),
            MAX(InvoiceDate)
        ) as Recency,
        COUNT(DISTINCT InvoiceNo) as Frequency,
        ROUND(SUM(TotalAmount), 2) as Monetary,
        COUNT(DISTINCT StockCode) as UniqueProducts,
        ROUND(AVG(Quantity), 2) as AvgQuantity
    FROM transactions
    GROUP BY CustomerID
    HAVING COUNT(DISTINCT InvoiceNo) >= 2
""")

print(f'‚úÖ Customers for clustering: {customer_features.count():,}')

In [None]:
# Vector Assembler
feature_cols = ['Recency', 'Frequency', 'Monetary', 'UniqueProducts', 'AvgQuantity']
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df_vector = assembler.transform(customer_features)

# Standardize features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
scaler_model = scaler.fit(df_vector)
df_scaled = scaler_model.transform(df_vector)

print('‚úÖ Features assembled and scaled!')

In [None]:
# T√¨m s·ªë cluster t·ªëi ∆∞u (Elbow method)
from pyspark.ml.evaluation import ClusteringEvaluator

costs = []
silhouettes = []
evaluator = ClusteringEvaluator()

for k in range(2, 11):
    kmeans = KMeans().setK(k).setSeed(42).setFeaturesCol("scaled_features")
    model = kmeans.fit(df_scaled)
    
    cost = model.summary.trainingCost
    predictions = model.transform(df_scaled)
    silhouette = evaluator.evaluate(predictions)
    
    costs.append(cost)
    silhouettes.append(silhouette)
    print(f'K={k}: Cost={cost:.2f}, Silhouette={silhouette:.4f}')

# Plot elbow
fig, axes = plt.subplots(1, 2, figsize=(12, 5))

axes[0].plot(range(2, 11), costs, marker='o', linewidth=2)
axes[0].set_title('üìâ Elbow Method', fontsize=12, fontweight='bold')
axes[0].set_xlabel('Number of Clusters (K)')
axes[0].set_ylabel('Cost (WSSSE)')
axes[0].grid(True, alpha=0.3)

axes[1].plot(range(2, 11), silhouettes, marker='o', linewidth=2, color='coral')
axes[1].set_title('üìä Silhouette Score', fontsize=12, fontweight='bold')
axes[1].set_xlabel('Number of Clusters (K)')
axes[1].set_ylabel('Silhouette Score')
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

In [None]:
# Train final model v·ªõi K=5
final_k = 5
kmeans = KMeans().setK(final_k).setSeed(42).setFeaturesCol("scaled_features").setPredictionCol("Cluster")
model = kmeans.fit(df_scaled)
clustered = model.transform(df_scaled)

# Cluster statistics
cluster_stats = clustered.groupBy('Cluster').agg(
    count('*').alias('CustomerCount'),
    round(avg('Recency'), 1).alias('AvgRecency'),
    round(avg('Frequency'), 1).alias('AvgFrequency'),
    round(avg('Monetary'), 2).alias('AvgMonetary'),
    round(avg('UniqueProducts'), 1).alias('AvgProducts')
).orderBy('Cluster')

cluster_stats.show()

# Visualize cluster distribution
cluster_pd = cluster_stats.toPandas()

fig, ax = plt.subplots(figsize=(10, 6))
bars = ax.bar(cluster_pd['Cluster'].astype(str), cluster_pd['CustomerCount'], color='steelblue', alpha=0.8)
ax.set_title('üë• S·ªë l∆∞·ª£ng Kh√°ch h√†ng theo Cluster', fontsize=14, fontweight='bold')
ax.set_xlabel('Cluster')
ax.set_ylabel('S·ªë kh√°ch h√†ng')
ax.grid(True, alpha=0.3, axis='y')

for bar, val in zip(bars, cluster_pd['CustomerCount']):
    ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 10, 
            f'{val:,}', ha='center', fontsize=10)

plt.tight_layout()
plt.show()

## 7. Ph√¢n t√≠ch theo Qu·ªëc gia

In [None]:
# Top 10 qu·ªëc gia theo doanh thu
country_stats = spark.sql("""
    SELECT 
        Country,
        COUNT(DISTINCT CustomerID) as TotalCustomers,
        COUNT(DISTINCT InvoiceNo) as TotalOrders,
        ROUND(SUM(TotalAmount), 2) as TotalRevenue
    FROM transactions
    GROUP BY Country
    ORDER BY TotalRevenue DESC
    LIMIT 10
""")

country_pd = country_stats.toPandas()

fig, ax = plt.subplots(figsize=(12, 6))
bars = ax.barh(range(len(country_pd)), country_pd['TotalRevenue'], color='teal', alpha=0.8)
ax.set_yticks(range(len(country_pd)))
ax.set_yticklabels(country_pd['Country'])
ax.set_title('üåç Top 10 Qu·ªëc gia theo Doanh thu', fontsize=14, fontweight='bold')
ax.set_xlabel('Doanh thu ($)')
ax.invert_yaxis()
ax.grid(True, alpha=0.3, axis='x')

for bar, val in zip(bars, country_pd['TotalRevenue']):
    ax.text(val + 10000, bar.get_y() + bar.get_height()/2, f'${val:,.0f}', va='center', fontsize=9)

plt.tight_layout()
plt.show()

## 8. T·ªïng k·∫øt v√† L∆∞u k·∫øt qu·∫£

In [None]:
# T·ªïng quan
print('=' * 60)
print('üìã T·ªîNG K·∫æT PH√ÇN T√çCH')
print('=' * 60)

total_records = df_cleaned.count()
total_customers = df_cleaned.select('CustomerID').distinct().count()
total_products = df_cleaned.select('StockCode').distinct().count()
total_revenue = df_cleaned.agg({'TotalAmount': 'sum'}).collect()[0][0]
total_orders = df_cleaned.select('InvoiceNo').distinct().count()

print(f'üìä T·ªïng s·ªë giao d·ªãch: {total_records:,}')
print(f'üë• T·ªïng s·ªë kh√°ch h√†ng: {total_customers:,}')
print(f'üì¶ T·ªïng s·ªë s·∫£n ph·∫©m: {total_products:,}')
print(f'üßæ T·ªïng s·ªë ƒë∆°n h√†ng: {total_orders:,}')
print(f'üí∞ T·ªïng doanh thu: ${total_revenue:,.2f}')
print(f'üíµ Gi√° tr·ªã ƒë∆°n h√†ng TB: ${total_revenue/total_orders:,.2f}')
print('=' * 60)

In [None]:
# D·ª´ng Spark Session
spark.stop()
print('‚úÖ Spark Session stopped!')
print('üéâ Analysis completed successfully!')