**Note**: Make sure the `hdfs` package is installed in your Jupyter environment. We've installed it using:
```bash
pip install hdfs
```

# Housing Price Prediction with PySpark and HDFS

This notebook demonstrates an end-to-end machine learning pipeline using PySpark and HDFS for housing price prediction. We'll cover:

1. Setting up PySpark with Jupyter
2. Working with HDFS
3. Data preparation and feature engineering
4. Training a Linear Regression model
5. Model evaluation
6. Model persistence
7. Batch inference

## 1. Setup PySpark with Jupyter

First, let's initialize our PySpark session and verify the setup:

In [1]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("HousingPricePrediction") \
    .config("spark.master", "spark://spark-spark-1:7077") \
    .getOrCreate()

# Test the connection
print("Spark WebUI URL:", spark.sparkContext.uiWebUrl)
print("Spark Application ID:", spark.sparkContext.applicationId)
print("Available cores:", spark.sparkContext.defaultParallelism)

Spark WebUI URL: http://ff41454abd16:4040
Spark Application ID: app-20251026153120-0000
Available cores: 2


## 2. Connect to HDFS and Load Data

Now we'll download the housing dataset and upload it to HDFS. We'll use the `hdfs` python library to interact with HDFS:

In [6]:
import os
import pandas as pd
from hdfs import InsecureClient

# Initialize HDFS client
hdfs_client = InsecureClient('http://namenode:9870')

# First, let's create our HDFS directory structure
hdfs_base_path = '/user/spark/housing_data'
try:
    hdfs_client.makedirs(hdfs_base_path)
except Exception as e:
    print(f"Directory might already exist: {e}")

# For this example, we'll use a sample of the housing dataset
# In practice, you would download from Kaggle using their API
sample_data = {
    'price': [100000, 200000, 150000, 300000],
    'bedrooms': [2, 3, 2, 4],
    'bathrooms': [1, 2, 1, 3],
    'sqft_living': [1000, 1500, 1200, 2000],
    'sqft_lot': [2000, 3000, 2500, 4000],
    'floors': [1, 2, 1, 2],
    'waterfront': [0, 0, 1, 0],
    'condition': [3, 4, 3, 5]
}

# Create a pandas DataFrame
df = pd.DataFrame(sample_data)

# Save to a temporary CSV file
temp_file = 'temp_housing.csv'
df.to_csv(temp_file, index=False)

# Upload to HDFS using the correct method
hdfs_path = hdfs_base_path + '/housing.csv'
hdfs_client.upload(hdfs_path, temp_file)

# Clean up temporary file
os.remove(temp_file)

print("Data uploaded to HDFS successfully!")

Data uploaded to HDFS successfully!


## 3. Prepare Housing Price Dataset

Now let's load the data from HDFS into a PySpark DataFrame and prepare it for training:

In [1]:
# Read data from HDFS
df = spark.read.csv(f"hdfs://namenode:9000{hdfs_base_path}/housing.csv", header=True, inferSchema=True)

# Display schema and sample data
print("DataFrame Schema:")
df.printSchema()
print("\nSample Data:")
df.show(5)

NameError: name 'spark' is not defined

In [None]:
# Prepare features
from pyspark.ml.feature import VectorAssembler

# Select features for the model
feature_columns = ['bedrooms', 'bathrooms', 'sqft_living', 'sqft_lot', 'floors', 'waterfront', 'condition']

# Create feature vector
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(df)

# Select features and label for modeling
data_for_ml = data.select("features", col("price").alias("label"))

print("Prepared data sample:")
data_for_ml.show(5)

## 4. Build Linear Regression Model

Now we'll split our data into training and testing sets, and train a Linear Regression model:

In [None]:
# Split the data
train_data, test_data = data_for_ml.randomSplit([0.8, 0.2], seed=42)

# Import and create the model
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(
    featuresCol="features",
    labelCol="label",
    maxIter=10,
    regParam=0.3,
    elasticNetParam=0.8
)

# Fit the model
model = lr.fit(train_data)

# Print model coefficients
print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))

## 5. Model Evaluation and Predictions

Let's evaluate our model's performance on the test dataset:

In [None]:
# Make predictions on test data
predictions = model.transform(test_data)

# Select prediction and label columns
predictions.select("prediction", "label", "features").show(5)

# Import evaluation metrics
from pyspark.ml.evaluation import RegressionEvaluator

# Create evaluator
evaluator = RegressionEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="rmse"
)

# Calculate RMSE and R2
rmse = evaluator.evaluate(predictions)
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})

print(f"Root Mean Squared Error (RMSE) = {rmse}")
print(f"R² = {r2}")

## 6. Save and Load Model

We'll save our trained model to HDFS for future use:

In [None]:
# Save model to HDFS
model_path = f"hdfs://namenode:9000{hdfs_base_path}/housing_model"
model.save(model_path)

# Load the model back (for demonstration)
from pyspark.ml.regression import LinearRegressionModel
loaded_model = LinearRegressionModel.load(model_path)

print("Model successfully saved and loaded from HDFS")

## 7. Batch Inference from HDFS

Finally, let's demonstrate how to use our saved model for batch inference on new data:

In [None]:
# Create some new sample data for prediction
new_houses = spark.createDataFrame([
    (3, 2, 1500, 3000, 1, 0, 4),  # House 1
    (4, 3, 2200, 4000, 2, 1, 5),  # House 2
], feature_columns)

# Prepare features
new_data = assembler.transform(new_houses)

# Make predictions using the loaded model
predictions = loaded_model.transform(new_data)

# Show predictions
predictions.select(feature_columns + ["prediction"]).show()

# Save predictions back to HDFS
predictions_path = f"hdfs://namenode:9000{hdfs_base_path}/predictions"
predictions.write.mode("overwrite").csv(predictions_path)

print("Predictions saved to HDFS at:", predictions_path)

## Cleanup

Finally, let's clean up our Spark session:

In [None]:
# Stop the Spark session
spark.stop()