In [1]:
%load_ext sagemaker_studio_analytics_extension.magics
%sm_analytics emr-serverless connect --application-id 00fo38nsirh68609 --language python --emr-execution-role-arn arn:aws:iam::442426877041:role/service-role/AmazonEMRStudio_RuntimeRole_1731016215290

Initiating EMR Serverless connection..
Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,00fo66g7dfntg90a,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


no need of: from pyspark.sql import SparkSession

it happens above automatically

In [4]:
import boto3
import json
from urllib.parse import quote_plus
from pyspark.sql.functions import col

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
s3_temp_dir = "s3://apartment-pricing/temp/"
output_s3_path = "s3://apartment-pricing/preprocessed/data/parquet/"

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
# source s3 parquet, pyspark.ml framework obviously built for cluster, further processing on EMR cluster
# model training, prediction and evaluation -- dont think distribution was necessary but works
#### check training from console
#### training job using sdk

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Load preprocessed data
preprocessed_data = spark.read.parquet(output_s3_path)

# Select features and target variable
# Drop 'rent' from the features and use it as the label
feature_columns = [
    "squarefootage", "numrooms", "numbathrooms", "hasbalcony", 
    "hasgymaccess", "hasparking", "neighborhoodsafetyindex", 
    "walkscore", "schoolrating"
]

# Ensure boolean fields are numeric (if necessary)
preprocessed_data = preprocessed_data.withColumn("hasbalcony", col("hasbalcony").cast("int"))
preprocessed_data = preprocessed_data.withColumn("hasgymaccess", col("hasgymaccess").cast("int"))
preprocessed_data = preprocessed_data.withColumn("hasparking", col("hasparking").cast("int"))

# Assemble features into a vector
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
assembled_data = assembler.transform(preprocessed_data)

# Prepare the final dataset with features and label
final_data = assembled_data.select("features", col("rent").alias("label"))

# Split the data into training and testing sets
train_data, test_data = final_data.randomSplit([0.8, 0.2], seed=42)

# Train a linear regression model
lr = LinearRegression(featuresCol="features", labelCol="label")
model = lr.fit(train_data)

# Evaluate the model on the test data
predictions = model.transform(test_data)

# Evaluate using RMSE (Root Mean Squared Error)
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

# Optionally print coefficients and intercept
print(f"Coefficients: {model.coefficients}")
print(f"Intercept: {model.intercept}")

# Show some predictions
predictions.select("features", "label", "prediction").show()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Root Mean Squared Error (RMSE): 250.32739887349678
Coefficients: [1.5029013947563223,149.1467512957859,197.3680103676724,99.88583351752915,79.90852853641567,43.724228576573914,100.11736460990436,99.92473243385216,49.83787050411398]
Intercept: 6.2358234630259615
+--------------------+-------+------------------+
|            features|  label|        prediction|
+--------------------+-------+------------------+
|[400.0,2.0,3.0,0....|2492.11|2146.4826178283765|
|[400.0,2.0,3.0,0....|2492.11|2146.4826178283765|
|[400.0,2.0,3.0,0....|2492.11|2146.4826178283765|
|[400.0,2.0,3.0,0....|2492.11|2146.4826178283765|
|[400.0,2.0,3.0,0....|2492.11|2146.4826178283765|
|[400.0,2.0,3.0,0....|2492.11|2146.4826178283765|
|[400.0,2.0,3.0,0....|2492.11|2146.4826178283765|
|[400.0,2.0,3.0,0....|2492.11|2146.4826178283765|
|[400.0,2.0,3.0,0....|2492.11|2146.4826178283765|
|[400.0,2.0,3.0,0....|2492.11|2146.4826178283765|
|[400.0,2.0,3.0,0....|2492.11|2146.4826178283765|
|[400.0,2.0,3.0,0....|2492.11|2146.482

In [6]:
# Paths to save model and pipeline
model_path = "s3://apartment-pricing/model/linear_regression/parquet_1/"
pipeline_path = "s3://apartment-pricing/pipeline/linear_regression/parquet_1/vector_assembler/"

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
# Save the assembler for reuse during inference
assembler.write().overwrite().save(pipeline_path)

# Save the trained model
model.write().overwrite().save(model_path)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
## batch predictions from saved model

from pyspark.ml.regression import LinearRegressionModel
from pyspark.ml.feature import VectorAssembler

# Load the saved model
model = LinearRegressionModel.load(model_path)

# Load the saved assembler
assembler = VectorAssembler.load(pipeline_path)

# Simulate new data for inference (replace with your actual data)
new_data = spark.createDataFrame([
    (750, 2, 1, 1, 1, 1, 8.5, 7.5, 8),  # Example row
    (1200, 3, 2, 0, 1, 1, 9.0, 8.0, 9)  # Example row
], [
    "squarefootage", "numrooms", "numbathrooms", "hasbalcony", 
    "hasgymaccess", "hasparking", "neighborhoodsafetyindex", 
    "walkscore", "schoolrating"
])

# Ensure all fields are numeric and assemble features
new_data = new_data.withColumn("hasbalcony", col("hasbalcony").cast("int"))
new_data = new_data.withColumn("hasgymaccess", col("hasgymaccess").cast("int"))
new_data = new_data.withColumn("hasparking", col("hasparking").cast("int"))

new_data_assembled = assembler.transform(new_data)

# Predict rent using the loaded model
predictions = model.transform(new_data_assembled)

# Show predictions
predictions.select("features", "prediction").show()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----------------+
|            features|       prediction|
+--------------------+-----------------+
|[750.0,2.0,1.0,1....|3851.728029591021|
|[1200.0,3.0,2.0,0...|4924.521504403287|
+--------------------+-----------------+