Notes:
* using the select statement to see the avg and do data exploration
* using with column to add a column




In [53]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable

os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [54]:
!pip install pyspark



In [55]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("camnugent/california-housing-prices")

print("Path to dataset files:", path)

Path to dataset files: /kaggle/input/california-housing-prices


In [56]:
from pyspark.sql import SparkSession

# get acess to spark variable using sparksession
spark = SparkSession.builder.master("local").appName("Colab").config("spark.ui.port","4050").getOrCreate()

In [57]:
df = spark.read.csv("/kaggle/input/california-housing-prices", header= True, inferSchema=True)

In [58]:
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [59]:
df.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [60]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [61]:
# adding a column using with column and then the first variable is the columnn name
df = df.withColumn("id", monotonically_increasing_id())

# now rearrange the column
df = df[["id"]+ df.columns[:-1]] # take id columns and also all except the last one "id" +"longitude" + "total_rooms" or "id" + df.columns[:-1]

In [62]:
df.show(2)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
only showing top 2 rows



In [63]:
df.count()

20640

In [64]:
# using the select statement to see the avg and do data exploration
df.select(avg("total_rooms").alias("the avg num of rooms")).show()

+--------------------+
|the avg num of rooms|
+--------------------+
|  2635.7630813953488|
+--------------------+



In [65]:
df.select(avg("population").alias("avg population")).show()

+------------------+
|    avg population|
+------------------+
|1425.4767441860465|
+------------------+



In [66]:
df.select(median("households").alias("mediean households")).show()

+------------------+
|mediean households|
+------------------+
|             409.0|
+------------------+



In [67]:
df.groupBy("ocean_proximity").count().show()

+---------------+-----+
|ocean_proximity|count|
+---------------+-----+
|         ISLAND|    5|
|     NEAR OCEAN| 2658|
|       NEAR BAY| 2290|
|      <1H OCEAN| 9136|
|         INLAND| 6551|
+---------------+-----+



In [68]:
df.groupBy("ocean_proximity").avg("median_income").show()

+---------------+------------------+
|ocean_proximity|avg(median_income)|
+---------------+------------------+
|         ISLAND|2.7444200000000003|
|     NEAR OCEAN| 4.005784800601957|
|       NEAR BAY| 4.172884759825336|
|      <1H OCEAN|4.2306819176882655|
|         INLAND| 3.208996382231716|
+---------------+------------------+



 # UDF user defined function in pysprk

In [69]:
def squared(x):
  return x*x

squared_udf = udf(squared, FloatType())

In [70]:
df.withColumn("squared_salary", squared_udf("median_income")).show()

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+--------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|squared_salary|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+--------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|      69.30895|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|      68.91324|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|     52.669853|
|  3|  -122.25|   37.85|              52.0|     1274.0|   

# ML Lib

In [71]:
# Step 1 : divide the dataset into train and test

In [72]:
train, test = df.randomSplit([0.7,0.3],3)

In [73]:
train.dropDuplicates()

DataFrame[id: bigint, longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double, ocean_proximity: string]

In [74]:
# Categorical Features: ocean_proximity , Numberical/Continious Features: Rest of Number cols

In [75]:
# Step 2: Deal with Continious variables

In [76]:
print(train.columns)

['id', 'longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income', 'median_house_value', 'ocean_proximity']


In [77]:
numerical_features = train.columns.copy()  # make a copy just in case

numerical_features.remove("id")
numerical_features.remove("median_house_value")
numerical_features.remove("ocean_proximity")


In [78]:
numerical_features

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income']

# Imputer in spark

In [79]:
from pyspark.ml.feature import *

In [80]:
imputer = Imputer(inputCols=numerical_features, outputCols= numerical_features).setStrategy("mean")

# Fit the imputer on the training data (learn the means) # .fit(train) will compute: mean_total_rooms = (100 + 200 + 300) / 3 = 200.0
imputer = imputer.fit(train)

# Apply the learned imputer model to training data: replaces the None in total_rooms with 200.0.

train = imputer.transform(train)
test =imputer.transform(test)

In [81]:
from pyspark.ml.feature import VectorAssembler

In [82]:
train = train.drop("numerical_feature_vector")
test = test.drop("numerical_feature_vector")

In [83]:
numerical_vector_assembler = VectorAssembler(inputCols=numerical_features,
                                             outputCol="numerical_feature_vector")

train = numerical_vector_assembler.transform(train)
test = numerical_vector_assembler.transform(test)

In [84]:
train.show()

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|    [-122.22,37.86,21...|
|  4|  -122.25|   37.85|              52.0|     1627.0|         280.0|     565.0|     259.0|       3.8462|          342200.0|       NEAR BAY|    [-122.25,37.85,52...

In [85]:
train.select("numerical_feature_vector").take(2)

[Row(numerical_feature_vector=DenseVector([-122.23, 37.88, 41.0, 880.0, 129.0, 322.0, 126.0, 8.3252])),
 Row(numerical_feature_vector=DenseVector([-122.22, 37.86, 21.0, 7099.0, 1106.0, 2401.0, 1138.0, 8.3014]))]

# Standardizing our data



In [86]:
scaler = StandardScaler(
    inputCol="numerical_feature_vector",   # ✅ this column exists
    outputCol="scaled_features",
    withStd=True,
    withMean=True
)

In [87]:
train =train.drop("numerical_feature_vector_")

In [88]:
scaler_model = scaler.fit(train)
train = scaler_model.transform(train)
test = scaler_model.transform(test)


In [89]:
train.show(2)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+--------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|     scaled_features|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+--------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|[-1.3367306965298...|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|    [-122.22,37.86,21...|[-1.3317245824612...|
+---+---------+--------+------------------+-----------+-----

In [90]:
train.select("scaled_features").take(2)

[Row(scaled_features=DenseVector([-1.3367, 1.058, 0.9811, -0.8161, -0.9827, -0.9985, -0.9867, 2.3587])),
 Row(scaled_features=DenseVector([-1.3317, 1.0486, -0.6087, 2.0975, 1.3765, 0.8918, 1.704, 2.3461]))]

# Now for the Categorical varaible - this is ordinal ranking
StringIndexer → Label Encoding
Converts strings like "low", "medium", "high" → numeric indices 2.0, 0.0, 1.0

In [91]:
index = StringIndexer(inputCol="ocean_proximity",
                      outputCol="ocean_category_index")
indexer = index.fit(train)
train = indexer.transform(train)
test = indexer.transform(test)


In [92]:
train.show(2)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+--------------------+--------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|     scaled_features|ocean_category_index|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+--------------------+--------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|[-1.3367306965298...|                 3.0|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|    [-122.22,37.86,21..

In [93]:
train.select("ocean_category_index").distinct().show()

+--------------------+
|ocean_category_index|
+--------------------+
|                 0.0|
|                 1.0|
|                 4.0|
|                 3.0|
|                 2.0|
+--------------------+



# this is one hot encoding


- OneHotEncoder in PySpark expects numeric indices (like 0, 1, 2)
- not strings like "New York" or "Medium".
- ❌ Can't one-hot encode raw strings in PySpark.
- Must use StringIndexer first, then OneHotEncoder.

In [94]:
one_hot_encoer = OneHotEncoder(inputCol = "ocean_category_index",
                                    outputCol = "ocean_category_one_hot")
one_hot_encoer = one_hot_encoer.fit(train)
train =one_hot_encoer.transform(train)
test =one_hot_encoer.transform(test)


In [95]:
# @title Default title text
train.show()

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+--------------------+--------------------+----------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|     scaled_features|ocean_category_index|ocean_category_one_hot|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+--------------------+--------------------+----------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|[-1.3367306965298...|                 3.0|         (4,[3],[1.0])|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|  

# differnece in one hot encoding and ordinal
## 🎯 When to Use Ordinal vs One-Hot Encoding
# but in spark its always String indexer --> one hot (becasue one hot doesnt recognize strings)

- **Ordinal Encoding** ✅  
  Use when categories have a **natural order**  
  _Examples: Low < Medium < High, Beginner < Expert_

- **One-Hot Encoding** ✅  
  Use when categories have **no order**  
  _Examples: City names, Colors, Product types_

❗️**Tip:**  
Don’t use ordinal encoding for unordered data — it creates fake rankings!


In [96]:
assembler = VectorAssembler(inputCols=['scaled_features',
                                       'ocean_category_one_hot'],
                            outputCol='final_feature_vector')

train = assembler.transform(train)
test = assembler.transform(test)

# applying linear regreassion

In [97]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol='final_feature_vector',
                      labelCol='median_house_value')

In [98]:
lr = lr.fit(train)

In [99]:
train.show(2)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+--------------------+--------------------+----------------------+--------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|     scaled_features|ocean_category_index|ocean_category_one_hot|final_feature_vector|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+--------------------+--------------------+----------------------+--------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|[-1.3367306965298...|                 3.0|         (4,[3],[1.0])|[-1.336730

In [100]:
pred_train_df = lr.transform(train)

In [101]:
pred_train_df.show()

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+--------------------+--------------------+----------------------+--------------------+------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|     scaled_features|ocean_category_index|ocean_category_one_hot|final_feature_vector|        prediction|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+--------------------+--------------------+----------------------+--------------------+------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|[-1.3367306965298.

In [102]:
pred_train_df.withColumnRenamed('prediction', 'predicted_median_house_value')

DataFrame[id: bigint, longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double, ocean_proximity: string, numerical_feature_vector: vector, scaled_features: vector, ocean_category_index: double, ocean_category_one_hot: vector, final_feature_vector: vector, predicted_median_house_value: double]

In [103]:
pred_test_df = lr.transform(test).withColumnRenamed('prediction', 'predicted_median_house_value')

# using custom row by row logic hence using rdd






In [105]:
predictions_and_actuals = pred_test_df[['predicted_median_house_value',
                                        'median_house_value']]

predictions_and_actuals_rdd = predictions_and_actuals.rdd

predictions_and_actuals_rdd.take(2)

[Row(predicted_median_house_value=379503.97897680313, median_house_value=352100.0),
 Row(predicted_median_house_value=322403.2335844542, median_house_value=341300.0)]

RDDs let you use regular Python functions (map, reduce, etc.) on rows.

For example: computing RMSE, MAE, or MSE manually using .map() and .reduce().

✅ 2. Access row-level tuples:
.rdd turns your DataFrame into something like:a

[(250000.0, 230000.0), (180000.0, 185000.0), ...]



In [106]:
predictions_and_actuals_rdd = predictions_and_actuals_rdd.map(tuple)

predictions_and_actuals_rdd.take( 2)

[(379503.97897680313, 352100.0), (322403.2335844542, 341300.0)]

# You used .rdd because RegressionMetrics only works with RDDs — and specifically tuple-based RDDs — not DataFrames. You converted the DataFrame to an RDD because RegressionMetrics requires an RDD of (prediction, actual) tuples as input. The .map(tuple) step ensures the format matches what the evaluator expects.

In [107]:
from pyspark.mllib.evaluation import RegressionMetrics

metrics = RegressionMetrics(predictions_and_actuals_rdd)

s = '''
Mean Squared Error:      {0}
Root Mean Squared Error: {1}
Mean Absolute Error:     {2}
R**2:                    {3}
'''.format(metrics.meanSquaredError,
           metrics.rootMeanSquaredError,
           metrics.meanAbsoluteError,
           metrics.r2
           )

print(s)




Mean Squared Error:      4952770209.614867
Root Mean Squared Error: 70375.9206661971
Mean Absolute Error:     50257.610313496014
R**2:                    0.6314275906655809



so we are taking rdd so