In [1]:
from datafusion import SessionContext, lit, col, udf
import pyarrow as pa

In [2]:
ctx = SessionContext()

In [3]:
import example_intermediate_data

In [4]:
store = example_intermediate_data.MyDataStore(['a', 'c'])

In [5]:
initialize_udf = udf(
    store.initialize,
    [pa.string(), pa.uint64()],
    pa.bool_(),
    "stable",
)

replace_udf = udf(
    store.replace_from_store,
    [pa.string(), pa.uint64()],
    pa.uint64(),
    "stable",
)    

In [6]:
batch = pa.RecordBatch.from_arrays(
        [
            pa.array(['a', 'b', 'a', 'b', 'c', 'd'], type=pa.string()),
            pa.array([1, 2, 3, 4, 5, 6], type=pa.uint64()),
        ],
        names=["col_1", "col_2"],
    )

df = ctx.create_dataframe([[batch]])

In [7]:
# Just show the original data

df

col_1,col_2
a,1
b,2
a,3
b,4
c,5
d,6


In [8]:
# Now we make this call to initialize the data store so we can use it later

df.with_column("col_3", initialize_udf(col("col_1"), col("col_2")))

col_1,col_2,col_3
a,1,True
b,2,False
a,3,True
b,4,False
c,5,True
d,6,False


In [9]:
# Use the data structure. It should ignore the replacement for col_1 values a and c
# for others it should sow the max value of col_2 for that col_1
df.with_column("max_by_col1", replace_udf(col("col_1"), col("col_2")))

col_1,col_2,max_by_col1
a,1,1
b,2,4
a,3,3
b,4,4
c,5,5
d,6,6
