In [1]:
import pandas as pd
import numpy as np

import mlflow

In [2]:
filepath = '../data/power-plant/power_plant.csv'
df_master = pd.read_csv(filepath, index_col=None)

In [3]:
df_master.head()

Unnamed: 0,AT,V,AP,RH,PE
0,14.96,41.76,1024.07,73.17,463.26
1,25.18,62.96,1020.04,59.08,444.37
2,5.11,39.4,1012.16,92.14,488.56
3,20.86,57.32,1010.24,76.64,446.48
4,10.82,37.5,1009.23,96.62,473.9


In [4]:
df = df_master.copy()
df.shape

(47840, 5)

## Add column "device_id" for an IoT device Id

In [5]:
df['device_id'] = [np.random.randint(1, 5) for _ in range(df.shape[0])]

In [6]:
df.head()

Unnamed: 0,AT,V,AP,RH,PE,device_id
0,14.96,41.76,1024.07,73.17,463.26,4
1,25.18,62.96,1020.04,59.08,444.37,2
2,5.11,39.4,1012.16,92.14,488.56,3
3,20.86,57.32,1010.24,76.64,446.48,1
4,10.82,37.5,1009.23,96.62,473.9,2


## Model training

In [7]:
mlflow.set_experiment('power_plant_experiment')
mlflow.autolog()
mlflow.set_tracking_uri('http://127.0.0.1:5000')

2021/06/12 18:14:05 INFO mlflow.pyspark.ml: No SparkSession detected. Autologging will log pyspark.ml models contained in the default allowlist. To specify a custom allowlist, initialize a SparkSession prior to calling mlflow.pyspark.ml.autolog() and specify the path to your allowlist file via the spark.mlflow.pysparkml.autolog.logModelAllowlistFile conf.


In [8]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

2021/06/12 18:14:07 INFO mlflow.tracking.fluent: Autologging successfully enabled for sklearn.


In [9]:
def train_model(device_df: pd.DataFrame) -> float:
    y = device_df['PE']
    X = device_df[['AT', 'V', 'AP', 'RH']]
    
    id_ = device_df['device_id'].iloc[0]
    with mlflow.start_run(run_name=str(id_), nested=True):
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
        model = RandomForestRegressor(max_depth=5, random_state=0)
        model.fit(X_train, y_train)

        y_pred = model.predict(X_test)

        rmse = np.sqrt(mean_squared_error(y_test, y_pred))
        mae = mean_absolute_error(y_test, y_pred)
        r2 = r2_score(y_test, y_pred)

        metrics = {'mse': rmse, 'mae': mae, 'r2': r2}
    
    return metrics

In [10]:
with mlflow.start_run(run_name="June", nested=True):
    for i in range(1, 5):
        train_model(df[df['device_id']==i])

In [11]:
df.groupby("device_id").apply(train_model)

device_id
1    {'mse': 4.036486776940315, 'mae': 3.1672547058...
2    {'mse': 4.2687640557373, 'mae': 3.264479984861...
3    {'mse': 4.240552568181494, 'mae': 3.1835286145...
4    {'mse': 4.174091169451231, 'mae': 3.2212229014...
dtype: object

In [12]:
from multiprocessing import Pool
pool = Pool()

In [13]:
device_df_list = [df[df['device_id'] == i] for i in df['device_id'].unique()]

In [14]:
# pool.map(train_model, device_df_list)

In [15]:
def mapped_training(month_df: pd.DataFrame, month: str):
    pool = Pool()
    
    device_df_list = [df[df['device_id'] == i] for i in df['device_id'].unique()]
#     pool.map(train_model, device_df_list)