# Pandas UDFs & Structured Streaming with PySpark

In this notebook you will:
1. Compare normal UDFs vs built-in functions vs Pandas UDFs
2. Use Pandas UDFs on `samples.nyctaxi.trips`
3. Build a basic Structured Streaming pipeline (rate source)


In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import pandas_udf
import pandas as pd

nyc_taxi_df = spark.read.table("samples.nyctaxi.trips")
display(nyc_taxi_df.limit(5))


## 1. Normal Python UDF vs Built-in vs Pandas UDF

We'll compute a simple custom metric:

```python
score = log(1 + fare_amount) * (1 + tip_rate)
```


In [None]:
import math

# Normal Python UDF (row-by-row, slower)
def tip_score_py(fare_amount, tip_amount):
    if fare_amount is None or fare_amount <= 0:
        return None
    tip_rate = tip_amount / fare_amount if tip_amount is not None else 0.0
    return math.log1p(fare_amount) * (1 + tip_rate)

tip_score_udf = F.udf(tip_score_py, DoubleType())

sample_df = nyc_taxi_df.select("fare_amount", "tip_amount").limit(10000)

py_udf_df = sample_df.withColumn("tip_score_py_udf", tip_score_udf("fare_amount", "tip_amount"))
display(py_udf_df.limit(10))


### 1.1 Pandas UDF (Vectorized)

- Operates on **Pandas Series** instead of single rows
- Often much faster for heavy Python logic


In [None]:
@pandas_udf("double")
def tip_score_pandas(fare_amount: pd.Series, tip_amount: pd.Series) -> pd.Series:
    fare = fare_amount.fillna(0.0)
    tip = tip_amount.fillna(0.0)
    tip_rate = tip / fare.replace(0.0, pd.NA)
    tip_rate = tip_rate.fillna(0.0)
    return pd.Series([math.log1p(f) * (1 + r) for f, r in zip(fare, tip_rate)])

pandas_udf_df = sample_df.withColumn("tip_score_pandas", tip_score_pandas("fare_amount", "tip_amount"))

display(pandas_udf_df.limit(10))


### 1.2 Comparing with Built-in Functions

If you can avoid UDFs, always prefer built-ins.

We'll implement an **approximate** version of the score using built-ins only:


In [None]:
builtin_df = (
    sample_df
    .withColumn(
        "tip_rate",
        F.when(F.col("fare_amount") > 0,
               F.col("tip_amount") / F.col("fare_amount")).otherwise(0.0)
    )
    .withColumn(
        "tip_score_builtin",
        F.log1p("fare_amount") * (1 + F.col("tip_rate"))
    )
)

display(builtin_df.limit(10))


## 2. Window + Pandas UDF Example

We'll:
- Use a Pandas UDF with **grouped map** to compute custom stats per `passenger_count`.


In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType

# Schema for grouped-map Pandas UDF
schema = StructType([
    StructField("passenger_count", IntegerType()),
    StructField("avg_fare", DoubleType()),
    StructField("std_fare", DoubleType()),
    StructField("avg_tip_rate", DoubleType())
])

@pandas_udf(schema, functionType="grouped_map")
def passenger_stats(pdf: pd.DataFrame) -> pd.DataFrame:
    pc = int(pdf["passenger_count"].iloc[0])
    avg_fare = pdf["fare_amount"].mean()
    std_fare = pdf["fare_amount"].std()
    # Handle division by zero
    valid = pdf["fare_amount"] > 0
    tip_rate = (pdf.loc[valid, "tip_amount"] / pdf.loc[valid, "fare_amount"]).fillna(0.0)
    avg_tip_rate = tip_rate.mean()
    return pd.DataFrame([{
        "passenger_count": pc,
        "avg_fare": avg_fare,
        "std_fare": std_fare,
        "avg_tip_rate": avg_tip_rate
    }])

stats_df = (
    nyc_taxi_df
    .select("passenger_count", "fare_amount", "tip_amount")
    .groupby("passenger_count")
    .apply(passenger_stats)
)

display(stats_df.orderBy("passenger_count"))


## 3. Structured Streaming Example (Rate Source)

We'll:
- Use the `rate` source to generate a stream of rows
- Compute a moving count + average over time


In [None]:
# Create a streaming DataFrame with 10 rows per second
stream_df = (
    spark.readStream
         .format("rate")
         .option("rowsPerSecond", 10)
         .load()
)

display(stream_df)


### 3.1 Transform the Stream

We'll:
- Group by 10-second windows
- Compute count and average of `value`


In [None]:
agg_stream_df = (
    stream_df
    .groupBy(
        F.window(F.col("timestamp"), "10 seconds").alias("time_window")
    )
    .agg(
        F.count("*").alias("rows_in_window"),
        F.avg("value").alias("avg_value")
    )
)

# For debugging in Databricks, write to the memory sink
query = (
    agg_stream_df.writeStream
    .format("memory")
    .queryName("rate_agg")
    .outputMode("complete")
    .trigger(processingTime="5 seconds")  # micro-batch every 5s
    .start()
)


### 3.2 Query the In-Memory Table

Run this cell multiple times to see streaming updates.


In [None]:
display(spark.sql("SELECT * FROM rate_agg ORDER BY time_window"))


### 3.3 Stop the Stream When Done


In [None]:
# Stop the query to avoid leaving a stream running
query.stop()
