In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean, when, count, isnan
from pyspark.sql.types import IntegerType, FloatType

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("DataCleaning_Housing_Tips") \
    .getOrCreate()

print("Spark Session Created Successfully")

In [None]:
# Load Housing Data
housing_df = spark.read.csv("housing (1).csv", header=True, inferSchema=True)

# Load Tips Data
tips_df = spark.read.csv("tips.csv", header=True, inferSchema=True)

print("--- Housing Schema ---")
housing_df.printSchema()

print("\n--- Tips Schema ---")
tips_df.printSchema()

In [None]:
# 1. Check for missing values
print("Missing values in Housing Data:")
housing_df.select([count(when(col(c).isNull(), c)).alias(c) for c in housing_df.columns]).show()

# 2. Impute missing 'total_bedrooms' with the mean value
mean_bedrooms = housing_df.select(mean(col('total_bedrooms'))).collect()[0][0]
housing_cleaned = housing_df.na.fill({'total_bedrooms': mean_bedrooms})

# 3. Cast 'housing_median_age' from double to integer
housing_cleaned = housing_cleaned.withColumn("housing_median_age", col("housing_median_age").cast(IntegerType()))

# 4. Feature Engineering: Create 'rooms_per_household'
housing_cleaned = housing_cleaned.withColumn("rooms_per_household", col("total_rooms") / col("households"))

# Show the results
print("Cleaned Housing Data Sample:")
housing_cleaned.select("housing_median_age", "total_bedrooms", "rooms_per_household").show(5)

In [None]:
# 1. Check for duplicates
duplicate_count = tips_df.count() - tips_df.dropDuplicates().count()
print(f"Number of duplicate rows in Tips: {duplicate_count}")

# 2. Drop duplicates
tips_cleaned = tips_df.dropDuplicates()

# 3. Feature Engineering: Calculate 'tip_percentage'
tips_cleaned = tips_cleaned.withColumn("tip_percentage", (col("tip") / col("total_bill")) * 100)

# 4. Filter: Remove any records where total_bill is 0 or negative (sanity check)
tips_cleaned = tips_cleaned.filter(col("total_bill") > 0)

# Show the results
print("Cleaned Tips Data Sample:")
tips_cleaned.select("total_bill", "tip", "tip_percentage", "day").show(5)

In [None]:
# Save Housing Data
# coalesce(1) is used to save as a single file for small datasets
housing_cleaned.coalesce(1).write.mode('overwrite').csv("cleaned_housing_data", header=True)

# Save Tips Data
tips_cleaned.coalesce(1).write.mode('overwrite').csv("cleaned_tips_data", header=True)

print("Data cleaning complete. Files saved.")