In [1]:
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
!ls -lh /content/drive/MyDrive/dld_transactions


total 968M
-rw------- 1 root root 968M Jan 28 10:27 dld_transactions.csv


In [3]:
!apt-get update -qq
!apt-get install openjdk-11-jdk-headless -qq


W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)


In [4]:
!rm -f spark-3.5.0-bin-hadoop3.tgz
!wget -O spark-3.5.0-bin-hadoop3.tgz https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!ls -lh spark-3.5.0-bin-hadoop3.tgz


--2026-01-28 10:57:56--  https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
Resolving archive.apache.org (archive.apache.org)... 65.108.204.189, 2a01:4f9:1a:a084::2
Connecting to archive.apache.org (archive.apache.org)|65.108.204.189|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 400395283 (382M) [application/x-gzip]
Saving to: ‘spark-3.5.0-bin-hadoop3.tgz’


2026-01-28 10:58:22 (15.0 MB/s) - ‘spark-3.5.0-bin-hadoop3.tgz’ saved [400395283/400395283]

-rw-r--r-- 1 root root 382M Sep  9  2023 spark-3.5.0-bin-hadoop3.tgz


In [5]:
!tar -xzf spark-3.5.0-bin-hadoop3.tgz
!ls -lh


total 382M
drwx------  5 root root 4.0K Jan 28 10:47 drive
drwxr-xr-x  1 root root 4.0K Jan 16 14:24 sample_data
drwxr-xr-x 13 1000 1000 4.0K Sep  9  2023 spark-3.5.0-bin-hadoop3
-rw-r--r--  1 root root 382M Sep  9  2023 spark-3.5.0-bin-hadoop3.tgz


Colab already has a Google package (dataproc-spark-connect) that prefers PySpark 4.x, but you’re installing PySpark 3.5.0
run this to execute that----

In [7]:
!pip -q uninstall -y dataproc-spark-connect
!pip -q install pyspark==3.5.0 //



In [6]:
import os, sys

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"
os.environ["PATH"] += ":/content/spark-3.5.0-bin-hadoop3/bin"

sys.path.insert(0, "/content/spark-3.5.0-bin-hadoop3/python")
sys.path.insert(0, "/content/spark-3.5.0-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip")

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("DLD Ingestion").getOrCreate()
spark.range(5).show()


+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [8]:
csv_path = "/content/drive/MyDrive/dld_transactions/dld_transactions.csv"
csv_path


'/content/drive/MyDrive/dld_transactions/dld_transactions.csv'

In [11]:
# Print the CSV header

!head -n 1 "$csv_path"


"transaction_id","procedure_id","trans_group_id","trans_group_ar","trans_group_en","procedure_name_ar","procedure_name_en","instance_date","property_type_id","property_type_ar","property_type_en","property_sub_type_id","property_sub_type_ar","property_sub_type_en","property_usage_ar","property_usage_en","reg_type_id","reg_type_ar","reg_type_en","area_id","area_name_ar","area_name_en","building_name_ar","building_name_en","project_number","project_name_ar","project_name_en","master_project_en","master_project_ar","nearest_landmark_ar","nearest_landmark_en","nearest_metro_ar","nearest_metro_en","nearest_mall_ar","nearest_mall_en","rooms_ar","rooms_en","has_parking","procedure_area","actual_worth","meter_sale_price","rent_value","meter_rent_price","no_of_parties_role_1","no_of_parties_role_2","no_of_parties_role_3"


In [12]:
# Define an explicit schema
# load everything as strings first to avoid failures


from pyspark.sql.types import StructType, StructField, StringType

header_cols = open(csv_path, "r", encoding="utf-8", errors="ignore") \
    .readline() \
    .strip() \
    .split(",")

schema = StructType([StructField(col, StringType(), True) for col in header_cols])

schema


StructType([StructField('"transaction_id"', StringType(), True), StructField('"procedure_id"', StringType(), True), StructField('"trans_group_id"', StringType(), True), StructField('"trans_group_ar"', StringType(), True), StructField('"trans_group_en"', StringType(), True), StructField('"procedure_name_ar"', StringType(), True), StructField('"procedure_name_en"', StringType(), True), StructField('"instance_date"', StringType(), True), StructField('"property_type_id"', StringType(), True), StructField('"property_type_ar"', StringType(), True), StructField('"property_type_en"', StringType(), True), StructField('"property_sub_type_id"', StringType(), True), StructField('"property_sub_type_ar"', StringType(), True), StructField('"property_sub_type_en"', StringType(), True), StructField('"property_usage_ar"', StringType(), True), StructField('"property_usage_en"', StringType(), True), StructField('"reg_type_id"', StringType(), True), StructField('"reg_type_ar"', StringType(), True), StructF

In [13]:
#Load the CSV using Spark

df_raw = spark.read.csv(
    csv_path,
    header=True,
    schema=schema,
    mode="DROPMALFORMED"
)


In [14]:
#Validate ingestion

df_raw.printSchema()
print("Row count:", df_raw.count())
df_raw.show(5)
print("Partitions:", df_raw.rdd.getNumPartitions())


root
 |-- "transaction_id": string (nullable = true)
 |-- "procedure_id": string (nullable = true)
 |-- "trans_group_id": string (nullable = true)
 |-- "trans_group_ar": string (nullable = true)
 |-- "trans_group_en": string (nullable = true)
 |-- "procedure_name_ar": string (nullable = true)
 |-- "procedure_name_en": string (nullable = true)
 |-- "instance_date": string (nullable = true)
 |-- "property_type_id": string (nullable = true)
 |-- "property_type_ar": string (nullable = true)
 |-- "property_type_en": string (nullable = true)
 |-- "property_sub_type_id": string (nullable = true)
 |-- "property_sub_type_ar": string (nullable = true)
 |-- "property_sub_type_en": string (nullable = true)
 |-- "property_usage_ar": string (nullable = true)
 |-- "property_usage_en": string (nullable = true)
 |-- "reg_type_id": string (nullable = true)
 |-- "reg_type_ar": string (nullable = true)
 |-- "reg_type_en": string (nullable = true)
 |-- "area_id": string (nullable = true)
 |-- "area_name_ar

In [15]:
#Basic casting for key columns
#convert important fields to correct types

from pyspark.sql.functions import col, to_date

df = df_raw

if "transaction_value" in df.columns:
    df = df.withColumn("transaction_value", col("transaction_value").cast("double"))

if "area_sqm" in df.columns:
    df = df.withColumn("area_sqm", col("area_sqm").cast("double"))

if "transaction_date" in df.columns:
    df = df.withColumn("transaction_date", to_date(col("transaction_date")))


In [17]:
#cleaning

if "transaction_value" in df.columns:
    df = df.dropna(subset=["transaction_value"]) \
           .filter(col("transaction_value") > 0)


In [18]:
#Show partitioning evidence

df.rdd.getNumPartitions()
df = df.repartition(200)
df.rdd.getNumPartitions()


200

In [19]:
out_path = "/content/drive/MyDrive/dld_transactions/cleaned_parquet"
df.write.mode("overwrite").parquet(out_path)

print("Parquet saved to:", out_path)


Parquet saved to: /content/drive/MyDrive/dld_transactions/cleaned_parquet


In [21]:
#Build the correct explicit schema (recommended)

#This cell defines a proper StructType (IDs as integers/strings, numerics as doubles, date as timestamp-ready)

from pyspark.sql.types import (
    StructType, StructField,
    StringType, IntegerType, DoubleType
)

schema = StructType([
    StructField("transaction_id", StringType(), True),

    StructField("procedure_id", IntegerType(), True),
    StructField("trans_group_id", IntegerType(), True),
    StructField("trans_group_ar", StringType(), True),
    StructField("trans_group_en", StringType(), True),

    StructField("procedure_name_ar", StringType(), True),
    StructField("procedure_name_en", StringType(), True),

    StructField("instance_date", StringType(), True),  # parse after load

    StructField("property_type_id", IntegerType(), True),
    StructField("property_type_ar", StringType(), True),
    StructField("property_type_en", StringType(), True),

    StructField("property_sub_type_id", IntegerType(), True),
    StructField("property_sub_type_ar", StringType(), True),
    StructField("property_sub_type_en", StringType(), True),

    StructField("property_usage_ar", StringType(), True),
    StructField("property_usage_en", StringType(), True),

    StructField("reg_type_id", IntegerType(), True),
    StructField("reg_type_ar", StringType(), True),
    StructField("reg_type_en", StringType(), True),

    StructField("area_id", IntegerType(), True),
    StructField("area_name_ar", StringType(), True),
    StructField("area_name_en", StringType(), True),

    StructField("building_name_ar", StringType(), True),
    StructField("building_name_en", StringType(), True),

    StructField("project_number", StringType(), True),
    StructField("project_name_ar", StringType(), True),
    StructField("project_name_en", StringType(), True),

    StructField("master_project_en", StringType(), True),
    StructField("master_project_ar", StringType(), True),

    StructField("nearest_landmark_ar", StringType(), True),
    StructField("nearest_landmark_en", StringType(), True),

    StructField("nearest_metro_ar", StringType(), True),
    StructField("nearest_metro_en", StringType(), True),

    StructField("nearest_mall_ar", StringType(), True),
    StructField("nearest_mall_en", StringType(), True),

    StructField("rooms_ar", StringType(), True),
    StructField("rooms_en", StringType(), True),

    StructField("has_parking", IntegerType(), True),

    StructField("procedure_area", DoubleType(), True),
    StructField("actual_worth", DoubleType(), True),
    StructField("meter_sale_price", DoubleType(), True),
    StructField("rent_value", DoubleType(), True),
    StructField("meter_rent_price", DoubleType(), True),

    StructField("no_of_parties_role_1", IntegerType(), True),
    StructField("no_of_parties_role_2", IntegerType(), True),
    StructField("no_of_parties_role_3", IntegerType(), True),
])


In [23]:
#Reload the CSV using this schema (clean, official ingestion)

csv_path = "/content/drive/MyDrive/dld_transactions/dld_transactions.csv"

df = spark.read.csv(
    csv_path,
    header=True,
    schema=schema,
    mode="DROPMALFORMED"
)


In [24]:
#Parse the date + create time features
# parse instance_date and create year/month/day-of-week

from pyspark.sql.functions import col, to_timestamp, year, month, dayofweek

df = df.withColumn("instance_ts", to_timestamp(col("instance_date")))
df = df.withColumn("year", year(col("instance_ts"))) \
       .withColumn("month", month(col("instance_ts"))) \
       .withColumn("dow", dayofweek(col("instance_ts")))


In [25]:
#Missing value & sanity cleaning

from pyspark.sql.functions import col

# Drop rows missing key numeric targets (pick one later; for now keep both)
df = df.dropna(subset=["actual_worth", "procedure_area"])

# Filter impossible values
df = df.filter(col("procedure_area") > 0)
df = df.filter(col("actual_worth") > 0)

# Optional: ensure parking is 0/1-ish (some datasets use null/1)
df = df.fillna({"has_parking": 0})


In [26]:
#Partitioning evidence

print("Before repartition:", df.rdd.getNumPartitions())
df = df.repartition(200)
print("After repartition:", df.rdd.getNumPartitions())


Before repartition: 8
After repartition: 200


In [28]:
#Save the handoff dataset as Parquet

out_path = "/content/drive/MyDrive/dld_transactions/cleaned_parquet"
df.write.mode("overwrite").parquet(out_path)
print("Saved Parquet to:", out_path)


Saved Parquet to: /content/drive/MyDrive/dld_transactions/cleaned_parquet


In [29]:
#screenshots for ingestion

df.printSchema()
print("Row count:", df.count())
df.show(5, truncate=False)
print("Partitions:", df.rdd.getNumPartitions())


root
 |-- transaction_id: string (nullable = true)
 |-- procedure_id: integer (nullable = true)
 |-- trans_group_id: integer (nullable = true)
 |-- trans_group_ar: string (nullable = true)
 |-- trans_group_en: string (nullable = true)
 |-- procedure_name_ar: string (nullable = true)
 |-- procedure_name_en: string (nullable = true)
 |-- instance_date: string (nullable = true)
 |-- property_type_id: integer (nullable = true)
 |-- property_type_ar: string (nullable = true)
 |-- property_type_en: string (nullable = true)
 |-- property_sub_type_id: integer (nullable = true)
 |-- property_sub_type_ar: string (nullable = true)
 |-- property_sub_type_en: string (nullable = true)
 |-- property_usage_ar: string (nullable = true)
 |-- property_usage_en: string (nullable = true)
 |-- reg_type_id: integer (nullable = true)
 |-- reg_type_ar: string (nullable = true)
 |-- reg_type_en: string (nullable = true)
 |-- area_id: integer (nullable = true)
 |-- area_name_ar: string (nullable = true)
 |-- are

In [33]:
df.select("instance_date").show(5, truncate=False)


+-------------+
|instance_date|
+-------------+
|14-11-2013   |
|27-01-2020   |
|10-09-2014   |
|01-06-2020   |
|09-11-2016   |
+-------------+
only showing top 5 rows



In [34]:
!ls -lh /content/drive/MyDrive/dld_transactions/cleaned_parquet


total 6.8M
-rw------- 1 root root 34K Jan 28 11:26 part-00000-6d0dc871-b810-4b46-bcea-9a48fda3725a-c000.snappy.parquet
-rw------- 1 root root 35K Jan 28 11:26 part-00001-6d0dc871-b810-4b46-bcea-9a48fda3725a-c000.snappy.parquet
-rw------- 1 root root 35K Jan 28 11:26 part-00002-6d0dc871-b810-4b46-bcea-9a48fda3725a-c000.snappy.parquet
-rw------- 1 root root 34K Jan 28 11:26 part-00003-6d0dc871-b810-4b46-bcea-9a48fda3725a-c000.snappy.parquet
-rw------- 1 root root 35K Jan 28 11:26 part-00004-6d0dc871-b810-4b46-bcea-9a48fda3725a-c000.snappy.parquet
-rw------- 1 root root 34K Jan 28 11:26 part-00005-6d0dc871-b810-4b46-bcea-9a48fda3725a-c000.snappy.parquet
-rw------- 1 root root 35K Jan 28 11:26 part-00006-6d0dc871-b810-4b46-bcea-9a48fda3725a-c000.snappy.parquet
-rw------- 1 root root 35K Jan 28 11:26 part-00007-6d0dc871-b810-4b46-bcea-9a48fda3725a-c000.snappy.parquet
-rw------- 1 root root 34K Jan 28 11:26 part-00008-6d0dc871-b810-4b46-bcea-9a48fda3725a-c000.snappy.parquet
-rw------- 1 root