In [2]:
## Import the  pyspark libraries adn create the pyspark session
import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('Rainfall-predictions-ML-project').getOrCreate()
spark


In [3]:
## Read the train set
df_train_set=spark.read.csv('train.csv',header=True,inferSchema=True)
df_train_set.show()

+---+---+--------+-------+-----------+-------+--------+--------+-----+--------+-------------+---------+--------+
| id|day|pressure|maxtemp|temparature|mintemp|dewpoint|humidity|cloud|sunshine|winddirection|windspeed|rainfall|
+---+---+--------+-------+-----------+-------+--------+--------+-----+--------+-------------+---------+--------+
|  0|  1|  1017.4|   21.2|       20.6|   19.9|    19.4|    87.0| 88.0|     1.1|         60.0|     17.2|       1|
|  1|  2|  1019.5|   16.2|       16.9|   15.8|    15.4|    95.0| 91.0|     0.0|         50.0|     21.9|       1|
|  2|  3|  1024.1|   19.4|       16.1|   14.6|     9.3|    75.0| 47.0|     8.3|         70.0|     18.1|       1|
|  3|  4|  1013.4|   18.1|       17.8|   16.9|    16.8|    95.0| 95.0|     0.0|         60.0|     35.6|       1|
|  4|  5|  1021.8|   21.3|       18.4|   15.2|     9.6|    52.0| 45.0|     3.6|         40.0|     24.8|       0|
|  5|  6|  1022.7|   20.6|       18.6|   16.5|    12.5|    79.0| 81.0|     0.0|         20.0|   

In [4]:
## check the data types of each column in dataset
df_train_set.printSchema()

root
 |-- id: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- pressure: double (nullable = true)
 |-- maxtemp: double (nullable = true)
 |-- temparature: double (nullable = true)
 |-- mintemp: double (nullable = true)
 |-- dewpoint: double (nullable = true)
 |-- humidity: double (nullable = true)
 |-- cloud: double (nullable = true)
 |-- sunshine: double (nullable = true)
 |-- winddirection: double (nullable = true)
 |-- windspeed: double (nullable = true)
 |-- rainfall: integer (nullable = true)



In [5]:
## check the column names
df_train_set.columns

['id',
 'day',
 'pressure',
 'maxtemp',
 'temparature',
 'mintemp',
 'dewpoint',
 'humidity',
 'cloud',
 'sunshine',
 'winddirection',
 'windspeed',
 'rainfall']

## Data Preprocessing

In [6]:
## chech the null values
from pyspark.sql.functions import col,isnan, when, count
df_train_set.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_train_set.columns]
   ).show()


+---+---+--------+-------+-----------+-------+--------+--------+-----+--------+-------------+---------+--------+
| id|day|pressure|maxtemp|temparature|mintemp|dewpoint|humidity|cloud|sunshine|winddirection|windspeed|rainfall|
+---+---+--------+-------+-----------+-------+--------+--------+-----+--------+-------------+---------+--------+
|  0|  0|       0|      0|          0|      0|       0|       0|    0|       0|            0|        0|       0|
+---+---+--------+-------+-----------+-------+--------+--------+-----+--------+-------------+---------+--------+



In [14]:
## seperate the independent features and target lables
independent_features=df_train_set.select(['id','day','pressure','maxtemp','temparature','mintemp','dewpoint','humidity','cloud','sunshine','winddirection','windspeed'])
target_feature=df_train_set.select(['rainfall'])
independent_features.show(),target_feature.show()

+---+---+--------+-------+-----------+-------+--------+--------+-----+--------+-------------+---------+
| id|day|pressure|maxtemp|temparature|mintemp|dewpoint|humidity|cloud|sunshine|winddirection|windspeed|
+---+---+--------+-------+-----------+-------+--------+--------+-----+--------+-------------+---------+
|  0|  1|  1017.4|   21.2|       20.6|   19.9|    19.4|    87.0| 88.0|     1.1|         60.0|     17.2|
|  1|  2|  1019.5|   16.2|       16.9|   15.8|    15.4|    95.0| 91.0|     0.0|         50.0|     21.9|
|  2|  3|  1024.1|   19.4|       16.1|   14.6|     9.3|    75.0| 47.0|     8.3|         70.0|     18.1|
|  3|  4|  1013.4|   18.1|       17.8|   16.9|    16.8|    95.0| 95.0|     0.0|         60.0|     35.6|
|  4|  5|  1021.8|   21.3|       18.4|   15.2|     9.6|    52.0| 45.0|     3.6|         40.0|     24.8|
|  5|  6|  1022.7|   20.6|       18.6|   16.5|    12.5|    79.0| 81.0|     0.0|         20.0|     15.7|
|  6|  7|  1022.8|   19.5|       18.4|   15.3|    11.3|    56.0|

(None, None)

In [None]:
df_train_set.show(100)

+---+---+--------+-------+-----------+-------+--------+--------+-----+--------+-------------+---------+--------+
| id|day|pressure|maxtemp|temparature|mintemp|dewpoint|humidity|cloud|sunshine|winddirection|windspeed|rainfall|
+---+---+--------+-------+-----------+-------+--------+--------+-----+--------+-------------+---------+--------+
|  0|  1|  1017.4|   21.2|       20.6|   19.9|    19.4|    87.0| 88.0|     1.1|         60.0|     17.2|       1|
|  1|  2|  1019.5|   16.2|       16.9|   15.8|    15.4|    95.0| 91.0|     0.0|         50.0|     21.9|       1|
|  2|  3|  1024.1|   19.4|       16.1|   14.6|     9.3|    75.0| 47.0|     8.3|         70.0|     18.1|       1|
|  3|  4|  1013.4|   18.1|       17.8|   16.9|    16.8|    95.0| 95.0|     0.0|         60.0|     35.6|       1|
|  4|  5|  1021.8|   21.3|       18.4|   15.2|     9.6|    52.0| 45.0|     3.6|         40.0|     24.8|       0|
|  5|  6|  1022.7|   20.6|       18.6|   16.5|    12.5|    79.0| 81.0|     0.0|         20.0|   

In [None]:
## scale the dataset usinf standard scaling
from pyspark.ml.feature import StandardScaler
scaler=StandardScaler(inputCol='features', outputCol='scaled_features',withStd=True, withMean=False)
scaler_model=scaler.fit(df_train_set)
scaled_train_data = scaler_model.transform(df_train_set)

IllegalArgumentException: features does not exist. Available: id, day, pressure, maxtemp, temparature, mintemp, dewpoint, humidity, cloud, sunshine, winddirection, windspeed, rainfall

## Explotary Data Analysis