In [None]:
# --- Big Data Setup ---
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, DoubleType
import os

# Spark Session shuru karna
spark = SparkSession.builder.appName("Vehicles_PySpark_Cleaning") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# Output folders banana
os.makedirs("data_raw", exist_ok=True)
os.makedirs("reports", exist_ok=True)

print(" Spark Session ready.")

In [None]:
# --- Data Load & Initial Schema ---
# 'vehicles.csv' file ko distributed DataFrame mein load karna
# Assuming your file is in 'data_raw/vehicles.csv'
path ="/content/vehicles.csv"
df_raw = spark.read.csv(path, header=True, inferSchema=True)

print(f"Total rows loaded: {df_raw.count():,}")

In [None]:
# --- Irrelevant Columns Drop Karna ---
columns_to_drop = ['id','url','region_url','VIN','image_url','description','state', 'lat', 'long','posting_date', 'county']

df_clean = df_raw.drop(*columns_to_drop)

In [None]:
# --- Dropping Columns with >25% Nulls ---
total_count = df_clean.count()
threshold = 0.25

# Null percentage nikalna aur columns drop karna
null_counts = [(col, df_clean.filter(F.col(col).isNull()).count()) for col in df_clean.columns]
cols_to_drop_25p = [col for col, count in null_counts if (count / total_count) > threshold]

# Purane code ke mutabik yeh 'size', 'cylinders', 'condition' honge.
df_clean = df_clean.drop(*cols_to_drop_25p)

In [None]:
# --- Dropping Rows with Nulls in Key Columns ---
# Key visualization aur analysis ke liye zaroori rows ko drop karna
df_clean = df_clean.na.drop(subset=['price', 'odometer', 'type', 'title_status', 'year', 'transmission', 'manufacturer', 'model', 'fuel'])

In [None]:
# --- Type Casting (Data types ko theek karna) ---
df_clean = df_clean.withColumn("year", F.col("year").cast(IntegerType()))
df_clean = df_clean.withColumn("price", F.col("price").cast(DoubleType()))
df_clean = df_clean.withColumn("odometer", F.col("odometer").cast(DoubleType()))

In [None]:
# --- Outlier Handling (Price) ---
# (249999 se zyada aur 100 se kam hataana, jaisa ki pehle kiya tha)
df_clean = df_clean.filter((F.col("price") > 100) & (F.col("price") < 249999))
print(f"Rows after all cleaning steps: {df_clean.count():,}")

In [None]:
# --- Final Visualization Data Save Karna (TEXT & NUMERIC) ---

df_viz_final = df_clean.select(
    "price",
    "odometer",
    "year",
    "manufacturer",
    "model",
    "type",
    "transmission",
    "title_status",
    "fuel"
)

# CSV mein save karna. 'coalesce(1)' zaroori hai taaki Spark ek single output file banaye.
df_viz_final.coalesce(1).write.csv(
    "craigslist_data.csv",
mode="overwrite",
header=(True)
)


In [None]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import functions as F

print("\n Overall Linear Regression: Depreciation Rate Calculation (PySpark MLlib)")

# 1. Outlier Filtering (PySpark equivalent of Z-score < 3)
# Mean aur Standard Deviation calculate karna
stats_df = df_clean.agg(
    F.mean("price").alias("price_mean"),
    F.stddev("price").alias("price_std"),
    F.mean("odometer").alias("odometer_mean"),
    F.stddev("odometer").alias("odometer_std")
).collect()[0]

# Filtering Conditions banana (3 standard deviations ke andar)
df_depr_filtered = df_clean.filter(
    (F.col("price") > stats_df["price_mean"] - 3 * stats_df["price_std"]) &
    (F.col("price") < stats_df["price_mean"] + 3 * stats_df["price_std"]) &
    (F.col("odometer") > stats_df["odometer_mean"] - 3 * stats_df["odometer_std"]) &
    (F.col("odometer") < stats_df["odometer_mean"] + 3 * stats_df["odometer_std"])
).select("price", "odometer")

print(f"Rows after 3-sigma outlier filtering: {df_depr_filtered.count():,}")

# 2. Vector Assembler: PySpark ML ke liye features ko vector mein jodna
assembler = VectorAssembler(inputCols=['odometer'], outputCol="features")
df_lr_ready = assembler.transform(df_depr_filtered)

# 3. Distributed Linear Regression Model Fit Karna (PySpark MLlib)
lr = LinearRegression(featuresCol="features", labelCol="price")
lr_model = lr.fit(df_lr_ready)

# 4. Result Nikalna: Coefficient (Depreciation Rate)
depreciation_per_mile = lr_model.coefficients[0]
loss_per_10k_miles = depreciation_per_mile * 10000

# Final Results Print Karna
print(f"\n Odometer Coefficient (Depreciation Rate per 1 Mile): ${depreciation_per_mile:,.4f}")
print(f" Loss Per 10,000 Miles: ${loss_per_10k_miles:,.2f}")

In [None]:
# 3. Grouped Map Apply Karna (Distributed Analysis)
master_depreciation_results_spark = df_clean.groupby("manufacturer", "model").apply(calculate_depreciation_udf)

# 4. Final Cleanup aur Save Karna (Master Table)
# Depreciation Rate ko positive loss mein badalna
master_depreciation_results_spark = master_depreciation_results_spark.withColumn(
    'Est_Loss_Per_10k_Miles_USD', F.col('Loss_Per_10k_Miles') * -1)
master_depreciation_results_spark = master_depreciation_results_spark.withColumn(
    'Model_Reliability_R2', F.round(F.col('R2_Score'), 3))


# Final columns chunna aur save karna (jaisa ki aapke original code mein tha)
df_final_master = master_depreciation_results_spark.select(
    "manufacturer",
    "model",
    "Model_Count",
    "Est_Loss_Per_10k_Miles_USD",
    "Model_Reliability_R2"
)

df_final_master.coalesce(1).write.csv(
    "reports/MASTER_DEPRECIATION_RATES.csv",
    mode="overwrite",
    header=True
)

print(" MASTER_DEPRECIATION_RATES.csv file successfully created (Power BI Master Table).")

In [None]:
results_df = pd.DataFrame({
    'Metric': ['Overall Depreciation per 1 Mile', 'Overall Loss Per 10k Miles'],
    'Value': [depreciation_per_mile, loss_per_10k_miles],
    'Unit': ['USD', 'USD']
})

results_df.to_csv('reports/analysis_key_metrics.csv', index=False)