## House price prediction

# **NLP**

**SPARK SESSION**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Install PySpark
!pip install pyspark



In [None]:
# Import SparkSession from PySpark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql import functions as F
from pyspark.ml.feature import StopWordsRemover, NGram, Tokenizer, IDF, CountVectorizer
from pyspark.ml.feature import StringIndexer, VectorAssembler, Imputer
from pyspark.ml.regression import GBTRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.types import StringType, DoubleType, StructType, StructField
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
from xgboost import XGBRegressor
import xgboost as xgb
from xgboost.spark import SparkXGBRegressor

In [None]:

# Initialize a Spark session
spark = SparkSession.builder \
    .appName("AustinHousingData") \
    .getOrCreate()

# Read the CSV file
df = spark.read.csv('/content/drive/My Drive/423BigDataAnalytics/3.14-sparkers Final project/archive/austinHousingData.csv', header=True, inferSchema=True)

# Show the first few rows
df.show()


+--------------------+--------------------+--------------------+------------------+--------------------+------------------+--------------------+--------------------+------------------+-----------------+----------+-------------+-------------+-------------+--------+-------------+-------------+----------+-------------+---------------+---------------+--------------------+---------------+-----------------+-----------+--------------------------+-------------------+--------------------+--------------------------+---------------------+-----------------------+-------------------+----------------------+-----------+--------------+-------------------+----------------------+------------------+------------------+------------------+------------------+-------------+------------------------+--------------------+--------------------+------------+--------------------+
|                zpid|                city|       streetAddress|           zipcode|         description|          latitude|           long

In [None]:
# Describe the 'latestPrice' column
df.select("latestPrice").describe().show()

+-------+--------------------+
|summary|         latestPrice|
+-------+--------------------+
|  count|               15012|
|   mean|  468959.24565499066|
| stddev|   439251.9324367748|
|    min| ""Best Architect...|
|    max|                True|
+-------+--------------------+



In [None]:
# Calculate quartiles and IQR using PySpark functions
quartiles = df.selectExpr('percentile_approx(latestPrice, array(0.25, 0.75)) as quartiles').collect()[0].quartiles
Q1, Q3 = quartiles[0], quartiles[1]
IQR = Q3 - Q1

# Calculate lower and upper bounds
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR

# Filter outliers using PySpark DataFrame API
outliers_df = df.filter((col('latestPrice') < lower_bound) | (col('latestPrice') > upper_bound))

# Count the number of low and high price outliers
low_price_outliers = outliers_df.filter(col('latestPrice') < lower_bound).count()
high_price_outliers = outliers_df.filter(col('latestPrice') > upper_bound).count()

# Print the number of outliers and proportion
print(f"Number of low price outliers: {low_price_outliers}")
print(f"Number of high price outliers: {high_price_outliers}")
print(f'Proportion of outliers using IQR method: {(low_price_outliers + high_price_outliers) / df.count() * 100:.2f}%')


Number of low price outliers: 0
Number of high price outliers: 784
Proportion of outliers using IQR method: 3.64%


In [None]:
df = df.withColumn('pricePerSqFt', F.col('latestPrice') / F.col('livingAreaSqFt'))


In [None]:


# Combine the number of schools into a new column numOfSchools
df = df.withColumn('numOfSchools',
                   F.col('numOfPrimarySchools') +
                   F.col('numOfElementarySchools') +
                   F.col('numOfMiddleSchools') +
                   F.col('numOfHighSchools'))

# Drop the individual school type columns
df = df.drop('numOfPrimarySchools', 'numOfElementarySchools', 'numOfMiddleSchools', 'numOfHighSchools')


In [None]:
df.printSchema()

# Show the column names
print("Columns in the DataFrame:")
for col in df.columns:
    print(col)

root
 |-- zpid: string (nullable = true)
 |-- city: string (nullable = true)
 |-- streetAddress: string (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- description: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- propertyTaxRate: string (nullable = true)
 |-- garageSpaces: string (nullable = true)
 |-- hasAssociation: string (nullable = true)
 |-- hasCooling: string (nullable = true)
 |-- hasGarage: string (nullable = true)
 |-- hasHeating: string (nullable = true)
 |-- hasSpa: string (nullable = true)
 |-- hasView: string (nullable = true)
 |-- homeType: string (nullable = true)
 |-- parkingSpaces: string (nullable = true)
 |-- yearBuilt: string (nullable = true)
 |-- latestPrice: string (nullable = true)
 |-- numPriceChanges: string (nullable = true)
 |-- latest_saledate: string (nullable = true)
 |-- latest_salemonth: string (nullable = true)
 |-- latest_saleyear: string (nullable = true)
 |-- latestPriceSo

In [None]:

# Drop columns from the DataFrame
df_combined = df.drop('streetAddress', 'description', 'homeImage', 'latest_saledate')

# Cast the 'latestPrice' column to DoubleType
df_combined = df_combined.withColumn("latestPrice", F.col("latestPrice").cast("double"))

# Define categorical columns
categorical_cols = ['city', 'homeType', 'latestPriceSource']

# Create StringIndexers for encoding categorical columns
indexers = [StringIndexer(inputCol=col, outputCol=col + "_index", handleInvalid='keep') for col in categorical_cols]

# Create a VectorAssembler to combine features into a single vector
assembler_inputs = [col + "_index" for col in categorical_cols]
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")

# Define the target variable
target = 'latestPrice'

# Impute missing values in 'latestPrice' column with a default value
imputer = Imputer(strategy='mean', inputCols=[target], outputCols=[target + "_imputed"])
df_combined = imputer.fit(df_combined).transform(df_combined)

# Create a GBTRegressor model with increased maxBins parameter
gbt = GBTRegressor(labelCol=target + "_imputed", featuresCol="features", maxBins=4000)

# Define the pipeline
pipeline = Pipeline(stages=indexers + [assembler, gbt])

# Fit the pipeline to the data
pipeline_model = pipeline.fit(df_combined)

# Get feature importances from the GBT model
importances = pipeline_model.stages[-1].featureImportances.toArray()

# Create a DataFrame for feature importances with explicit schema definition
schema = StructType([
    StructField("feature", StringType(), nullable=False),
    StructField("importance", DoubleType(), nullable=False)
])


In [None]:
# Convert importances to Python float type
importances_float = [float(val) for val in importances]

# Create a DataFrame for feature importances with explicit schema definition
feature_importances_df = spark.createDataFrame(zip(assembler_inputs, importances_float), schema)

# Sort by importance
feature_importances_df = feature_importances_df.orderBy(F.col("importance").desc())

# Display the top 10 important features
feature_importances_df.show(3)


+--------------------+-------------------+
|             feature|         importance|
+--------------------+-------------------+
|latestPriceSource...|0.49492727705107686|
|      homeType_index|0.42813118163104463|
|          city_index|0.07694154131787839|
+--------------------+-------------------+



Inference: Price source, home type, city are some important features of the dataset

In [None]:
from pyspark.sql.functions import col
from pyspark.sql.types import FloatType
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
from xgboost import XGBRegressor

selected_features = ['livingAreaSqFt', 'propertyTaxRate', 'numOfAppliances', 'lotSizeSqFt',
                     'numOfBathrooms', 'longitude', 'latestPriceSource', 'numOfWindowFeatures',
                     'avgSchoolDistance', 'latest_saleyear']

df_selected = df.select(selected_features + ['latestPrice'])

# Convert selected features to numeric type
for feature in selected_features:
    df_selected = df_selected.withColumn(feature, col(feature).cast(FloatType()))

# Convert target to numeric type and handle NaN/infinite values
df_selected = df_selected.withColumn('latestPrice', col('latestPrice').cast(FloatType()))
df_selected = df_selected.filter(~col('latestPrice').isNull() & ~col('latestPrice').isin([float('nan'), float('inf'), -float('inf')]))

# Split the data into features (X) and target (y)
X = df_selected.select(selected_features).toPandas()
y = df_selected.select('latestPrice').toPandas()

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Initialize and train XGBoost with the selected features
model = XGBRegressor(eval_metric='rmse')
model.fit(X_train, y_train)

# Make predictions on the test set
y_pred = model.predict(X_test)

# Evaluate the model's performance
mse = mean_squared_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)

print(f"Mean Squared Error: {mse}")
print(f"R-squared Score: {r2}")


Mean Squared Error: 65985339392.0
R-squared Score: 0.6438405427785324


In [None]:
from xgboost.spark import SparkXGBClassifier

In [None]:


# Initialize Spark session
spark = SparkSession.builder.appName("XGBoostExample").getOrCreate()

selected_features = ['livingAreaSqFt', 'propertyTaxRate', 'numOfAppliances', 'lotSizeSqFt',
                     'numOfBathrooms', 'longitude', 'latestPriceSource', 'numOfWindowFeatures',
                     'avgSchoolDistance', 'latest_saleyear']

# Assuming df_selected is the DataFrame containing your selected features and target
df_selected = df.select(selected_features + ['latestPrice'])

# Convert selected features to numeric type and handle NaN/infinite values
for feature in selected_features:
    df_selected = df_selected.withColumn(feature, df_selected[feature].cast("float"))

# Convert target to numeric type and handle NaN/infinite values
df_selected = df_selected.filter(~df_selected['latestPrice'].isNull() &
                                 ~df_selected['latestPrice'].isin([float('nan'), float('inf'), -float('inf')]))
df_selected = df_selected.withColumn('latestPrice', df_selected['latestPrice'].cast("float"))

# Remove rows with null values in any of the selected features
df_selected = df_selected.dropna(subset=selected_features + ['latestPrice'])

# Define the features vector
assembler = VectorAssembler(inputCols=selected_features, outputCol="features")
df_assembled = assembler.transform(df_selected)

# Split the data into training and testing sets
X_train, X_test = df_assembled.randomSplit([0.8, 0.2], seed=42)

# Initialize and train XGBoost with the selected features
xgb_regressor = SparkXGBRegressor(label_col="latestPrice", features_col="features", eval_metric='rmse')
model = xgb_regressor.fit(X_train)

# Make predictions on the test set
predictions = model.transform(X_test)

# Evaluate the model's performance
evaluator = RegressionEvaluator(labelCol="latestPrice", predictionCol="prediction", metricName="mse")
mse = evaluator.evaluate(predictions)

evaluator = RegressionEvaluator(labelCol="latestPrice", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)

print(f"Mean Squared Error: {mse}")
print(f"R-squared Score:-{r2}")

# Stop Spark session
spark.stop()


INFO:XGBoost-PySpark:Running xgboost-2.0.3 on 1 workers with
	booster params: {'objective': 'reg:squarederror', 'device': 'cpu', 'eval_metric': 'rmse', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Finished xgboost training!


Mean Squared Error: 770.8353686727582
R-squared Score:--0.572660002698689


An MSE of approximately 770.84 suggests that the model's predictions are, on average, off by that amount squared. An R-squared score of about 0.573 indicates that the model explains approximately 57.3% of the variance in the dependent variable, which is a decent level of explanatory power