In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, when, lower, expr, count,isnan,hour, dayofweek, month, year, dayofmonth,approx_count_distinct
from pyspark.sql.functions import lower as lower_func
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler

import seaborn as sns
import matplotlib.pyplot as plt

import time
start_time = time.time()

spark = SparkSession.builder.appName("sparksqlexample").getOrCreate()
print("Session created")
df = spark.read.csv("file:///home/hduser/Downloads/archive(7)/data.csv",header=True,inferSchema=True,encoding="utf-8")

#Column dtatatypes
df.dtypes

# Mean, min, max, count, stddev
df.describe().show()

# Median
df.select(expr("percentile_approx(fare_amount, 0.5)").alias("median_fare")).show()

# Mode
df.groupBy("fare_amount").count().orderBy(col("count").desc()).show(1)

df = df.withColumn("pickup_hour", hour(col("tpep_pickup_datetime"))) \
       .withColumn("pickup_day", dayofmonth(col("tpep_pickup_datetime"))) \
       .withColumn("pickup_month", month(col("tpep_pickup_datetime"))) \
       .withColumn("pickup_weekday", dayofweek(col("tpep_pickup_datetime"))) \
       .withColumn("pickup_year", year(col("tpep_pickup_datetime")))

# Dropoff datetime features (different column names)
df = df.withColumn("dropoff_hour", hour(col("tpep_dropoff_datetime"))) \
       .withColumn("dropoff_day", dayofmonth(col("tpep_dropoff_datetime"))) \
       .withColumn("dropoff_month", month(col("tpep_dropoff_datetime"))) \
       .withColumn("dropoff_weekday", dayofweek(col("tpep_dropoff_datetime"))) \
       .withColumn("dropoff_year", year(col("tpep_dropoff_datetime")))

df = df.drop(*[
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime"
])


#Missing Values
numeric_cols = [f.name for f in df.schema.fields if f.dataType.simpleString() in ('double','float','int')]
exprs = []
for c in df.columns:
    dtype = [f.dataType.simpleString() for f in df.schema.fields if f.name == c][0]
    if dtype in ('double', 'float', 'int'):
        exprs.append(count(when(col(c).isNull() | isnan(c), c)).alias(c))
    else:
        exprs.append(count(when(col(c).isNull(), c)).alias(c))
null_counts = df.select(exprs)
null_counts_pd = null_counts.toPandas()
#Heatmap
plt.figure(figsize=(12,6))
sns.heatmap(null_counts_pd, annot=True, cmap="YlGnBu", cbar=False)
plt.title("Missing Values Heatmap")
plt.show()

#  Duplicates
cols_to_check = ["VendorID", "passenger_count", "trip_distance", "fare_amount"]
duplicate_count = df.count() - df.dropDuplicates(subset=cols_to_check).count()
print(f"Duplicate rows based on selected columns: {duplicate_count}")

# Outliers (numeric columns)
# Using IQR method
for col_name in numeric_cols:
    quantiles = df.approxQuantile(col_name, [0.25, 0.75], 0.05)
    Q1, Q3 = quantiles
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5*IQR
    upper_bound = Q3 + 1.5*IQR
    outliers = df.filter((col(col_name) < lower_bound) | (col(col_name) > upper_bound)).count()
    print(f"Outliers in {col_name}: {outliers}")

# Structural Errors
# - Inconsistent labels
# - Check string columns
numeric_cols = [f.name for f in df.schema.fields if f.dataType.simpleString() in ('double','float','int')]
non_numeric_cols = [f for f in df.columns if f not in numeric_cols]
for c in non_numeric_cols:
    distinct_values = df.select(c).distinct().count()
    approx_values = df.select(approx_count_distinct(c)).collect()[0][0]
    print(f"Column '{c}' has {distinct_values} distinct values (approx: {approx_values})")

# Remove unwanted rows
# Example: remove rows with negative fare_amount or trip_distance
initial_count = df.count()
print(f"Initial number of rows: {initial_count}")
df_clean = df.filter((col("fare_amount") > 0) & (col("trip_distance") > 0))
after_unwanted_removal = df_clean.count()
print(f"Rows removed due to unwanted values: {initial_count - after_unwanted_removal}")

# Handle missing values
# Drop rows with missing essential columns
essential_cols = ["fare_amount", "trip_distance", "passenger_count"]
df_clean2 = df_clean.dropna(subset=essential_cols)
after_missing_removal = df_clean2.count()
print(f"Rows removed due to missing essential values: {after_unwanted_removal - after_missing_removal}")

# Outlier handling (example: clip outliers instead of dropping)
# Store counts if you drop instead of clip
numeric_cols = ["fare_amount", "trip_distance", "passenger_count"]
for c in numeric_cols:
    Q1, Q3 = df_clean2.approxQuantile(c, [0.25, 0.75], 0.01)
    IQR = Q3 - Q1
    lower = Q1 - 1.5*IQR
    upper = Q3 + 1.5*IQR
    # Optional: drop outliers
    before_outlier_removal = df_clean2.count()
    df_clean2 = df_clean2.filter((col(c) >= lower) & (col(c) <= upper))
    after_outlier_removal = df_clean2.count()
    print(f"Rows removed from {c} outliers: {before_outlier_removal - after_outlier_removal}")


# Correct inconsistent values
# Example: lowercase categorical columns
from pyspark.sql.functions import lower

for c in non_numeric_cols:
    df_clean2 = df_clean2.withColumn(c, lower(col(c)))
    print(f"Column: {c}")
    df_clean2.select(c).distinct().show(truncate=False)

from pyspark.ml.feature import StringIndexer

payment_indexer = StringIndexer(
    inputCol="payment_type",
    outputCol="payment_type_encoded",
    handleInvalid="keep"
)

store_indexer = StringIndexer(
    inputCol="store_and_fwd_flag",
    outputCol="store_and_fwd_flag_encoded",
    handleInvalid="keep"
)

# Encode store_and_fwd_flag
df_final = store_indexer.fit(df_clean2).transform(df_clean2)

# Encode payment_type
df_final = payment_indexer.fit(df_final).transform(df_final)
#Drop the column
df_final = df_final.drop("store_and_fwd_flag")

from pyspark.sql.functions import min, max, col

scale_cols = [
    "fare_amount",
    "trip_distance",
    "passenger_count",
    "pickup_hour",
    "pickup_day",
    "pickup_month",
    "pickup_year"
]

for c in scale_cols:
    stats = df_final.agg(
        min(col(c)).alias("min_val"),
        max(col(c)).alias("max_val")
    ).collect()[0]

    min_val = stats["min_val"]
    max_val = stats["max_val"]

    df_final = df_final.withColumn(
        f"{c}_scaled",
        (col(c) - min_val) / (max_val - min_val)
    )

df_final.write \
    .mode("overwrite") \
    .option("header", True) \
    .csv("file:///home/hduser/23processed_data_csv")


# Your PySpark code here
df_final.write.mode("overwrite").option("header", True).csv("file:///home/hduser/processed_data_final")
end_time = time.time()  # end timer
print(f"Time taken: {end_time - start_time:.2f} seconds")
