In [1]:
import pandas as pd
import numpy as np
import databricks.koalas as ks
from pyspark.sql import SparkSession
from datetime import datetime



In [2]:
%set_env JAVA_HOME /usr/lib/jvm/java-1.8.0-openjdk-amd64/

env: JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-amd64/


## YouGov - Wearing Mask in public

In [3]:
start = datetime.now()


##Chargement dataset
df = ks.read_csv(
    "./data/wearing_face_mask_public.csv",
    sep=";"
)

##Transformation du dataset = 1 ligne par date/pays
format = '%Y-%m-%d %H:%M:%S'
df['DateTime'] = ks.to_datetime(df['DateTime'], format=format)
df['DateTime'] = df['DateTime'].dt.normalize()


#### 1er changement : autoriser les opérations sur 2 dataframes différents (ks.set_option('compute.ops_on_diff_frames', True) 
#### ou faire un groupby sur la colonne (comportement légèrement différent de pandas car la colonne de group_by devient un index et disparait de la projection)
# df = df.sort_values('DateTime').groupby(df['DateTime']).max()
df = df.sort_values('DateTime').groupby(['DateTime'], as_index=False).max()
# df = df.set_index(pd.DatetimeIndex(df['DateTime'])).drop(['DateTime'], axis=1)
df = df.set_index('DateTime')

#### 2e changement : The method `pd.DataFrame.resample()` is not implemented yet. (en cours d'étude : https://github.com/databricks/koalas/issues/1562)
#### on est obligé de partir sur Spark directement dans ce cas ou alors de passer par pandas ...
df = df.to_pandas()
wearing_mask_in_public_data = df.resample('1D').pad()

#### Retours au dataframe Koalas
wearing_mask_in_public_data = ks.from_pandas(wearing_mask_in_public_data)
wearing_mask_in_public_data = wearing_mask_in_public_data.fillna(0)
wearing_mask_in_public_data = wearing_mask_in_public_data.reset_index().melt(
                                id_vars=['DateTime'], 
                                var_name='country', 
                                value_name='percent_wearing_mask')

print(f"Le dataset contient {len(df)} enregistrements")

print("Sample dataset final:")
##### 3e changement : Function sample currently does not support specifying exact number of items to return. Use frac instead.
print(wearing_mask_in_public_data.head(5))

stop = datetime.now()

print("Temps de chargement et tranformation petit dataset : ", (stop-start).microseconds/1000, "ms")


Le dataset contient 192 enregistrements
Sample dataset final:
    DateTime    country  percent_wearing_mask
0 2020-02-21  Australia                   0.0
1 2020-02-21     Canada                   0.0
2 2020-02-21      China                   0.0
3 2020-02-21    Denmark                   0.0
4 2020-02-21    Finland                   0.0
Temps de chargement et tranformation petit dataset :  654.176 ms


## Google - Covid 19 Open Data

In [4]:
start = datetime.now()

#Chargement dataset
covid19_opendata = ks.read_csv(
    "./data/latest.csv",
    keep_default_na=False,
    na_values=[""])


# Jointure entre open data covid 19 et yougo
format = '%Y-%m-%d %H:%M:%S'
##### 4e changement : la transformation de datetime en date ne se fait pas directement
covid19_opendata['date'] = ks.to_datetime(covid19_opendata['date'], format=format)
covid19_opendata['date'] = covid19_opendata['date'].dt.normalize()


covid19_merge1 = covid19_opendata.merge(wearing_mask_in_public_data, 
                                      left_on = ['country_name','date'],
                                      right_on = ['country','DateTime'], how = 'left')


remove_cols = ['key', 'country','aggregation_level','locality_code', 'wikidata', 'datacommons', 'country_code', 'subregion1_code', 'subregion1_name', 'subregion2_code', 'subregion2_name', 'locality_name', '3166-1-alpha-2', '3166-1-alpha-3', 'DateTime']

covid19_merge1 = covid19_merge1.drop(remove_cols, axis=1)

prepared_data =  covid19_merge1.copy()

#### 5e changement, Les fonctions de préprocessing de scikit learn ne sont pas accessibles avec les dataframes Koalas 
#### Si on repasse en dataframe pandas, tout est remonté au driver donc on va plutôt utiliser get_dummies
prepared_data = ks.get_dummies(prepared_data, ['country_name', 'date'])

prepared_data = prepared_data.fillna(0)

print(f"Le dataset contient {len(prepared_data)} enregistrements")

print("Sample dataset final:")
print(prepared_data.head(5))

stop = datetime.now()

print("Temps de chargement et tranformation grand dataset : ", (stop-start).microseconds/1000, "ms")



Le dataset contient 21505 enregistrements
Sample dataset final:
   date  country_name  new_confirmed  new_deceased  new_recovered  new_tested  total_confirmed  total_deceased  total_recovered  total_tested  new_hospitalized  total_hospitalized  current_hospitalized  new_intensive_care  total_intensive_care  current_intensive_care  new_ventilator  total_ventilator  current_ventilator  population  population_male  population_female  rural_population  urban_population  largest_city_population  clustered_population  population_density  human_development_index  population_age_00_09  population_age_10_19  population_age_20_29  population_age_30_39  population_age_40_49  population_age_50_59  population_age_60_69  population_age_70_79  population_age_80_89  population_age_90_99  population_age_80_and_older           gdp  gdp_per_capita  human_capital_index  open_street_maps   latitude   longitude  elevation       area  rural_area  urban_area  life_expectancy  smoking_prevalence  diabetes_prev

## Entrainement et inférence

In [5]:
start = datetime.now()

##### 6e changement : Pour utiliser scikit learn avec koalas, il faut utiliser Mlflow
##### Mais l'entrainement restera sur des dataframe pandas, seule la prédiction peut être faite avec koalas
prepared_data = prepared_data.to_pandas()

#### On prépare donc l'environnement
from mlflow.tracking import MlflowClient, set_tracking_uri
import mlflow.sklearn

from tempfile import mkdtemp
d = mkdtemp("koalas_mlflow")
set_tracking_uri("file:%s"%d)
client = MlflowClient()
exp = mlflow.create_experiment("my_experiment")
mlflow.set_experiment("my_experiment")

# Split Train/Test
from sklearn.model_selection import train_test_split
X = prepared_data.loc[:, prepared_data.columns != 'new_confirmed']
y = prepared_data['new_confirmed'].ravel()

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33)


# Scale des valeurs
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
scaler.fit(X_train)
X_train = scaler.transform(X_train)
X_test = scaler.transform(X_test)

# Entraintement MLP
from sklearn.neural_network import MLPRegressor

with mlflow.start_run():
    regr = MLPRegressor(max_iter=10, hidden_layer_sizes=(100, 50, 25, 10, 5), verbose=True)
   
    regr.fit(X_train, y_train)

    mlflow.sklearn.log_model(regr, "model")


#### Notre modèle est entrainé, on peut donc l'utiliser sur des datafames Koalas
from databricks.koalas.mlflow import load_model
run_info = client.list_run_infos(exp)[-1]

model = load_model("runs:/{run_id}/model".format(run_id=run_info.run_uuid))

# Prédiction et Score
df = ks.DataFrame(X_test)
df["prediction"] = model.predict(df)

stop = datetime.now()

print("Temps préparation et inférence (ML) : ", (stop-start).microseconds/1000, "ms")

Iteration 1, loss = 1972657.14203192
Iteration 2, loss = 1971911.05644309
Iteration 3, loss = 1962543.06857397
Iteration 4, loss = 1896294.60472285
Iteration 5, loss = 1782673.71609958
Iteration 6, loss = 1676519.86651913
Iteration 7, loss = 1603786.99658003
Iteration 8, loss = 1551900.85074224
Iteration 9, loss = 1463254.81699981
Iteration 10, loss = 1377386.36486305
Temps préparation et inférence (ML) :  24.672 ms


In [6]:
##### 7e changement : Il faut donc recalculer le score nous même

from databricks.koalas.config import set_option, reset_option

set_option("compute.ops_on_diff_frames", True)

# Score : The coefficient R^2 is defined as (1 - u/v), where u is the residual sum of squares ((y_true - y_pred) ** 2).sum() and v is the total sum of squares ((y_true - y_true.mean()) ** 2).sum()

reel = ks.Series(y_test).to_frame().rename(columns = {0:'Reel'})
result = ks.concat([df,reel],axis=1)

result['square_diff_true_pred'] = (result['Reel'] - result['prediction']) ** 2
u = result['square_diff_true_pred'].sum()
v = ((result['Reel'] - result['Reel'].mean()) ** 2).sum()

score = (1 - u/v)
print(f"score: {score}")

  and should_run_async(code)
score: -0.8649982812600598


## Entrainement et inférence avec Pipeline

Seuls les modèles entrainés et les prédictions peuvent être utilisés avec koalas