In [0]:
df_properties = spark.read.table("df_properties")
df_config = spark.read.table("df_config")
df_tags = spark.read.table("df_tags")
df_amenities = spark.read.table("df_amenities")
df_places = spark.read.table("df_places")


In [0]:
# %restart_python

In [0]:
# df_properties.show(1)

##Clean Properties

In [0]:
# df_properties_cleaned.count()

In [0]:
# df_config.show()

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType, IntegerType, DateType, TimestampType, BooleanType, StructType, StructField
import re

# 3.1 Helper functions for parsing
def parse_amount_indian(s: str):
    """
    Parse a string like '₹7.2 K', '76.75 L', '1.44 Cr', or plain numbers with commas.
    Returns a float (absolute rupees), or None if unparsable.
    """
    if not s:
        return None
    # Remove currency symbol and any whitespace
    s_clean = s.replace('₹', '').replace(',', '').strip()
    # Regex to capture number and optional suffix
    m = re.match(r'([\d\.]+)\s*([KkLl]|Lakh|lakh|Cr|cr|Crore|crore)?', s_clean)
    if not m:
        # Try parse plain float
        try:
            return float(s_clean)
        except:
            return None
    num = float(m.group(1))
    suffix = m.group(2)
    if not suffix:
        return num
    suffix = suffix.lower()
    if suffix in ('k',):
        return num * 1e3
    if suffix in ('l', 'lakh'):
        return num * 1e5
    if suffix in ('cr', 'crore'):
        return num * 1e7
    # default
    return num

def parse_price_range(s: str):
    """
    Parse a range string like '₹76.75 L - 1.44 Cr' or '76.75 L - 1.44 Cr'.
    Returns tuple (min_val, max_val) in rupees, or (None, None) if unparsable.
    """
    if not s:
        return (None, None)
    # Remove currency symbols globally; but keep suffixes
    s_clean = s.replace('₹', '').strip()
    # Split on hyphen
    parts = [p.strip() for p in re.split(r'-', s_clean)]
    if len(parts) != 2:
        return (None, None)
    low, high = parts
    low_val = parse_amount_indian(low)
    high_val = parse_amount_indian(high)
    return (low_val, high_val)

# Register UDFs with corrected StructType import
parse_amount_udf = F.udf(lambda x: parse_amount_indian(x) if x else None, DoubleType())
parse_price_range_udf = F.udf(lambda x: parse_price_range(x) if x else (None, None), 
                              StructType([
                                  StructField("min_val", DoubleType()),
                                  StructField("max_val", DoubleType())
                              ]))

# Rest of your code remains the same...

# 3.2 Start cleaning pipeline
df = df_properties  # your input DataFrame

# 3.2.1 Remove duplicate IDs
df = df.dropDuplicates(["id"])

# 3.2.2 Trim string columns globally (optional helper)
string_cols = [f.name for f in df.schema.fields if f.dataType.simpleString() == "string"]
for col in string_cols:
    df = df.withColumn(col, F.when(F.col(col).isNotNull(), F.trim(F.col(col))).otherwise(F.col(col)))

# 3.2.3 Clean description: remove HTML tags
df = df.withColumn(
    "description_clean",
    F.when(
        F.col("description").isNotNull(),
        F.trim(F.regexp_replace(F.regexp_replace(F.col("description"), "<.*?>", ""), "\\s+", " "))
    ).otherwise(None)
)

# 3.2.4 Parse numeric fields

# avg_price_per_sqft: remove "/sq.ft" suffix, parse amount
df = df.withColumn(
    "avg_price_per_sqft_val",
    parse_amount_udf(
        F.when(
            F.col("avg_price_per_sqft").isNotNull(),
            F.trim(F.regexp_replace(F.col("avg_price_per_sqft"), r"/.*$", ""))
        ).otherwise(None)
    )
)


# price_display_value into min_price_val, max_price_val
df = df.withColumn("price_range_struct", parse_price_range_udf(F.col("price_display_value"))) \
       .withColumn("min_price_val", F.col("price_range_struct.min_val")) \
       .withColumn("max_price_val", F.col("price_range_struct.max_val")) \
       .drop("price_range_struct")

# property_price: parse string values like '76.75 L'
df = df.withColumn("property_price_val", parse_amount_udf(F.col("property_price")))

# property_area: e.g., '1066 sq.ft' → numeric
df = df.withColumn("property_area_val",
                   F.when(F.col("property_area").isNotNull(),
                          F.regexp_extract(F.col("property_area"), r"([\d\.]+)", 1).cast(DoubleType())
                         ).otherwise(None)
                  )

# size_range: '1066 - 1999 sq.ft.' → min_size, max_size
df = df.withColumn("size_range_clean", F.regexp_replace(F.col("size_range"), "sq\\.ft\\.?", "")) \
       .withColumn("size_parts", F.split(F.col("size_range_clean"), r"\s*-\s*")) \
       .withColumn("min_size", 
                   F.when(F.size(F.col("size_parts")) >= 1, 
                          F.col("size_parts").getItem(0).cast(DoubleType())).otherwise(None)) \
       .withColumn("max_size", 
                   F.when(F.size(F.col("size_parts")) >= 2, 
                          F.col("size_parts").getItem(1).cast(DoubleType())).otherwise(None)) \
       .drop("size_range_clean", "size_parts")

# latitude, longitude: cast to Double
df = df.withColumn("latitude", F.col("latitude").cast(DoubleType())) \
       .withColumn("longitude", F.col("longitude").cast(DoubleType()))

# bathrooms, bedrooms: cast to Integer
df = df.withColumn("bedrooms", F.col("bedrooms").cast(IntegerType())) 
    #    .withColumn("bathrooms", F.col("bathrooms").cast(IntegerType()))

# 3.2.5 Parse dates

# posted_date: ISO format "2019-08-13T00:00:33Z"
df = df.withColumn(
    "posted_timestamp", 
    F.to_timestamp(F.col("posted_date"), "yyyy-MM-dd'T'HH:mm:ss'Z'")
)


df = df.withColumn(
    "possession_date_parsed",
    F.when(
        F.col("possession_date").isNotNull(),
        F.to_date(
            F.concat(F.lit("01 "), F.col("possession_date")),
            "dd MMM, yyyy"
        )
    ).otherwise(F.to_date(F.col("posted_timestamp")))
)


# 3.2.6 Cast boolean-like columns
for bool_col in ["is_active_property", "is_most_contacted", "seller_is_paid"]:
    df = df.withColumn(bool_col,
                       F.when(F.col(bool_col).isin(True, False), F.col(bool_col).cast(BooleanType()))
                        .when(F.lower(F.col(bool_col)) == "true", F.lit(True))
                        .when(F.lower(F.col(bool_col)) == "false", F.lit(False))
                        .otherwise(None)
                      )

# 3.2.7 Trim/clean categorical columns
for cat_col in ["region", "city", "state", "seller_type", "seller_name", "locality"]:
    df = df.withColumn(cat_col, 
                       F.when(F.col(cat_col).isNotNull(), F.initcap(F.col(cat_col))).otherwise(F.col(cat_col))
                      )

# 3.2.8 Logical consistency: drop or flag rows where min_price > max_price
df = df.withColumn(
    "price_consistent", 
    F.when(
        (F.col("min_price_val").isNotNull() & 
         F.col("max_price_val").isNotNull() &
         (F.col("min_price_val") <= F.col("max_price_val"))),
        True
    ).otherwise(False)
)

# Option: drop inconsistent, or keep but flag
df = df.filter(F.col("price_consistent") == True).drop("price_consistent")

# 3.2.9 Handle missing critical values
df = df.filter(F.col("id").isNotNull() & 
              F.col("latitude").isNotNull() & 
              F.col("longitude").isNotNull())


# For bedrooms/bathrooms null: fill with -1
df = df.withColumn("bathrooms", F.coalesce(F.col("bathrooms"), F.lit(-1))) \
       .withColumn("bedrooms", F.coalesce(F.col("bedrooms"), F.lit(-1)))

# 3.2.10 Drop intermediate helper columns if desired
df_clean = df.drop("avg_price_per_sqft", "price_display_value", 
                   "property_price", "property_area", "description",
                   "size_range", "posted_date", "possession_date")

# Rename parsed columns
df_clean = df_clean.withColumnRenamed("avg_price_per_sqft_val", "avg_price_per_sqft") \
                   .withColumnRenamed("property_area_val", "property_area_sqft") \
                   .withColumnRenamed("property_price_val", "property_price") \
                   .withColumnRenamed("posted_timestamp", "posted_date") \
                   .withColumnRenamed("possession_date_parsed", "possession_date")

df_clean = df_clean.withColumn("avg_price_per_sqft", F.col("avg_price_per_sqft").cast(DoubleType()))
df_clean = df_clean.withColumn("property_price", F.col("property_price").cast(DoubleType()))
df_clean = df_clean.withColumn("min_size", F.col("min_size").cast(DoubleType()))
df_clean = df_clean.withColumn("max_size", F.col("max_size").cast(DoubleType()))
median_avg_price = df_clean.approxQuantile("avg_price_per_sqft", [0.5], 0.01)[0]
median_property_price = df_clean.approxQuantile("property_price", [0.5], 0.01)[0]
median_min_size = df_clean.approxQuantile("min_size", [0.5], 0.01)[0]
median_max_size = df_clean.approxQuantile("max_size", [0.5], 0.01)[0]

# Treating null values after thorough observing inside data.
df_clean = df_clean.fillna({
    "avg_price_per_sqft": median_avg_price,
    "property_price": median_property_price,
    "min_size": median_min_size,
    "max_size": median_max_size,
    "brochure_url": "",
    "image_url": "",
    "description_clean": ""
})

# 3.2.11 Final type enforcement
target_schema = {
    "id": "string",
    "address": "string",
    "long_address": "string",
    "locality": "string",
    "city": "string",
    "region": "string",
    "state": "string",
    "latitude": "double",
    "longitude": "double",
    "bedrooms": "int",
    "bathrooms": "int",
    "avg_price_per_sqft": "double",
    "min_price_val": "double",
    "max_price_val": "double",
    "property_price": "double",
    "property_area_sqft": "double",
    "min_size": "double",
    "max_size": "double",
    "posted_date": "timestamp",
    "possession_date": "date",
    "description_clean": "string",
    "brochure_url": "string",
    "from_url": "string",
    "url": "string",
    "is_active_property": "boolean",
    "is_most_contacted": "boolean",
    "seller_is_paid": "boolean",
    "seller_name": "string",
    "seller_phone": "string",
    "seller_type": "string",
    "subtitle": "string"
}

for col_name, dtype in target_schema.items():
    if col_name in df_clean.columns:
        df_clean = df_clean.withColumn(col_name, F.col(col_name).cast(dtype))


# 3.2.12 Show cleaned schema & sample
df_clean.printSchema()
# df_clean.show(5, truncate=False)

df_properties_cleaned = df_clean

root
 |-- address: string (nullable = true)
 |-- bathrooms: integer (nullable = false)
 |-- bedrooms: integer (nullable = false)
 |-- brochure_url: string (nullable = false)
 |-- city: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- emi: string (nullable = true)
 |-- from_url: string (nullable = true)
 |-- id: string (nullable = true)
 |-- image_url: string (nullable = false)
 |-- is_active_property: boolean (nullable = true)
 |-- is_most_contacted: boolean (nullable = true)
 |-- latitude: double (nullable = true)
 |-- locality: string (nullable = true)
 |-- long_address: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- max_price: long (nullable = true)
 |-- min_price: long (nullable = true)
 |-- name: string (nullable = true)
 |-- region: string (nullable = true)
 |-- seller_is_paid: boolean (nullable = true)
 |-- seller_name: string (nullable = true)
 |-- seller_phone: string (nullable = true)
 |-- seller_type: string (nullable = true)
 |--

In [0]:
from pyspark.sql.functions import (
    split, col, array_position, size, concat_ws, when,
    initcap, lit, slice as spark_slice, trim, coalesce
)
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf

# UDF to convert 'L' and 'Cr' values to float Lakhs (as before)
def parse_price_to_lakhs(value):
    try:
        value = value.strip()
        if "Cr" in value:
            return float(value.replace("Cr", "").strip()) * 100
        elif "L" in value:
            return float(value.replace("L", "").strip())
        else:
            return None
    except:
        return None

parse_price_udf = udf(parse_price_to_lakhs, FloatType())

df = df_config

# Drop duplicates and price parsing as before
df = df.dropDuplicates(["id", "config_type", "price_range"])
df = df.withColumn("price_parts", split(col("price_range"), "-"))
df = df.withColumn("min_price_lakhs", parse_price_udf(trim(col("price_parts")[0])))
df = df.withColumn(
    "max_price_lakhs",
    when(col("price_parts").getItem(1).isNotNull(),
         parse_price_udf(trim(col("price_parts")[1])))
    .otherwise(col("min_price_lakhs"))
)

# --- NEW: parse config_type into size-format and category ---

# 1. split on underscore
df = df.withColumn("config_parts", split(col("config_type"), "_"))

# 2. find position of "bhk" in array (1-based index). If absent, returns 0.
df = df.withColumn("pos_bhk", array_position(col("config_parts"), "bhk"))

# 3. derive parsed config_type (size/format)
#    - if pos_bhk > 1: take element at pos_bhk-1 as size, append "BHK"
#    - elif first part is "studio": use "Studio"
#    - else: no size-format → null
df = df.withColumn(
    "config_type_parsed",
    when(col("pos_bhk") > 1,
         concat_ws(" ", col("config_parts")[col("pos_bhk") - 2], lit("BHK"))
    ).when(col("config_parts")[0] == "studio",
         initcap(col("config_parts")[0])
    ).otherwise(lit(None))
)

# 4. derive parsed config_category
#    - if pos_bhk > 0 and there are parts after "bhk": slice those for category
#    - elif pos_bhk > 0 but bhk is last part: no category (null)
#    - elif first part is "studio" and there are parts after: those form category
#    - else: treat entire joined parts as category
# Note: spark_slice is 1-based: spark_slice(array, startPos, length)
# For slicing after bhk: startPos = pos_bhk + 1, length = size(config_parts) - pos_bhk
df = df.withColumn(
    "config_category_parsed",
    when(
        (col("pos_bhk") > 0) & (size(col("config_parts")) > col("pos_bhk")),
        initcap(
            concat_ws(" ",
                      spark_slice(
                          col("config_parts"),
                          col("pos_bhk") + 1,
                          size(col("config_parts")) - col("pos_bhk")
                      )
            )
        )
    ).when(
        (col("pos_bhk") > 0) & (size(col("config_parts")) == col("pos_bhk")),
        lit(None)
    ).when(
        (col("config_parts")[0] == "studio") & (size(col("config_parts")) > 1),
        initcap(
            concat_ws(" ",
                      spark_slice(col("config_parts"), 2, size(col("config_parts")) - 1)
            )
        )
    ).otherwise(
        # join all parts e.g. ["plot"] or ["builder","floor"] or any other single/multi-part
        initcap(concat_ws(" ", col("config_parts")))
    )
)

# 5. replace original columns: overwrite config_type and add config_category
df = df.withColumnRenamed("config_type", "config_type_original") \
       .withColumnRenamed("config_type_parsed", "config_type") \
       .withColumnRenamed("config_category_parsed", "config_category")

# 6. drop helpers
df = df.drop("config_parts", "pos_bhk")

df = df.withColumn("config_type", coalesce(col("config_type"), lit("Other")))
df = df.withColumn("config_category", coalesce(col("config_category"), lit("Unknown")))

# Final cleaned DataFrame
df_config_cleaned = df.select("id", "config_type", "config_category", "min_price_lakhs", "max_price_lakhs")

df_config_cleaned.show(10, truncate=False)


+------+-----------+---------------+---------------+---------------+
|id    |config_type|config_category|min_price_lakhs|max_price_lakhs|
+------+-----------+---------------+---------------+---------------+
|269627|3 BHK      |Apartment      |61.88          |83.66          |
|284804|3 BHK      |Apartment      |123.0          |134.0          |
|182229|3 BHK      |Apartment      |225.0          |296.0          |
|325921|3 BHK      |Apartment      |150.0          |244.0          |
|227035|3 BHK      |Apartment      |158.0          |193.0          |
|125374|3 BHK      |Apartment      |154.0          |194.0          |
|228309|3 BHK      |Apartment      |238.0          |316.0          |
|284131|3 BHK      |Apartment      |90.0           |90.0           |
|251066|3 BHK      |Apartment      |89.6           |92.4           |
|290575|3 BHK      |Apartment      |77.12          |77.12          |
+------+-----------+---------------+---------------+---------------+
only showing top 10 rows



In [0]:
df_config.count()

9704

In [0]:
# %sql
# SELECT * FROM df_config WHERE ID = 269574;

In [0]:
# df_tags.show()

In [0]:
from pyspark.sql.functions import col, when, lit, trim, regexp_extract, max as spark_max
from pyspark.sql import functions as F

# Step 1: Standardize tag values (trim spaces)
df_tags = df_tags.withColumn("tag", trim(col("tag")))

# Step 2: Define features using conditional aggregation
df_tags_cleaned = df_tags.groupBy("id").agg(
    F.max(when(col("tag") == "RERA Approved", lit(True)).otherwise(lit(False))).alias("is_rera_approved"),
    F.max(when(col("tag") == "New Booking", lit(True)).otherwise(lit(False))).alias("is_new_booking"),
    F.max(when(col("tag") == "Under Construction", lit(True)).otherwise(lit(False))).alias("is_under_construction"),
    F.max(when(col("tag") == "Ready to Move", lit(True)).otherwise(lit(False))).alias("is_ready_to_move"),
    F.max(when(col("tag") == "Project", lit(True)).otherwise(lit(False))).alias("has_project_tag"),
    
    # Extract possession date from tag like: "Possession: Mar 2026"
    # F.max(
    #     when(col("tag").rlike("Possession:.*"),
    #          F.to_date(regexp_extract(col("tag"), "Possession: (\\w+ \\d{4})", 1), "MMM yyyy"))
    # ).alias("possession_date")
)

# df_tags_cleaned = df_tags_cleaned.fillna({"possession_date" : " "})

df_tags_cleaned.show(2)


+------+----------------+--------------+---------------------+----------------+---------------+
|    id|is_rera_approved|is_new_booking|is_under_construction|is_ready_to_move|has_project_tag|
+------+----------------+--------------+---------------------+----------------+---------------+
|265095|            true|          true|                false|            true|          false|
|250487|            true|         false|                false|            true|           true|
+------+----------------+--------------+---------------------+----------------+---------------+
only showing top 2 rows



In [0]:
df_tags.count()

43579

In [0]:
# df_amenities.show()

In [0]:
from pyspark.sql.functions import col, lit, trim, lower, when, max as spark_max

# Step 1: Clean 'amenity' column (strip whitespace)
df_amenities = df_amenities.withColumn("amenity", trim(col("amenity")))

# Step 2: Normalize amenity names if needed (optional)
# Example: standardize casing (e.g., lower case then map back if needed)
# Step 3: Create binary flags
df_amenity_clean = df_amenities.withColumn("value", lit(True))

# Step 4: Pivot the amenity values
df_pivoted = df_amenity_clean.groupBy("id").pivot("amenity").agg(spark_max("value"))

# Step 5: Rename columns to meaningful boolean feature names
df_pivoted = df_pivoted \
    .withColumnRenamed("Pool", "has_pool") \
    .withColumnRenamed("Gym", "has_gym") \
    .withColumnRenamed("Lift", "has_lift") \
    .withColumnRenamed("Parking", "has_parking") \
    .withColumnRenamed("Gated Community", "has_gated_community")

# Fill nulls with False
df_amenities_cleaned = df_pivoted.fillna(False)

df_amenities_cleaned.show(2)


+------+------------+-------------------+-------+--------+-----------+--------+
|    id|Gas Pipeline|has_gated_community|has_gym|has_lift|has_parking|has_pool|
+------+------------+-------------------+-------+--------+-----------+--------+
|201828|       false|              false|  false|   false|       true|   false|
|201103|       false|              false|  false|   false|       true|   false|
+------+------------+-------------------+-------+--------+-----------+--------+
only showing top 2 rows



In [0]:
df_amenities.count()

19741

In [0]:
# df_places.show()

In [0]:
from pyspark.sql.functions import col, trim, lower, round, when


df_places_cleaned = df_places

# Step 1: Trim and lower case place_type and place_name
df_places_cleaned = df_places_cleaned \
    .withColumn("place_type", trim(lower(col("place_type")))) \
    .withColumn("place_name", trim(col("place_name")))

# Step 2: Cast numeric columns
df_places_cleaned = df_places_cleaned \
    .withColumn("distance_km", round(col("distance_km").cast("double"), 2)) \
    .withColumn("travel_time_min", col("travel_time_min").cast("int"))

# Step 3: Handle missing travel_time_min
df_places_cleaned = df_places_cleaned.withColumn(
    "travel_time_min",
    when(col("travel_time_min").isNull(), -1).otherwise(col("travel_time_min"))
)

# Step 4: Optional deduplication
df_places_cleaned = df_places_cleaned.dropDuplicates(["id", "place_type", "place_name", "distance_km"])

df_places_cleaned.show(5)


+------+-------------+--------------------+-----------+---------------+
|    id|   place_type|          place_name|distance_km|travel_time_min|
+------+-------------+--------------------+-----------+---------------+
|103088|       school|The Hyderabad Pub...|        1.8|             -1|
|285010|   restaurant|  Peacock Restaurant|       2.24|             -1|
|343119|     hospital|    Surekha Hospital|       0.39|              1|
|287751|shopping_mall|Shopping Complex ...|       3.38|             90|
|281814|       school|Sadhu Vaswani Int...|       0.66|             -1|
+------+-------------+--------------------+-----------+---------------+
only showing top 5 rows



In [0]:
# df_properties_cleaned.printSchema()

In [0]:
# df_places_cleaned.printSchema()

In [0]:
sfOptions = {
    "sfURL": "YQRWNWT-DR42610.snowflakecomputing.com",
    "sfUser": "MASTER",
    "sfPassword": "6305524392Pavan@",
    "sfDatabase": "REALESTATE",
    "sfSchema": "SILVER",
    "sfWarehouse": "COMPUTE_WH",
    "sfRole": "ACCOUNTADMIN"  # Optional
}

In [0]:
df_properties_cleaned.write \
    .format("snowflake") \
    .options(**sfOptions) \
    .option("dbtable", "properties") \
    .mode("overwrite") \
    .save()

df_amenities_cleaned.write \
    .format("snowflake") \
    .options(**sfOptions) \
    .option("dbtable", "property_amenities") \
    .mode("overwrite") \
    .save()


df_config_cleaned.write \
    .format("snowflake") \
    .options(**sfOptions) \
    .option("dbtable", "property_configurations") \
    .mode("overwrite") \
    .save()


df_tags_cleaned.write \
    .format("snowflake") \
    .options(**sfOptions) \
    .option("dbtable", "property_status") \
    .mode("overwrite") \
    .save()


df_places_cleaned.write \
    .format("snowflake") \
    .options(**sfOptions) \
    .option("dbtable", "property_nearby_places") \
    .mode("overwrite") \
    .save()




In [0]:
# from pyspark.sql import functions as F
# from pyspark.sql.types import DoubleType, IntegerType, DateType, TimestampType, BooleanType, StructType, StructField
# import re

# # -------------------- Helper Functions --------------------

# def parse_amount_indian(s: str):
#     if not s:
#         return None
#     s_clean = s.replace('₹', '').replace(',', '').strip()
#     m = re.match(r'([\d\.]+)\s*([KkLl]|Lakh|lakh|Cr|cr|Crore|crore)?', s_clean)
#     if not m:
#         try:
#             return float(s_clean)
#         except:
#             return None
#     num = float(m.group(1))
#     suffix = m.group(2)
#     if not suffix:
#         return num
#     suffix = suffix.lower()
#     if suffix in ('k',):
#         return num * 1e3
#     if suffix in ('l', 'lakh'):
#         return num * 1e5
#     if suffix in ('cr', 'crore'):
#         return num * 1e7
#     return num

# def parse_price_range(s: str):
#     if not s:
#         return (None, None)
#     s_clean = s.replace('₹', '').strip()
#     parts = [p.strip() for p in re.split(r'-', s_clean)]
#     if len(parts) != 2:
#         return (None, None)
#     low_val = parse_amount_indian(parts[0])
#     high_val = parse_amount_indian(parts[1])
#     return (low_val, high_val)

# # Register UDFs
# parse_amount_udf = F.udf(lambda x: parse_amount_indian(x) if x else None, DoubleType())
# parse_price_range_udf = F.udf(
#     lambda x: parse_price_range(x) if x else (None, None),
#     StructType([
#         StructField("min_val", DoubleType()),
#         StructField("max_val", DoubleType())
#     ])
# )

# # -------------------- Data Cleaning Pipeline --------------------

# df = df_properties
# print(f"Initial count: {df.count()}")

# # 1. Drop duplicate IDs
# df = df.dropDuplicates(["id"])
# print(f"After dropDuplicates: {df.count()}")

# # 2. Trim all string columns
# string_cols = [f.name for f in df.schema.fields if f.dataType.simpleString() == "string"]
# for col in string_cols:
#     df = df.withColumn(col, F.when(F.col(col).isNotNull(), F.trim(F.col(col))).otherwise(F.col(col)))

# # 3. Clean HTML from description
# df = df.withColumn(
#     "description_clean",
#     F.when(
#         F.col("description").isNotNull(),
#         F.trim(F.regexp_replace(F.regexp_replace(F.col("description"), "<.*?>", ""), "\\s+", " "))
#     ).otherwise(None)
# )

# # 4. Parse avg_price_per_sqft
# df = df.withColumn(
#     "avg_price_per_sqft_val",
#     parse_amount_udf(
#         F.when(
#             F.col("avg_price_per_sqft").isNotNull(),
#             F.trim(F.regexp_replace(F.col("avg_price_per_sqft"), r"/.*$", ""))
#         ).otherwise(None)
#     )
# )

# # 5. Parse price_display_value into min/max
# df = df.withColumn("price_range_struct", parse_price_range_udf(F.col("price_display_value"))) \
#        .withColumn("min_price_val", F.col("price_range_struct.min_val")) \
#        .withColumn("max_price_val", F.col("price_range_struct.max_val")) \
#        .drop("price_range_struct")
# print(f"After parsing price_display_value: {df.count()}")

# # --- After parsing price_display_value and filtering price consistency ---

# # Log how many nulls before imputation
# null_min = df.filter(F.col("min_price_val").isNull()).count()
# null_max = df.filter(F.col("max_price_val").isNull()).count()
# print(f"Before price imputation: null min_price_val = {null_min}, null max_price_val = {null_max}, total rows = {df.count()}")

# # 1. Compute global median ratio (max/min) among rows where both exist
# ratio_df = df.filter(F.col("min_price_val").isNotNull() & F.col("max_price_val").isNotNull()) \
#              .withColumn("price_ratio", F.col("max_price_val") / F.col("min_price_val"))
# # If there are enough rows:
# if ratio_df.count() > 0:
#     ratio_median_row = ratio_df.selectExpr("percentile_approx(price_ratio, 0.5) as med_ratio").collect()[0]
#     ratio_median = ratio_median_row["med_ratio"]
# else:
#     ratio_median = None
# print(f"Global median price ratio (max/min): {ratio_median}")

# # 2. Compute group-level medians by locality (or another grouping column you prefer)
# #    If locality has many distinct values, ensure performance is acceptable.
# group_medians = df.groupBy("locality").agg(
#     F.expr("percentile_approx(min_price_val, 0.5)").alias("med_min_loc"),
#     F.expr("percentile_approx(max_price_val, 0.5)").alias("med_max_loc")
# )
# df = df.join(group_medians, on="locality", how="left")

# # 3. Impute min_price_val:
# #    - If min is null but max exists and ratio_median is known: set min = max / ratio_median
# #    - Else if min is null: use med_min_loc
# df = df.withColumn(
#     "min_price_val_imputed",
#     F.when(
#         F.col("min_price_val").isNull() & F.col("max_price_val").isNotNull() & F.lit(ratio_median).isNotNull(),
#         F.col("max_price_val") / F.lit(ratio_median)
#     ).when(
#         F.col("min_price_val").isNull(),
#         F.col("med_min_loc")
#     ).otherwise(F.col("min_price_val"))
# )

# # 4. Impute max_price_val:
# #    - If max is null but min exists and ratio_median is known: set max = min * ratio_median
# #    - Else if max is null: use med_max_loc
# df = df.withColumn(
#     "max_price_val_imputed",
#     F.when(
#         F.col("max_price_val").isNull() & F.col("min_price_val").isNotNull() & F.lit(ratio_median).isNotNull(),
#         F.col("min_price_val") * F.lit(ratio_median)
#     ).when(
#         F.col("max_price_val").isNull(),
#         F.col("med_max_loc")
#     ).otherwise(F.col("max_price_val"))
# )

# # 5. For any remaining nulls (e.g., group median was null or ratio_median was None), fallback to global medians:
# global_medians = df.selectExpr(
#     "percentile_approx(min_price_val, 0.5) as global_med_min",
#     "percentile_approx(max_price_val, 0.5) as global_med_max"
# ).collect()[0]
# global_med_min = global_medians["global_med_min"]
# global_med_max = global_medians["global_med_max"]
# print(f"Global medians fallback: min = {global_med_min}, max = {global_med_max}")

# df = df.withColumn(
#     "min_price_val_imputed",
#     F.when(F.col("min_price_val_imputed").isNull(), F.lit(global_med_min)).otherwise(F.col("min_price_val_imputed"))
# ).withColumn(
#     "max_price_val_imputed",
#     F.when(F.col("max_price_val_imputed").isNull(), F.lit(global_med_max)).otherwise(F.col("max_price_val_imputed"))
# )

# # 6. Replace original columns and drop helper columns
# df = df.drop("min_price_val", "max_price_val") \
#        .withColumnRenamed("min_price_val_imputed", "min_price_val") \
#        .withColumnRenamed("max_price_val_imputed", "max_price_val") \
#        .drop("med_min_loc", "med_max_loc")

# # Log after imputation
# null_min2 = df.filter(F.col("min_price_val").isNull()).count()
# null_max2 = df.filter(F.col("max_price_val").isNull()).count()
# print(f"After price imputation: null min_price_val = {null_min2}, null max_price_val = {null_max2}, total rows = {df.count()}")


# # Log invalid price range parsing
# invalid_price = df.filter(F.col("price_display_value").isNotNull() &
#                           F.col("min_price_val").isNull() & F.col("max_price_val").isNull())
# print(f"Invalid price ranges (remain in DF for now): {invalid_price.count()}")
# # You can df.select("price_display_value").distinct().show(20,False) here to inspect formats.

# # 6. Parse property_price
# df = df.withColumn("property_price_val", parse_amount_udf(F.col("property_price")))

# # 7. Parse property_area
# df = df.withColumn("property_area_val",
#                    F.when(F.col("property_area").isNotNull(),
#                           F.regexp_extract(F.col("property_area"), r"([\d\.]+)", 1).cast(DoubleType())
#                          ).otherwise(None)
# )

# # 8. Parse size_range into min_size / max_size
# df = df.withColumn("size_range_clean", F.regexp_replace(F.col("size_range"), "sq\\.ft\\.?", "")) \
#        .withColumn("size_parts", F.split(F.col("size_range_clean"), r"\s*-\s*")) \
#        .withColumn("min_size", 
#                    F.when(F.size(F.col("size_parts")) >= 1, 
#                           F.col("size_parts").getItem(0).cast(DoubleType())).otherwise(None)) \
#        .withColumn("max_size", 
#                    F.when(F.size(F.col("size_parts")) >= 2, 
#                           F.col("size_parts").getItem(1).cast(DoubleType())).otherwise(None)) \
#        .drop("size_range_clean", "size_parts")

# # 9. Cast latitude/longitude to Double
# df = df.withColumn("latitude", F.col("latitude").cast(DoubleType())) \
#        .withColumn("longitude", F.col("longitude").cast(DoubleType()))

# # 10. Cast bedrooms
# df = df.withColumn("bedrooms", F.col("bedrooms").cast(IntegerType()))

# # 11. Parse posted_date timestamp
# df = df.withColumn(
#     "posted_timestamp", 
#     F.to_timestamp(F.col("posted_date"), "yyyy-MM-dd'T'HH:mm:ss'Z'")
# )

# # 12. Parse possession_date: if present, parse “01 <month-year>”, else fallback to posted_timestamp date
# df = df.withColumn(
#     "possession_date_parsed",
#     F.when(
#         F.col("possession_date").isNotNull(),
#         F.to_date(F.concat(F.lit("01 "), F.col("possession_date")), "dd MMM, yyyy")
#     ).otherwise(F.to_date(F.col("posted_timestamp")))
# )

# # 13. Cast boolean-like columns
# for bool_col in ["is_active_property", "is_most_contacted", "seller_is_paid"]:
#     df = df.withColumn(bool_col,
#                        F.when(F.col(bool_col).isin(True, False), F.col(bool_col).cast(BooleanType()))
#                         .when(F.lower(F.col(bool_col)) == "true", F.lit(True))
#                         .when(F.lower(F.col(bool_col)) == "false", F.lit(False))
#                         .otherwise(None)
#                       )

# # 14. Trim/initcap for categorical columns
# for cat_col in ["region", "city", "state", "seller_type", "seller_name", "locality"]:
#     df = df.withColumn(cat_col, 
#                        F.when(F.col(cat_col).isNotNull(), F.initcap(F.col(cat_col))).otherwise(F.col(cat_col)))

# # 15. Logical price consistency: only drop rows where both min/max exist and min > max
# df = df.withColumn(
#     "price_consistent",
#     F.when(
#         (F.col("min_price_val").isNotNull() & F.col("max_price_val").isNotNull()) &
#         (F.col("min_price_val") > F.col("max_price_val")),
#         False
#     ).otherwise(True)
# )
# df = df.filter(F.col("price_consistent")).drop("price_consistent")
# print(f"After price consistency filter: {df.count()}")

# # 16. Check & drop null lat/long if truly critical
# missing_geo = df.filter(F.col("latitude").isNull() | F.col("longitude").isNull()).count()
# print(f"Rows missing lat/long: {missing_geo}")
# # If geo is critical, drop; otherwise you may choose to impute or keep with nulls.
# df = df.filter(F.col("latitude").isNotNull() & F.col("longitude").isNotNull())
# print(f"After dropping rows with null lat/long: {df.count()}")

# # 17. Fill bathrooms/bedrooms if needed
# df = df.withColumn("bathrooms", F.coalesce(F.col("bathrooms"), F.lit(-1))) \
#        .withColumn("bedrooms", F.coalesce(F.col("bedrooms"), F.lit(-1)))

# # 18. Drop intermediate raw fields
# df_clean = df.drop("avg_price_per_sqft", "price_display_value", 
#                    "property_price", "property_area", "description",
#                    "size_range", "posted_date", "possession_date")

# # 19. Rename parsed columns
# df_clean = df_clean.withColumnRenamed("avg_price_per_sqft_val", "avg_price_per_sqft") \
#                    .withColumnRenamed("property_area_val", "property_area_sqft") \
#                    .withColumnRenamed("property_price_val", "property_price") \
#                    .withColumnRenamed("posted_timestamp", "posted_date") \
#                    .withColumnRenamed("possession_date_parsed", "possession_date")

# # 20. Final schema cast
# target_schema = {
#     "id": "string", "address": "string", "long_address": "string", "locality": "string", 
#     "city": "string", "region": "string", "state": "string", "latitude": "double", 
#     "longitude": "double", "bedrooms": "int", "bathrooms": "int", "avg_price_per_sqft": "double", 
#     "min_price_val": "double", "max_price_val": "double", "property_price": "double", 
#     "property_area_sqft": "double", "min_size": "double", "max_size": "double", 
#     "posted_date": "timestamp", "possession_date": "date", "description_clean": "string", 
#     "brochure_url": "string", "from_url": "string", "url": "string", 
#     "is_active_property": "boolean", "is_most_contacted": "boolean", 
#     "seller_is_paid": "boolean", "seller_name": "string", "seller_phone": "string", 
#     "seller_type": "string", "subtitle": "string"
# }
# for col_name, dtype in target_schema.items():
#     if col_name in df_clean.columns:
#         df_clean = df_clean.withColumn(col_name, F.col(col_name).cast(dtype))

# print(f"Final cleaned count before null-imputation: {df_clean.count()}")
# df_clean.printSchema()

# # -------------------- Null Counts Logging --------------------
# # Compute null counts for each column
# null_counts = df_clean.select([
#     F.count(F.when(F.col(c).isNull(), c)).alias(c) 
#     for c in df_clean.columns
# ])
# print("Null counts per column:")
# null_counts.show(truncate=False)

# # -------------------- Logical Null Handling / Imputation --------------------
# # Example strategies: adjust as per your needs.

# # 1. String / categorical columns: fill null with "Unknown" (or "")
# str_cols = [c for c, t in df_clean.dtypes if t == 'string']
# for c in str_cols:
#     # You may choose empty string "" instead of "Unknown"
#     df_clean = df_clean.withColumn(c, F.when(F.col(c).isNull(), F.lit("Unknown")).otherwise(F.col(c)))

# # 2. Numeric columns: fill null with sentinel (-1) or median
# # Identify numeric columns (double or int)
# num_cols = [c for c, t in df_clean.dtypes if t in ('double','int') and c not in ("latitude","longitude")]  # example: maybe keep lat/long nulls? but we dropped earlier
# for c in num_cols:
#     # Example: fill with median for that column. 
#     # Only if there are nulls; else skip.
#     null_count = df_clean.filter(F.col(c).isNull()).count()
#     if null_count > 0:
#         # Compute median via approxQuantile
#         try:
#             median = df_clean.approxQuantile(c, [0.5], 0.01)[0]
#         except:
#             median = None
#         if median is not None:
#             df_clean = df_clean.withColumn(c, F.when(F.col(c).isNull(), F.lit(median)).otherwise(F.col(c)))
#         else:
#             # fallback sentinel
#             df_clean = df_clean.withColumn(c, F.when(F.col(c).isNull(), F.lit(-1)).otherwise(F.col(c)))

# # 3. Boolean columns: fill null with False (or leave null if you prefer)
# bool_cols = [c for c, t in df_clean.dtypes if t == 'boolean']
# for c in bool_cols:
#     df_clean = df_clean.withColumn(c, F.coalesce(F.col(c), F.lit(False)))

# # 4. Date/Timestamp columns: fill null with a default date or current date, or leave null.
# # Example: fill posted_date nulls with current timestamp; possession_date nulls with posted_date
# if "posted_date" in df_clean.columns:
#     df_clean = df_clean.withColumn(
#         "posted_date",
#         F.when(F.col("posted_date").isNull(), F.current_timestamp()).otherwise(F.col("posted_date"))
#     )
# if "possession_date" in df_clean.columns:
#     df_clean = df_clean.withColumn(
#         "possession_date",
#         F.when(F.col("possession_date").isNull(), F.to_date(F.col("posted_date"))).otherwise(F.col("possession_date"))
#     )

# # After imputation, re-log null counts
# null_counts_after = df_clean.select([
#     F.count(F.when(F.col(c).isNull(), c)).alias(c) 
#     for c in df_clean.columns
# ])
# print("Null counts per column after imputation:")
# null_counts_after.show(truncate=False)

# # -------------------- Final Output --------------------
# df_properties_cleaned = df_clean
# print(f"Final cleaned count after imputation: {df_properties_cleaned.count()}")
# # df_properties_cleaned.show(5, truncate=False)
