In [20]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [21]:
spark = SparkSession.builder.appName("merging_all_datasets").getOrCreate()

In [22]:
df_bayut = spark.read.parquet("hdfs://namenode:9000/datalake/gold/transformed_bayut")
df_dubbizle =  spark.read.parquet("hdfs://namenode:9000/datalake/gold/transformed_dubbizle")
df_fazwaz=  spark.read.parquet("hdfs://namenode:9000/datalake/gold/transformed_fazwaz")
df_propertyfinder =  spark.read.parquet("hdfs://namenode:9000/datalake/gold/transformed_propertyfinder")

In [23]:
df_bayut.printSchema()
df_dubbizle.printSchema()
df_fazwaz.printSchema()
df_propertyfinder.printSchema()

root
 |-- title: string (nullable = true)
 |-- link: string (nullable = true)
 |-- price: double (nullable = true)
 |-- location: string (nullable = true)
 |-- region: string (nullable = true)
 |-- area: double (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- bathrooms: integer (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- property_type: string (nullable = true)
 |-- source: string (nullable = true)
 |-- price_per_sqm: double (nullable = true)
 |-- Jacuzzi: integer (nullable = true)
 |-- Garden: integer (nullable = true)
 |-- Balcony: integer (nullable = true)
 |-- Pool: integer (nullable = true)
 |-- Parking: integer (nullable = true)
 |-- Gym: integer (nullable = true)
 |-- Maids_Quarters: integer (nullable = true)
 |-- Spa: integer (nullable = true)
 |-- description: string (nullable = true)

root
 |-- description: string (nullable = true)
 |-- link: string (nullable = true)
 |-- price: double (nullable = true)

In [24]:
# --- UPDATED LOGIC: FAZWAZ & PROPERTYFINDER DEEP EXTRACTION ---

from pyspark.sql.functions import split, trim, col, when, size, regexp_replace, lit, element_at, slice, array_join, lower

# 1. df_bayut processing:
print("Standardizing Bayut...")
df_bayut = df_bayut.withColumnRenamed("region", "city") \
                   .withColumn("region", col("location"))

# 2. df_dubbizle processing:
def split_dubbizle_location(df):
    print("Standardizing Dubbizle...")
    df_clean = df.withColumn("clean_loc", regexp_replace(col("location"), "[.•]+$", ""))
    
    df_with_city = df_clean.withColumn(
        "city",
        when(col("source") == "dubbizle_Cairo", "Cairo")
        .otherwise("Alexandria")
    )
    
    df_split = df_with_city.withColumn("split_arr", split(col("clean_loc"), ","))
    last_token = trim(element_at(col("split_arr"), -1))
    
    df_processed = df_split.withColumn(
        "region",
        when(lower(last_token).isin("cairo", "alexandria", "alex", "egypt"), 
             trim(element_at(col("split_arr"), -2)))
        .otherwise(last_token)
    )
    return df_processed.drop("split_arr", "clean_loc")

df_dubbizle = split_dubbizle_location(df_dubbizle)

# 3. df_fazwaz (Deep Extraction):
def split_fazwaz_location(df):
    print("Standardizing Fazwaz...")
    df_cleaned = df.withColumn("location_cleaned", regexp_replace(col("location"), ", Egypt$", ""))
    df_split = df_cleaned.withColumn("split_arr", split(col("location_cleaned"), ","))
    df_split = df_split.withColumn("split_size", size(col("split_arr")))
    
    df_split = df_split.withColumn("city", trim(element_at(col("split_arr"), -1)))
    df_split = df_split.withColumn("temp_region", trim(element_at(col("split_arr"), -2)))

    df_processed = df_split.withColumn(
        "region",
        when(
            (col("temp_region").isin("New Cairo", "New Cairo City")) & (col("split_size") >= 3),
            trim(element_at(col("split_arr"), -3))
        ).otherwise(col("temp_region"))
    )
    
    return df_processed.drop("location_cleaned", "split_arr", "split_size", "temp_region")

df_fazwaz = split_fazwaz_location(df_fazwaz)

# 4. df_propertyfinder (Deep Extraction):
print("Standardizing PropertyFinder...")
df_propertyfinder = df_propertyfinder.withColumn("location_lower", lower(col("location")))

df_propertyfinder = df_propertyfinder.withColumn(
    "city",
    when(col("location_lower").rlike("alex|alexandria|alex governorate|elx"), "Alexandria")
    .when(col("location_lower").rlike(
        "cairo|giza|nasr city|heliopolis|maadi|rehab|tagamo3|new cairo|madinaty|badr|shorouk|obour|sheikh zayed|zayed|6th october|6 october|october|hadayek october|october gardens|new giza|palm hills|gardenia|zayed dunes|dahshour|الشيخ زايد|٦ أكتوبر|اكتوبر|حدائق اكتوبر"
    ), "Cairo")
    .when(col("location_lower").rlike("north coast|sidi abdel rahman|al alamein|ras al hekm|dabaa"), "North Coast")
    .when(col("location_lower").rlike("ain sokhna|sokhna|suez|galala"), "Sokhna")
    .when(col("location_lower").rlike("hurghada|red sea|soma bay|safaga|gouna"), "Red Sea")
    .when(col("location_lower").rlike("ras sedr|south sinai|dahab|sharm el sheikh"), "South Sinai")
    .otherwise("Other")
)

df_propertyfinder = df_propertyfinder.withColumn("split_pf", split(col("location"), ","))
df_propertyfinder = df_propertyfinder.withColumn("split_size", size(col("split_pf")))
df_propertyfinder = df_propertyfinder.withColumn("temp_region", trim(element_at(col("split_pf"), -2)))

df_propertyfinder = df_propertyfinder.withColumn(
    "region", 
    when(
        (col("temp_region").isin("New Cairo City", "New Cairo")) & (col("split_size") >= 3),
        trim(element_at(col("split_pf"), -3))
    )
    .otherwise(col("temp_region"))
)

df_propertyfinder = df_propertyfinder.drop("location_lower", "split_pf", "split_size", "temp_region")

print("Location standardization complete for all sources.")

Standardizing Bayut...
Standardizing Dubbizle...
Standardizing Fazwaz...
Standardizing PropertyFinder...
Location standardization complete for all sources.


In [25]:
# Merge all DataFrames
final_columns = [
    "title", "link", "price", "location", "region", "city", "area", "bedrooms", "bathrooms",
    "latitude", "longitude", "property_type", "source", "price_per_sqm",
    "Jacuzzi", "Garden", "Balcony", "Pool", "Parking", "Gym",
    "Maids_Quarters", "Spa", "description"
]

def safe_select(df, columns):
    existing_cols = df.columns
    select_expr = [
        col(c) if c in existing_cols else lit(None).cast(StringType()).alias(c) 
        for c in columns
    ]
    return df.select(select_expr)

df_bayut_ordered = safe_select(df_bayut, final_columns)
df_dubbizle_ordered = safe_select(df_dubbizle, final_columns)
df_fazwaz_ordered = safe_select(df_fazwaz, final_columns)
df_propertyfinder_ordered = safe_select(df_propertyfinder, final_columns)

df_all = (
    df_bayut_ordered
    .unionByName(df_dubbizle_ordered)
    .unionByName(df_fazwaz_ordered)
    .unionByName(df_propertyfinder_ordered)
)

df_all = df_all.dropDuplicates()

print("--- Merged DataFrame Schema ---")
df_all.printSchema()


--- Merged DataFrame Schema ---
root
 |-- title: string (nullable = true)
 |-- link: string (nullable = true)
 |-- price: double (nullable = true)
 |-- location: string (nullable = true)
 |-- region: string (nullable = true)
 |-- city: string (nullable = true)
 |-- area: double (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- bathrooms: integer (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- property_type: string (nullable = true)
 |-- source: string (nullable = true)
 |-- price_per_sqm: double (nullable = true)
 |-- Jacuzzi: integer (nullable = true)
 |-- Garden: integer (nullable = true)
 |-- Balcony: integer (nullable = true)
 |-- Pool: integer (nullable = true)
 |-- Parking: integer (nullable = true)
 |-- Gym: integer (nullable = true)
 |-- Maids_Quarters: integer (nullable = true)
 |-- Spa: integer (nullable = true)
 |-- description: string (nullable = true)



In [26]:
# --- UPDATED CLEANING & NAME STANDARDIZATION ---

from pyspark.sql.functions import initcap, trim, regexp_replace

print("Cleaning 'city' column and standardizing 'region'...")

# 1. Basic Trim & Formatting
df_all = df_all.withColumn("city", initcap(regexp_replace(trim(col("city")), "[.•]$", "")))
df_all = df_all.withColumn("region", initcap(regexp_replace(trim(col("region")), "[.•]$", "")))

# 2. Fix City Names
df_all = df_all.withColumn("city", 
    when(col("city") == "Alex", "Alexandria")
    .otherwise(col("city"))
)
df_all = df_all.withColumn("region", 
    when(col("region") == "Alex", "Alexandria")
    .otherwise(col("region"))
)

# 3. --- NAME STANDARDIZATION STEP ---
# This fixes 'The 5th Settlement' and merges 'New Cairo City' -> 'New Cairo'
df_all = df_all.withColumn("region",
    regexp_replace(col("region"), "The 5th Settlement", "5th Settlement")
)
df_all = df_all.withColumn("region",
    regexp_replace(col("region"), "New Cairo City", "New Cairo")
)
df_all = df_all.withColumn("region",
    regexp_replace(col("region"), "6 October City", "6 October")
)
df_all = df_all.withColumn("region",
    regexp_replace(col("region"), "Sheikh Zayed City", "Sheikh Zayed")
)

# 4. Apply Constraint: region != city
print("Applying constraint: region cannot be the same as city...")
df_all = df_all.withColumn("region",
    when(col("region") == col("city"), None) 
    .otherwise(col("region"))
)

print("City and Region columns cleaned. Showing new distinct counts:")
city_counts_cleaned = df_all.groupBy("city").count().orderBy("count", ascending=False)
city_counts_cleaned.show(truncate=False)

print("--- Region Counts (non-null) ---")
region_counts_cleaned = df_all.filter(col("region").isNotNull()).groupBy("region").count().orderBy("count", ascending=False)
region_counts_cleaned.show(truncate=False)

print("--- Sample of all three location columns ---")
df_all.select("location", "region", "city", "source").show(20, truncate=False)

Cleaning 'city' column and standardizing 'region'...
Applying constraint: region cannot be the same as city...
City and Region columns cleaned. Showing new distinct counts:
+-----------+-----+
|city       |count|
+-----------+-----+
|Cairo      |10096|
|Alexandria |1823 |
|Matruh     |459  |
|Giza       |262  |
|Red Sea    |255  |
|Suez       |181  |
|North Coast|106  |
|Sokhna     |73   |
|South Sinai|3    |
|Other      |2    |
|Aswan      |1    |
|Damietta   |1    |
+-----------+-----+

--- Region Counts (non-null) ---
+----------------------------+-----+
|region                      |count|
+----------------------------+-----+
|5th Settlement              |5068 |
|New Cairo                   |686  |
|Al Rehab                    |496  |
|North Coast                 |459  |
|Sheikh Zayed                |399  |
|Madinaty                    |367  |
|Smoha                       |359  |
|New Capital City            |348  |
|6 October                   |285  |
|Shorouk City                

In [27]:
df_all = df_all.withColumn("property_type", lower(col("property_type")))

In [28]:
!pip install psycopg2-binary



In [29]:
# Load the final merged DataFrame into PostgreSQL

import psycopg2
import pandas as pd
from psycopg2.extras import execute_values

# Convert Spark DataFrame to Pandas
print("Converting Spark DataFrame to Pandas DataFrame...")
pdf = df_all.toPandas()
print(f"Conversion complete. {len(pdf)} rows to insert.")

# PostgreSQL connection parameters
conn_params = {
    "dbname": "real_state_dwh",
    "user": "admin",
    "password": "admin",
    "host": "postgres_general", 
    "port": 5432
}

insert_sql = """
INSERT INTO real_estate_one_big_table (
    title, link, price, location, region, city, area, bedrooms, bathrooms, latitude, longitude,
    property_type, source, price_per_sqm, Jacuzzi, Garden, Balcony, Pool,
    Parking, Gym, Maids_Quarters, Spa, description
) VALUES %s
ON CONFLICT (link) DO NOTHING; 
"""

conn = None
cur = None
try:
    print("Connecting to PostgreSQL...")
    conn = psycopg2.connect(**conn_params)
    cur = conn.cursor()

    print("Truncating destination table for a fresh load...")
    cur.execute("TRUNCATE TABLE real_estate_one_big_table;")

    print("Preparing data tuples for insertion...")
    data_tuples = [tuple(x) for x in pdf.to_numpy()]

    print(f"Executing batch insert of {len(data_tuples)} rows...")
    execute_values(cur, insert_sql, data_tuples)

    conn.commit()
    print("Data loaded successfully!")

except Exception as e:
    print("Error:", e)
    if conn:
        conn.rollback()

finally:
    print("Closing connection.")
    if cur:
        cur.close()
    if conn:
        conn.close()


Converting Spark DataFrame to Pandas DataFrame...
Conversion complete. 13262 rows to insert.
Connecting to PostgreSQL...
Truncating destination table for a fresh load...
Preparing data tuples for insertion...
Executing batch insert of 13262 rows...
Data loaded successfully!
Closing connection.
