In [1]:
!pip install pyspark py4j
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Basics").getOrCreate()

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=5e295c84befd4d04a225a75f6dae9b7f78e18f482f475f62f7d6fc4634c1a680
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:

file_path = "/content/drive/MyDrive/Project Dataset/modified_data1.csv"

housing = spark.read.csv(file_path, header=True, inferSchema=True)

housing.show()

+--------+--------+---+----+-------------+-----------+----------+
|  status|   price|bed|bath|         city|      state|house_size|
+--------+--------+---+----+-------------+-----------+----------+
|for_sale|105000.0|  3|   2|     Adjuntas|Puerto Rico|     920.0|
|for_sale| 80000.0|  4|   2|     Adjuntas|Puerto Rico|    1527.0|
|for_sale| 67000.0|  2|   1|   Juana Diaz|Puerto Rico|     748.0|
|for_sale|145000.0|  4|   2|        Ponce|Puerto Rico|    1800.0|
|for_sale|179000.0|  4|   3|San Sebastian|Puerto Rico|    2520.0|
|for_sale| 50000.0|  3|   1|       Ciales|Puerto Rico|    2040.0|
|for_sale| 71600.0|  3|   2|        Ponce|Puerto Rico|    1050.0|
|for_sale|100000.0|  2|   1|        Ponce|Puerto Rico|    1092.0|
|for_sale|150000.0|  3|   2|   Juana Diaz|Puerto Rico|    1045.0|
|for_sale| 79000.0|  5|   2|       Utuado|Puerto Rico|    1620.0|
|for_sale|649000.0|  5|   5|        Ponce|Puerto Rico|    2677.0|
|for_sale|120000.0|  3|   2|        Yauco|Puerto Rico|    1100.0|
|for_sale|

In [4]:
housing.count()

978906

In [5]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# Identify categorical columns
categorical_cols = ['city', 'state','status']  # Replace with your actual categorical column names

# Create StringIndexer and OneHotEncoder objects
indexers = [StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)) for c in categorical_cols]
encoders = [OneHotEncoder(inputCol="{0}_indexed".format(c), outputCol="{0}_encoded".format(c)) for c in categorical_cols]

# Apply StringIndexer and OneHotEncoder to the DataFrame
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=indexers + encoders)
df_encoded = pipeline.fit(housing).transform(housing)

# Show the encoded features
df_encoded.select('city', 'city_indexed', 'city_encoded', 'state', 'state_indexed', 'state_encoded').show(5)


+-------------+------------+--------------------+-----------+-------------+---------------+
|         city|city_indexed|        city_encoded|      state|state_indexed|  state_encoded|
+-------------+------------+--------------------+-----------+-------------+---------------+
|     Adjuntas|     10316.0|(14116,[10316],[1...|Puerto Rico|         46.0|(53,[46],[1.0])|
|     Adjuntas|     10316.0|(14116,[10316],[1...|Puerto Rico|         46.0|(53,[46],[1.0])|
|   Juana Diaz|      6341.0|(14116,[6341],[1.0])|Puerto Rico|         46.0|(53,[46],[1.0])|
|        Ponce|      3611.0|(14116,[3611],[1.0])|Puerto Rico|         46.0|(53,[46],[1.0])|
|San Sebastian|      7243.0|(14116,[7243],[1.0])|Puerto Rico|         46.0|(53,[46],[1.0])|
+-------------+------------+--------------------+-----------+-------------+---------------+
only showing top 5 rows



#Train-test split

In [6]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Select the features and target variable
assembler = VectorAssembler(inputCols=['bed', 'bath', 'house_size', 'city_encoded', 'state_encoded'], outputCol='feature')
df_assembled = assembler.transform(df_encoded).select('feature', 'price')

# Split the data into training and testing sets
train_data, test_data = df_assembled.randomSplit([0.8, 0.2], seed=42)

# Linear Regression

In [None]:
# Create a LinearRegression model
lr = LinearRegression(featuresCol='feature', labelCol='price')

# Fit the model to the training data
lr_model = lr.fit(train_data)

# Make predictions on the testing data
predictions = lr_model.transform(test_data)

# Evaluate the model
evaluator = RegressionEvaluator(labelCol='price', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE):", rmse)

# Evaluate the model using R-squared
evaluator = RegressionEvaluator(labelCol='price', predictionCol='prediction', metricName='r2')
r2 = evaluator.evaluate(predictions)
print("R-squared linear regression:", r2)

Root Mean Squared Error (RMSE): 109706.69606634983
R-squared linear regression: 0.7552070632319697


# Lasso Regression

In [7]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Create a LinearRegression model with Lasso regularization
lasso = LinearRegression(featuresCol="feature", labelCol="price", elasticNetParam=1.0, regParam=0.1)  # elasticNetParam=1.0 for Lasso

# Fit the model to the training data
lasso_model = lasso.fit(train_data)

# Make predictions on the test data
predictions = lasso_model.transform(test_data)

# Evaluate the model
evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data:", rmse)

evaluator_r2 = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="r2")
r2 = evaluator_r2.evaluate(predictions)
print("R-squared on test data:", r2)


Root Mean Squared Error (RMSE) on test data: 109708.43407395305
R-squared on test data: 0.7551993069988383


# Ridge Regression

In [8]:
# Create a LinearRegression model with Ridge regularization
ridge = LinearRegression(featuresCol="feature", labelCol="price", elasticNetParam=0.0, regParam=0.1)  # elasticNetParam=0.0 for Ridge

# Fit the model to the training data
ridge_model = ridge.fit(train_data)

# Make predictions on the test data
predictions = ridge_model.transform(test_data)

# Evaluate the model
evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data:", rmse)

evaluator_r2 = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="r2")
r2 = evaluator_r2.evaluate(predictions)
print("R-squared on test data:", r2)


Root Mean Squared Error (RMSE) on test data: 109706.69715721073
R-squared on test data: 0.7552070583638067


#HyperParameter Tuning

In [9]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create a LinearRegression model
plr = LinearRegression(featuresCol="feature", labelCol="price")

# Create a ParamGridBuilder to define the hyperparameter grid to search over
param_grid = ParamGridBuilder() \
    .addGrid(plr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(plr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# Create a RegressionEvaluator for evaluating the model
evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")

# Create a CrossValidator
crossval = CrossValidator(estimator=plr,
                          estimatorParamMaps=param_grid,
                          evaluator=evaluator,
                          numFolds=5)  # Use 5-fold cross-validation

# Fit the model using CrossValidator (assuming 'train_data' is already defined)
cv_model = crossval.fit(train_data)

# Get the best model from cross-validation
best_model = cv_model.bestModel

# Make predictions on the test data (assuming 'test_data' is already defined)
predictions = best_model.transform(test_data)

# Evaluate the best model
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) of best model:", rmse)

# Evaluate the model using R-squared
evaluator_r2 = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="r2")
r2 = evaluator_r2.evaluate(predictions)
print("R-squared:", r2)

Root Mean Squared Error (RMSE) of best model: 109703.78228553546
R-squared: 0.7552200663305235
R-squared: 0.7552200663305235
