In [1]:
from pyspark.sql import SparkSession
import tqdm

# Add here your team number teamx
team = "team38"

# location of your Hive database in HDFS
warehouse = "project/hive/warehouse"

spark = SparkSession.builder\
        .appName("{} - spark ML".format(team))\
        .master("yarn")\
        .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883")\
        .config("spark.sql.warehouse.dir", warehouse)\
        .config("spark.sql.avro.compression.codec", "snappy")\
        .enableHiveSupport()\
        .getOrCreate()


In [2]:
spark.sql("USE team38_projectdb")

# Execute the optimized SQL query and convert the result to a DataFrame
df = spark.sql("""
    SELECT 
        price, 
        minutes_to_metro, 
        number_of_rooms, 
        area, 
        living_area, 
        kitchen_area, 
        apartment_floor, 
        number_of_floors, 
        apartment_type_name, 
        region_name, 
        renovation_name, 
        metro_station_latitude, 
        metro_station_longitude 
    FROM 
        housing_data_part_buck
""")

# Show the DataFrame
df.show()

+------------+----------------+---------------+-----+-----------+------------+---------------+----------------+-------------------+-----------+------------------+----------------------+-----------------------+
|       price|minutes_to_metro|number_of_rooms| area|living_area|kitchen_area|apartment_floor|number_of_floors|apartment_type_name|region_name|   renovation_name|metro_station_latitude|metro_station_longitude|
+------------+----------------+---------------+-----+-----------+------------+---------------+----------------+-------------------+-----------+------------------+----------------------+-----------------------+
| 1.5528071E7|            16.0|            2.0|66.73|       33.9|        15.2|            3.0|            25.0|       New building|     Moscow|          Cosmetic|             55.649857|               37.70144|
|   5950000.0|             3.0|            0.0| 20.0|       12.2|         5.1|            9.0|             9.0|          Secondary|     Moscow|          Cosmeti

In [4]:
from pyspark.sql.functions import col, mean as _mean, stddev as _stddev, abs as abs_func
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pyspark.sql.functions import rand
import time

# Assuming df is your DataFrame with the required columns

In [17]:

# Step 1: Split the data into train and test sets (80/20) with random state = 42
train_data, test_data = df.orderBy(rand(seed=42)).randomSplit([0.8, 0.2], seed=42)

# Step 2: Identify categorical columns
categorical_columns = [
    'apartment_type_name', 
    'region_name', 
    'renovation_name', 
]

# Step 3: Identify numeric columns
numeric_columns = [
    'minutes_to_metro', 
    'number_of_rooms', 
    'area', 
    'living_area', 
    'kitchen_area', 
    'apartment_floor', 
    'number_of_floors', 
    'metro_station_latitude', 
    'metro_station_longitude'
]

# Step 4: Create StringIndexer and OneHotEncoder stages
indexers = [StringIndexer(inputCol=col, outputCol=col + "_index", handleInvalid="keep") for col in categorical_columns]
encoders = [OneHotEncoder(inputCol=col + "_index", outputCol=col + "_encoded") for col in categorical_columns]

# Step 5: Create a feature for the ratio of living area to total area
train_data = train_data.withColumn("living_area_ratio", col("living_area") / col("area"))
test_data = test_data.withColumn("living_area_ratio", col("living_area") / col("area"))

# Update numeric columns to include the new feature
numeric_columns.append("living_area_ratio")

# Step 6: Create a VectorAssembler to combine all features into a single vector column
assembler_inputs = [col + "_encoded" for col in categorical_columns] + numeric_columns
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")

# Step 7: Feature scaling
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)

# Step 8: Create the pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler, scaler])

# Step 9: Fit the pipeline on the training data
model = pipeline.fit(train_data)

# Step 10: Transform the training and test data
train_data = model.transform(train_data)
test_data = model.transform(test_data)

# Step 11: Select only the features and label columns
train_data = train_data.select("scaled_features", "price")
test_data = test_data.select("scaled_features", "price")

# Step 12: Rename the label column to 'label'
train_data = train_data.withColumnRenamed("price", "label")
test_data = test_data.withColumnRenamed("price", "label")

In [19]:
train_data.show()

+--------------------+---------+
|     scaled_features|    label|
+--------------------+---------+
|[0.85555047711956...|1150000.0|
|[0.85555047711956...|1420000.0|
|[0.85555047711956...|1750000.0|
|[-1.1687738602762...|1939125.0|
|[-1.1687738602762...|1939125.0|
|[-1.1687738602762...|1939125.0|
|[-1.1687738602762...|1941650.0|
|[-1.1687738602762...|1963875.0|
|[-1.1687738602762...|1972470.0|
|[-1.1687738602762...|1984900.0|
|[-1.1687738602762...|2261523.0|
|[-1.1687738602762...|2278998.0|
|[-1.1687738602762...|2278998.0|
|[0.85555047711956...|2390000.0|
|[-1.1687738602762...|2477950.0|
|[0.85555047711956...|2540000.0|
|[-1.1687738602762...|2583125.0|
|[-1.1687738602762...|2583125.0|
|[-1.1687738602762...|2583125.0|
|[-1.1687738602762...|2595000.0|
+--------------------+---------+
only showing top 20 rows



In [20]:
# A function to run commands
import os
def run(command):
    return os.popen(command).read()

# Save train_data
train_data.select("scaled_features", "label")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("json")\
    .save("project/data/train")

# Run it from root directory of the repository
run("hdfs dfs -cat project/data/train/*.json > data/train.json")

# Save test_data
test_data.select("scaled_features", "label")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("json")\
    .save("project/data/test")

# Run it from root directory of the repository
run("hdfs dfs -cat project/data/test/*.json > data/test.json")

''

In [30]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pyspark.sql.functions import abs, col, mean

# Step 1: Initialize the Linear Regression model
lr = LinearRegression(featuresCol="scaled_features", labelCol="label")

# Step 2: Create a parameter grid for grid search
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# Step 3: Set up the TrainValidationSplit for cross-validation
tvs = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse"),
                           trainRatio=0.8)

# Step 4: Fit the model on the training data
lr_model = tvs.fit(train_data)

# Step 5: Save the best Linear Regression model to HDFS
lr_model.bestModel.write().overwrite().save("project/models/model1")

# Step 6: Make predictions on the test data
predictions = lr_model.transform(test_data)

# Step 7: Save the prediction results to HDFS
predictions.select("label", "prediction")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("header", "true")\
    .save("project/output/model1_predictions")

# Step 8: Evaluate the model
evaluator_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")
evaluator_mae = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mae")

rmse1 = evaluator_rmse.evaluate(predictions)
r21 = evaluator_r2.evaluate(predictions)
mae1 = evaluator_mae.evaluate(predictions)

# Step 9: Calculate MAPE manually
predictions = predictions.withColumn("APE", abs((col("label") - col("prediction")) / col("label")))
mape1 = predictions.select(mean("APE")).collect()[0][0] * 100

print(f"Linear Regression Model Evaluation:")
print(f"Root Mean Squared Error (RMSE): {rmse1}")
print(f"R^2: {r21}")
print(f"Mean Absolute Error (MAE): {mae1}")
print(f"Mean Absolute Percentage Error (MAPE): {mape1}")

# Step 10: Show some predictions
predictions.select("label", "prediction").show(10)

Linear Regression Model Evaluation:
Root Mean Squared Error (RMSE): 58172214.72593775
R^2: 0.6167264731353712
Mean Absolute Error (MAE): 21030107.34068811
Mean Absolute Percentage Error (MAPE): 99.37805144138096
+---------+--------------------+
|    label|          prediction|
+---------+--------------------+
|1560000.0| -1717532.5268334672|
|1939125.0|-1.758145106925191E7|
|1941650.0|-1.64189609519939...|
|2218450.0|-1.977625844864788E7|
|2540000.0|-1.53238440084085...|
|2583125.0|  -9290651.840316996|
|2600000.0|  -4228712.566564754|
|2607300.0|-1.05631503843554...|
|2630625.0|   -9575456.72697375|
|2630640.0|  -9037473.706536181|
+---------+--------------------+
only showing top 10 rows



In [31]:
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pyspark.sql.functions import abs, col, mean

# Step 1: Initialize the GBT model
gbt = GBTRegressor(featuresCol="scaled_features", labelCol="label")

# Step 2: Create a parameter grid for grid search
paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [2, 4]) \
    .addGrid(gbt.maxIter, [3, 5]) \
    .build()

# Step 3: Set up the TrainValidationSplit for cross-validation
tvs = TrainValidationSplit(estimator=gbt,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse"),
                           trainRatio=0.8)

# Step 4: Fit the model on the training data
gbt_model = tvs.fit(train_data)

# Step 5: Save the best GBT model to HDFS
gbt_model.bestModel.write().overwrite().save("project/models/model2")

# Step 6: Make predictions on the test data
predictions = gbt_model.transform(test_data)

# Step 7: Save the prediction results to HDFS
predictions.select("label", "prediction")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("header", "true")\
    .save("project/output/model2_predictions")

# Step 8: Evaluate the model
evaluator_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")
evaluator_mae = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mae")

rmse2 = evaluator_rmse.evaluate(predictions)
r22 = evaluator_r2.evaluate(predictions)
mae2 = evaluator_mae.evaluate(predictions)

# Step 9: Calculate MAPE manually
predictions = predictions.withColumn("APE", abs((col("label") - col("prediction")) / col("label")))
mape2 = predictions.select(mean("APE")).collect()[0][0] * 100

print(f"GBT Model Evaluation:")
print(f"Root Mean Squared Error (RMSE): {rmse2}")
print(f"R^2: {r22}")
print(f"Mean Absolute Error (MAE): {mae2}")
print(f"Mean Absolute Percentage Error (MAPE): {mape2}")

# Step 10: Show some predictions
predictions.select("label", "prediction").show(10)

GBT Model Evaluation:
Root Mean Squared Error (RMSE): 60683249.613260634
R^2: 0.5829239183171425
Mean Absolute Error (MAE): 11437773.092258744
Mean Absolute Percentage Error (MAPE): 18.154998573806175
+---------+------------------+
|    label|        prediction|
+---------+------------------+
|1560000.0|3044949.5050197914|
|1939125.0|2697327.7442573826|
|1941650.0| 2552123.407877258|
|2218450.0|2672443.3557320675|
|2540000.0| 4755423.864178333|
|2583125.0| 2620496.707959361|
|2600000.0| 4349562.082554642|
|2607300.0|2703614.6421105964|
|2630625.0|2680754.6625601375|
|2630640.0|2680754.6625601375|
+---------+------------------+
only showing top 10 rows



In [32]:
# Create data frame to report performance of the models
models = [
    ["Linear Regression", rmse1, r21, mae1, mape1],
    ["GBT", rmse2, r22, mae2, mape2]
]

df = spark.createDataFrame(models, ["model", "RMSE", "R2", "MAE", "MAPE"])
df.show(truncate=False)

# Save it to HDFS
df.coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header", "true")\
    .save("project/output/evaluation.csv")

# Run it from root directory of the repository
run("hdfs dfs -cat project/output/evaluation.csv/*.csv > output/evaluation.csv")

+-----------------+--------------------+------------------+--------------------+------------------+
|model            |RMSE                |R2                |MAE                 |MAPE              |
+-----------------+--------------------+------------------+--------------------+------------------+
|Linear Regression|5.817221472593775E7 |0.6167264731353712|2.103010734068811E7 |99.37805144138096 |
|GBT              |6.0683249613260634E7|0.5829239183171425|1.1437773092258744E7|18.154998573806175|
+-----------------+--------------------+------------------+--------------------+------------------+



''