In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import time

try:
    # create a SparkSession
    spark = SparkSession.builder.master("local[*]").getOrCreate()

    # read the dataset
    datapath = 'CLUSTER_URL_HERE'
    df = spark.read.csv(datapath, sep=',', inferSchema=True, header=True)
    print("Data read!")
    print("Preparing data..")
    
    # remove unecessary columns
    df = df.select("price","type","sqfeet","beds","baths","cats_allowed","dogs_allowed","smoking_allowed","wheelchair_access","electric_vehicle_charge","comes_furnished","laundry_options","parking_options")

    # drop rows with missing values
    df = df.na.drop()
    # split the dataset into train and test sets
    train_data, test_data = df.randomSplit([.8,.2],seed=1234)
    
    print("Creating pipeline..")
    
    # create a string indexer to convert categorical variables to numerical variables
    str_idx = StringIndexer(inputCols=["type","laundry_options","parking_options"], outputCols=["type_idx","laundry_options_idx","parking_options_idx"])

    # create a one-hot encoder to convert the indexed categorical variables to binary vectors
    ohe = OneHotEncoder(inputCols=["type_idx","laundry_options_idx","parking_options_idx"], outputCols=["type_vec","laundry_options_vec","parking_options_vec"])

    # input list for scaling
    inputs = ["sqfeet", "beds", "baths"]

    # scale the inputs
    assembler1 = VectorAssembler(inputCols=inputs, outputCol="features_scaled1")
    scaler = MinMaxScaler(inputCol="features_scaled1", outputCol="features_scaled")

    # create a vector assembler for the encoded columns
    assembler2 = VectorAssembler(inputCols=["type_vec","laundry_options_vec","parking_options_vec","features_scaled"], outputCol="features")

    # create the models
    lr = LinearRegression(featuresCol="features",labelCol="price",maxIter=10, regParam=0.01)
    rf = RandomForestRegressor(featuresCol="features",labelCol="price",maxDepth=10, numTrees=20)
    
    # Create ParamGrid for Cross Validation
    lrparamGrid = (ParamGridBuilder()
                .addGrid(lr.regParam, [0.001, 0.01, 0.1, 0.5, 1.0, 2.0])             
                .addGrid(lr.elasticNetParam, [0.0, 0.25, 0.5, 0.75, 1.0])
                .addGrid(lr.maxIter, [1, 5, 10, 20, 50])
                .build())
    
    rf_param_grid = (ParamGridBuilder()
                 .addGrid(rf.numTrees, [10, 50, 100])
                 .addGrid(rf.maxDepth, [5, 10, 20])
                 .build())
    
    # Define the evaluator to use for model selection
    lr_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price", metricName="rmse")
    rf_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price", metricName="rmse")
    
    # Create 5-fold CrossValidator
    lr_cv = CrossValidator(estimator = lr,
                        estimatorParamMaps = lrparamGrid,
                        evaluator = lr_evaluator,
                        numFolds = 5)
    
    rf_cv = CrossValidator(estimator=rf,
                       estimatorParamMaps=rf_param_grid,
                       evaluator=rf_evaluator,
                       numFolds=5)

    # stage list for the pipeline
    stages_lr = [str_idx,ohe, assembler1, scaler, assembler2, lr_cv]
    stages_rf = [str_idx,ohe, assembler1, scaler, assembler2, rf_cv]

    # pipeline
    pipeline_lr = Pipeline(stages=stages_lr)
    pipeline_rf = Pipeline(stages=stages_rf)
    
    print("Training Linear Regression Model..")

    # fit the pipeline to the training data
    pipeline_model_lr = pipeline_lr.fit(train_data)
    #Get best model and predict test data
    
    lr_best_model = pipeline_model_lr.stages[-1].bestModel

    pred_lr = lr_best_model.transform(test_data)
    
    # # We select the actual label, probability and predictions
    print("Results for Linear Regression Model:")
    pred_lr.select("price", "prediction").show(5)
    
    print("Training Random Forest Model..")
    # fit the pipeline to the training data
    pipeline_model_rf = pipeline_rf.fit(train_data)
    # Get the best model from the cross-validation
    rf_best_model = pipeline_model_rf.stages[-1].bestModel

    pred_rf = rf_best_model.transform(test_data)

    # rf results
    print("Results for Random Forest Model:")
    pred_rf.select("price", "prediction").show(5)
    
    print("Saving models..")
    
    timestamp = int(time.time())
    
    if lr_best_model:
        model_path_lr = f"CLUSTER_URL_HERE/model/model_lr_{timestamp}"
        lr_best_model.write().overwrite().save(model_path_lr)
        print(f"Linear regression model saved at {model_path_lr}")
    
        
    if rf_best_model:
        model_path_rf = f"CLUSTER_URL_HERE/model/model_rf_{timestamp}"
        rf_best_model.write().overwrite().save(model_path_rf)
        print(f"Random forest model saved at {model_path_rf}")
    print("Models saved successfully!")
    
except Exception as e:
    print("An error occurred: ", e)

finally:
    # stop the SparkSession
    
    spark.stop()
    print("Finished!")