Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prediction issue using Keras and TransformSpec with PySpark #784

Closed
sdaza opened this issue Dec 6, 2022 · 0 comments
Closed

Prediction issue using Keras and TransformSpec with PySpark #784

sdaza opened this issue Dec 6, 2022 · 0 comments

Comments

@sdaza
Copy link

sdaza commented Dec 6, 2022

Hello,
I am trying to get predictions from a Keras model with two inputs: sequence info and a regular covariate.

Using the TransformSpec function, I preprocess sequences so that they have the same length, and for masking values.

The model fits fine, but I have issues getting predictions.

import pyspark.sql.functions as F
import pyspark.sql.types as T

import tensorflow as tf

from petastorm.spark import SparkDatasetConverter, make_spark_converter
from petastorm import TransformSpec
spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, 'file:///dbfs/...')

import numpy as np
import pandas as pd

# create data
sequence = [[1, 1, 1, 1, 1], [2, 2, 2, 2], [3, 2, 2], [3, 3, 3], [3, 3, 3, 3, 3], [2, 2, 2, 2, 2]]
y = [0, 1, 1, 2, 2, 1]
x = [0.3, 0.1, 0.3, 0.5, 0.5, 0.1]
df = pd.DataFrame({'y':y, 'x':x, 'sequence':sequence})
sdf = spark.createDataFrame(df)
target='y'
all_features = ['sequence', 'x']

# functions
def preprocess(v, max_length=5):
    vv = list(v)
    vv = [0] * (max_length - len(vv)) + vv
    return np.array(vv)

def format_sequence_data(pd_batch):
    pd_batch['sequence'] = pd_batch['sequence'].map(lambda x: preprocess(x))
    return pd_batch.loc[:,['sequence', 'x', 'y']]
    
transform_spec_fn = TransformSpec(
  format_sequence_data, 
  edit_fields=[
        ('sequence', np.float32, (5,), False), 
        ('x', np.float32, (), False),
        ('y', np.int32, (), False)], 
  selected_fields=['sequence', 'x', 'y'])

# petastorm
df_converter = make_spark_converter(sdf)

# model
def createModel():

    seq_vec = tf.keras.Input(shape=(5,), name='sequence')
    e = tf.keras.layers.Embedding(input_dim=5, output_dim=5, 
        input_length=5, mask_zero=True, name='sequence_embedding')(seq_vec)

    x = tf.keras.Input(shape=(1,), name='x', dtype='float')
    x = tf.keras.layers.Normalization()(x)

    ml = tf.keras.layers.LSTM(10, return_sequences=True)(e)
    ml = tf.keras.layers.LSTM(5)(ml)

    combined = tf.keras.layers.Concatenate()([ml, x])

    mlp = tf.keras.layers.Dense(10)(combined)
    mlp = tf.keras.layers.Dense(5)(mlp)
    mlp = tf.keras.layers.Dense(3, activation='softmax')(mlp)

    model = tf.keras.Model([seq_vec, x], mlp)
    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics='accuracy')
    return model

model = createModel()

# training
batch_size=1
def train_and_evaluate(): 
    with df_converter.make_tf_dataset(transform_spec=transform_spec_fn, batch_size=batch_size) as data:
        data = data.map(lambda x: (tuple(getattr(x, col) for col in ['sequence', 'x']), getattr(x, target)))
        steps_per_epoch = int(len(df_converter) / batch_size)

        history = model.fit(data, 
            steps_per_epoch=steps_per_epoch,
            epochs=10,
            shuffle=False,
            verbose=2)

    return history

history = train_and_evaluate()

For prediction, I use:

# udf function for prediction (pyspark)
def model_prediction_prob_udf(model):
  def predict(input_batch_iter):
    for input_batch in input_batch_iter:
        input_batch['sequence'] = input_batch['sequence'].map(lambda x: preprocess(x))
        preds = model.predict([input_batch.loc[:,c] for c in all_features], batch_size=1000)
        yield pd.Series(preds.tolist())
  return_type = T.ArrayType(T.DoubleType())
  return F.pandas_udf(return_type, F.PandasUDFType.SCALAR_ITER)(predict) 

pred_prob_udf = model_prediction_prob_udf(model)

pred = sdf.withColumn('features', F.struct(all_features))
pred = pred.withColumn('prediction_prob', pred_prob_udf(F.col('features')))
display(pred)

I get the error:

'ValueError: Failed to convert a NumPy array to a Tensor (Unsupported object type numpy.ndarray).'

If I use something like the code below, prediction takes forever:

with df_converter.make_tf_dataset(transform_spec=transform_spec_fn, batch_size=1) as data:
        data = data.map(lambda x: ((x.sequence, x.x),))
        tt = model.predict(data)

Any ideas or suggestions on how to fix this?
Thanks!

@sdaza sdaza closed this as completed Jan 12, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant