/
flow.py
117 lines (100 loc) · 3.4 KB
/
flow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
from metaflow import FlowSpec, step, batch, conda_base, pypi_base, card, current
from metaflow.cards import Image
# user packages
from dependencies import *
from ops import ModelStore
from fraud_detection_logic import FeatureEngineering, ModelTraining
@pypi_base(
python=python_version,
packages={**pypi_common_pkgs, **pypi_feature_eng_pkgs, **pypi_xgb_pkg},
)
class FraudClassifierTreeSelection(
FlowSpec,
FeatureEngineering,
ModelTraining,
ModelStore, # introduces required param "model-repo" expecting s3 uri the flow's task execution role can write to
):
_plot_learning_curves = True
@step
def start(self):
self.next(self.preprocess)
@batch(cpu=1, memory=8000)
@card
@step
def preprocess(self):
self.compute_features()
self.setup_model_grid(model_list=["Random Forest"])
self.next(self.train, foreach="model_grid")
@batch(cpu=4, memory=16000)
@card
@step
def train(self):
self.model_name, self.model_grid = self.input
self.best_model = self.smote_pipe(
self.model_grid, self.X_train_full, self.y_train_full
)
self.next(self.eval)
@batch(cpu=1, memory=8000)
@card
@step
def eval(self, inputs):
# propagate data artifacts
self.columns = inputs[0].columns
self.X_train_full = inputs[0].X_train_full
self.X_test_full = inputs[0].X_test_full
self.y_train_full = inputs[0].y_train_full
self.y_test_full = inputs[0].y_test_full
# score trained models
from fraud_detection_logic import score_trained_model
import pandas as pd
best_score = -1
self.best_model = None
self.best_model_type = None
scores = []
best_models = []
for input in inputs:
scores.append(
{
"model name": input.model_name,
**score_trained_model(
input.best_model, self.X_test_full, self.y_test_full
),
}
)
best_models.append((input.best_model, input.model_name))
if scores[-1]["auc"] > best_score:
best_score = scores[-1]["auc"]
self.best_model = input.best_model
self.best_model_type = input.model_name
self.scores = pd.DataFrame(scores)
# plot learning curves
if self._plot_learning_curves:
import matplotlib.pyplot as plt
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(
2, 2, figsize=(18, 10), sharey=True
)
self.plot_learning_curves(
best_models, [ax1, ax2, ax3, ax4], self.X_train_full, self.y_train_full
)
fig.tight_layout()
current.card.append(Image.from_matplotlib(fig))
self.next(self.deploy)
@batch(cpu=1, memory=8000)
@step
def deploy(self):
# put artifacts triton needs in the format it expects in the s3 model repo
self.store_sklearn_estimator(model=self.best_model)
self.next(self.end)
@step
def end(self):
print(
f"""
Access evaluation results:
from metaflow import Flow
f = Flow('{current.flow_name}')
r = f.latest_successful_run
scores = r.data.scores
"""
)
if __name__ == "__main__":
FraudClassifierTreeSelection()