In [19]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel
from sklearn.model_selection import train_test_split
from pyspark.sql.functions import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix, classification_report, precision_recall_curve, \
auc

In [2]:
spark=SparkSession.builder.appName('Australian_rain_prediction').getOrCreate()
sc = spark.sparkContext

22/11/26 17:17:14 WARN Utils: Your hostname, bigdata2022-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
22/11/26 17:17:14 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/11/26 17:17:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df=spark.read.csv('hdfs://localhost:9000/user/bigdata2022/datasets/big_data_project/australian_rain_dataset.csv',inferSchema=True,header=True)

                                                                                

In [4]:
print(df.columns)

['_c0', 'MinTemp', 'MaxTemp', 'Rainfall', 'Evaporation', 'Sunshine', 'WindGustSpeed', 'WindSpeed9am', 'WindSpeed3pm', 'Humidity9am', 'Humidity3pm', 'Pressure9am', 'Pressure3pm', 'RainToday', 'RainTomorrow', 'Month', 'WindGustDir_N', 'WindGustDir_S', 'WindGustDir_W', 'WindDir3pm_N', 'WindDir3pm_S', 'WindDir3pm_W']


In [5]:
df_1 = df.drop('RainTomorrow')

In [6]:
assembler = VectorAssembler(inputCols=df_1.columns, outputCol="features_to_scale")

In [7]:
scaler = MinMaxScaler(inputCol="features_to_scale", outputCol="features_scaled")

In [8]:
model = GBTClassifier(labelCol="RainTomorrow", featuresCol="features_scaled", maxIter=10) 

In [9]:
pipeline = Pipeline(stages=[assembler, scaler, model])

In [10]:
training_data, test_data = df.randomSplit([0.75, 0.25])

In [51]:
paramGrid = ParamGridBuilder().addGrid(model.stepSize, [0.01, 0.1, 0.5, 1.0])\
                              .addGrid(model.minWeightFractionPerNode, [0.01, 0.1, 0.25, 0.45])\
                              .addGrid(model.minWeightFractionPerNode, [0.01, 0.1, 0.25, 0.45])\
                              .addGrid(model.maxDepth, [3, 4, 5, 6, 7, 8, 9, 10, 11, 12])\
                              .addGrid(model.subsamplingRate, [0.01, 0.1, 0.5, 1.0]).build() 

In [52]:
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=5)

In [53]:
cvModel = crossval.fit(training_data)
#pipeline_model = pipeline.fit(training_data)

22/11/26 18:10:44 WARN CacheManager: Asked to cache already cached data.
22/11/26 18:10:44 WARN CacheManager: Asked to cache already cached data.
22/11/26 18:10:45 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , MinTemp, MaxTemp, Rainfall, Evaporation, Sunshine, WindGustSpeed, WindSpeed9am, WindSpeed3pm, Humidity9am, Humidity3pm, Pressure9am, Pressure3pm, RainToday, RainTomorrow, Month, WindGustDir_N, WindGustDir_S, WindGustDir_W, WindDir3pm_N, WindDir3pm_S, WindDir3pm_W
 Schema: _c0, MinTemp, MaxTemp, Rainfall, Evaporation, Sunshine, WindGustSpeed, WindSpeed9am, WindSpeed3pm, Humidity9am, Humidity3pm, Pressure9am, Pressure3pm, RainToday, RainTomorrow, Month, WindGustDir_N, WindGustDir_S, WindGustDir_W, WindDir3pm_N, WindDir3pm_S, WindDir3pm_W
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/user/bigdata2022/datasets/big_data_project/australian_rain_dataset.csv
                                                                                

IllegalArgumentException: label does not exist. Available: _c0, MinTemp, MaxTemp, Rainfall, Evaporation, Sunshine, WindGustSpeed, WindSpeed9am, WindSpeed3pm, Humidity9am, Humidity3pm, Pressure9am, Pressure3pm, RainToday, RainTomorrow, Month, WindGustDir_N, WindGustDir_S, WindGustDir_W, WindDir3pm_N, WindDir3pm_S, WindDir3pm_W, CrossValidator_24dc94588ab1_rand, features_to_scale, features_scaled, rawPrediction, probability, prediction

                                                                                

In [12]:
predictions = pipeline_model.transform(test_data)

In [13]:
predictions.select("prediction", "RainTomorrow").show(100)

22/11/26 17:17:49 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , MinTemp, MaxTemp, Rainfall, Evaporation, Sunshine, WindGustSpeed, WindSpeed9am, WindSpeed3pm, Humidity9am, Humidity3pm, Pressure9am, Pressure3pm, RainToday, RainTomorrow, Month, WindGustDir_N, WindGustDir_S, WindGustDir_W, WindDir3pm_N, WindDir3pm_S, WindDir3pm_W
 Schema: _c0, MinTemp, MaxTemp, Rainfall, Evaporation, Sunshine, WindGustSpeed, WindSpeed9am, WindSpeed3pm, Humidity9am, Humidity3pm, Pressure9am, Pressure3pm, RainToday, RainTomorrow, Month, WindGustDir_N, WindGustDir_S, WindGustDir_W, WindDir3pm_N, WindDir3pm_S, WindDir3pm_W
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/user/bigdata2022/datasets/big_data_project/australian_rain_dataset.csv


+----------+------------+
|prediction|RainTomorrow|
+----------+------------+
|       0.0|           0|
|       0.0|           0|
|       0.0|           0|
|       0.0|           1|
|       0.0|           0|
|       0.0|           0|
|       0.0|           0|
|       0.0|           1|
|       0.0|           0|
|       0.0|           0|
|       0.0|           0|
|       0.0|           0|
|       0.0|           0|
|       0.0|           0|
|       0.0|           0|
|       0.0|           1|
|       0.0|           0|
|       0.0|           0|
|       0.0|           0|
|       0.0|           0|
|       0.0|           0|
|       0.0|           0|
|       0.0|           0|
|       0.0|           0|
|       0.0|           1|
|       1.0|           1|
|       0.0|           0|
|       0.0|           0|
|       0.0|           0|
|       0.0|           0|
|       0.0|           0|
|       0.0|           1|
|       0.0|           0|
|       0.0|           0|
|       0.0|           1|
|       0.0|

22/11/26 17:17:49 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/11/26 17:17:49 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


In [14]:
t_test=predictions.select('RainTomorrow').toPandas()
t_hat=predictions.select('prediction').toPandas()

22/11/26 17:17:50 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , MinTemp, MaxTemp, Rainfall, Evaporation, Sunshine, WindGustSpeed, WindSpeed9am, WindSpeed3pm, Humidity9am, Humidity3pm, Pressure9am, Pressure3pm, RainToday, RainTomorrow, Month, WindGustDir_N, WindGustDir_S, WindGustDir_W, WindDir3pm_N, WindDir3pm_S, WindDir3pm_W
 Schema: _c0, MinTemp, MaxTemp, Rainfall, Evaporation, Sunshine, WindGustSpeed, WindSpeed9am, WindSpeed3pm, Humidity9am, Humidity3pm, Pressure9am, Pressure3pm, RainToday, RainTomorrow, Month, WindGustDir_N, WindGustDir_S, WindGustDir_W, WindDir3pm_N, WindDir3pm_S, WindDir3pm_W
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/user/bigdata2022/datasets/big_data_project/australian_rain_dataset.csv
22/11/26 17:17:51 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , MinTemp, MaxTemp, Rainfall, Evaporation, Sunshine, WindGustSpeed, WindSpeed9am, WindSpeed3pm, Humidity9am, Humidity3pm, Pressure9am, Pressu

In [15]:
print("\nAccuracy score on the test set: ", accuracy_score(t_test, t_hat))


Accuracy score on the test set:  0.8616399399636622
