## Data Loading & Exploration
### Load Data:
- Load the dataset into PySpark from a CSV or Parquet file.
- Inspect the schema and ensure column types are correctly inferred.
### Basic Statistics:
- Calculate the average Median_Price and Median_PSF across all properties.
- Identify the state with the highest and lowest median property prices.
### Filter & Grouping:
- Filter properties with Median_Price > 1 million MYR and count them by Type.
- Group data by State and calculate:
    - Total Transactions.
    - Average Median_PSF.
### Data Transformation:
- Create a new column, Price_to_PSF_Ratio, as Median_Price / Median_PSF.
- Categorize properties into price ranges (e.g., <500k, 500k-1M, >1M) and count them.
### Insights by Property Type:
- Compare the average Median_Price for Freehold vs. Leasehold properties by Type.
- Find the Township with the highest number of Transactions for condominiums.
### Area Insights:
- Determine the Area with the highest Median_PSF.
- Identify the most popular Type of property in each Area.
- Rank Townships by Median Price within Each Area

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql import functions as F
from pyspark.sql.types import *
pricing_schema = StructType([
    StructField("Township", StringType(), True),
    StructField("Area", StringType(), True),
    StructField("State", StringType(), True),
    StructField("Tenure", StringType(), True),
    StructField("Type", StringType(), True),
    StructField("Median_Price", DoubleType(), True),
    StructField("Median_PSF", DoubleType(), True),
    StructField("Transactions", IntegerType(), True)
])
# columns Township,Area,State,Tenure,Type,Median_Price,Median_PSF,Transactions
spark = SparkSession.builder.appName("MY-housing-price").getOrCreate()
df = spark.read.csv("malaysia_house_price_data_2025.csv", header=True, schema=pricing_schema)

In [2]:
df.show(3)

+--------------------+-------------+------+--------+--------------------+------------+----------+------------+
|            Township|         Area| State|  Tenure|                Type|Median_Price|Median_PSF|Transactions|
+--------------------+-------------+------+--------+--------------------+------------+----------+------------+
| SCIENTEX SUNGAI DUA|Tasek Gelugor|Penang|Freehold|       Terrace House|    331800.0|     304.0|         593|
|        BANDAR PUTRA|        Kulai| Johor|Freehold|Cluster House, Te...|    590900.0|     322.0|         519|
|TAMAN LAGENDA TRO...|  Chenderiang| Perak|Freehold|       Terrace House|    229954.0|     130.0|         414|
+--------------------+-------------+------+--------+--------------------+------------+----------+------------+
only showing top 3 rows



- Calculate the average `Median_Price` and `Median_PSF` across all properties.

In [9]:
avg_prices = df.select('Median_Price', 'Median_PSF').agg({
    "Median_Price": "avg",
    'Median_PSF': "avg"
}).withColumnRenamed("avg(Median_Price)", "avg_median_price") \
.withColumnRenamed("avg(Median_PSF)", "avg_median_psf")
avg_prices.show()

+--------------+----------------+
|avg_median_psf|avg_median_price|
+--------------+----------------+
|      328.8625|     490685.4105|
+--------------+----------------+



- Identify the state with the highest and lowest median property prices.

In [29]:
group_by_states = df.select("State", "Median_Price") \
.groupBy("State").agg(F.round(F.avg("Median_Price"), 2).alias("Average_Median_Price"))
group_by_states.orderBy("Average_Median_Price", ascending=False).limit(1).show()
group_by_states.orderBy("Average_Median_Price", ascending=True).limit(1).show()

+------------+--------------------+
|       State|Average_Median_Price|
+------------+--------------------+
|Kuala Lumpur|           853342.09|
+------------+--------------------+

+------+--------------------+
| State|Average_Median_Price|
+------+--------------------+
|Labuan|            130000.0|
+------+--------------------+



Filter properties with Median_Price > 1 million MYR and count them by Type.

In [3]:
count_by_type = df.select('Median_Price', 'Type').filter(df.Median_Price > 1000000).groupBy('Type').agg(F.count('Median_Price'))
count_by_type.show()

+--------------------+-------------------+
|                Type|count(Median_Price)|
+--------------------+-------------------+
|Town House, Semi ...|                  1|
|Bungalow, Terrace...|                  1|
|Terrace House, Bu...|                  4|
|   Service Residence|                 14|
|Terrace House, Bu...|                  1|
|Semi D, Cluster H...|                  1|
|Terrace House, Cl...|                  1|
|Cluster House, Te...|                  1|
|       Terrace House|                 35|
|Bungalow, Terrace...|                  9|
|Cluster House, Se...|                  2|
|Semi D, Bungalow,...|                  1|
|         Condominium|                 22|
|              Semi D|                  6|
|Town House, Terra...|                  1|
|Terrace House, Se...|                  2|
|    Bungalow, Semi D|                  3|
|       Cluster House|                  5|
|    Semi D, Bungalow|                  3|
|Town House, Bunga...|                  1|
+----------

Group data by State and calculate:
- Total Transactions.
- Average Median_PSF.

In [19]:
total_transactionns = df.select('State', 'Transactions').filter(df.Median_Price > 1000000).groupBy('State').agg(F.sum('Transactions').alias('total_transactions'))
total_transactionns.show()
average_psf = df.select('State', 'Median_PSF').filter(df.Median_Price > 1000000).groupBy('State').agg(F.round(F.avg('Median_PSF'), 2).alias('Average_Median_PSF'))
average_psf.show()

+---------------+------------------+
|          State|total_transactions|
+---------------+------------------+
|       Selangor|              1483|
|Negeri Sembilan|                10|
|   Kuala Lumpur|               777|
|          Johor|               795|
|         Penang|               188|
|          Sabah|                16|
|      Putrajaya|                18|
+---------------+------------------+

+---------------+------------------+
|          State|Average_Median_PSF|
+---------------+------------------+
|       Selangor|            602.32|
|Negeri Sembilan|             335.0|
|   Kuala Lumpur|           1033.22|
|          Johor|            564.13|
|         Penang|            778.17|
|          Sabah|             537.0|
|      Putrajaya|             569.0|
+---------------+------------------+



Create a new column, Price_to_PSF_Ratio, as Median_Price / Median_PSF.
Categorize properties into price ranges (e.g., <500k, 500k-1M, >1M) and count them.

In [21]:
df.withColumn(col=F.round(df.Median_Price / df.Median_PSF), colName="Price_to_PSF_Ratio").select('Price_to_PSF_Ratio').show(5)

+------------------+
|Price_to_PSF_Ratio|
+------------------+
|            1091.0|
|            1835.0|
|            1769.0|
|            1172.0|
|            1305.0|
+------------------+
only showing top 5 rows



In [10]:
from pyspark.sql.functions import when, count
df_with_category = df.withColumn(
    "Price_Range",
    when(df["Median_Price"] < 500000, "<500k")
    .when((df["Median_Price"] >= 500000) & (df["Median_Price"] <= 1000000), "500k-1M")
    .otherwise(">1M")
)
result = df_with_category.select('Price_Range').groupBy('Price_Range').agg(F.count('Price_Range').alias("property_count"))
result.show()

+-----------+--------------+
|Price_Range|property_count|
+-----------+--------------+
|        >1M|           124|
|    500k-1M|           515|
|      <500k|          1361|
+-----------+--------------+



Insights by Property Type:

- Compare the average Median_Price for Freehold vs. Leasehold properties by Type.
- Find the Township with the highest number of Transactions for condominiums.

In [23]:
avg_by_Tenure = df.select('Tenure', 'Type', 'Median_Price') \
.filter(df.Tenure.isin("Freehold", "Leasehold")) \
.groupBy('Tenure', 'Type') \
.agg(F.round(F.avg('Median_Price'), 2).alias("avg_by_type"))  \
.orderBy('avg_by_type')
avg_by_Tenure.show()

+---------+--------------------+-----------+
|   Tenure|                Type|avg_by_type|
+---------+--------------------+-----------+
| Freehold|Condominium, Serv...|    84000.0|
| Freehold|                Flat|  149855.07|
|Leasehold|                Flat|  163909.68|
|Leasehold|     Apartment, Flat|   170000.0|
|Leasehold|     Flat, Apartment|   192750.0|
| Freehold|  Semi D, Town House|   200000.0|
| Freehold|Cluster House, To...|   205000.0|
| Freehold|Cluster House, Bu...|   240000.0|
|Leasehold|           Apartment|  277900.64|
| Freehold|           Apartment|  292200.85|
|Leasehold|    Semi D, Bungalow|   294400.0|
|Leasehold|Cluster House, Te...|   329000.0|
|Leasehold|Terrace House, Cl...|  332622.75|
| Freehold|Terrace House, Bu...|   335000.0|
|Leasehold|   Flat, Condominium|   350000.0|
|Leasehold|Semi D, Terrace H...|   350000.0|
| Freehold|Semi D, Terrace H...|   350000.0|
| Freehold|          Town House|   381500.0|
|Leasehold|       Terrace House|   392742.4|
|Leasehold

In [14]:
# Find the Township with the highest number of Transactions for condominiums.
df.select("Township", "Transactions", "Type") \
.filter(df.Type.rlike('.*Condominium.*')).groupBy("Township") \
.agg(F.sum('Transactions').alias('Total_Transactions')) \
.orderBy('Total_Transactions', ascending=False) \
.limit(5).show()

+--------------------+------------------+
|            Township|Total_Transactions|
+--------------------+------------------+
|        Sri Putramas|                60|
|Royal Domain Sri ...|                59|
|Palm Spring @ Dam...|                57|
|Angkasa Condominiums|                50|
|    Medini Signature|                50|
+--------------------+------------------+



- Determine the Area with the highest Median_PSF.
- Identify the most popular Type of property in each Area.

In [15]:
df.select('Area', 'Median_PSF').groupBy('Area').agg(F.avg('Median_PSF').alias("avg_PSF")) \
.orderBy('avg_PSF', ascending=False) \
.limit(5).show()

+-------------+-------+
|         Area|avg_PSF|
+-------------+-------+
|   KL Sentral| 2264.0|
|         KLCC| 1394.5|
| Ampang Hilir| 1224.0|
|Bukit Bintang| 1221.5|
|Desa ParkCity| 976.75|
+-------------+-------+



In [16]:
from pyspark.sql.window import Window
transactions_per_type_per_area = df.select('Type', 'Area', 'Transactions') \
.groupBy('Area', 'Type').agg(F.sum('Transactions').alias('Total_Transactions'))
window = Window.partitionBy('Area').orderBy(F.desc('Total_Transactions'))
ranked = transactions_per_type_per_area.withColumn('rank', F.row_number().over(window))
most_popular = ranked.filter(ranked.rank == 1).drop('rank')
most_popular.show()

+----------------+--------------------+------------------+
|            Area|                Type|Total_Transactions|
+----------------+--------------------+------------------+
|      Alor Gajah|       Terrace House|                23|
|      Alor Setar|              Semi D|                30|
|          Ampang|                Flat|                97|
|    Ampang Hilir|         Condominium|                11|
|        Ampangan|       Terrace House|               111|
|   Ara Damansara|         Condominium|                46|
|            Arau|Semi D, Terrace H...|                29|
|       Ayer Itam|                Flat|               262|
|      Ayer Molek|       Terrace House|                37|
|         Bachang|       Terrace House|                41|
|     Bagan Serai|Terrace House, Se...|                19|
|           Bahau|       Terrace House|               116|
|           Bakri|       Terrace House|                32|
|   Balai Panjang|       Terrace House|                6

Rank Townships by Median Price within Each Area

In [17]:
grouped_price = df.select('Area', 'Township', 'Median_Price').groupBy('Area', 'Township') \
.agg(F.avg("Median_Price").alias('Average_Median_Price'))
window = Window.partitionBy('Area').orderBy(F.desc('Average_Median_Price'))
ranked = grouped_price.withColumn('rank', F.row_number().over(window)).filter(col('rank') <= 3) # only check top 3 townships
ranked.show()

+-------------+--------------------+--------------------+----+
|         Area|            Township|Average_Median_Price|rank|
+-------------+--------------------+--------------------+----+
|   Alor Gajah| TAMAN KELEMAK UTAMA|            398760.0|   1|
|   Alor Gajah|  TAMAN KASA HEIGHTS|            205000.0|   2|
|   Alor Setar|       TAMAN PULASAN|            561200.0|   1|
|   Alor Setar|  TAMAN CAHAYA SURIA|            311111.0|   2|
|   Alor Setar|        TAMAN CASSIA|            300000.0|   3|
|       Ampang|TAMAN PANDAN PERDANA|            722500.0|   1|
|       Ampang|              M City|            530000.0|   2|
|       Ampang|   TAMAN PANDAN JAYA|            530000.0|   3|
| Ampang Hilir|            18 Madge|           3200728.0|   1|
|     Ampangan|OSKAR RESIDENCE @...|            669000.0|   1|
|     Ampangan|SEREMBAN FOREST H...|            637500.0|   2|
|     Ampangan|ARDEN HILL @SEREM...|            614500.0|   3|
|Ara Damansara|           Aragreens|           1672200.