In [630]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [631]:
!wget -q https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
# Unzip the file
!tar xf spark-3.3.2-bin-hadoop3.tgz

In [632]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = '/content/spark-3.3.2-bin-hadoop3'

In [633]:
# Install library for finding Spark
!pip install -q findspark
# Import the libary
import findspark
# Initiate findspark
findspark.init()
# Check the location for Spark
findspark.find()

'/content/spark-3.3.2-bin-hadoop3'

In [634]:
# Import SparkSession
from pyspark.sql import SparkSession
# Create a Spark Session
spark = SparkSession.builder.master("local[*]").getOrCreate()
# Check Spark Session Information
spark

In [635]:
#Creating a dataframe with joined dataset
df = spark.read.csv("/content/Joined-data-final.csv", inferSchema=True, header=True)

In [636]:
df.show() #Displaying the data

+----+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-----------+-----------+-----------+-----------+-----------+------+------+------+------+------+------+------+------+------+------+------+------+---------+---------+---------+---------+---------+
|year|tavg_jan|tavg_feb|tavg_mar|tavg_apr|tavg_may|tavg_jun|tavg_jul|tavg_aug|tavg_sep|tavg_oct|tavg_nov|tavg_dec|tavg_annual|tavg_janfeb|tavg_marmay|tavg_junsep|tavg_octdec|rf_jan|rf_feb|rf_mar|rf_apr|rf_may|rf_jun|rf_jul|rf_aug|rf_sep|rf_oct|rf_nov|rf_dec|rf_annual|rf_janfeb|rf_marmay|rf_junsep|rf_octdec|
+----+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-----------+-----------+-----------+-----------+-----------+------+------+------+------+------+------+------+------+------+------+------+------+---------+---------+---------+---------+---------+
|1990|   24.36|   26.83|   28.66|   30.43|   29.63|    30.9|   29.48|   2

In [637]:
#Displaying the schema of the data
df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- tavg_jan: double (nullable = true)
 |-- tavg_feb: double (nullable = true)
 |-- tavg_mar: double (nullable = true)
 |-- tavg_apr: double (nullable = true)
 |-- tavg_may: double (nullable = true)
 |-- tavg_jun: double (nullable = true)
 |-- tavg_jul: double (nullable = true)
 |-- tavg_aug: double (nullable = true)
 |-- tavg_sep: double (nullable = true)
 |-- tavg_oct: double (nullable = true)
 |-- tavg_nov: double (nullable = true)
 |-- tavg_dec: double (nullable = true)
 |-- tavg_annual: double (nullable = true)
 |-- tavg_janfeb: double (nullable = true)
 |-- tavg_marmay: double (nullable = true)
 |-- tavg_junsep: double (nullable = true)
 |-- tavg_octdec: double (nullable = true)
 |-- rf_jan: double (nullable = true)
 |-- rf_feb: double (nullable = true)
 |-- rf_mar: double (nullable = true)
 |-- rf_apr: double (nullable = true)
 |-- rf_may: double (nullable = true)
 |-- rf_jun: double (nullable = true)
 |-- rf_jul: double (nullable = tru

In [638]:
#Displaying data types of the data
df.dtypes

[('year', 'int'),
 ('tavg_jan', 'double'),
 ('tavg_feb', 'double'),
 ('tavg_mar', 'double'),
 ('tavg_apr', 'double'),
 ('tavg_may', 'double'),
 ('tavg_jun', 'double'),
 ('tavg_jul', 'double'),
 ('tavg_aug', 'double'),
 ('tavg_sep', 'double'),
 ('tavg_oct', 'double'),
 ('tavg_nov', 'double'),
 ('tavg_dec', 'double'),
 ('tavg_annual', 'double'),
 ('tavg_janfeb', 'double'),
 ('tavg_marmay', 'double'),
 ('tavg_junsep', 'double'),
 ('tavg_octdec', 'double'),
 ('rf_jan', 'double'),
 ('rf_feb', 'double'),
 ('rf_mar', 'double'),
 ('rf_apr', 'double'),
 ('rf_may', 'double'),
 ('rf_jun', 'double'),
 ('rf_jul', 'double'),
 ('rf_aug', 'double'),
 ('rf_sep', 'double'),
 ('rf_oct', 'double'),
 ('rf_nov', 'double'),
 ('rf_dec', 'double'),
 ('rf_annual', 'double'),
 ('rf_janfeb', 'double'),
 ('rf_marmay', 'double'),
 ('rf_junsep', 'double'),
 ('rf_octdec', 'double')]

In [639]:
#Importing VectorAssembler and Type cast
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import IntegerType

In [640]:
#Displaying columns present in joined dataset
df.columns

['year',
 'tavg_jan',
 'tavg_feb',
 'tavg_mar',
 'tavg_apr',
 'tavg_may',
 'tavg_jun',
 'tavg_jul',
 'tavg_aug',
 'tavg_sep',
 'tavg_oct',
 'tavg_nov',
 'tavg_dec',
 'tavg_annual',
 'tavg_janfeb',
 'tavg_marmay',
 'tavg_junsep',
 'tavg_octdec',
 'rf_jan',
 'rf_feb',
 'rf_mar',
 'rf_apr',
 'rf_may',
 'rf_jun',
 'rf_jul',
 'rf_aug',
 'rf_sep',
 'rf_oct',
 'rf_nov',
 'rf_dec',
 'rf_annual',
 'rf_janfeb',
 'rf_marmay',
 'rf_junsep',
 'rf_octdec']

In [641]:
#Transforming rainfall columns from millimeters to centimeters
df = df.withColumn("rf_junsep", df["rf_junsep"] * 0.01)
df = df.withColumn("rf_octdec", df["rf_octdec"] * 0.01)
df = df.withColumn("rf_janfeb", df["rf_janfeb"] * 0.01)
df = df.withColumn("rf_marmay", df["rf_marmay"]*0.01)

#Changing the type of each column to integer
for col in df.columns:
  df = df.withColumn(col, df[col].cast(IntegerType()))



In [642]:
df.show() #Displaying transformed data

+----+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-----------+-----------+-----------+-----------+-----------+------+------+------+------+------+------+------+------+------+------+------+------+---------+---------+---------+---------+---------+
|year|tavg_jan|tavg_feb|tavg_mar|tavg_apr|tavg_may|tavg_jun|tavg_jul|tavg_aug|tavg_sep|tavg_oct|tavg_nov|tavg_dec|tavg_annual|tavg_janfeb|tavg_marmay|tavg_junsep|tavg_octdec|rf_jan|rf_feb|rf_mar|rf_apr|rf_may|rf_jun|rf_jul|rf_aug|rf_sep|rf_oct|rf_nov|rf_dec|rf_annual|rf_janfeb|rf_marmay|rf_junsep|rf_octdec|
+----+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-----------+-----------+-----------+-----------+-----------+------+------+------+------+------+------+------+------+------+------+------+------+---------+---------+---------+---------+---------+
|1990|      24|      26|      28|      30|      29|      30|      29|    

In [643]:
#Generating vector assembler with year as input column and output will be stored in features
assembler = VectorAssembler(inputCols=['year'], outputCol="features")

In [644]:
assembler

VectorAssembler_b17cab5a23ca

In [645]:
#Adding features vector to dataframe
output = assembler.transform(df)

In [646]:
output.show(30)

+----+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-----------+-----------+-----------+-----------+-----------+------+------+------+------+------+------+------+------+------+------+------+------+---------+---------+---------+---------+---------+--------+
|year|tavg_jan|tavg_feb|tavg_mar|tavg_apr|tavg_may|tavg_jun|tavg_jul|tavg_aug|tavg_sep|tavg_oct|tavg_nov|tavg_dec|tavg_annual|tavg_janfeb|tavg_marmay|tavg_junsep|tavg_octdec|rf_jan|rf_feb|rf_mar|rf_apr|rf_may|rf_jun|rf_jul|rf_aug|rf_sep|rf_oct|rf_nov|rf_dec|rf_annual|rf_janfeb|rf_marmay|rf_junsep|rf_octdec|features|
+----+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-----------+-----------+-----------+-----------+-----------+------+------+------+------+------+------+------+------+------+------+------+------+---------+---------+---------+---------+---------+--------+
|1990|      24|      26|      28|      30|    

In [647]:
#Creating dataframe for each target column which will be used to train and test
tavg_janfeb_model_df = output.select("tavg_janfeb","features")
tavg_marmay_model_df = output.select("tavg_marmay","features")
tavg_junsep_model_df = output.select("tavg_junsep","features")
tavg_octdec_model_df = output.select("tavg_octdec","features")
rf_janfeb_model_df = output.select("rf_janfeb","features")
rf_marmay_model_df = output.select("rf_marmay", "features")
rf_junsep_model_df = output.select("rf_junsep","features")
rf_octdec_model_df = output.select("rf_octdec","features")

In [648]:
#Displaying dataframes created above
tavg_janfeb_model_df.show()
tavg_marmay_model_df.show()
tavg_junsep_model_df.show()
tavg_octdec_model_df.show()
rf_janfeb_model_df.show()
rf_marmay_model_df.show()
rf_junsep_model_df.show()
rf_octdec_model_df.show()

+-----------+--------+
|tavg_janfeb|features|
+-----------+--------+
|         25|[1990.0]|
|         25|[1991.0]|
|         24|[1992.0]|
|         24|[1993.0]|
|         25|[1994.0]|
|         25|[1995.0]|
|         25|[1996.0]|
|         24|[1997.0]|
|         26|[1998.0]|
|         25|[1999.0]|
|         26|[2000.0]|
|         26|[2001.0]|
|         25|[2002.0]|
|         25|[2003.0]|
|         25|[2004.0]|
|         25|[2005.0]|
|         25|[2006.0]|
|         25|[2007.0]|
|         25|[2008.0]|
|         25|[2009.0]|
+-----------+--------+
only showing top 20 rows

+-----------+--------+
|tavg_marmay|features|
+-----------+--------+
|         29|[1990.0]|
|         28|[1991.0]|
|         29|[1992.0]|
|         29|[1993.0]|
|         29|[1994.0]|
|         29|[1995.0]|
|         29|[1996.0]|
|         29|[1997.0]|
|         30|[1998.0]|
|         30|[1999.0]|
|         30|[2000.0]|
|         30|[2001.0]|
|         30|[2002.0]|
|         30|[2003.0]|
|         29|[2004.0]|
|       

In [649]:
#Displaying data type of created training dataframes
tavg_janfeb_model_df.dtypes 
tavg_marmay_model_df.dtypes
tavg_junsep_model_df.dtypes
tavg_octdec_model_df.dtypes
rf_janfeb_model_df.dtypes
rf_marmay_model_df.dtypes
rf_junsep_model_df.dtypes
rf_octdec_model_df.dtypes

[('rf_octdec', 'int'), ('features', 'vector')]

In [650]:
#Splitting data in each training dataframe into taining and test data
tavg_janfeb_training_df, tavg_janfeb_test_df = tavg_janfeb_model_df.randomSplit([0.7, 0.3])
tavg_marmay_training_df, tavg_marmay_test_df = tavg_marmay_model_df.randomSplit([0.7, 0.3])
tavg_junsep_training_df, tavg_junsep_test_df = tavg_junsep_model_df.randomSplit([0.6, 0.4])
tavg_octdec_training_df, tavg_octdec_test_df = tavg_octdec_model_df.randomSplit([0.6, 0.4])
training_rf_janfeb, test_rf_janfeb = rf_janfeb_model_df.randomSplit([0.7, 0.3])
training_rf_marmay, test_rf_marmay = rf_marmay_model_df.randomSplit([0.7, 0.3])
rf_junsep_training_df, rf_junsep_test_df = rf_junsep_model_df.randomSplit([0.7, 0.3])
rf_octdec_training_df, rf_octdec_test_df = rf_octdec_model_df.randomSplit([0.7, 0.3])

In [651]:
#Displaying cout of training data
tavg_janfeb_training_df.count()
tavg_marmay_training_df.count()
tavg_junsep_training_df.count()
tavg_octdec_training_df.count()
training_rf_janfeb.count()
training_rf_marmay.count()
rf_junsep_training_df.count()
rf_octdec_training_df.count()

20

In [652]:
#displaying count of test data
tavg_janfeb_test_df.count()
tavg_marmay_test_df.count()
tavg_junsep_test_df.count()
tavg_octdec_test_df.count()
test_rf_janfeb.count()
test_rf_marmay.count()
rf_junsep_test_df.count()
rf_octdec_test_df.count()

6

In [653]:
#Importing Ml models
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer, IndexToString
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.regression import LinearRegression

# **Linear Regression**

In [654]:
#Creating instance of linear regression model for each target column
tavg_janfeb_lr = LinearRegression(featuresCol= "features", labelCol="tavg_janfeb")
tavg_marmay_lr = LinearRegression(featuresCol= "features", labelCol="tavg_marmay")
tavg_junsep_lr = LinearRegression(featuresCol= "features", labelCol="tavg_junsep")
tavg_octdec_lr = LinearRegression(featuresCol= "features", labelCol="tavg_octdec")
lr_janfeb = LinearRegression(featuresCol= "features", labelCol="rf_janfeb")
lr_marmay = LinearRegression(featuresCol= "features", labelCol="rf_marmay")
rf_junsep_lr = LinearRegression(featuresCol= "features", labelCol="rf_junsep")
rf_octdec_lr = LinearRegression(featuresCol= "features", labelCol="rf_octdec")

In [None]:
#Fitting training data on models created above
tavg_janfeb_trained_model = tavg_janfeb_lr.fit(tavg_janfeb_training_df)
tavg_marmay_trained_model = tavg_marmay_lr.fit(tavg_marmay_training_df)
tavg_junsep_trained_model = tavg_junsep_lr.fit(tavg_junsep_training_df)
tavg_octdec_trained_model = tavg_octdec_lr.fit(tavg_octdec_training_df)
trained_model_janfeb = lr_janfeb.fit(training_rf_janfeb)
trained_model_marmay = lr_marmay.fit(training_rf_marmay)
rf_junsep_trained_model = rf_junsep_lr.fit(rf_junsep_training_df)
rf_octdec_trained_model = rf_octdec_lr.fit(rf_octdec_training_df)

In [None]:
#Evaluating linear regression models
tavg_janfeb_results = tavg_janfeb_trained_model.evaluate(tavg_janfeb_training_df)
tavg_marmay_results = tavg_marmay_trained_model.evaluate(tavg_marmay_training_df)
results_tavg_junsep = tavg_junsep_trained_model.evaluate(tavg_junsep_training_df)
results_tavg_octdec = tavg_octdec_trained_model.evaluate(tavg_octdec_training_df)
results_rf_janfeb = trained_model_janfeb.evaluate(training_rf_janfeb)
results_rf_marmay = trained_model_marmay .evaluate(training_rf_marmay)
rf_junsep_results = rf_junsep_trained_model.evaluate(rf_junsep_training_df)
rf_octdec_results = rf_octdec_trained_model.evaluate(rf_octdec_training_df)

In [None]:
#Displaying R squared value
print("R squared for tavg janfeb",tavg_janfeb_results.r2)
print("R squared for tavg marmay",tavg_marmay_results.r2)
print("R squared for tavg junsep",results_tavg_junsep.r2)
print("R squared for tavg octdec",results_tavg_octdec.r2)
print(rf_junsep_results.r2)
print(rf_octdec_results.r2)
print(results_rf_janfeb.r2)
print(results_rf_marmay.r2)

In [None]:
#Displaying mean square error for linear regression models created
print("Mean square error for tavg_janfeb: ", tavg_janfeb_results.meanSquaredError)
print("Mean square error for tavg_marmay: ", tavg_marmay_results.meanSquaredError)
print("Mean square error for tavg_junsep: ", results_tavg_junsep.meanSquaredError)
print("Mean square error for tavg_octdec: ", results_tavg_octdec.meanSquaredError)
print("Mean square error of rf_janfeb: ", results_rf_janfeb.meanSquaredError)
print("Mean square error of rf_marmay: ", results_rf_marmay.meanSquaredError)
print("Mean square error for rf_junsep: ", rf_junsep_results.meanSquaredError)
print("Mean square error for rf_octdec: ", rf_octdec_results.meanSquaredError)

In [None]:
#Selecting features of each test data
tavg_janfeb_unlabeled_data = tavg_janfeb_test_df.select("features")
tavg_janfeb_unlabeled_data.show()
tavg_marmay_unlabeled_data = tavg_marmay_test_df.select("features")
tavg_marmay_unlabeled_data.show()
tavg_junsep_unlabeled_data = tavg_junsep_test_df.select("features")
tavg_junsep_unlabeled_data.show()
tavg_octdec_unlabeled_data = tavg_octdec_test_df.select("features")
tavg_octdec_unlabeled_data.show()
rf_janfeb_unlabeled_data = test_rf_janfeb.select("features")
rf_marmay_unlabeled_data = test_rf_marmay.select("features")
rf_janfeb_unlabeled_data.show()
rf_marmay_unlabeled_data.show()
rf_junsep_unlabeled_data = rf_junsep_test_df.select("features")
rf_junsep_unlabeled_data.show()
rf_octdec_unlabeled_data = rf_octdec_test_df.select("features")
rf_octdec_unlabeled_data.show()

In [None]:
#Trasforming test data on trained models to get predictions
tavg_janfeb_predictions = tavg_janfeb_trained_model.transform(tavg_janfeb_unlabeled_data)
tavg_marmay_predictions = tavg_marmay_trained_model.transform(tavg_marmay_unlabeled_data)
tavg_junsep_predictions = tavg_junsep_trained_model.transform(tavg_junsep_unlabeled_data)
tavg_octdec_predictions = tavg_octdec_trained_model.transform(tavg_octdec_unlabeled_data)
rf_janfeb_predictions = trained_model_janfeb.transform(rf_janfeb_unlabeled_data)
rf_marmay_predictions = trained_model_marmay.transform(rf_marmay_unlabeled_data)
rf_junsep_predictions = rf_junsep_trained_model.transform(rf_junsep_unlabeled_data)
rf_octdec_predictions = rf_octdec_trained_model.transform(rf_octdec_unlabeled_data)

In [None]:
#Displaying predictions
tavg_janfeb_predictions.show()
tavg_marmay_predictions.show()
tavg_junsep_predictions.show()
tavg_octdec_predictions.show()
rf_janfeb_predictions.show()
rf_marmay_predictions.show()
rf_junsep_predictions.show()
rf_octdec_predictions.show()

# **Decision Tree**

In [None]:
#Creating instances of decision tree classifier for targeted columns training data
tavg_janfeb_df_classifier = DecisionTreeClassifier(labelCol="tavg_janfeb").fit(tavg_janfeb_training_df)
tavg_marmay_df_classifier = DecisionTreeClassifier(labelCol="tavg_marmay").fit(tavg_marmay_training_df)
tavg_junsep_df_classifier = DecisionTreeClassifier(labelCol="tavg_junsep").fit(tavg_junsep_training_df)
tavg_octdec_df_classifier = DecisionTreeClassifier(labelCol="tavg_octdec").fit(tavg_octdec_training_df)
rf_janfeb_df_classifier = DecisionTreeClassifier(labelCol="rf_janfeb").fit(training_rf_janfeb)
rf_marmay_df_classifier = DecisionTreeClassifier(labelCol="rf_marmay").fit(training_rf_marmay)
rf_junsep_classifier = DecisionTreeClassifier(labelCol="rf_junsep").fit(rf_junsep_training_df)
rf_octdec_classifier = DecisionTreeClassifier(labelCol="rf_octdec").fit(rf_octdec_training_df)

In [None]:
#Transforming decision tree models on respective test data
tavg_janfeb_df_predictions = tavg_janfeb_df_classifier.transform(tavg_janfeb_test_df)
tavg_marmay_df_predictions = tavg_marmay_df_classifier.transform(tavg_marmay_test_df)
tavg_junsep_df_predictions = tavg_junsep_df_classifier.transform(tavg_junsep_test_df)
tavg_octdec_df_predictions = tavg_octdec_df_classifier.transform(tavg_octdec_test_df)
df_predictions_janfeb = rf_janfeb_df_classifier.transform(test_rf_janfeb)
df_predictions_marmay = rf_marmay_df_classifier.transform(test_rf_marmay)
rf_junsep_predictions = rf_junsep_classifier.transform(rf_junsep_test_df)
rf_octdec_predictions = rf_octdec_classifier.transform(rf_octdec_test_df)

In [None]:
#Displaying predictions
tavg_janfeb_df_predictions.show()
tavg_marmay_df_predictions.show()
tavg_junsep_df_predictions.show()
tavg_octdec_df_predictions.show()
df_predictions_janfeb.show()
df_predictions_marmay.show()
rf_junsep_predictions.show()
rf_octdec_predictions.show()

In [None]:
#Evaluating accuracy of decision tree models
tavg_janfeb_df_accuracy = MulticlassClassificationEvaluator(labelCol="tavg_janfeb", metricName="accuracy").evaluate(tavg_janfeb_df_predictions)
print("tavg_janfeb_df_accuracy: ", tavg_janfeb_df_accuracy)
tavg_marmay_df_accuracy = MulticlassClassificationEvaluator(labelCol="tavg_marmay", metricName="accuracy").evaluate(tavg_marmay_df_predictions)
print("tavg_marmay_df_accuracy: ", tavg_marmay_df_accuracy)
tavg_junsep_df_accuracy = MulticlassClassificationEvaluator(labelCol="tavg_junsep", metricName="accuracy").evaluate(tavg_junsep_df_predictions)
print("tavg_junsep Accuracy: ", tavg_junsep_df_accuracy)
tavg_octdec_df_accuracy = MulticlassClassificationEvaluator(labelCol="tavg_octdec", metricName="accuracy").evaluate(tavg_octdec_df_predictions)
print("tavg_octdec Accuracy: ", tavg_octdec_df_accuracy)
df_accuracy_janfeb = MulticlassClassificationEvaluator(labelCol="rf_janfeb", metricName="accuracy").evaluate(df_predictions_janfeb)
df_accuracy_marmay = MulticlassClassificationEvaluator(labelCol="rf_marmay", metricName="accuracy").evaluate(df_predictions_marmay)
print("Accuracy: ", df_accuracy_janfeb)
print("Accuracy: ", df_accuracy_marmay)
rf_junsep_accuracy = MulticlassClassificationEvaluator(labelCol="rf_junsep", metricName="accuracy").evaluate(rf_junsep_predictions)
print("Accuracy: ", rf_junsep_accuracy)
rf_octdec_accuracy = MulticlassClassificationEvaluator(labelCol="rf_octdec", metricName="accuracy").evaluate(rf_octdec_predictions)
print("Accuracy: ", rf_octdec_accuracy)

In [None]:
#Displaying precision for decision tree models
tavg_janfeb_dt_precision = MulticlassClassificationEvaluator(labelCol="tavg_janfeb", metricName="weightedPrecision").evaluate(tavg_janfeb_df_predictions)
print("Precision: ",tavg_janfeb_dt_precision)
tavg_marmay_dt_precision = MulticlassClassificationEvaluator(labelCol="tavg_marmay", metricName="weightedPrecision").evaluate(tavg_marmay_df_predictions)
print("Precision: ",tavg_marmay_dt_precision)
tavg_junsep_dt_precision = MulticlassClassificationEvaluator(labelCol="tavg_junsep", metricName="weightedPrecision").evaluate(tavg_junsep_df_predictions)
print("Precision: ", tavg_junsep_dt_precision)
tavg_octdec_dt_precision = MulticlassClassificationEvaluator(labelCol="tavg_octdec", metricName="weightedPrecision").evaluate(tavg_octdec_df_predictions)
print("Precision: ", tavg_octdec_dt_precision)
rf_junsep_dt_precision = MulticlassClassificationEvaluator(labelCol="rf_junsep", metricName="weightedPrecision").evaluate(rf_junsep_predictions)
print("Precision: ", rf_junsep_dt_precision)
dt_precision_janfeb = MulticlassClassificationEvaluator(labelCol="rf_janfeb", metricName="weightedPrecision").evaluate(df_predictions_janfeb)
dt_precision_marmay = MulticlassClassificationEvaluator(labelCol="rf_marmay", metricName="weightedPrecision").evaluate(df_predictions_marmay)
print("Precision: ", dt_precision_janfeb)
print("Precision: ", dt_precision_marmay)
rf_octdec_dt_precision = MulticlassClassificationEvaluator(labelCol="rf_octdec", metricName="weightedPrecision").evaluate(rf_octdec_predictions)
print("Precision: ", rf_octdec_dt_precision)

## Random Forest

In [None]:
#Importing Random forest regressor and its evaluator
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
#Creating random forest regressor models for each targeted column and fitting on their respective training data
Tavg_janfeb = RandomForestRegressor(labelCol="tavg_janfeb")
Tavg_janfeb_model = Tavg_janfeb.fit(tavg_janfeb_training_df) 
Tavg_marmay = RandomForestRegressor(labelCol="tavg_marmay")
Tavg_marmay_model = Tavg_marmay.fit(tavg_marmay_training_df)  
tavg_junsep_rf = RandomForestRegressor(numTrees=5, labelCol="tavg_junsep")
tavg_junsep_model = tavg_junsep_rf.fit(tavg_junsep_training_df)

tavg_octdec_rf = RandomForestRegressor(numTrees=5, labelCol="tavg_octdec")
tavg_octdec_model = tavg_octdec_rf.fit(tavg_octdec_training_df)
rf_janfeb_1 = RandomForestRegressor(numTrees=5, labelCol="rf_janfeb")
rf_marmay_1 = RandomForestRegressor(numTrees=5, labelCol="rf_marmay")

model_janfeb = rf_janfeb_1.fit(training_rf_janfeb)
model_marmay = rf_marmay_1.fit(training_rf_marmay)
rf_junsep = RandomForestRegressor(numTrees=5, labelCol="rf_junsep")
rf_junsep_model = rf_junsep.fit(rf_junsep_training_df)

rf_octdec = RandomForestRegressor(numTrees=5, labelCol="rf_octdec")
rf_junsep_model = rf_octdec.fit(rf_octdec_training_df)

In [None]:
#Applying the test data on trained models
tavg_janfeb_df_predictions = Tavg_janfeb_model.transform(tavg_janfeb_test_df)
tavg_janfeb_df_predictions.show()
tavg_marmay_df_predictions = Tavg_janfeb_model.transform(tavg_marmay_test_df)
tavg_marmay_df_predictions.show()
tavg_junsep_predictions = tavg_junsep_model.transform(tavg_junsep_test_df)
tavg_junsep_predictions.show()
tavg_octdec_predictions = tavg_octdec_model.transform(tavg_octdec_test_df)
tavg_octdec_predictions.show()
predictions_rf_janfeb = model_janfeb.transform(test_rf_janfeb)
predictions_rf_janfeb.show()
predictions_rf_marmay = model_marmay.transform(test_rf_marmay)
predictions_rf_marmay.show()
rf_junsep_predictions = rf_junsep_model.transform(rf_junsep_test_df)
rf_junsep_predictions.show()
rf_octdec_predictions = rf_junsep_model.transform(rf_octdec_test_df)
rf_octdec_predictions.show()

In [None]:
#Displaying RMSE value for each RF model
tavg_janfeb_model_evaluator = RegressionEvaluator(
    labelCol="tavg_janfeb", metricName="rmse")
print("Root Mean Squared Error for tavg_janfeb_model = %g" % tavg_janfeb_model_evaluator.evaluate(tavg_janfeb_df_predictions))
tavg_marmay_model_evaluator = RegressionEvaluator(
    labelCol="tavg_marmay", metricName="rmse")
print("Root Mean Squared Error for tavg_marmay_model = %g" % tavg_marmay_model_evaluator.evaluate(tavg_marmay_df_predictions))
tavg_junsep_model_evaluator = RegressionEvaluator(
    labelCol="tavg_junsep", metricName="rmse")
print("Root Mean Squared Error for tavg_junsep_model = %g" % tavg_junsep_model_evaluator.evaluate(tavg_junsep_predictions))
tavg_octdec_model_evaluator = RegressionEvaluator(
    labelCol="tavg_octdec", metricName="rmse")
print("Root Mean Squared Error for tavg_octdec_model = %g" % tavg_octdec_model_evaluator.evaluate(tavg_octdec_predictions))
rf_janfeb_evaluator = RegressionEvaluator(labelCol="rf_janfeb", metricName="rmse")
print("Root Mean Squared Error for rf_janfeb_evaluator = %g" % rf_janfeb_evaluator.evaluate(predictions_rf_janfeb))
rf_marmay_evaluator = RegressionEvaluator(labelCol="rf_marmay", metricName="rmse")
print("Root Mean Squared Error for rf_marmay_evaluator = %g" % rf_marmay_evaluator.evaluate(predictions_rf_marmay))
rf_junsep_model_evaluator = RegressionEvaluator(
    labelCol="rf_junsep", metricName="rmse")
print("Root Mean Squared Error (RMSE) on rf_junsep_model_evaluator= %g" %rf_junsep_model_evaluator.evaluate(rf_junsep_predictions) )
rf_octdec_model_evaluator = RegressionEvaluator(
    labelCol="rf_octdec", metricName="rmse")
print("Root Mean Squared Error (RMSE) on rf_octdec_model_evaluator= %g" %rf_octdec_model_evaluator.evaluate(rf_octdec_predictions) )