## pandas UDF
pyspark UDFの問題点として、Sparkの走っているJVMとpythonの間で行毎のデータ移動が発生し、パフォーマンスが低くなることが挙げられる。<br>
そこで登場したpandas UDFは、データをApache Arrowを用いて言語間データ利用を効率化し、pythonのデータフレーム解析ライブラリであるPandasを用いてデータのベクトル操作が可能になる。<br>

このノートでは、検出器信号の波形データの操作を例として示す。<br>
後半でscipyを使用するので、
```
conda install scipy
```
でインストールしておく。

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

df = spark.read.parquet("./data/sample_waveform.parquet").select("event_id","waveform")
df.show(5)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/10/10 17:25:21 WARN Utils: Your hostname, TABLET-S9I8ER9S, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/10/10 17:25:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/10 17:25:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


+--------+--------------------+
|event_id|            waveform|
+--------+--------------------+
|       0|[-10.649999999999...|
|       1|[4.29999999999927...|
|       2|[-6.4500000000007...|
|       3|[6.95000000000072...|
|       4|[-18.700000000000...|
+--------+--------------------+
only showing top 5 rows


このデータには、"waveform"列に波形データがアレイとして格納されている。<br>
まずは例として、waveformの最初の20点をベースラインとし、その標準偏差を返すpandas UDFを定義してみる。

pandas UDFを定義するには、@pandas_udf(戻り値の型)というデコレータを関数の前に付ける。<br>
引数・戻り値ともにデータフレームの列である場合pandas.Series型を指定する。<br>
以下がUDFの定義部分。

In [6]:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType
import pandas as pd
import numpy as np

@pandas_udf(DoubleType())
def baseline_std_udf(waveform: pd.Series) -> pd.Series:
    return waveform.apply(lambda x: float(np.std(x[:20])))

これによって、pyspark関数内でbaseline_std_udfが使えるようになる。

In [7]:
df.withColumn("baseline_std", baseline_std_udf(df.waveform)).show(10)

[Stage 2:>                                                          (0 + 1) / 1]

+--------+--------------------+------------------+
|event_id|            waveform|      baseline_std|
+--------+--------------------+------------------+
|       0|[-10.649999999999...|12.146089905809195|
|       1|[4.29999999999927...|13.697079980784226|
|       2|[-6.4500000000007...|   8.3694384518915|
|       3|[6.95000000000072...|10.552132485900659|
|       4|[-18.700000000000...|11.243220179290274|
|       5|[7.04999999999927...|12.395462879618492|
|       6|[5.75, -7.25, -2....|  6.94172168845741|
|       7|[-22.399999999999...|23.438003327928765|
|       8|[-2.25, -6.25, 7....|  9.33742469849155|
|       9|[0.70000000000072...|12.284543133547945|
+--------+--------------------+------------------+
only showing top 10 rows


                                                                                

"baseline_std"列に波形のベースライン部分の標準偏差が作られた。


In [12]:

RISETIME=1.8
DECAYTIME=84
TOFFSET=30
# Fit 関数の定義
def model(x, A, t0):
    return A * (1+np.erf((x-t0)/RISETIME))*0.5*np.exp(-(x-t0)/DECAYTIME)
    
from pyspark.sql.types import DoubleType, StructType, StructField
# 返り値のデータフレームのSchema
schema = StructType([
    StructField("amplitude", DoubleType()),
    StructField("t0", DoubleType()),
])

# Scipy curve_fitをするUDFの定義
@pandas_udf(schema)
def fit_waveforms(wave: pd.Series) -> pd.DataFrame:
    def fit_wave(y):
        y = np.asarray(y, dtype=float)
        x = np.arange(len(y), dtype=float)
        popt, _ = curve_fit(
            model, x, y,
            p0=[y.max, TOFFSET],
            bounds=([1e-6, 0], [np.inf,np.inf])
        )
        return [float(popt[0]), float(popt[1])]

    return pd.DataFrame(wave.apply(fit_wave).tolist(), columns=["amplitude","t0"])
    

In [13]:
df2 = df.withColumn("fit_params", fit_waveforms(F.col("waveform")))
df2.select("fit_params.*").show(10, truncate=False)

25/10/10 18:09:57 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3)/ 1]
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/tmp/ipykernel_326921/1393252530.py", line 28, in fit_waveforms
  File "/home/rin/opt/miniconda3/envs/spark4/lib/python3.12/site-packages/pandas/core/series.py", line 4943, in apply
    ).apply()
      ^^^^^^^
  File "/home/rin/opt/miniconda3/envs/spark4/lib/python3.12/site-packages/pandas/core/apply.py", line 1422, in apply
    return self.apply_standard()
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/rin/opt/miniconda3/envs/spark4/lib/python3.12/site-packages/pandas/core/apply.py", line 1502, in apply_standard
    mapped = obj._map_values(
             ^^^^^^^^^^^^^^^^
  File "/home/rin/opt/miniconda3/envs/spark4/lib/python3.12/site-packages/pandas/core/base.py", line 925, in _map_values
    return algorithms.map_array(arr, mapper, na_action=na_action, convert=convert)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/tmp/ipykernel_326921/1393252530.py", line 28, in fit_waveforms
  File "/home/rin/opt/miniconda3/envs/spark4/lib/python3.12/site-packages/pandas/core/series.py", line 4943, in apply
    ).apply()
      ^^^^^^^
  File "/home/rin/opt/miniconda3/envs/spark4/lib/python3.12/site-packages/pandas/core/apply.py", line 1422, in apply
    return self.apply_standard()
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/rin/opt/miniconda3/envs/spark4/lib/python3.12/site-packages/pandas/core/apply.py", line 1502, in apply_standard
    mapped = obj._map_values(
             ^^^^^^^^^^^^^^^^
  File "/home/rin/opt/miniconda3/envs/spark4/lib/python3.12/site-packages/pandas/core/base.py", line 925, in _map_values
    return algorithms.map_array(arr, mapper, na_action=na_action, convert=convert)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/rin/opt/miniconda3/envs/spark4/lib/python3.12/site-packages/pandas/core/algorithms.py", line 1743, in map_array
    return lib.map_infer(values, mapper, convert=convert)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "pandas/_libs/lib.pyx", line 2999, in pandas._libs.lib.map_infer
  File "/tmp/ipykernel_326921/1393252530.py", line 21, in fit_wave
NameError: name 'curve_fit' is not defined
