## Section 3: Spark ML

In [0]:
%run "./Includes/Classroom-Setup"

### Distributed ML Concepts

In [0]:
# Describe some of the difficulties associated with distributing machine learning models.

# 例：max_bins
# Sparkでは、データは行で分割されます。そのため、分割を行う必要がある場合、各Workerは分割点ごとに各特徴の要約統計量を計算する必要があります。そして、分割するためにこれらの統計情報を（tree reduceによって）集約する必要があります。
#考えてみてください。Worker1が値 32 を持っているが、他のどのWorkerもその値を持っていなかったとしたらどうなるでしょうか。どれだけ良い分割になるのかどうやって分かりますか。そこで、Sparkには連続変数を離散化してバケットにするためのmaxBinsパラメータを使います。しかし、バケット数は最も基数の多いカテゴリ型変数と同じ大きさでなければなりません。

In [0]:
# Identify Spark ML as a key library for distributing traditional machine learning work.
# Identify scikit-learn as a single-node solution relative to Spark ML.

# 例：sklearnのソースコードをDatabricksに移行する際
# マルチノードのMLクラスターでsklearnのソースコードをそのまま実行しても処理速度向上は見込めない
# ⇒sklearnはシングルノード前提のため、分散処理されない
# ⇒sparkML + spark dataframeなどにリファクタリングする必要あり

### Spark ML Modeling APIs

In [0]:
# Split data using Spark ML.

file_path = f"{DA.paths.datasets}/airbnb/sf-listings/sf-listings-2019-03-06-clean.delta/"
df = spark.read.format("delta").load(file_path)

# SparkMLの場合
train_df, test_df = df.randomSplit([.8, .2], seed=42)

# sklearnの場合
from sklearn.model_selection import train_test_split
X = df.select([pair[0] for pair in df.dtypes if pair[0] != 'price']).toPandas()
y = df.select(['price']).toPandas()
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42)

In [0]:
# Identify key gotchas when splitting distributed data using Spark ML.

# 再現性を担保するためには、seed値を固定するのは前提(セッションが変わっても乱数分布を固定できる)
# また、repartitionを変えると再現性が担保されない可能性あり

print(f'count before repartition: {train_df.cache().count()}')

train_df_repartition, test_df_repartition = df.repartition(24).randomSplit([.8, .2], seed=42)

print(f'count after repartition: {train_df_repartition.cache().count()}')

In [0]:
# Train / evaluate a machine learning model using Spark ML.
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler

vec_assembler = VectorAssembler(inputCols=["bedrooms"], outputCol="features")
vec_train_df = vec_assembler.transform(train_df)

# train
lr = LinearRegression(featuresCol="features", labelCol="price")
lr_model = lr.fit(vec_train_df)

# evaluate
vec_test_df = vec_assembler.transform(test_df)
pred_df = lr_model.transform(vec_test_df)

regression_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price", metricName="rmse")
rmse = regression_evaluator.evaluate(pred_df)

In [0]:
# Describe Spark ML estimator and Spark ML transformer.

# 推定器(estimator): DataFrameが持つデータにフィットして、Transformerを生成することができるアルゴリズムです。例えば、学習アルゴリズムはDataFrameから学習し、モデルを生成するestimatorです。Estimatorは .fit() メソッドを持っており、DataFrameからパラメータを学習（または「フィット」）します。

# 変換器(transformer): DataFrameを別のDataFrameに変換します。DataFrameを入力として受け取り、1つまたは複数の列が追加された新しいDataFrameを返します。Transformerはデータからパラメータを学習せず、単純にルールベースの変換を適用します。Transformerは .transform() メソッドを持ちます。

In [0]:
# Develop a Pipeline using Spark ML.
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel
from pyspark.ml.feature import OneHotEncoder, StringIndexer

file_path = f"{DA.paths.datasets}/airbnb/sf-listings/sf-listings-2019-03-06-clean.delta/"
df = spark.read.format("delta").load(file_path)
train_df, test_df = df.randomSplit([.8, .2], seed=42)

categorical_cols = [field for (field, dataType) in train_df.dtypes if dataType == "string"]
index_output_cols = [x + "Index" for x in categorical_cols]
ohe_output_cols = [x + "OHE" for x in categorical_cols]

string_indexer = StringIndexer(inputCols=categorical_cols, outputCols=index_output_cols, handleInvalid="skip")
ohe_encoder = OneHotEncoder(inputCols=index_output_cols, outputCols=ohe_output_cols)

stages = [string_indexer, ohe_encoder, vec_assembler, lr] #estimatorをリスト化する
pipeline = Pipeline(stages=stages)

pipeline_model = pipeline.fit(train_df) #まとめてfitして、transformerを作成する

# optional
pipeline_model.write().overwrite().save(DA.paths.working_dir) #transformerに変換したパイプラインをまるごと保存
saved_pipeline_model = PipelineModel.load(DA.paths.working_dir) #読み込み

pred_df = saved_pipeline_model.transform(test_df) #指定した順番通りにまとめてtransform（transformerなのでtransformメソッド持っている）

In [0]:
# Identify key gotchas when developing a Spark ML Pipeline.

# cross validatorと組み合わせるときに、場合によてはデータの漏洩が発生すること注意

from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator

file_path = f"{DA.paths.datasets}/airbnb/sf-listings/sf-listings-2019-03-06-clean.delta/"
df = spark.read.format("delta").load(file_path)
train_df, test_df = df.randomSplit([.8, .2], seed=42)

categorical_cols = [field for (field, dataType) in train_df.dtypes if dataType == "string"]
index_output_cols = [x + "Index" for x in categorical_cols]

string_indexer = StringIndexer(inputCols=categorical_cols, outputCols=index_output_cols, handleInvalid="skip")

numeric_cols = [field for (field, dataType) in train_df.dtypes if ((dataType == "double") & (field != "price"))]
assembler_inputs = index_output_cols + numeric_cols
vec_assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")

rf = RandomForestRegressor(labelCol="price", maxBins=40)

rf_1 = RandomForestRegressor(
  labelCol="price",
  maxBins=40,
  maxDepth=2,
  numTrees=5,
)

param_grid = (ParamGridBuilder()
              .addGrid(rf.maxDepth, [2, 5])
              .addGrid(rf.numTrees, [5, 10])
              .build())

In [0]:
import time

# cvにpipelineを含める場合
# pros: データ漏洩の可能性が低い
# cons: string indexerのようなestimator/transformerがある場合、foldのdatasetに対して毎回変換をかけることになる
stages = [string_indexer, vec_assembler, rf]
pipeline = Pipeline(stages=stages)
evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction")
cv = CrossValidator(estimator=pipeline, evaluator=evaluator, estimatorParamMaps=param_grid, 
                    numFolds=3, seed=42)
start = time.time()
cv_model = cv.fit(train_df)
end = time.time()
print(f'cvにpipelineを含める場合: {end - start}')

# pipelineにcvを含める場合
# pros: 変換後にfoldのdatasetに分割するため、処理速度向上が見込める
# cons: データ漏洩の可能性がある
cv = CrossValidator(estimator=rf, evaluator=evaluator, estimatorParamMaps=param_grid, 
                    numFolds=3, seed=42)
stages_with_cv = [string_indexer, vec_assembler, cv]
pipeline = Pipeline(stages=stages_with_cv)
start = time.time()
pipeline_model = pipeline.fit(train_df)
end = time.time()
print(f'pipelineにcvを含める場合: {end - start}')

### Hyperopt

In [0]:
# Identify Hyperopt as a solution for parallelizing the tuning of single-node models.

# Hyperoptは、「実数値、離散、条件付き次元を含む、厄介な探索空間上でのシリアルおよびパラレル最適化」のためのPythonライブラリです。
# 機械学習ワークフローにおいて、hyperoptは、他のライブラリで利用可能なものより高度な最適化戦略を用いてハイパーパラメータ最適化プロセスを分散/並列化するために使用することができます。
# Apache Sparkでhyperoptをスケールさせるには、2つの方法があります。

# シングルマシンのhyperoptで、分散学習アルゴリズム（MLlibなど）を使う
# 分散hyperoptで、SparkTrialsクラスと一緒にシングルマシンの学習アルゴリズム（scikit-learnなど）を使う。←これ！SparkTrialsクラスを使うのは押さえておく

# SparkTrialsは、1つのSpark Executorで一つのモデルを適合・評価するため、チューニングのための大規模なスケールアウトが可能になります。HyperoptでSparkTrialsを使うには、Hyperoptのfmin()関数にSparkTrialsオブジェクトを渡します。

In [0]:
# Identify Hyperopt as a solution for Bayesian hyperparameter inference for distributed models.

# 残念ながら現時点では、hyperoptを使用して分散型の学習アルゴリズムとともにハイパーパラメータ最適化を分散させることはできません。しかし、Spark MLを使ってより高度なハイパーパラメータ探索アルゴリズム（ランダム探索、TPEなど）を使用する利点があります。

In [0]:
# Parallelize the tuning of hyperparameters for Spark ML models using Hyperopt and Trials.

# sparkMLは分散型のMLなので、Trialsクラスを使って、TPEで探索する

file_path = f"{DA.paths.datasets}/airbnb/sf-listings/sf-listings-2019-03-06-clean.delta/"
airbnb_df = spark.read.format("delta").load(file_path)
train_df, val_df, test_df = airbnb_df.randomSplit([.6, .2, .2], seed=42)

# 目的関数（この戻り値が最小化するようにハイパーパラメータをチューニングする）
def objective_function(params):    
    # set the hyperparameters that we want to tune
    max_depth = params["max_depth"]
    num_trees = params["num_trees"]

    with mlflow.start_run():
        estimator = pipeline.copy({rf.maxDepth: max_depth, rf.numTrees: num_trees})
        model = estimator.fit(train_df)

        preds = model.transform(val_df)
        rmse = regression_evaluator.evaluate(preds)
        mlflow.log_metric("rmse", rmse)

    return rmse
  
# 探索範囲を指定する
from hyperopt import hp
search_space = {
    "max_depth": hp.quniform("max_depth", 2, 5, 1),
    "num_trees": hp.quniform("num_trees", 10, 100, 1)
}

# 探索、bestなモデルをlogging
from hyperopt import fmin, tpe, Trials
import numpy as np
import mlflow
import mlflow.spark
mlflow.pyspark.ml.autolog(log_models=False)
num_evals = 4
trials = Trials()
best_hyperparam = fmin(fn=objective_function, 
                       space=search_space,
                       algo=tpe.suggest, 
                       max_evals=num_evals,
                       trials=trials,
                       rstate=np.random.default_rng(42))

with mlflow.start_run():
    best_max_depth = best_hyperparam["max_depth"]
    best_num_trees = best_hyperparam["num_trees"]
    estimator = pipeline.copy({rf.maxDepth: best_max_depth, rf.numTrees: best_num_trees})
    combined_df = train_df.union(val_df) # Combine train & validation together

    pipeline_model = estimator.fit(combined_df)
    pred_df = pipeline_model.transform(test_df)
    rmse = regression_evaluator.evaluate(pred_df)

    # Log param and metrics for the final model
    mlflow.log_param("maxDepth", best_max_depth)
    mlflow.log_param("numTrees", best_num_trees)
    mlflow.log_metric("rmse", rmse)
    mlflow.spark.log_model(pipeline_model, "model")

In [0]:
# Identify the relationship between the number of trials and model accuracy.

# parallelismも同様だが、基本的に処理速度と精度がトレードオフの関係にある
# https://hyperopt.github.io/hyperopt/scaleout/spark/

### Pandas API on Spark

In [0]:
# Describe key differences between Spark DataFrames and Pandas on Spark DataFrames.

# Dataframeは以下の3通り

# 1: pandas dataframe
# データサイエンティストの間では最も一般的
# ミュータブル(変更可能) 、即時実行、行の順序を維持
# メリット：データセットが小さい場合に非常に高い性能
# デメリット：単一のノードで動く前提のため、サイズが大きいとOOME起きる
# 一般的にはデータサイエンティストがpandasで作って、エンジニアが実運用のためにsparkにリファクタリングする

# 2: spark dataframe
# 分散的、遅延評価され、不変であり、行の順序を維持しない
# メリット：大規模データの場合の性能は非常に高い
# デメリット：pandasのメソッドと互換性がない

# 3: pandas API on spark
# 性能はsparkに近く(厳密にはspark > pandas api on spark)、お作法はpandasに近い、いいとこどりのイメージ

In [0]:
# Identify the usage of an InternalFrame making Pandas API on Spark not quite as fast as native Spark.

# pandas api on sparkは裏でinternal frame(Spark dataframeとメタデータ)を管理する

# メタデータのみ更新する場合
# カラムをindexに指定するような場合、裏のspark dataframeを更新する必要はなく、メタデータの更新のみでOK
# その場合、internal frameのメタデータのstateを更新するだけ

# spark dataframeを更新する場合
# カラム追加をする場合(例えば、psdf['x2'] = psdf.x * psdf.x)、メタデータの更新とデータの更新が必要
# その場合、internal frameのメタデータのstateと、dataframe事態を更新する

# inplaceで更新する場合、新しいdataframeを返すのではなく、内部のデータのstateを更新する

In [0]:
# Identify Pandas API on Spark as a solution for scaling data pipelines without much refactoring.

# pandasのお作法と似ているため、ソースコードの修正は最小限で分散処理の恩恵を受けることができる

In [0]:
# Identify how to import and use the Pandas on Spark APIs
# Convert data between a PySpark DataFrame and a Pandas on Spark DataFrame.

# 読み込み方法
# spark df
spark_df = spark.read.parquet(f"{DA.paths.datasets}/airbnb/sf-listings/sf-listings-2019-03-06-clean.parquet/")

# pandas df
import pandas as pd
pandas_df = pd.read_parquet(f"{DA.paths.datasets.replace('dbfs:/', '/dbfs/')}/airbnb/sf-listings/sf-listings-2019-03-06-clean.parquet/")

# pandas api on spark
import pyspark.pandas as ps
psdf = ps.read_parquet(f"{DA.paths.datasets}/airbnb/sf-listings/sf-listings-2019-03-06-clean.parquet/")


# 変換方法
# spark df => pandas df
pandas_df = spark_df.toPandas()
print(f'spark df => pandas df: {type(pandas_df)}')

# spark df <= pandas df
spark_df = spark.createDataFrame(pandas_df)
print(f'spark df <= pandas df: {type(spark_df)}')

# spark df => pandas api on spark
psdf = spark_df.to_pandas_on_spark()
psdf = ps.DataFrame(spark_df)
print(f'spark df => pandas api on spark: {type(psdf)}')

# spark df <= pandas api on spark
spark_df = psdf.to_spark()
print(f'spark df <= pandas api on spark: {type(spark_df)}')

# pandas df => pandas api on spark
from pyspark.pandas import from_pandas
psdf = from_pandas(pandas_df)
print(f'pandas df => pandas api on spark: {type(psdf)}')

# pandas df <= pandas api on spark
pandas_df = psdf.to_pandas()
print(f'pandas df <= pandas api on spark: {type(pandas_df)}')

### Pandas UDFs/Function APIs

In [0]:
# Identify Apache Arrow as the key to Pandas <-> Spark conversions.

# Spark 2.3からは、Pythonで利用できるPandas UDFがあり、UDFの効率を向上させることができます。PandasのUDFは、Apache Arrowを利用して計算を高速化します。
# Apache Arrowは、Sparkで使用されてJVM と Python プロセス間のデータをほぼゼロの（デ）シリアライズコストで効率的に転送するためのインメモリ列型データ形式です。pandasのインスタンスおよびAPIと連携するため、関数内部でpandasを使用します。

In [0]:
# Describe why iterator UDFs are preferred for large data.

# モデルが非常に大きい場合、同じPythonワーカープロセスでバッチごとに同じモデルを繰り返しロードすることは、Pandas UDFにとって高いオーバーヘッドとなります。Spark 3.0では、Pandas UDFはpandas.Seriesまたはpandas.DataFrameのiteratorを受け取ることができるので、iterator内のシリーズごとにモデルを読み込むのではなく、一度だけモデルを読み込むことで済みます。そうすれば、必要なセットアップのコストが発生する回数も少なくなります。扱うレコード数が spark.conf.get('spark.sql.execution.arrow.maxRecordsPerBatch') (デフォルトは 10,000) より多い場合、pandas scalar UDFはpd.Seriesのバッチを反復処理するので、スピードアップが見られるはずです。

In [0]:
# Apply a model in parallel using a Pandas UDF.

from typing import Iterator, Tuple
import mlflow.sklearn
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from pyspark.sql.functions import pandas_udf

with mlflow.start_run(run_name="sklearn-random-forest") as run:
    # Enable autologging 
    mlflow.sklearn.autolog(log_input_examples=True, log_model_signatures=True, log_models=True)
    # Import the data
    df = pd.read_csv(f"{DA.paths.datasets}/airbnb/sf-listings/airbnb-cleaned-mlflow.csv".replace("dbfs:/", "/dbfs/")).drop(["zipcode"], axis=1)
    X_train, X_test, y_train, y_test = train_test_split(df.drop(["price"], axis=1), df[["price"]].values.ravel(), random_state=42)

    # Create model
    rf = RandomForestRegressor(n_estimators=100, max_depth=10, random_state=42)
    rf.fit(X_train, y_train)
spark_df = spark.createDataFrame(X_test)

@pandas_udf("double")
def predict(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.Series]:
    model_path = f"runs:/{run.info.run_id}/model" 
    model = mlflow.sklearn.load_model(model_path) # Load model
    for features in iterator:
        pdf = pd.concat(features, axis=1)
        yield pd.Series(model.predict(pdf))

prediction_df = spark_df.withColumn("prediction", predict(*spark_df.columns))
display(prediction_df)

host_total_listings_count,neighbourhood_cleansed,latitude,longitude,property_type,room_type,accommodates,bathrooms,bedrooms,beds,bed_type,minimum_nights,number_of_reviews,review_scores_rating,review_scores_accuracy,review_scores_cleanliness,review_scores_checkin,review_scores_communication,review_scores_location,review_scores_value,prediction
1.0,29,37.750853665952526,-122.47896134638864,0,0,4.0,1.0,0.0,4.0,0,2.0,194.0,96.0,10.0,10.0,10.0,10.0,9.0,9.0,138.7427902906762
2.0,12,37.79569442370353,-122.417081972524,1,1,2.0,1.5,1.0,1.0,0,2.0,124.0,99.0,10.0,10.0,10.0,10.0,10.0,10.0,131.24509694021717
2.0,7,37.76393574011793,-122.43001124805248,0,1,2.0,1.0,1.0,1.0,0,5.0,2.0,100.0,10.0,10.0,10.0,10.0,10.0,10.0,129.26194822031346
1.0,7,37.76690648031917,-122.43792377044348,1,0,7.0,2.0,3.0,3.0,0,3.0,3.0,93.0,10.0,9.0,10.0,10.0,10.0,10.0,413.6222277778268
1.0,2,37.77491545710221,-122.44027012206556,6,1,1.0,1.0,1.0,1.0,0,1.0,21.0,100.0,10.0,10.0,10.0,10.0,10.0,10.0,129.3876183458104
39.0,19,37.729883744746296,-122.42672685799468,1,1,2.0,1.0,1.0,1.0,0,30.0,15.0,89.0,8.0,8.0,9.0,9.0,8.0,9.0,48.90435216613397
4.0,30,37.714110738500814,-122.4072828875996,1,1,2.0,1.0,1.0,1.0,0,1.0,20.0,97.0,10.0,10.0,10.0,10.0,10.0,10.0,80.48652556180105
54.0,6,37.78663277349695,-122.4085188120046,17,1,2.0,1.0,0.0,1.0,0,1.0,0.0,97.0,10.0,10.0,10.0,10.0,10.0,10.0,78.87385270408892
1.0,15,37.78294900804669,-122.38856041539098,0,0,10.0,2.0,2.0,8.0,0,1.0,6.0,93.0,9.0,9.0,8.0,9.0,10.0,8.0,359.32030526097446
2.0,7,37.76852191665309,-122.4278718063526,0,1,2.0,1.0,1.0,1.0,0,2.0,127.0,98.0,10.0,10.0,10.0,10.0,10.0,10.0,124.4406137083362


In [0]:
# Train / apply group-specific models using the Pandas Function API.

# Pandas UDFを使う代わりに、Pandas Function APIを使うことができます。Apache Spark 3.0のこの新しい機能では、PySpark DataFrameに対してPandasインスタンスを取得・出力するPythonネイティブ関数を直接適用することができるようになりました。Apache Spark 3.0でサポートされるPandas Functions APIは、grouped map、mapとco-grouped mapです。
# mapInPandas() は pandas.DataFrame のiteratorを入力とし、別の pandas.DataFrame のiteratorを出力する。モデルが入力として全てのカラムを必要とする場合、柔軟で使いやすいですが、DataFrame全体のシリアライズ/デシリアライズが必要です（入力として渡されるため）。iteratorが出力する各pandas.DataFrameのバッチサイズは、 spark.sql.execution.arrow.maxRecordsPerBatch の設定により制御できます。
import pyspark.sql.functions as f

df = (spark
      .range(1000*100)
      .select(f.col("id").alias("record_id"), (f.col("id")%10).alias("device_id"))
      .withColumn("feature_1", f.rand() * 1)
      .withColumn("feature_2", f.rand() * 2)
      .withColumn("feature_3", f.rand() * 3)
      .withColumn("label", (f.col("feature_1") + f.col("feature_2") + f.col("feature_3")) + f.rand())
     )
train_return_schema = "device_id integer, n_used integer, model_path string, mse float"

import mlflow
import mlflow.sklearn
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error

def train_model(df_pandas: pd.DataFrame) -> pd.DataFrame:
    """
    Trains an sklearn model on grouped instances
    """
    # Pull metadata
    device_id = df_pandas["device_id"].iloc[0]
    n_used = df_pandas.shape[0]
    run_id = df_pandas["run_id"].iloc[0] # Pulls run ID to do a nested run

    # Train the model
    X = df_pandas[["feature_1", "feature_2", "feature_3"]]
    y = df_pandas["label"]
    rf = RandomForestRegressor()
    rf.fit(X, y)

    # Evaluate the model
    predictions = rf.predict(X)
    mse = mean_squared_error(y, predictions) # Note we could add a train/test split

    # Resume the top-level training
    with mlflow.start_run(run_id=run_id) as outer_run:
        # Small hack for running as a job
        experiment_id = outer_run.info.experiment_id
        print(f"Current experiment_id = {experiment_id}")

        # Create a nested run for the specific device
        with mlflow.start_run(run_name=str(device_id), nested=True, experiment_id=experiment_id) as run:
            mlflow.sklearn.log_model(rf, str(device_id))
            mlflow.log_metric("mse", mse)
            mlflow.set_tag("device", str(device_id))

            artifact_uri = f"runs:/{run.info.run_id}/{device_id}"
            # Create a return pandas DataFrame that matches the schema above
            return_df = pd.DataFrame([[device_id, n_used, artifact_uri, mse]], 
                                    columns=["device_id", "n_used", "model_path", "mse"])

    return return_df
  
with mlflow.start_run(run_name="Training session for all devices") as run:
    run_id = run.info.run_id

    model_directories_df = (df
        .withColumn("run_id", f.lit(run_id)) # Add run_id
        .groupby("device_id")
        .applyInPandas(train_model, schema=train_return_schema)
        .cache()
    )

combined_df = df.join(model_directories_df, on="device_id", how="left")

apply_return_schema = "record_id integer, prediction float"

def apply_model(df_pandas: pd.DataFrame) -> pd.DataFrame:
    """
    Applies model to data for a particular device, represented as a pandas DataFrame
    """
    model_path = df_pandas["model_path"].iloc[0]

    input_columns = ["feature_1", "feature_2", "feature_3"]
    X = df_pandas[input_columns]

    model = mlflow.sklearn.load_model(model_path)
    prediction = model.predict(X)

    return_df = pd.DataFrame({
        "record_id": df_pandas["record_id"],
        "prediction": prediction
    })
    return return_df

prediction_df = combined_df.groupby("device_id").applyInPandas(apply_model, schema=apply_return_schema)
display(prediction_df)

record_id,prediction
0,3.3271086
10,3.6622128
20,3.1752758
30,6.402021
40,2.2227569
50,3.6614392
60,4.5180664
70,2.9201493
80,2.1863194
90,2.5247664
