##Analyzing California Housing Data


In [1]:
pip install pyspark



In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, max, min

In [3]:
# Initialize Spark Session
spark = SparkSession.builder.appName("California Housing Analysis").getOrCreate()

In [4]:
# Load Built-in Dataset
train_df = spark.read.csv("sample_data/california_housing_train.csv", header=True, inferSchema=True)
test_df = spark.read.csv("sample_data/california_housing_test.csv", header=True, inferSchema=True)

First we consider only train dataset for our analysis

In [5]:
# Show the first few rows of the dataset
train_df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
+---------+--------+----

In [6]:
# Get shape
num_rows = train_df.count()
num_cols= len(train_df.columns)
print(f"shape : ({num_rows},{num_cols})")

shape : (17000,9)


In [7]:
# Get description (summary statistics)
train_df.describe().show()

+-------+-------------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+------------------+------------------+
|summary|          longitude|          latitude|housing_median_age|      total_rooms|   total_bedrooms|        population|       households|     median_income|median_house_value|
+-------+-------------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+------------------+------------------+
|  count|              17000|             17000|             17000|            17000|            17000|             17000|            17000|             17000|             17000|
|   mean|-119.56210823529375|  35.6252247058827| 28.58935294117647|2643.664411764706|539.4108235294118|1429.5739411764705|501.2219411764706| 3.883578100000021|207300.91235294117|
| stddev| 2.0051664084260357|2.1373397946570867|12.586936981660406|2179.947071452777|421.4994515798648| 1

In [8]:
# Get data types and non-null counts
train_df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



In [9]:
# Register DataFrame as Temp Table
train_df.createOrReplaceTempView("california_housing")

In [10]:
# Example Analysis 1: Calculate average house value by median income bracket
Analysis1= spark.sql("""
  SELECT median_income, AVG(median_house_value) as avg_house_value
  FROM california_housing
  GROUP BY median_income
  ORDER BY median_income
""")

In [11]:
Analysis1.show()

+-------------+------------------+
|median_income|   avg_house_value|
+-------------+------------------+
|       0.4999|133027.36363636365|
|        0.536| 163571.2857142857|
|       0.6433|          111300.0|
|       0.6775|          350000.0|
|       0.6825|          187500.0|
|       0.6831|           87500.0|
|        0.696|           42500.0|
|       0.6991|           89500.0|
|       0.7007|          134400.0|
|       0.7025|          500001.0|
|       0.7068|          200000.0|
|       0.7069|           70300.0|
|       0.7075|           78800.0|
|        0.716|          104200.0|
|       0.7286|           95200.0|
|       0.7445|          112500.0|
|       0.7473|          168800.0|
|       0.7526|          162500.0|
|       0.7591|          350000.0|
|         0.76|          162500.0|
+-------------+------------------+
only showing top 20 rows



In [12]:
# Example Analysis 2: Find the maximum and minimum house values in each housing block
Analysis2 = spark.sql("""
  SELECT longitude, latitude, MAX(median_house_value) as max_house_value,
        MIN(median_house_value) as min_house_value
  FROM california_housing
  GROUP BY longitude, latitude
  ORDER BY max_house_value DESC
  LIMIT 10
""")
Analysis2.show()

+---------+--------+---------------+---------------+
|longitude|latitude|max_house_value|min_house_value|
+---------+--------+---------------+---------------+
|  -119.23|   34.19|       500001.0|       500001.0|
|   -118.4|   34.13|       500001.0|       500001.0|
|  -118.12|   33.76|       500001.0|       461500.0|
|  -118.07|   33.72|       500001.0|       485000.0|
|  -118.43|   34.02|       500001.0|       500001.0|
|  -117.31|   32.82|       500001.0|       500001.0|
|  -118.57|   34.27|       500001.0|       500001.0|
|  -118.48|   33.96|       500001.0|       500001.0|
|  -122.06|    37.3|       500001.0|       500001.0|
|  -118.13|   33.79|       500001.0|       234800.0|
+---------+--------+---------------+---------------+



In [13]:
# Analysis 3: Determine the average house value for houses older than 50 years
Analysis3 = spark.sql("""
  SELECT AVG(median_house_value) as avg_old_house_value
  FROM california_housing
  WHERE housing_median_age > 50
""")

In [14]:
Analysis3.show()

+-------------------+
|avg_old_house_value|
+-------------------+
|  276408.2573800738|
+-------------------+



In [15]:
# 1. Average House Value by Proximity to Ocean (Based on Longitude)
result1 = spark.sql("""
 SELECT CASE
     WHEN longitude <  -122 THEN "Near Ocean"
     ELSE "Far from Ocean"
    END as proximity_to_ocean,
  AVG (median_house_value) as avg_house_value
  FROM california_housing
  GROUP BY proximity_to_ocean
"""   )
result1.show()

+------------------+------------------+
|proximity_to_ocean|   avg_house_value|
+------------------+------------------+
|        Near Ocean|  245589.432132964|
|    Far from Ocean|198254.34113882628|
+------------------+------------------+



In [16]:
# 2. Average Number of Rooms by Age of Housing
result2 = spark.sql("""
    SELECT housing_median_age,
           AVG(total_rooms) as avg_rooms
    FROM california_housing
    GROUP BY housing_median_age
    ORDER BY housing_median_age
""")
result2.show()

+------------------+------------------+
|housing_median_age|         avg_rooms|
+------------------+------------------+
|               1.0|            2158.0|
|               2.0| 5237.102040816327|
|               3.0| 6920.326086956522|
|               4.0| 6065.614906832298|
|               5.0| 4926.261306532663|
|               6.0| 4365.062015503876|
|               7.0| 5672.907284768212|
|               8.0|4076.7865168539324|
|               9.0|3813.4651162790697|
|              10.0|3518.4823008849557|
|              11.0| 3971.105769230769|
|              12.0| 4216.661458333333|
|              13.0|3692.5060240963853|
|              14.0| 3720.348703170029|
|              15.0|3283.3173076923076|
|              16.0| 3047.636220472441|
|              17.0|3133.9670138888887|
|              18.0|3098.4435146443516|
|              19.0| 3005.140776699029|
|              20.0|2852.3342036553527|
+------------------+------------------+
only showing top 20 rows



In [17]:
# 3. Top 10 Locations with the Highest Median Income
result3 = spark.sql("""
    SELECT longitude, latitude, median_income
    FROM california_housing
    ORDER BY median_income DESC
    LIMIT 10
""")
result3.show()

+---------+--------+-------------+
|longitude|latitude|median_income|
+---------+--------+-------------+
|  -118.04|   34.13|      15.0001|
|  -118.34|   34.08|      15.0001|
|  -118.06|   33.72|      15.0001|
|  -118.19|   34.19|      15.0001|
|  -118.33|   34.06|      15.0001|
|  -118.32|   34.06|      15.0001|
|  -117.23|   32.99|      15.0001|
|  -118.33|   34.07|      15.0001|
|  -118.18|   34.19|      15.0001|
|  -118.12|   34.12|      15.0001|
+---------+--------+-------------+



In [18]:
# 4. Relationship Between Population and House Value (Correlation Analysis)
result4 = spark.sql("""
    SELECT POPULATION, median_house_value
    FROM california_housing
""")
result4.corr("population", "median_house_value")

-0.02785006112089839

In [19]:
# 5. Average House Value by Income Level (Low, Medium, High)
result5 = spark.sql("""
    SELECT CASE
               WHEN median_income < 2.5 THEN 'Low Income'
               WHEN median_income BETWEEN 2.5 AND 4.5 THEN 'Medium Income'
               ELSE 'High Income'
           END as income_level,
           AVG(median_house_value) as avg_house_value
    FROM california_housing
    GROUP BY income_level
""")
result5.show()

+-------------+------------------+
| income_level|   avg_house_value|
+-------------+------------------+
|  High Income| 306535.0667466027|
|   Low Income|120705.26480304956|
|Medium Income|187971.56916015383|
+-------------+------------------+



In [20]:
# 6. Number of Houses in Different Age Groups
result6 = spark.sql("""
    SELECT CASE
               WHEN housing_median_age < 20 THEN 'New'
               WHEN housing_median_age BETWEEN 20 AND 40 THEN 'Middle-aged'
               ELSE 'Old'
           END as age_group,
           COUNT(*) as house_count
    FROM california_housing
    GROUP BY age_group
    ORDER BY house_count DESC
""")
result6.show()

+-----------+-----------+
|  age_group|house_count|
+-----------+-----------+
|Middle-aged|       9004|
|        New|       4826|
|        Old|       3170|
+-----------+-----------+



In [21]:
# 7. Average Number of Bedrooms Per House
result7 = spark.sql("""
    SELECT AVG(total_bedrooms / households) as avg_bedrooms_per_house
    FROM california_housing
""")
result7.show()

+----------------------+
|avg_bedrooms_per_house|
+----------------------+
|    1.0972809315785848|
+----------------------+



In [22]:
# 8. Median House Value by Latitude and Longitude
result8 = spark.sql("""
    SELECT latitude, longitude,
           PERCENTILE_APPROX(median_house_value, 0.5) as median_value
    FROM california_housing
    GROUP BY latitude, longitude
    ORDER BY median_value DESC
    LIMIT 10
""")
result8.show()

+--------+---------+------------+
|latitude|longitude|median_value|
+--------+---------+------------+
|   32.95|  -117.26|    500001.0|
|   32.83|  -117.26|    500001.0|
|   32.83|  -117.31|    500001.0|
|   32.84|  -117.27|    500001.0|
|   32.76|  -117.19|    500001.0|
|   32.84|  -117.25|    500001.0|
|   32.72|  -117.23|    500001.0|
|   32.85|  -117.26|    500001.0|
|   32.81|  -117.29|    500001.0|
|   32.85|  -117.24|    500001.0|
+--------+---------+------------+



In [23]:
# 9. Households with More Than 4 Persons Per Household
result9 = spark.sql("""
    SELECT COUNT(*) as large_households
    FROM california_housing
    WHERE population / households > 4
""")
result9.show()

+----------------+
|large_households|
+----------------+
|            1437|
+----------------+



In [24]:
# 10. Areas with the Highest House Density (Rooms per Area)
result10 = spark.sql("""
    SELECT latitude, longitude, (total_rooms / housing_median_age) as rooms_density
    FROM california_housing
    ORDER BY rooms_density DESC
    LIMIT 10
""")
result10.show()

+--------+---------+-----------------+
|latitude|longitude|    rooms_density|
+--------+---------+-----------------+
|   38.72|  -121.35|          10948.5|
|   33.57|  -117.16|          10195.5|
|   33.89|  -117.74|          9484.25|
|   33.89|  -117.52|           8989.0|
|   33.52|  -117.12|          7600.25|
|    33.9|  -117.19|           7020.0|
|   38.77|  -121.33|           6738.0|
|    33.9|  -117.33|           6418.5|
|   38.66|  -121.13|           6180.0|
|   33.97|  -117.21|6118.666666666667|
+--------+---------+-----------------+



Now we perform predictive Analysis

In [25]:
feature_cols = ['longitude', 'latitude', 'housing_median_age',
                'total_rooms', 'total_bedrooms', 'population',
                'households', 'median_income']

In [26]:
# VectorAssembler combines feature columns into a single vector column
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

In [27]:
train_df = assembler.transform(train_df)
test_df = assembler.transform(test_df)

In [28]:
# Split train data into training and validation sets
train_data, validation_data = train_df.randomSplit([0.8, 0.2], seed=1234)

In [29]:
# Initialize the Gradient Boosted Trees Regressor
from pyspark.ml.regression import GBTRegressor
gbt_model = GBTRegressor(featuresCol="features", labelCol="median_house_value", maxIter=100)


In [30]:
# Train the model on the training data
gbt_fitted_model = gbt_model.fit(train_data)

In [31]:
# Make predictions on the validation data
validation_predictions = gbt_fitted_model.transform(validation_data)

In [32]:
from pyspark.ml.evaluation import RegressionEvaluator
r2_evaluator = RegressionEvaluator(labelCol="median_house_value", predictionCol="prediction", metricName="r2")

In [33]:

# Calculate evaluation metrics for the validation set
r2 = r2_evaluator.evaluate(validation_predictions)

In [34]:
print(f"Validation R²: {r2}")

Validation R²: 0.7951325591336172


In [35]:
# Make predictions on the test set using the trained model
test_predictions = gbt_fitted_model.transform(test_df)

In [36]:
# Evaluate the model on the test set
r2_test = r2_evaluator.evaluate(test_predictions)

In [38]:
print(f"Test R²: {r2_test}")

Test R²: 0.7738505979617136
