# DAMO-630-29 Assignment 01
prublic repository:  https://github.com/prumucena1979/UNFTERM4

#Business Challenge 01

In [None]:
# Imports
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import f1_score, accuracy_score, roc_auc_score
from sklearn.metrics import r2_score, mean_absolute_error
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor

from sdv.metadata import Metadata
from sdv.single_table import GaussianCopulaSynthesizer, CTGANSynthesizer
from sdmetrics.reports.single_table import QualityReport, DiagnosticReport

# TASK I - Exploratory Data Analysis

The EDA offers an initial overview of the dataset by inspecting its structure, detecting missing values or outliers, 
and applying descriptive statistics with visualizations. These insights provide the foundation for subsequent synthetic data generation and evaluation.

In [None]:
# 1.1. load dataset
df = pd.read_csv("Datasets\HealthInsurance.csv")  # adjust file name as needed

In [None]:
# 1.2. Shape
print("Shape:", df.shape)

In [None]:
# 1.3. Preview
display(df.head())

In [None]:
# 1.4. Info
df.info()

In [None]:
# 1.5. Descriptive statistics
display(df.describe())

In [None]:
# 1.6. Missing values
print(df.isnull().sum())

# 1.7. Distribution plots (example numeric columns)
import matplotlib.pyplot as plt
import numpy as np

numeric_cols = df.select_dtypes(include=np.number).columns[:3]
for col in numeric_cols:
    plt.figure(figsize=(6, 4))
    plt.hist(df[col].dropna(), bins=30, color='skyblue', edgecolor='black')
    plt.title(f"Distribution: {col}")
    plt.xlabel(col)
    plt.ylabel("Frequency")
    plt.grid(alpha=0.3)
    plt.show()


# Task II — Baseline Synthetic Data

In [None]:
# 2.1 Random noise baseline
synthetic_baseline = pd.DataFrame(
    np.random.randn(df.shape[0], df.shape[1]),
    columns=df.columns
)
display(synthetic_baseline.head())

# Task III — Advanced Synthetic Data (SDV)

In [None]:
# 3.1 Infer table metadata (types, constraints, relations)
metadata = Metadata.detect_from_dataframe(data=df, table_name="my_table")

In [None]:
# 3.2 Synthetic Data Generation — GaussianCopula (for SDV ≤ 1.3.x)

from sdv.single_table import GaussianCopulaSynthesizer
from sdv.metadata import SingleTableMetadata  # fallback import for older SDV versions
import pandas as pd

# --- Step 1: Create metadata from the dataframe ---
metadata = SingleTableMetadata()
metadata.detect_from_dataframe(df)

# ✅ Save metadata for reproducibility
metadata.save_to_json("metadata.json")
print("✅ Metadata saved as metadata.json")

# --- Step 2: Fit the GaussianCopula model ---
gc = GaussianCopulaSynthesizer(metadata)
gc.fit(df)
print("✅ GaussianCopula model trained successfully!")

# --- Step 3: Generate synthetic data ---
synthetic_gc = gc.sample(num_rows=len(df))
display(synthetic_gc.head())

# --- Step 4: Optional — Save the trained model itself ---
gc.save("gaussian_copula_synth.pkl")
print("💾 Model saved as gaussian_copula_synth.pkl")

# --- Step 5 (optional) — Reload later without retraining ---
# gc_loaded = GaussianCopulaSynthesizer.load("gaussian_copula_synth.pkl")
# synthetic_gc = gc_loaded.sample(num_rows=len(df))


In [None]:
# 3.3 CTGAN
#ctgan = CTGANSynthesizer(metadata, epochs=200, batch_size=100, verbose=True) - We will let it leaner just for the video.
ctgan = CTGANSynthesizer(metadata, epochs=15, batch_size=15, verbose=True)
ctgan.fit(df)
synthetic_ctgan = ctgan.sample(num_rows=len(df))
display(synthetic_ctgan.head())

# Task IV — Evaluation

Convert metadata for sdmetrics (single table)

In [None]:
# Convert metadata for sdmetrics (single table)
_meta_dict = metadata.to_dict()
if "tables" in _meta_dict:
    _table_name = next(iter(_meta_dict["tables"].keys()))
    single_table_meta = _meta_dict["tables"][_table_name]
else:
    single_table_meta = _meta_dict

In [None]:
# 4.1 Quality and Diagnostics
qr_gc = QualityReport(); qr_gc.generate(df, synthetic_gc, single_table_meta)
qr_ct = QualityReport(); qr_ct.generate(df, synthetic_ctgan, single_table_meta)

print("Quality — GC:", qr_gc.get_score())
print("Quality — CTGAN:", qr_ct.get_score())

dr_gc = DiagnosticReport(); dr_gc.generate(df, synthetic_gc, single_table_meta)
dr_ct = DiagnosticReport(); dr_ct.generate(df, synthetic_ctgan, single_table_meta)

In [None]:
# 4.2 Correlation Preservation
def corr_rmse(a, b):
    cols = a.select_dtypes(include=np.number).columns.intersection(
        b.select_dtypes(include=np.number).columns
    )
    if len(cols) < 2:
        return np.nan
    ca, cb = a[cols].corr(), b[cols].corr()
    mask = np.triu(np.ones_like(ca, dtype=bool), k=1)
    diff = (ca - cb).where(mask)
    vals = diff.values[~np.isnan(diff.values)]
    return np.sqrt(np.mean(vals**2)) if len(vals) else np.nan

print("Correlation RMSE — GC:", corr_rmse(df, synthetic_gc))
print("Correlation RMSE — CTGAN:", corr_rmse(df, synthetic_ctgan))

In [None]:
# 4.3 Utility — TSTR (Train on Synthetic, Test on Real)
def tstr_classification(real_df, synth_df, target):
    Xs, ys = synth_df.drop(columns=[target]), synth_df[target]
    Xr, yr = real_df.drop(columns=[target]), real_df[target]
    Xs = Xs.select_dtypes(include=np.number).fillna(Xs.median(numeric_only=True))
    Xr = Xr.select_dtypes(include=np.number).fillna(Xr.median(numeric_only=True))
    clf = RandomForestClassifier(n_estimators=300, random_state=42)
    clf.fit(Xs, ys)
    pred = clf.predict(Xr)
    out = {
        "accuracy": accuracy_score(yr, pred),
        "f1_macro": f1_score(yr, pred, average="macro")
    }
    if len(clf.classes_) == 2:
        out["roc_auc"] = roc_auc_score(yr, clf.predict_proba(Xr)[:, 1])
    return out

def tstr_regression(real_df, synth_df, target):
    Xs, ys = synth_df.drop(columns=[target]), synth_df[target]
    Xr, yr = real_df.drop(columns=[target]), real_df[target]
    Xs = Xs.select_dtypes(include=np.number).fillna(Xs.median(numeric_only=True))
    Xr = Xr.select_dtypes(include=np.number).fillna(Xr.median(numeric_only=True))
    reg = RandomForestRegressor(n_estimators=400, random_state=42)
    reg.fit(Xs, ys)
    pred = reg.predict(Xr)
    return {"r2": r2_score(yr, pred), "mae": mean_absolute_error(yr, pred)}

# Example (uncomment and set target column)
# print(tstr_classification(df, synthetic_gc, "your_target"))
# print(tstr_classification(df, synthetic_ctgan, "your_target"))

In [None]:
# 4.4 Privacy — exact duplicates
def exact_dup_rate(real_df, synth_df):
    r = real_df.astype(str).agg("|".join, axis=1)
    s = synth_df.astype(str).agg("|".join, axis=1)
    return len(set(r) & set(s)) / max(1, len(s))

print("Duplication rate — GC:", exact_dup_rate(df, synthetic_gc))
print("Duplication rate — CTGAN:", exact_dup_rate(df, synthetic_ctgan))

## BUSINESS CHALLENGE #02: Mining NYC Taxi Trip Data (DAMO630 Learning Outcomes 3 & 4)
- **HDFS URI (RPC):** `hdfs://hadoop-VirtualBox:9000`
- **Task I (Data Prep):** load parquet from HDFS and create CSV for MapReduce
- **Task II (MapReduce):** total fare per pickup zone (HDFS → Hadoop Streaming)
- **Task III (PySpark):** FPGrowth (frequent pattern mining)
- **Task IV (PySpark):** K-Means (clustering)

In [None]:
# Imports
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import f1_score, accuracy_score, roc_auc_score
from sklearn.metrics import r2_score, mean_absolute_error
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor

from sdv.metadata import Metadata
from sdv.single_table import GaussianCopulaSynthesizer, CTGANSynthesizer
from sdmetrics.reports.single_table import QualityReport, DiagnosticReport

## BUSINESS CHALLENGE #02

### Task I — Data Preparation and Exploration
The cells below configure HDFS/Spark, verify the dataset path, load the parquet file from HDFS, show schema and basic stats, and provide an optional CSV export containing (PULocationID, fare_amount) for Hadoop Streaming.

In [None]:
# --- BC2.I - Environment constants and Spark setup (idempotent) ---
import os, time
from pyspark.sql import SparkSession

# ------------------------------------------------------------------
# 1. Enforce HDFS user = hadoop BEFORE Spark starts
# ------------------------------------------------------------------
try:
    spark.stop()
except:
    pass
os.environ["HADOOP_USER_NAME"] = "hadoop"   # critical: ensures permissions ok
time.sleep(1)

# ------------------------------------------------------------------
# 2. Define Hadoop / HDFS endpoints and paths
# ------------------------------------------------------------------
NN_HOST = "hadoop-VirtualBox"   # or the IP if you prefer (e.g. "192.168.0.149")
NN_PORT = "9000"
HDFS_URI = f"hdfs://{NN_HOST}:{NN_PORT}"
DATA_DIR = "/data/tlc/trips"
HDFS_PARQUET = f"{DATA_DIR}/yellow_tripdata_2023-05.parquet"
CSV_BASE_DIR = "/user/hadoop/taxi"   # hadoop’s own writable home

# ------------------------------------------------------------------
# 3. Start SparkSession configured for HDFS
# ------------------------------------------------------------------
spark = (SparkSession.builder
         .appName("BC2_Env")
         .config("spark.hadoop.fs.defaultFS", HDFS_URI)
         .getOrCreate())

sc = spark.sparkContext

# ------------------------------------------------------------------
# 4. Verify connection and effective user
# ------------------------------------------------------------------
print("Using HDFS URI:", HDFS_URI)
print("fs.defaultFS =", spark._jsc.hadoopConfiguration().get("fs.defaultFS"))
ugi = spark._jvm.org.apache.hadoop.security.UserGroupInformation.getCurrentUser().getShortUserName()
print("HDFS effective user:", ugi)


In [None]:
# HDFS JVM probe: ensure DATA_DIR exists and list its contents
from py4j.java_gateway import java_import
jvm = sc._jvm
java_import(jvm, "org.apache.hadoop.fs.FileSystem")
java_import(jvm, "org.apache.hadoop.fs.Path")
fs = jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
p = jvm.org.apache.hadoop.fs.Path(DATA_DIR)
created = fs.mkdirs(p)
print("Created HDFS directory:" if created else "HDFS directory already present:", DATA_DIR)
print("Listing contents of:", DATA_DIR)
for s in fs.listStatus(p):
    print(' -', s.getPath().toString())


In [None]:
# Parquet load and basic exploration (schema, counts, averages, sample)
from pyspark.sql import functions as F
if not fs.exists(jvm.org.apache.hadoop.fs.Path(HDFS_PARQUET)):
    raise FileNotFoundError(f"Parquet not found: {HDFS_PARQUET} on {HDFS_URI}")
df = spark.read.parquet(f"{HDFS_URI}{HDFS_PARQUET}")
print("Schema:")
df.printSchema()
print("Row count:", df.count())
print("Basic stats:")
df.selectExpr("avg(fare_amount) as avg_fare", "avg(trip_distance) as avg_distance", "avg(passenger_count) as avg_passengers").show(1, truncate=False)
print("Sample 5 rows:")
display(df.select("PULocationID", "DOLocationID", "fare_amount", "trip_distance", "passenger_count").limit(5))


In [None]:
# BC2.I.opt — Minimal CSV for Hadoop Streaming (fixed to user=hadoop, no fallbacks)

from pyspark.sql import functions as F
from py4j.java_gateway import java_import

HDFS_URI = "hdfs://hadoop-VirtualBox:9000"
OUT_DIR  = f"{HDFS_URI}/user/hadoop/taxi/yellow_2023-05_mincsv"

# 1) Sanity check: effective HDFS user must be 'hadoop'
ugi = spark._jvm.org.apache.hadoop.security.UserGroupInformation.getCurrentUser().getShortUserName()
print("HDFS effective user:", ugi)
if ugi != "hadoop":
    raise RuntimeError("Effective HDFS user is not 'hadoop'. Restart kernel, set HADOOP_USER_NAME='hadoop' BEFORE creating SparkSession.")

# 2) Build minimal DF and write a single-shard CSV (no header)
min_df = (df
    .select("PULocationID", "fare_amount")
    .where(F.col("PULocationID").isNotNull() & F.col("fare_amount").isNotNull())
)

(min_df.coalesce(1)
      .write.mode("overwrite")
      .option("header", "false")
      .csv(OUT_DIR))

print("✅ CSV written to:", OUT_DIR)

# 3) List the output with Hadoop FS API
sc, jvm = spark.sparkContext, spark.sparkContext._jvm
java_import(jvm, "org.apache.hadoop.fs.FileSystem")
java_import(jvm, "org.apache.hadoop.fs.Path")
fs   = jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
pout = jvm.org.apache.hadoop.fs.Path(OUT_DIR)

print("Contents:")
for s in fs.listStatus(pout):
    print(" -", s.getPath().toString())

# 4) Quick read-back to verify data shape (show a few rows)
print("\nSample rows (read-back):")
preview = (spark.read
    .option("header", "false")
    .csv(OUT_DIR)
    .toDF("PULocationID","fare_amount_raw")
    .select(F.col("PULocationID").cast("int"),
            F.col("fare_amount_raw").cast("double").alias("fare_amount"))
)
preview.show(5, truncate=False)


### Task II — MapReduce: Total Fare by Pickup Zone
Below are minimal mapper and reducer definitions suitable for Hadoop Streaming. Save these as `mapper.py` and `reducer.py` on the Hadoop VM and run the streaming job there.

In [None]:
# BC2.II.mapper — create mapper.py for Hadoop Streaming

mapper = r"""#!/usr/bin/env python3
import sys

for line in sys.stdin:
    line = line.strip()
    if not line:
        continue
    parts = line.split(',')
    if len(parts) < 2:
        continue
    pul, fare = parts[0].strip(), parts[1].strip()
    try:
        f = float(fare)
    except:
        continue
    print(f"{pul}\t{f}")
"""

with open("mapper.py", "w", encoding="utf-8") as f:
    f.write(mapper)

print("✅ mapper.py created successfully.\nPreview:\n")
print(mapper[:400])


In [None]:
# BC2.II.reducer — create reducer.py for Hadoop Streaming

reducer = r"""#!/usr/bin/env python3
import sys
from collections import defaultdict

acc = defaultdict(float)

for line in sys.stdin:
    line = line.strip()
    if not line:
        continue
    try:
        key, val = line.split('\t')
        acc[key] += float(val)
    except:
        continue

for k, v in acc.items():
    print(f"{k}\t{v}")
"""

with open("reducer.py", "w", encoding="utf-8") as f:
    f.write(reducer)

print("✅ reducer.py created successfully.\nPreview:\n")
print(reducer[:400])


Example Hadoop Streaming command (run on the Hadoop VM):
```
hdfs dfs -cat /user/hadoop/taxi/yellow_2023-05_mincsv/part-* | \
  python3 mapper.py | sort | python3 reducer.py > total_fare_by_pu.txt
```
After the job, inspect `total_fare_by_pu.txt` for total fares per pickup zone.

### Task III — Frequent Pattern Mining (FPGrowth)
Create simple baskets from pickup and dropoff location pairs and run FPGrowth to find frequent location pairs.

In [None]:
# BC2.III — Frequent Pair Analysis (Simplified alternative to FPGrowth)
# Computes support, confidence, and lift directly for PU→DO pairs

from pyspark.sql import functions as F

# Base: trips dataframe already loaded as `df`
print("Dataset rows:", df.count())

# 1) Prepare clean PU–DO pairs
pairs = (df
    .select("PULocationID","DOLocationID")
    .where(F.col("PULocationID").isNotNull() & F.col("DOLocationID").isNotNull())
    .select(F.col("PULocationID").cast("string").alias("A"),
            F.col("DOLocationID").cast("string").alias("B"))
)
total_baskets = pairs.count()
print(f"Total valid trips (baskets): {total_baskets:,}")

# 2) Count individual item frequencies (for confidence and lift)
item_counts = (pairs
    .select(F.col("A").alias("id")).unionByName(pairs.select(F.col("B").alias("id")))
    .groupBy("id").count()
    .withColumnRenamed("count","item_count")
)

# 3) Count co-occurrences of A→B pairs
pair_counts = pairs.groupBy("A","B").count().withColumnRenamed("count","pair_count")

# 4) Join and compute association metrics
stats = (pair_counts
    .join(item_counts.withColumnRenamed("id","A"), on="A", how="left")
    .join(item_counts.withColumnRenamed("id","B")
                    .withColumnRenamed("item_count","B_count"), on="B", how="left")
    .withColumn("support",   F.col("pair_count") / F.lit(total_baskets))
    .withColumn("confidence",F.col("pair_count") / F.col("item_count"))
    .withColumn("lift",      F.col("confidence") / (F.col("B_count") / F.lit(total_baskets)))
)

# 5) Show strongest directional relations PU→DO
top_rules = (stats
    .where(F.col("support") >= 0.005)   # >= 0.5% of all trips
    .orderBy(F.desc("confidence"), F.desc("support"))
)

print("\nTop PU→DO pairs by confidence (support≥0.5%):")
top_rules.select("A","B","pair_count","support","confidence","lift").show(20, truncate=False)


### Task IV — K-Means Clustering
Use numerical features (fare_amount, trip_distance, passenger_count) to cluster trips and inspect cluster centers.

In [None]:
# BC2.IV — KMeans on numeric features (robust + interpretable)

from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.linalg import DenseVector, Vectors

# 0) Select & clean numeric columns
NUM_COLS = ["fare_amount", "trip_distance", "passenger_count"]

df_num = (
    df.select(*[F.col(c).cast("double").alias(c) for c in NUM_COLS])
      .where(F.col("fare_amount").isNotNull() & F.col("trip_distance").isNotNull() & F.col("passenger_count").isNotNull())
)

# (Optional but helpful) clip extreme outliers using approx quantiles
# keeps computation stable for KMeans on a laptop
bounds = {}
for c in NUM_COLS:
    q1, q99 = df_num.approxQuantile(c, [0.01, 0.99], 0.01)
    bounds[c] = (q1, q99)

df_clipped = df_num
for c in NUM_COLS:
    lo, hi = bounds[c]
    df_clipped = df_clipped.withColumn(c, F.when(F.col(c) < lo, lo).when(F.col(c) > hi, hi).otherwise(F.col(c)))

# (Optional) sample for speed if the dataset is very large
n_rows = df_clipped.count()
if n_rows > 1_000_000:
    frac = 1_000_000 / float(n_rows)
    print(f"Sampling to ~1,000,000 rows for clustering (fraction={frac:.4f})")
    df_clipped = df_clipped.sample(False, frac, seed=42)

# 1) Assemble & scale (mean-centering helps KMeans)
assembler = VectorAssembler(inputCols=NUM_COLS, outputCol="features_raw")
df_feat = assembler.transform(df_clipped).select(*NUM_COLS, "features_raw")

scaler = StandardScaler(inputCol="features_raw", outputCol="features", withStd=True, withMean=True)
scaler_model = scaler.fit(df_feat)
df_scaled = scaler_model.transform(df_feat).select(*NUM_COLS, "features")

# 2) Fit KMeans
k = 4
kmeans = (KMeans()
          .setK(k)
          .setSeed(42)
          .setFeaturesCol("features")
          .setPredictionCol("prediction")
          .setInitMode("k-means||")
          .setMaxIter(50)
          .setTol(1e-4))

model_k = kmeans.fit(df_scaled)

# 3) Evaluate & report
pred_scaled = model_k.transform(df_scaled)

evaluator = ClusteringEvaluator(featuresCol="features", predictionCol="prediction", metricName="silhouette")
sil = evaluator.evaluate(pred_scaled)

sizes = model_k.summary.clusterSizes
centers_scaled = model_k.clusterCenters()

# Unscale centers back to original units for interpretability
mu: DenseVector = scaler_model.mean  # per-feature means
sd: DenseVector = scaler_model.std   # per-feature stds
def unscale(center_vec):
    # center_orig = center_scaled * sd + mu
    return [center_vec[i] * sd[i] + mu[i] for i in range(len(NUM_COLS))]

centers_orig = [unscale(c) for c in centers_scaled]

print(f"\nKMeans k={k}")
print(f"Silhouette (euclidean): {sil:.4f}")
print("Cluster sizes:", sizes)
print("\nCluster centers (original units: fare_amount, trip_distance, passenger_count):")
for i, co in enumerate(centers_orig):
    print(f"  {i}: fare=${co[0]:.2f}, dist={co[1]:.2f} mi, pax={co[2]:.2f}")

# 4) Peek at a few assignments (join original numeric cols)
print("\nSample predictions:")
(pred_scaled
   .select(*NUM_COLS, "prediction")
   .limit(10)
   .show(truncate=False))



## Conclusion and Business Insights
- Total fare by pickup zone identifies high-revenue origins for targeted offers and dynamic pricing adjustments.
- Frequent pickup/dropoff pairs reveal corridors and hotspots useful for rebalancing drivers and scheduling surge pricing windows.
- Clustering of trips (by fare, distance, passengers) can help create service personas (short cheap trips vs long high-fare trips) and optimize vehicle allocation.
- Next steps: validate results on multiple months, integrate time-of-day features, and operationalize MapReduce outputs into BI dashboards.