In [47]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import datetime

In [48]:
spark = SparkSession.builder.appName("Real_Estate_Processing").getOrCreate()

In [49]:
# Real_estate_data : project-bucket-for-pipeline/real_estate_data/real_estate_files/2025-01-07/real_estate_data.json
# Own Data : project-bucket-for-pipeline/real_estate_data/owners_data_files/2025-01-07/owner_data.json

In [54]:
today_date = datetime.datetime.now().strftime("%Y-%m-%d")
real_estate_data = f"gs://project-bucket-for-pipeline/real_estate_data/real_estate_files/{today_date}/real_estate_data.json"
owner_data = f"gs://project-bucket-for-pipeline/real_estate_data/owners_data_files/{today_date}/owner_data.json"

In [56]:
real_estate_df = spark.read.format("json")\
                            .option("inferschema", True)\
                            .option("mode", "PERMISSIVE")\
                            .load(real_estate_data)
real_estate_df.show(2)

                                                                                

+---------+--------+---------+--------+-----------+---------+-----+----------+
|Bathrooms|Bedrooms| Location|   Price|Property_ID|Size_SqFt| Type|Year_Built|
+---------+--------+---------+--------+-----------+---------+-----+----------+
|        2|       1|  Chennai|29865958|    P-00001|     2643|Villa|      2004|
|        3|       4|Hyderabad|42386305|    P-00002|     1566|House|      2007|
+---------+--------+---------+--------+-----------+---------+-----+----------+
only showing top 2 rows



In [57]:
owner_df = spark.read.format("json")\
                            .option("inferschema", True)\
                            .option("mode", "PERMISSIVE")\
                            .load(owner_data)
owner_df.show(2)

+--------------+--------+-----------+-----------+
|Contact_Number|Owner_ID| Owner_Name|Property_ID|
+--------------+--------+-----------+-----------+
| +918228668084| O-00001| Meera Iyer|    P-00001|
| +917049457261| O-00002|Anjali Iyer|    P-00002|
+--------------+--------+-----------+-----------+
only showing top 2 rows



In [58]:
# Calculating the Price_Per_Sqft in real_estate_df
real_estate_with_price_per_sqft = real_estate_df.withColumn("Price_Per_Sqft", round(col("Price")/col("Size_SqFt"),2))
real_estate_with_price_per_sqft.show(2)

+---------+--------+---------+--------+-----------+---------+-----+----------+--------------+
|Bathrooms|Bedrooms| Location|   Price|Property_ID|Size_SqFt| Type|Year_Built|Price_Per_Sqft|
+---------+--------+---------+--------+-----------+---------+-----+----------+--------------+
|        2|       1|  Chennai|29865958|    P-00001|     2643|Villa|      2004|      11300.02|
|        3|       4|Hyderabad|42386305|    P-00002|     1566|House|      2007|      27066.61|
+---------+--------+---------+--------+-----------+---------+-----+----------+--------------+
only showing top 2 rows



In [59]:
# Renaming Columns for Property_ID in both the DFs
real_estate_renamed = real_estate_with_price_per_sqft.withColumnRenamed("Property_ID", "Real_Estate_Property_ID")
owner_renamed = owner_df.withColumnRenamed("Property_ID", "Owner_Property_ID")

In [60]:
real_estate_renamed.show(2)

+---------+--------+---------+--------+-----------------------+---------+-----+----------+--------------+
|Bathrooms|Bedrooms| Location|   Price|Real_Estate_Property_ID|Size_SqFt| Type|Year_Built|Price_Per_Sqft|
+---------+--------+---------+--------+-----------------------+---------+-----+----------+--------------+
|        2|       1|  Chennai|29865958|                P-00001|     2643|Villa|      2004|      11300.02|
|        3|       4|Hyderabad|42386305|                P-00002|     1566|House|      2007|      27066.61|
+---------+--------+---------+--------+-----------------------+---------+-----+----------+--------------+
only showing top 2 rows



In [61]:
owner_renamed.show(2)

+--------------+--------+-----------+-----------------+
|Contact_Number|Owner_ID| Owner_Name|Owner_Property_ID|
+--------------+--------+-----------+-----------------+
| +918228668084| O-00001| Meera Iyer|          P-00001|
| +917049457261| O-00002|Anjali Iyer|          P-00002|
+--------------+--------+-----------+-----------------+
only showing top 2 rows



In [62]:
# Joining both the dataframes
joined_df = real_estate_renamed\
            .join(owner_renamed, real_estate_renamed["Real_Estate_Property_ID"] == owner_renamed["Owner_Property_ID"], "inner")\
            .selectExpr("Owner_Property_ID", "Owner_ID", "Owner_Name", "Contact_Number", "Location", "Type", "Size_SqFt", "Price", "Price_Per_Sqft", "Bathrooms", "Bedrooms")
joined_df.show(2)

+-----------------+--------+-----------+--------------+---------+-----+---------+--------+--------------+---------+--------+
|Owner_Property_ID|Owner_ID| Owner_Name|Contact_Number| Location| Type|Size_SqFt|   Price|Price_Per_Sqft|Bathrooms|Bedrooms|
+-----------------+--------+-----------+--------------+---------+-----+---------+--------+--------------+---------+--------+
|          P-00001| O-00001| Meera Iyer| +918228668084|  Chennai|Villa|     2643|29865958|      11300.02|        2|       1|
|          P-00002| O-00002|Anjali Iyer| +917049457261|Hyderabad|House|     1566|42386305|      27066.61|        3|       4|
+-----------------+--------+-----------+--------------+---------+-----+---------+--------+--------------+---------+--------+
only showing top 2 rows



In [63]:
# Writing joined_df to the BigQuery table
dataset_id = "linen-age-447106-e3.Real_estate_dataset"
table_name = "real_estate_owner_data"
table_id = f"{dataset_id}.{table_name}"
bq_temp_bucket = "gs://project-bucket-for-pipeline/real_estate_data/gcs_temp_bucket"
joined_df.write\
        .format("bigquery")\
        .option("table", table_id)\
        .option("temporaryGcsBucket", bq_temp_bucket)\
        .mode("append")\
        .save()

print("Dataframe loaded successfully")

                                                                                

Dataframe loaded successfully


In [45]:
# Define the path to the GCS bucket
final_df_bucket = f"gs://project-bucket-for-pipeline/real_estate_data/joined_real_estate_owner/{today_date}"

# Write the DataFrame to GCS in CSV format (you can choose other formats like 'parquet', 'json', etc.)
joined_df.write \
    .format("csv") \
    .option("header", True) \
    .mode("append") \
    .save(final_df_bucket)

print(f"Data written to {final_df_bucket}")


                                                                                

Data written to gs://project-bucket-for-pipeline/real_estate_data/joined_real_estate_owner/2025-01-07
