-
Notifications
You must be signed in to change notification settings - Fork 5.4k
/
predictors.py
100 lines (83 loc) · 2.84 KB
/
predictors.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# flake8: noqa
# isort: skip_file
# __use_predictor_start__
import numpy as np
import tensorflow as tf
import ray
from ray.train.batch_predictor import BatchPredictor
from ray.train.tensorflow import (
TensorflowCheckpoint,
TensorflowPredictor,
)
def build_model() -> tf.keras.Model:
model = tf.keras.Sequential(
[
tf.keras.layers.InputLayer(input_shape=()),
# Add feature dimension, expanding (batch_size,) to (batch_size, 1).
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(1),
]
)
return model
model = build_model()
checkpoint = TensorflowCheckpoint.from_model(model)
predictor = TensorflowPredictor.from_checkpoint(
checkpoint, model_definition=build_model
)
data = np.array([1, 2, 3, 4])
predictions = predictor.predict(data)
print(predictions)
# [[-1.6930283]
# [-3.3860567]
# [-5.079085 ]
# [-6.7721133]]
# __use_predictor_end__
# __batch_prediction_start__
import pandas as pd
from ray.train.batch_predictor import BatchPredictor
batch_predictor = BatchPredictor(
checkpoint, TensorflowPredictor, model_definition=build_model
)
# Create a dummy dataset.
ds = ray.data.from_pandas(pd.DataFrame({"feature_1": [1, 2, 3], "label": [1, 2, 3]}))
# Use `feature_columns` to specify the input columns to your model.
predictions = batch_predictor.predict(ds, feature_columns=["feature_1"])
print(predictions.show())
# {'predictions': array([-1.2789773], dtype=float32)}
# {'predictions': array([-2.5579545], dtype=float32)}
# {'predictions': array([-3.8369317], dtype=float32)}
# __batch_prediction_end__
# __compute_accuracy_start__
def calculate_accuracy(df):
return pd.DataFrame({"correct": int(df["predictions"][0]) == df["label"]})
predictions = batch_predictor.predict(
ds, feature_columns=["feature_1"], keep_columns=["label"]
)
print(predictions.show())
# {'predictions': array([-1.2789773], dtype=float32), 'label': 0}
# {'predictions': array([-2.5579545], dtype=float32), 'label': 1}
# {'predictions': array([-3.8369317], dtype=float32), 'label': 0}
correct = predictions.map_batches(calculate_accuracy)
print("Final accuracy: ", correct.mean(on="correct"))
# Final accuracy: 0.5
# __compute_accuracy_end__
# __pipelined_prediction_start__
import pandas as pd
import ray
from ray.air import Checkpoint
from ray.train.predictor import Predictor
from ray.train.batch_predictor import BatchPredictor
# Create a BatchPredictor that always returns `42` for each input.
batch_pred = BatchPredictor.from_pandas_udf(
lambda data: pd.DataFrame({"a": [42] * len(data)})
)
# Create a dummy dataset.
ds = ray.data.range_tensor(200, parallelism=4)
# Setup a prediction pipeline.
pipeline = batch_pred.predict_pipelined(ds, blocks_per_window=1)
for batch in pipeline.iter_batches():
print("Pipeline result", batch)
# 0 42
# 1 42
# ...
# __pipelined_prediction_end__