## Medallion architecture notebook

#### Install geopy library

In [0]:
pip install geopy

Python interpreter will be restarted.
Collecting geopy
  Downloading geopy-2.4.1-py3-none-any.whl (125 kB)
Collecting geographiclib<3,>=1.52
  Downloading geographiclib-2.1-py3-none-any.whl (40 kB)
Installing collected packages: geographiclib, geopy
Successfully installed geographiclib-2.1 geopy-2.4.1
Python interpreter will be restarted.


### Mouting the storage to Databricks

In [0]:
dbutils.fs.mount(
    source="wasbs://containername@storageaccoutname.blob.core.windows.net",
    mount_point="/mnt/project01",
    extra_configs={
        "fs.azure.account.key.storageaccountname.blob.core.windows.net": "connectionstring"
    }
)


#### Reading the file from the bronze folder into a dataframe

In [0]:
df = (spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .option("quote", "\"")   # handle values inside quotes
      .option("escape", "\"")  # escape quotes properly
      .csv("/mnt/project01/bronze/raw_data.csv"))


In [0]:
df = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/pallavisaxenacounsellor@gmail.com/goa_sorted__1_.csv")

#### Raw Data

In [0]:
display(df)

"title,price,price_numeric,location,details,link,property_type"
"Apartment for Sale in Provident Adora De Goa, Dabolim Goa,Call for Price,0.0,N/A,N/A,https://www.magicbricks.com/provident-adora-de-goa-dabolim-goa-pdpid-4d4235313134313633,N/A"
"3 BHK Villa for Sale in Dona Paula Goa,Call for Price,0.0,N/A,N/A,N/A,N/A"
"Apartment for Sale in Shivneri Apartments, Canconna Goa,Call for Price,0.0,N/A,N/A,https://www.magicbricks.com/shivneri-apartments-canconna-goa-pdpid-4d4235343135373231,N/A"
"1 BHK Apartment for Sale in Saipem Goa,Call for Price,0.0,N/A,N/A,N/A,N/A"
"2 BHK Apartment for Sale in Saipem Goa,Call for Price,0.0,N/A,N/A,N/A,N/A"
"3 BHK Apartment for Sale in Candolim Goa,Call for Price,0.0,N/A,N/A,N/A,N/A"
"2 BHK Apartment for Sale in Chamunda Residency, Caranzalem Goa,Call for Price,0.0,N/A,N/A,https://www.magicbricks.com/chamunda-residency-caranzalem-goa-pdpid-4d4235303939313335,N/A"
"4 BHK Villa for Sale in Siolim Goa,Call for Price,0.0,N/A,N/A,N/A,N/A"
"5 BHK Villa for Sale in Reis Magos Goa,Call for Price,0.0,N/A,N/A,N/A,N/A"
"3 BHK Villa for Sale in Benaulim Goa,Call for Price,0.0,N/A,N/A,N/A,N/A"


#### Total Records in the raw df

In [0]:
display(df.count())

3702

#### Splitting columns from the extracted data

In [0]:
from pyspark.sql.functions import regexp_extract, regexp_replace, when, col, lit
from pyspark.sql import functions as F

df = (
    df
    # Rename column correctly (example: renaming "title" -> "first")
    .withColumnRenamed("title,price,price_numeric,location,details,link,property_type", "first")
    
    # Filter out unwanted rows
    .filter(~col("first").startswith("N/A"))
    
    # Extract BHK info
    .withColumn(
        "BHK",
        when(col("first").rlike(r"^\s*>?\s*\d+\s*BHK"),
             regexp_extract(col("first"), r"^\s*>?\s*(\d+\s*BHK)", 1))
        .when(col("first").rlike(r"^(Apartment|Plot/Land|Villa|Industrial Building|Office Space|Studio Apartment|Land|Agricultural Land|Warehouse|Industrial Shed)"),
              regexp_extract(col("first"), r"^(Apartment|Plot/Land|Villa|Industrial Building|Office Space|Studio Apartment|Land|Agricultural Land|Warehouse|Industrial Shed)", 1))
        .otherwise(None)
    )
    
    # Extract area info
    .withColumn(
        "area2",
        regexp_extract(col("first"), r"(\w+(?:\s+\w+)?)\s+Goa", 1)
    )
    
    # Clean up rupee symbol
    .withColumn("first", regexp_replace(col("first"), "â‚¹", ""))
    
    # Refine area2 with special cases
    .withColumn(
        "area2",
        when(col("first").contains("Adora De Goa"),
             regexp_extract(col("first"), r"Adora De Goa\s+(\w+)", 1))
        .when(col("first").contains("Provident Adora De Goa"),
              regexp_extract(col("first"), r"Adora De Goa[,\s]+(\w+)", 1))
        .when(col("first").contains("Rio De Goa"),
              regexp_extract(col("first"), r"Rio De Goa\s+(\w+)", 1))
        .when(col("first").contains("Industrial Estate"),
              regexp_extract(col("first"), r"(\w+)\s+Industrial Estate", 1))
        .when(col("first").contains("Velha"), lit("Velha"))
        .when(col("first").contains("Maharashtra Border"), lit("Maharashtra Border"))
        .otherwise(col("area2"))
    )
    
    # Convert price to numeric
    .withColumn(
        "price_numeric",
        when(col("first").rlike("Cr"),
             regexp_extract(col("first"), r"([\d\.]+)\s*Cr", 1).cast("double") * 10000000)
        .when(col("first").rlike("Lac"),
              regexp_extract(col("first"), r"([\d\.]+)\s*Lac", 1).cast("double") * 100000)
        .otherwise(None)
    )
    
    # Extract square feet
    .withColumn(
        "sq.feet",
        F.regexp_extract(F.col("first"), r"(?:Lac|Cr)\s*(\d+)", 1).cast("double")
    )
)

display(df)


first,BHK,area2,price_numeric,sq.feet
"Apartment for Sale in Provident Adora De Goa, Dabolim Goa,Call for Price,0.0,N/A,N/A,https://www.magicbricks.com/provident-adora-de-goa-dabolim-goa-pdpid-4d4235313134313633,N/A",Apartment,,,
"3 BHK Villa for Sale in Dona Paula Goa,Call for Price,0.0,N/A,N/A,N/A,N/A",3 BHK,Dona Paula,,
"Apartment for Sale in Shivneri Apartments, Canconna Goa,Call for Price,0.0,N/A,N/A,https://www.magicbricks.com/shivneri-apartments-canconna-goa-pdpid-4d4235343135373231,N/A",Apartment,Canconna,,
"1 BHK Apartment for Sale in Saipem Goa,Call for Price,0.0,N/A,N/A,N/A,N/A",1 BHK,in Saipem,,
"2 BHK Apartment for Sale in Saipem Goa,Call for Price,0.0,N/A,N/A,N/A,N/A",2 BHK,in Saipem,,
"3 BHK Apartment for Sale in Candolim Goa,Call for Price,0.0,N/A,N/A,N/A,N/A",3 BHK,in Candolim,,
"2 BHK Apartment for Sale in Chamunda Residency, Caranzalem Goa,Call for Price,0.0,N/A,N/A,https://www.magicbricks.com/chamunda-residency-caranzalem-goa-pdpid-4d4235303939313335,N/A",2 BHK,Caranzalem,,
"4 BHK Villa for Sale in Siolim Goa,Call for Price,0.0,N/A,N/A,N/A,N/A",4 BHK,in Siolim,,
"5 BHK Villa for Sale in Reis Magos Goa,Call for Price,0.0,N/A,N/A,N/A,N/A",5 BHK,Reis Magos,,
"3 BHK Villa for Sale in Benaulim Goa,Call for Price,0.0,N/A,N/A,N/A,N/A",3 BHK,in Benaulim,,


#### Number of nulls in each column 

In [0]:
from pyspark.sql.functions import col, sum, when, trim

null_counts = df.select([
    sum(
        when(col(f"`{c}`").isNull() | (trim(col(f"`{c}`")) == ""), 1).otherwise(0)
    ).alias(c)
    for c in df.columns
])

display(null_counts)


first,BHK,area2,price_numeric,sq.feet
0,0,57,139,447


#### Dropping the "first" column since we extracted all other columns

In [0]:
df=df.drop("first")

#### Mapping values in the "BHK" column to common categories 

In [0]:
from pyspark.sql.functions import when, col

df = df.withColumn(
    "BHK",
    when(col("BHK").isin("Agricultural Land", "Plot/Land"), "Land")
    .when(col("BHK") == "Industrial Shed", "Industrial Building")
    .otherwise(col("BHK"))
)


#### Filling the empty string values in "area2" column with "null"

In [0]:
from pyspark.sql.functions import regexp_extract, when, col, trim, lit

# Convert empty/whitespace results to real null
df = df.withColumn("area2", when(trim(col("area2")) == "", None).otherwise(col("area2")))
df = df.withColumn("price_numeric", when(trim(col("price_numeric")) == "", None).otherwise(col("price_numeric")))


#### Displaying the records in the "area2" column as "null"

In [0]:
from pyspark.sql.functions import col, trim

display(
    df.filter(
        col("area2").isNull() | (trim(col("area2")) == "")
    )
)


BHK,area2,price_numeric,sq.feet
Apartment,,,
3 BHK,,,
3 BHK,,10000000.0,
3 BHK,,10900000.0,9561.0
3 BHK,,11000000.0,9649.0
3 BHK,,11200000.000000002,9825.0
3 BHK,,11200000.000000002,9833.0
3 BHK,,11200000.000000002,9825.0
3 BHK,,11200000.000000002,9825.0
3 BHK,,11500000.0,10088.0


#### Filling the null values with the corresponding locations

In [0]:
from pyspark.sql.functions import when, col

df = df.withColumn(
    "area2",
    when(col("area2").rlike("(?i)Vidhyanagar Colony"), "Vidhyanagar Colony")
    .when(col("area2").rlike("(?i)Dabolim"), "Dabolim")
    .otherwise(col("area2"))
)



#### Cleaning the area2 column removing the word "in"

In [0]:
from pyspark.sql.functions import trim
df = df.withColumn(
    "area2",
    F.regexp_replace(F.col("area2"), r"\s*in\s*", " ")
)
df = df.withColumn("area2", trim(df["area2"]))
display(df.select("area2").distinct())

area2
Karapur
Samaya Retreat
Meadows Arossim
Oxel
Dona Paula
CARASWADA
Residency Torsem
Bogmalo
Marna
Mulgao


#### Removing unwanted areas from area2 column which dont provide a specific location

In [0]:
import re
unwanted = ["bhk flat", "Overall", "sale", "Sale", "Medical College", "nort", "guest house", "Villa Plot", "Agricultural", "goa plots"]

pattern = "|".join([re.escape(w) for w in unwanted])

# Filter rows
df = df.filter(~col("area2").rlike(pattern))
display(df)


BHK,area2,price_numeric,sq.feet
3 BHK,Dona Paula,,
Apartment,Canconna,,
1 BHK,Saipem,,
2 BHK,Saipem,,
3 BHK,Candolim,,
2 BHK,Caranzalem,,
4 BHK,Siolim,,
5 BHK,Reis Magos,,
3 BHK,Benaulim,,
Apartment,Dabolim,,


#### mapping the small localities to a common location to keep it uniform

In [0]:
mapping = {
    "Samaya Retreat": "Dodamarg",
    "Meadows Arossim": "Arossim",
    "Residency Torsem": "Torsem",
    "Brizen enclave": "Majorda",
    "Alt ho": "Panjim",
    "Santa ez": "Panjim",
    "Patto": "Panjim",
    "Maria Resort": "Calangute",
    "Barro": "Mandrem",
    "golf city": "Mopa",
    "GHD Skypark Goa": "Margao",
    "villa Riveria": "Candolim",
    "Susheela seawinds": "Vasco",
    "Sale in Chodankar Harmony Goa": "Tiswadi",
    "St Inez": "Panjim",
    "Acoi Village": "Mapusa",
    "Levelup Dreamz": "Shiroda",
    "Caserio": "Assagao",
    "Mulgao": "Bicholim",
    "gogol": "Margao",
    "Classic Apartment": "Vasco",
    "gogal lake": "Arambol",
    "HOA One": "Bicholim",
    "Campal": "Panjim",
    "do Mundo": "Porvorim",
    "Vidhyanagar Colony": "Vasco",
    "sapana vista": "Margao",
    "Quitla": "Mapsa",
    "Fatorda Margao": "Margao",
    "Braganza resort": "Calangute",
    "yog city": "Dodamarg",
    "Bus Stand": "Kadamba",
    "Borim Ponda": "Ponda",
    "Ba gu im": "Bainguinim",
    "of Joy": "Panjim",
    "Aero Cidade": "Dabolim",
    "VILLA COLVA": "Colva",
    "apartments comba": "Salcete",
    "Mia Casa": "Arpora",
    "New Vaddem": "Vasco",
    "Sheikh Construction": "Porvorim",
    "Arolkar Apartment": "Vasco",
    "barreto Appartments": "Benaulim",
    "Anjuna Road": "Anjuna",
    "view residency": "Margao",
    "Faria Mist": "Salcete",
    "STARLITE HOMES": "Carmona",
    "Yugen Infra": "Mopa",
    "Dhargalim VP": "Dhargalim",
    "Lodha Imperial": "Bainguinim",
    "Abh andan Lodha":"Bainguinim",
    "Mathias Hills": "Dona Paula",
    "in Ishta": "Dabolim",
    "Jairam Nagar": "Dabolim",
    "Bhobe Pride": "Nerul",
    "goa 2": "Nerul",
    "parish property": "Old Goa",
    "Nest Residency": "Porvorim",
    "f meadows": "Revora",
    "Ch ch im": "Chicalim",
    "garden zorit": "Sancole",
    "Airpot Road": "Dabolim",
    "Calungate": "Calangute",
    "Vidya Nagar": "Margao",
    "De Braganza": "Calangute",
    "Mauzekar": "Ponda",
    "bank kaijwada": "Ponda",
    "Tivim": "Thivim",
    "Da Gama": "Vasco",
    "keshava": "Candolim",
    "nessai": "Margao",
    "La Mer": "Calangute",
    "Superluxury Villa": "Candolim",
    "Osborne Apartment": "Calangute",
    "Sai Enclave": "Orgao",
    "Paradise Palms": "Nerul",
    "cansa Thivim": "Thivim",
    "Old": "Old Goa",
    "Feira Alta": "Mapsa",
    "Aldeia De": "Dona Paula",
    "Aqua Eden": "Sancole",
    "GHARSE TOWER": "Panjim",
    "Sukhshanti Gardens": "Ponda",
    "Queeny Vision": "Sancoale",
    "Gulf of": "Dabolim",
    "nest mspusa": "Mapsa",
    "Carmona Panchayat" : "Carmona",
    "Abhinandan Lodha" : "Baingini",
    "Seawinds Orion" : "Vasco",
    "Pilerne" : "Porvorim",
    "Gogal Lake" : "Arambol",
    "Chodankar Harmony" : "Cujira",
    "GHD Skypark" : "Margao",
    "casa vino" : "Nachinola",
    "Anokha Estates":"Porvorim",
    "Alto Porvorim" :"Porvorim",
    "vaddem" :"Vasco",
    "Honda":"onda",
    "TISWADI goa":"Tiswadi",
    "Dukle heavan":"Karaswada",
    "Bicholim North":"Bicholim",
    "giri mapsa":"Mapsa",
    "Da Praia":"Calangute",
    "Santainez": "Panjim",
    "housing society" :"Karaswada",
    "Gogol":"Margao",
    "Mumbai":"Mumbai Highway",
    "Khobra Waddo":"Calangute",
    "BARRO":"Mandrem",
    "pvreal" :"Mapsa",
    "TATA":"Dabolim",
    "tata":"Dalbolim",
    "PVReal":"Mapsa",
    "Nach ola":"Nachinola",
    "Susheela seaw ds":"Dabolim",
    "hous g society":"Karaswada",
    "Casa V o":"Casa Vino",
    "Nuvem Margao":"Margao",
    "Madgaon":"Margao",
    "Aradi Socorro":"Margao",
    "casa v o":"Casa Vino",
    "Seaw ds Orion":"Vasco",
    "Honda":"Onda",
    "2":"Chicalim",
    "goa 2":"Sindhudurg",
    "Chogm Road":"Porvorim",
    "Ch ch im":"Chicalim",
    "moul gem":"Morjim",
    "CARASWADA":"Karaswada",
    "thivim":"Thivim",
    "Pvreal":"Mapsa",
    "Tata":"Dabolim",
    "dabolim":"Dabolim",
    "corgao":"Corgao",
    "tivim":"Thivim",
    "Salcate":"Salcete"

}


#### We create a new "location" column after mapping

In [0]:
from pyspark.sql import functions as F

# convert dictionary to PySpark map
mapping_expr = F.create_map([F.lit(x) for kv in mapping.items() for x in kv])

df = df.withColumn(
    "Location",
    F.coalesce(mapping_expr[F.col("area2")], F.col("area2"))
)


#### Clean the "Location" column

In [0]:
from pyspark.sql import functions as F

df = df.withColumn(
    "Location",
    F.regexp_replace(F.col("Location"), r"\bin\b", "")
)

# Remove extra spaces that may appear after dropping "in"
df = df.withColumn(
    "Location",
    F.trim(F.regexp_replace(F.col("Location"), r"\s+", " "))
)
display(df)


BHK,area2,price_numeric,sq.feet,Location
3 BHK,Dona Paula,,,Dona Paula
Apartment,Canconna,,,Canconna
1 BHK,Saipem,,,Saipem
2 BHK,Saipem,,,Saipem
3 BHK,Candolim,,,Candolim
2 BHK,Caranzalem,,,Caranzalem
4 BHK,Siolim,,,Siolim
5 BHK,Reis Magos,,,Reis Magos
3 BHK,Benaulim,,,Benaulim
Apartment,Dabolim,,,Dabolim


#### Dropping the 'area2' column

In [0]:
df=df.drop("area2")

#### Filling the null values in the 'sq.feet' column by filling them with average sq.feet per area

In [0]:
from pyspark.sql import functions as F


# Step 1: Compute average sq.feet per area
area_avg = (
    df.filter(F.col("price_numeric").isNotNull() & F.col("`sq.feet`").isNotNull())
      .groupBy("Location")
      .agg(F.round(F.avg("`sq.feet`"), 0).alias("avg_sq_feet"))
)

# Step 2: Join back with original dataframe
df= (
    df.join(area_avg, on="Location", how="left")
      .withColumn(
          "sq_feet",
          F.when(F.col("`sq.feet`").isNotNull(), F.col("`sq.feet`"))
           .otherwise(F.col(("avg_sq_feet")))
      )
      .drop("avg_sq_feet")  # optional cleanup
)
df= df.drop("sq.feet")
display(df)


Location,BHK,price_numeric,sq_feet
Dona Paula,3 BHK,,19580.0
Canconna,Apartment,,8281.0
Saipem,1 BHK,,18414.0
Saipem,2 BHK,,18414.0
Candolim,3 BHK,,14287.0
Caranzalem,2 BHK,,14042.0
Siolim,4 BHK,,17747.0
Reis Magos,5 BHK,,13846.0
Benaulim,3 BHK,,8994.0
Dabolim,Apartment,,8444.0


#### Dropping nulls in the 'price_numeric' and 'sq.feet' columns

In [0]:
df = df.filter(~F.col("price_numeric").isNull())
df = df.filter(~F.col("sq_feet").isNull())

df=df.drop("sq.feet")
display(df.count())

2473

#### Rounding the 'price_numeric' column

In [0]:
from pyspark.sql.functions import round

df = df.withColumn("price_numeric", round("price_numeric", 0))

#### Renaming the 'price_numeric column' to 'price'

In [0]:
df = df.withColumnRenamed("price_numeric", "price")

In [0]:
from pyspark.sql.functions import col, sum, when, trim

null_counts = df.select([
    sum(
        when(col(f"`{c}`").isNull() | (trim(col(f"`{c}`")) == ""), 1).otherwise(0)
    ).alias(c)
    for c in df.columns
])

display(null_counts)

Location,BHK,price,sq_feet
0,0,0,0


#### Defining the silver path

In [0]:
silver_path ="/mnt/project01/silver/"
df.write.mode("overwrite").format("delta").save(silver_path)

#### Checkpoint : if the _delta_log directory exists and has files


In [0]:
import os
silver_path = "/mnt/project01/silver/"
delta_log_path = os.path.join(silver_path, "_delta_log")

# List contents of the delta log
dbutils.fs.ls(silver_path + "_delta_log/")

#### Writing the data to a cleaned data to a silver folder

In [0]:
dbutils.fs.rm(silver_path, True)  # True enables recursive deletion

# Then write your data
df.write.mode("overwrite").format("delta").save(silver_path)

#### Enriching the data with the latitude and longitude information

In [0]:
unique_areas = [row["Location"] for row in df.select("Location").distinct().collect()]
from geopy.geocoders import Nominatim
import time
import pandas as pd

geolocator = Nominatim(user_agent="goa_project")
lookup_data = []

for area in unique_areas:
    try:
        location = geolocator.geocode(area + ", Goa, India")
        if location:
            lookup_data.append((area, location.latitude, location.longitude))
        else:
            lookup_data.append((area, None, None))
    except:
        lookup_data.append((area, None, None))
    time.sleep(1)  # to avoid being blocked
lookup_df = pd.DataFrame(lookup_data, columns=["Location", "latitude", "longitude"])
print(lookup_df)


       Location   latitude  longitude
0       Karapur  15.563510  73.988360
1          Oxel  15.286317  74.251226
2    Dona Paula  15.458200  73.803867
3       Bogmalo  15.369632  73.835917
4         Marna  15.613281  73.771954
..          ...        ...        ...
162  Betalbatim  15.300881  73.920300
163     Britona  15.520529  73.847523
164        Onda        NaN        NaN
165      Agonda  15.041104  73.988242
166    Canacona  15.006842  74.039489

[167 rows x 3 columns]


In [0]:
# Convert Pandas DataFrame to PySpark DataFrame
spark_df = spark.createDataFrame(lookup_df)

# Show first few rows
display(spark_df)


Location,latitude,longitude
Karapur,15.5635097,73.9883604
Oxel,15.2863172,74.2512259
Dona Paula,15.4581997,73.8038671
Bogmalo,15.3696321,73.8359166
Marna,15.6132808,73.7719541
Chandor,15.2614307,74.0440025
Bastora,15.5813889,73.8256926
Taleigao,15.4702665,73.8225674
Old Goa,15.5023329,73.9117432
Taligao,15.477811,73.82054


In [0]:
# Save in driver local path
local_path = "/databricks/driver/goa_area_lookup.csv"
lookup_df.to_csv(local_path, index=False)

# Copy from driver to DBFS
dbutils.fs.cp(f"file:{local_path}", "dbfs:/FileStore/goa_area_lookup.csv")

Out[408]: True

#### Save the location , latitude and longitude to a separate dataframe

In [0]:
locs = spark.read.format("csv").option("header", "true").load("filepath")

In [0]:
display(locs)

Location,latitude,longitude
Karapur,15.5635097,73.9883604
Oxel,15.2863172,74.2512259
Dona Paula,15.4581997,73.8038671
Bogmalo,15.3696321,73.8359166
Marna,15.6132808,73.7719541
Chandor,15.2614307,74.0440025
Bastora,15.5813889,73.8256926
Taleigao,15.4702665,73.8225674
Old Goa,15.5023329,73.9117432
Taligao,15.477811,73.82054


#### Join it with the original dataframe

In [0]:
# Join with original df
df_final = df.join(locs, on="Location", how="left")

In [0]:
display(df_final)

Location,BHK,price,sq_feet,latitude,longitude
Mapusa,Office Space,100000.0,77.0,15.590853,73.8102146
Taleigao,2 BHK,10000000.0,11905.0,15.4702665,73.8225674
Calangute,2 BHK,10000000.0,12500.0,15.545594,73.7646182
Tuem,Land,10000000.0,2714.0,15.6665677,73.795665
Bainguinim,2 BHK,10000000.0,7492.0,15.5003488,73.9022881
Arpora,1 BHK,10000000.0,13333.0,15.5630974,73.7640368
Ponda,Land,100000.0,0.0,15.3979351,74.0079788
Mapusa,1 BHK,10000000.0,11806.0,15.590853,73.8102146
Porvorim,2 BHK,10000000.0,7943.0,15.5251795,73.8282988
Siolim,1 BHK,10000000.0,15408.0,15.6270222,73.7656851


#### Writing the final dataframe to the gold folder

In [0]:
gold_path ="/mnt/project01/gold/"
df_final.write.mode("overwrite").format("delta").save(gold_path)
