# Data Preparation

## Creazione features Region, Month, Season


In [None]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 45 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 63.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=3a3da56a2f7d6f2afb05b99ed5d9de8a22ba194c317b43f21c8c724106bcf887
  Stored in directory: /root/.cache/pip/wheels/42/59/f5/79a5bf931714dcd201b26025347785f087370a10a3329a899c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


In [None]:
from pyspark.sql.functions import when, col, isnan, count,  regexp_replace, udf
import numpy as np
import pyspark
from pyspark.sql.types import FloatType, DoubleType
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler,MinMaxScaler, RobustScaler, StandardScaler, MaxAbsScaler, PCA
import pyspark.sql.functions as F
from pyspark.ml.clustering import KMeans, BisectingKMeansModel, KMeansSummary
from  pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import ClusteringEvaluator
from matplotlib.pyplot import figure
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from yellowbrick.cluster import KElbowVisualizer
import matplotlib.pyplot as plt
from pyspark.sql import SQLContext
from pyspark.ml.feature import StringIndexer, VectorIndexer, StandardScaler, VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator, ClusteringEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics, BinaryClassificationMetrics
from pyspark.sql.functions import monotonically_increasing_id 
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
from pyspark import SparkContext
# initialize a new Spark Context to use for the execution of the script
sc = SparkContext(appName="MY-APP-NAME", master="local[*]")

In [None]:
from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)



In [None]:
# load the dataset

rain_path = 'drive/MyDrive/DDAM/Australia Rain/australia_rain_tomorrow_raw.csv'

In [None]:
# carico il dataframe
df = sqlCtx.read.load(rain_path, format="csv", sep=",", inferSchema="true", header="true")

AnalysisException: ignored

In [None]:
# Creazione colonna Month
df = df.withColumn("Month", F.regexp_extract(col("Date"), r'(\d{1,2})/(\d{1,2})/(\d{4})', 2))

# Creazione colonna Season
df = df.withColumn("Season", .when((df.Month == 12.0) & (df.Month <=2.0),"Winter")\
                               .when((df.Month > 2.0) & (df.Month <=5.0),"Spring") \
                               .when((df.Month > 5.0) & (df.Month <=8.0),"Summer")
                                .otherwise("Fall"))

In [None]:
print("**************Statistiche per variabile Season ********************")
df.createOrReplaceTempView("tmp")
tot = df.count()
query = sqlCtx.sql("SELECT Season, count(*)/"+str(tot)+" * 100 as percentage FROM tmp GROUP BY Season ORDER BY percentage DESC")
query.show()

In [None]:
print("**************Statistiche per variabile Month ********************")
tot = df.count()
query = sqlCtx.sql("SELECT Month, count(*)/"+str(tot)+" * 100 as percentage FROM tmp GROUP BY Month ORDER BY percentage DESC")
query.show()

In [None]:
query = sqlCtx.sql("SELECT Month, RainToday, count(*) as cnt FROM tmp GROUP BY Month, RainToday ORDER BY cnt DESC")
query.show(24)

In [None]:
query = sqlCtx.sql("SELECT Month, location, count(*) as cnt FROM tmp GROUP BY Month, location ORDER BY location, Month DESC")
query.show(24)

In [None]:
query = sqlCtx.sql("SELECT Date, location FROM tmp GROUP BY Date, location ORDER BY Date")
query.show(24)

In [None]:
query = sqlCtx.sql("SELECT Month, RainTomorrow, count(*) as cnt FROM tmp GROUP BY Month, RainTomorrow ORDER BY cnt DESC")
query.show(25)

In [None]:
query = sqlCtx.sql("SELECT Season, RainTomorrow, count(*) as cnt FROM tmp GROUP BY Season, RainTomorrow ORDER BY cnt DESC")
query.show()

In [None]:
query = sqlCtx.sql("SELECT Season, RainToday, count(*) as cnt FROM tmp GROUP BY Season, RainToday ORDER BY cnt DESC")
query.show()

In [None]:
df.show()

In [None]:
# Creazione colonna Region

# Percorso file con associazioni città-regione
filename_cities = 'drive/MyDrive/DDAM/Australia Rain/Mario/cities_australia.csv'

# Creazione rdd (problemi usando il dataframe)
cities_rdd = sc.textFile(filename_cities)
cities_rdd.take(3)

In [None]:
# create a dictionary (k: location, v: region)

cities_rdd_dict = cities_rdd.map(lambda line: line.strip().split(";")).collectAsMap()

cities_rdd_dict['Sydney']

In [None]:
func_name = udf(
    lambda val: cities_rdd_dict[val], 
    StringType()
)

df = df.withColumn('Region', func_name(df.Location))

In [None]:
df.show(10)

In [None]:
df.groupBy('Region').count().show()

In [None]:
print("**************Statistiche per variabile Region ********************")
df.createOrReplaceTempView("tmp")
tot = df.count()
query = sqlCtx.sql("SELECT Region, count(*)/"+str(tot)+" * 100 as percentage FROM tmp GROUP BY Region ORDER BY percentage DESC")
query.show()

## Tentativi di sostituzione missing values (groupBy)

In [None]:
# Sostituisco 'NA' con null values 
df = df.replace('NA', None)
df.show(5)

In [None]:
# Colonne con più null values

print('Totale osservazioni: 142193')
print()

Dict_Null = {col:df.filter(df[col].isNull()).count() for col in df.columns}
Dict_Null = sorted(Dict_Null.items(), key=lambda x: x[1], reverse=True)

for i in Dict_Null:
	print(i[0], i[1])

In [None]:
# Analisi dei null values sulle colonne con più valori mancanti, raggruppati per città

from pyspark.ml.stat import Correlation

cols = ["Evaporation", "Sunshine", "Cloud3pm", "Cloud9am"]

for c in cols:
    x = df.filter(df[c].isNull()).groupby(df.Location).count()

    print('Null values for column {}'.format(c),'(grouped by city)')
    x.sort("count", ascending=False).show(10)  # con print dà un None di troppo

In [None]:
# Analisi dei null values sulle colonne con più valori mancanti, raggruppati per RainTomorrow

cols = ["Evaporation", "Sunshine", "Cloud3pm", "Cloud9am"]

for c in cols:
    x = df.filter(df[c].isNull()).groupby(df.RainTomorrow).count()

    print('Null values for column {}'.format(c),'(grouped by RainTomorrow)')
    x.sort("count", ascending=False).show(10)

In [None]:
# Analisi dei null values sulle colonne con più valori mancanti, raggruppati per RainToday
# L'attributo RainToday, a differenza dei precedenti, presenta null values

cols = ["Evaporation", "Sunshine", "Cloud3pm", "Cloud9am"]

for c in cols:
    x = df.filter(df[c].isNull()).groupby(df.RainToday).count()

    print('Null values for column {}'.format(c),'(grouped by RainToday)')
    x.sort("count", ascending=False).show(10) 

In [None]:
# Visualizzo la distribuzione di Yes e No in RainToday e RainTomorrow

df.groupBy("RainToday").count().show()
df.groupBy("RainTomorrow").count().show()

In [None]:
# Media dell'attributo Evaporation (groupby RainToday e RainTomorrow)

df.groupBy("RainToday").agg({'Evaporation':'avg'}).show()
df.groupBy("RainTomorrow").agg({'Evaporation':'avg'}).show()

In [None]:
# Media dell'attributo Sunshine (groupby RainToday e RainTomorrow)

df.groupBy("RainToday").agg({'Sunshine':'avg'}).show()
df.groupBy("RainTomorrow").agg({'Sunshine':'avg'}).show()

In [None]:
# Media dell'attributo Cloud9am (groupby RainToday e RainTomorrow)

df.groupBy("RainToday").agg({'Cloud9am':'avg'}).show()
df.groupBy("RainTomorrow").agg({'Cloud9am':'avg'}).show()

In [None]:
# Media dell'attributo Cloud3pm (groupby RainToday e RainTomorrow)

df.groupBy("RainToday").agg({'Cloud3pm':'avg'}).show()
df.groupBy("RainTomorrow").agg({'Cloud3pm':'avg'}).show()

#### GroupBy Location

In [None]:
# Media dell'attributo Evaporation 
# groupby Sunshine, ossia l'attributo con la correlazione più forte con Evaporation
# groupby Location

df.groupBy("Location","Sunshine").agg({'Evaporation':'avg'}).sort('Location').show(5)

In [None]:
# Media dell'attributo Sunshine 
# groupby Evaporation, ossia l'attributo con la correlazione più forte con Sunshine
# groupby Location

df.groupBy("Location","Evaporation").agg({'Sunshine':'avg'}).sort('Location').show(5)

In [None]:
# Media dell'attributo Cloud9am 
# groupby Humidity3pm, ossia l'attributo con la correlazione più forte con Cloud9am
# groupby Location

df.groupBy("Location","Humidity3pm").agg({'Cloud9am':'avg'}).sort('Location', ascending=True).show(5)

In [None]:
# Media dell'attributo Cloud3pm 
# groupby Humidity3pm, ossia l'attributo con la correlazione più forte con Cloud3pm
# groupby Location

df.groupBy("Location","Humidity3pm").agg({'Cloud3pm':'avg'}).sort('Location', ascending=True).show(5)

Per diverse città è impossibile fare la sostituzione dei null values con il groubby perché presentano troppi null values. Raggruppiamo quindi per l'attributo con la correlazione più forte e per periodo dell'anno.

#### GroupBy Month

In [None]:
df.groupBy("Month").count().sort("Month").show()

In [None]:
# Media dell'attributo Evaporation 
# groupby Sunshine, ossia l'attributo con la correlazione più forte con Evaporation
# groupby Month

df.groupBy("Month","Sunshine").agg({'Evaporation':'avg'}).sort('Month').show(5)

In [None]:
# Media dell'attributo Sunshine 
# groupby Evaporation, ossia l'attributo con la correlazione più forte con Sunshine
# groupby Date

df.groupBy("Month","Evaporation").agg({'Sunshine':'avg'}).sort('Month').show(5)

In [None]:
# Media dell'attributo Cloud9am 
# groupby Humidity3pm, ossia l'attributo con la correlazione più forte con Cloud9am
# groupby Month

df.groupBy("Month","Humidity3pm").agg({'Cloud9am':'avg'}).sort('Month').show(5)

In [None]:
# Media dell'attributo Cloud3pm 
# groupby Humidity3pm, ossia l'attributo con la correlazione più forte con Cloud3pm
# groupby Month

df.groupBy("Month","Humidity3pm").agg({'Cloud3pm':'avg'}).sort('Month').show(5)

## Sostituzione missing values con mediana

In [None]:
df = df.fillna('NA')
df.show(5)

In [None]:
cols_to_impute = ["Evaporation", "Sunshine", "Pressure9am", "Pressure3pm", "Cloud9am", "Cloud3pm"]

In [None]:
# verifichiamo se a seconda di quando sta per piovere o se sta piovendo consecutivamente
# i valori possano seguire distribuzioni diverse
df = df.withColumn( 'it-will-rain' , when( (df.RainToday == 'No') & ( df.RainTomorrow == 'Yes'), 'Yes' ).otherwise('No') )   # oggi non piove e domani si
df = df.withColumn( 'no-rain' , when( (df.RainToday == 'No') & ( df.RainTomorrow == 'No'), 'Yes' ).otherwise('No') )         # oggi non piove e neanche domani
df = df.withColumn( 'it-rain' , when( (df.RainToday == 'Yes') & ( df.RainTomorrow == 'Yes'), 'Yes' ).otherwise('No') )       # oggi piove e pure domani
df = df.withColumn( 'end-rain' , when( (df.RainToday == 'Yes') & ( df.RainTomorrow == 'No'), 'Yes' ).otherwise('No') )       # oggi non piove e neanche domani

df.show(5)

In [None]:
df_no_rain = df.where(df['no-rain']== 'Yes')
df_will_rain = df.where(df['it-will-rain'] == 'Yes')
df_it_rain = df.where(df['it-rain'] == 'Yes')
df_end_rain = df.where(df['end-rain'] == 'Yes')

In [None]:
# questa funzione mi serve per prendere un campione dell'attributo scelto
# che verra ritornato come array di numpy
def get_sample_(df, attr):
  df_filtered=df.filter(col(attr) != 'NA')
  sample = np.array( df_filtered.select(col(attr)).rdd.takeSample(False, 3000, seed= 42) ).flatten()
  return sample

In [None]:
def check_distribution(attr):
  # ottengo un campione dei dati dell'attributo specificato
  sample_no_rain = get_sample_(df_no_rain, attr).astype(np.float)
  sample_will_rain = get_sample_(df_will_rain, attr).astype(np.float)
  sample_rain= get_sample_(df_it_rain, attr).astype(np.float)
  sample_end_rain = get_sample_(df_end_rain, attr).astype(np.float)

  # faccio un test per vedere se seguono la stessa distribuzione
  from scipy.stats import median_test

  alfa = 0.01    # alfa e il livello di confidenza con il quale vogliamo essere sicuri se accettare o 
                # rifiutare l'ipotesi nulla

  print("Median Test for attribute <"+ attr + ">")
  stat, p, med, tbl = median_test(sample_no_rain, sample_will_rain)
  print('   no-rain and will-rain ',p < alfa)

  stat, p, med, tbl = median_test(sample_will_rain, sample_rain)
  print('   will-rain and it-rain ',p < alfa)

  stat, p, med, tbl = median_test(sample_rain, sample_end_rain)
  print('   it-rain and end-rain', p < alfa)
  print("")

In [None]:
for attr in cols_to_impute:
  check_distribution(attr)

In [None]:
def get_median(df, attr):
  # pulisco dai NA values
  df_filtered= df.filter(col(attr) != 'NA')
  # converto la colonna in float
  df_float = df_filtered.withColumn(attr, col(attr).cast(FloatType()))
  return df_float.approxQuantile(attr, [0.5], 0.25)  # ritorna la mediana dell'attributo scelto


def get_median_(df, attr, dict_median):
  # pulisco dai NA values
  df_filtered= df.filter(col(attr) != 'NA')
  # converto la colonna in float
  df_float = df_filtered.withColumn(attr, col(attr).cast(FloatType()))
  median = df_float.approxQuantile(attr, [0.5], 0.25)
  dict_median[attr] = median
  return dict_median 

In [None]:
# sostituiamo i valori mancanti con la mediana
dict_median = {}
for attr in cols_to_impute:
  if attr!="Evaporation":
    dict_median = get_median_(df,attr, dict_median)
    median_attr = str(dict_median[attr][0])
    df = df.withColumn(attr, regexp_replace(attr, 'NA', median_attr)).withColumn(attr, col(attr).cast(FloatType()))

Poiché no-rain, will-rain e it-rain hanno dei p-value molto piccoli possiamo assumere che abbiano la stessa mediana, ma per quanto riguarda it-rain e end-rain, non seguono la stessa mediana

In [None]:
# imputiamo quindi i valori, ricordandoci che siccome
# no-rain, will-rain e it-rain hanno la stessa mediana, per loro 
# sara identica, mentre lo stesso non avverra per end-rain

median_rain = get_median(df.where(col('end-rain') == 'No' ), 'Evaporation')
median_rain= str(median_rain[0])

median_noRain = get_median(df.where(col('end-rain') == 'Yes'), 'Evaporation')
median_noRain= str(median_noRain[0])

# sostituiamo i valori mancanti con la mediana
df = df.withColumn('Evaporation', when( (col('end-rain') == 'No') & (col('Evaporation') == 'NA') , regexp_replace('Evaporation', 'NA', median_rain))\
                            .otherwise( when(   (col('end-rain') == 'Yes') & (col('Evaporation') == 'NA') , regexp_replace('Evaporation', 'NA', median_noRain))\
                            .otherwise( col('Evaporation')))).withColumn('Evaporation', col('Evaporation').cast(FloatType()))

In [None]:
for attr in cols_to_impute:
  df.select([count(when(isnan(attr),True))]).show()

In [None]:
df = df.drop("it-will-rain", "no-rain", "it-rain", "end-rain")
df.show(10)

In [None]:
cat_cols = ["Date", "Location", "WindGustDir", "WindDir9am", "WindDir3pm",
            "RainTomorrow", "RainToday", "Evaporation", "Sunshine", "Pressure9am",
            "Pressure3pm", "Cloud9am", "Cloud3pm", 'Month', 'Season', 'Region']

col_to_float = [i for i in df.columns if i not in cat_cols]

In [None]:
col_to_float

In [None]:
for attr in col_to_float:
  df = df.withColumn(attr, col(attr).cast(FloatType()))

In [None]:
df.show()

In [None]:
df

In [None]:
# Salvataggio nuovo dataframe 'rain_tomorrow_australia'

df.toPandas().to_csv('drive/MyDrive/DDAM/Australia Rain/rain_tomorrow_australia_PROVA.csv', header=True)

## Confronto tra mediana e regressione per la sostituzione dei missing values

In [None]:
# Caricamento nuovo dataset senza missing values

filename = 'drive/MyDrive/DDAM/Australia Rain/rain_tomorrow_australia.csv'
df_median = sqlCtx.read.load(filename, format="csv", sep=",", inferSchema="true", header="true")

# Caricamento dataset originale

filename2 = 'drive/MyDrive/DDAM/Australia Rain/australia_rain_tomorrow_raw.csv'
df_original = sqlCtx.read.load(filename2, format="csv", sep=",", inferSchema="true", header="true")

Prendo un campione di osservazioni del dataframe. Per capire quali righe non avevano missing values utilizzo direttamente il vecchio dataframe.

In [None]:
# Creo una colonna id nel dataframe originale
df_original = df_original.select("*").withColumn("_c0", monotonically_increasing_id())

# Riordino le colonne
df_original = df_original.select('_c0','Date','Location','MinTemp','MaxTemp','Rainfall','Evaporation','Sunshine',
 'WindGustDir','WindGustSpeed','WindDir9am','WindDir3pm','WindSpeed9am','WindSpeed3pm','Humidity9am',
 'Humidity3pm','Pressure9am','Pressure3pm','Cloud9am','Cloud3pm','Temp9am','Temp3pm','RainToday','RISK_MM',
 'RainTomorrow') 

Nel nuovo dataframe sono stati sostituiti i missing values per le colonne Evaporation, Sunshine, Cloud9am, Cloud3pm, Pressure9am, Pressure3pm. 

Per fare la predizione con il vecchio dataframe devo escludere tutte queste colonne (se i valori sono stati sostituiti è perché erano missing values -> impossibile fare regressione).

Tengo tuttavia tutti i valori di Sunshine, per cui devo fare la predizione.

In [None]:
cols = ['_c0','Sunshine','MinTemp','MaxTemp','Rainfall','WindGustSpeed','WindSpeed9am',
        'WindSpeed3pm','Humidity9am','Humidity3pm','Temp3pm']

df_original = df_original.select(*cols)
df_median = df_median.select(*cols)

In [None]:
# Rimuovo osservazioni con missing values (ad esclusione di Sunshine) dal df originale

df_original = df_original.replace('NA', None)
df_original = df_original.na.drop(subset=['_c0','MinTemp','MaxTemp','Rainfall','WindGustSpeed','WindSpeed9am','WindSpeed3pm','Humidity9am','Humidity3pm','Temp3pm'])

In [None]:
print('Totale osservazioni:', df_original.count())
print()
print('Null values per column:')

Dict_Null = {col:df_original.filter(df_original[col].isNull()).count() for col in df_original.columns}
Dict_Null = sorted(Dict_Null.items(), key=lambda x: x[1], reverse=True)

for i in Dict_Null:
	print(i[0], i[1])

In [None]:
# Del nuovo dataset tengo solo le righe che nel vecchio dataset non avevano null values (escluso Sunshine)
# Faccio una join sulla colonna id

df_median = df_median.join(df_original, df_median._c0 == df_original._c0, "semi")

df_median.join(df_original, df_median._c0 == df_original._c0, 'semi').show(5)
df_median.join(df_original, df_median._c0 == df_original._c0, 'semi').count()

df_median.filter(df_median['Sunshine'].isNull()).count()

print('N° osservazioni rimaste', df_median.count())

#### Regressione per Sunshine

Faccio la regressione per Sunshine. Ottengo una colonna di predizioni e la confronto con la colonna Sunshine del nuovo dataset.

In [None]:
# Seleziono colonne
df_regression = df_median

# Trasformo in float
df_regression = df_regression.select(*(col(c).cast("float").alias(c) for c in df_regression.columns))

In [None]:
df_regression.printSchema()

In [None]:
# Creo il vettore di valori

in_cols= [col for col in df_regression.columns if col != 'Sunshine']

vecAssembler = VectorAssembler(inputCols=in_cols, outputCol="features")
vec_df = vecAssembler.transform(df_regression)
vec_df = vec_df.select(['features', 'Sunshine'])
vec_df.show(3)

In [None]:
vec_df.printSchema()

In [None]:
# Creo modello e ottengo le predizioni

lr = LinearRegression(featuresCol = 'features', labelCol='Sunshine')
lr_model = lr.fit(vec_df)
lr_predictions = lr_model.transform(vec_df)

print('N° predizioni:',lr_predictions.count())

In [None]:
lr_predictions.show(5)
lr_predictions.printSchema()

#### Evaluation

Ora abbiamo la colonna Sunshine e la colonna con le predizioni. Misuriamo quanto sono diverse.

In [None]:
lr_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="Sunshine",metricName="r2")
lr_evaluator2 = RegressionEvaluator(predictionCol="prediction", labelCol="Sunshine",metricName="rmse")

print("R Squared = %g" % lr_evaluator.evaluate(lr_predictions))
print("RMSE = %g" % lr_evaluator2.evaluate(lr_predictions))

In [None]:
# Aggiungo la colonna differenza (in valore assoluto)
from  pyspark.sql.functions import abs

lr_predictions = lr_predictions.withColumn('difference', abs(lr_predictions.Sunshine - lr_predictions.prediction))

lr_predictions.show(30)

In [None]:
lr_predictions.select(lr_predictions['difference']).summary().show()