In [0]:
%sql

SELECT * FROM samples.nyctaxi.trips

tpep_pickup_datetime,tpep_dropoff_datetime,trip_distance,fare_amount,pickup_zip,dropoff_zip
2016-02-13T21:47:53Z,2016-02-13T21:57:15Z,1.4,8.0,10103,10110
2016-02-13T18:29:09Z,2016-02-13T18:37:23Z,1.31,7.5,10023,10023
2016-02-06T19:40:58Z,2016-02-06T19:52:32Z,1.8,9.5,10001,10018
2016-02-12T19:06:43Z,2016-02-12T19:20:54Z,2.3,11.5,10044,10111
2016-02-23T10:27:56Z,2016-02-23T10:58:33Z,2.6,18.5,10199,10022
2016-02-13T00:41:43Z,2016-02-13T00:46:52Z,1.4,6.5,10023,10069
2016-02-18T23:49:53Z,2016-02-19T00:12:53Z,10.4,31.0,11371,10003
2016-02-18T20:21:45Z,2016-02-18T20:38:23Z,10.15,28.5,11371,11201
2016-02-03T10:47:50Z,2016-02-03T11:07:06Z,3.27,15.0,10014,10023
2016-02-19T01:26:39Z,2016-02-19T01:40:01Z,4.42,15.0,10003,11222


In [0]:
from pydantic_workflow.trip import Trip

In [0]:
from pydantic_workflow.trip import Trip

import json
import pydantic

Error_Message = str

def is_trip_valid(json_str) -> tuple[bool, Error_Message | None, Trip | None]:
    """
    Validate a JSON string as a Trip

    Uses Pydantic's validation to ensure the JSON is a valid Trip
    """
    try:
        data = json.loads(json_str)
        return True, None, Trip(**data)
    except json.JSONDecodeError as e:
        return False, str(e), None
    except pydantic.ValidationError as e:
        return False, str(e), None

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType, StringType, StructField, StructType

# FIXME Use sparkdantic to generate the schema
trip_schema = StructType([])

is_trip_valid_schema = StructType([
    StructField("is_valid", BooleanType(), False),
    StructField("error_message", StringType(), True),
    StructField("trip", trip_schema, True),
])

is_trip_valid_udf = udf(is_trip_valid, is_trip_valid_schema)

In [0]:
from pyspark.sql.functions import col, struct, to_json

trips = (
    spark
    .table("samples.nyctaxi.trips")
    .withColumn("trip_json", to_json(struct("*")))
    .withColumn("qc", is_trip_valid_udf("trip_json"))
    .withColumn("is_valid", col("qc.is_valid"))
    .withColumn("error_message", col("qc.error_message"))
    .withColumn("trip", col("qc.trip"))
    .drop("qc", "trip_json")
)

In [0]:
display(trips)

tpep_pickup_datetime,tpep_dropoff_datetime,trip_distance,fare_amount,pickup_zip,dropoff_zip,is_valid,error_message,trip
2016-02-13T21:47:53Z,2016-02-13T21:57:15Z,1.4,8.0,10103,10110,False,"1 validation error for Trip id  Field required [type=missing, input_value={'tpep_pickup_datetime': ...3, 'dropoff_zip': 10110}, input_type=dict]  For further information visit https://errors.pydantic.dev/2.10/v/missing",
2016-02-13T18:29:09Z,2016-02-13T18:37:23Z,1.31,7.5,10023,10023,False,"1 validation error for Trip id  Field required [type=missing, input_value={'tpep_pickup_datetime': ...3, 'dropoff_zip': 10023}, input_type=dict]  For further information visit https://errors.pydantic.dev/2.10/v/missing",
2016-02-06T19:40:58Z,2016-02-06T19:52:32Z,1.8,9.5,10001,10018,False,"1 validation error for Trip id  Field required [type=missing, input_value={'tpep_pickup_datetime': ...1, 'dropoff_zip': 10018}, input_type=dict]  For further information visit https://errors.pydantic.dev/2.10/v/missing",
2016-02-12T19:06:43Z,2016-02-12T19:20:54Z,2.3,11.5,10044,10111,False,"1 validation error for Trip id  Field required [type=missing, input_value={'tpep_pickup_datetime': ...4, 'dropoff_zip': 10111}, input_type=dict]  For further information visit https://errors.pydantic.dev/2.10/v/missing",
2016-02-23T10:27:56Z,2016-02-23T10:58:33Z,2.6,18.5,10199,10022,False,"1 validation error for Trip id  Field required [type=missing, input_value={'tpep_pickup_datetime': ...9, 'dropoff_zip': 10022}, input_type=dict]  For further information visit https://errors.pydantic.dev/2.10/v/missing",
2016-02-13T00:41:43Z,2016-02-13T00:46:52Z,1.4,6.5,10023,10069,False,"1 validation error for Trip id  Field required [type=missing, input_value={'tpep_pickup_datetime': ...3, 'dropoff_zip': 10069}, input_type=dict]  For further information visit https://errors.pydantic.dev/2.10/v/missing",
2016-02-18T23:49:53Z,2016-02-19T00:12:53Z,10.4,31.0,11371,10003,False,"1 validation error for Trip id  Field required [type=missing, input_value={'tpep_pickup_datetime': ...1, 'dropoff_zip': 10003}, input_type=dict]  For further information visit https://errors.pydantic.dev/2.10/v/missing",
2016-02-18T20:21:45Z,2016-02-18T20:38:23Z,10.15,28.5,11371,11201,False,"1 validation error for Trip id  Field required [type=missing, input_value={'tpep_pickup_datetime': ...1, 'dropoff_zip': 11201}, input_type=dict]  For further information visit https://errors.pydantic.dev/2.10/v/missing",
2016-02-03T10:47:50Z,2016-02-03T11:07:06Z,3.27,15.0,10014,10023,False,"1 validation error for Trip id  Field required [type=missing, input_value={'tpep_pickup_datetime': ...4, 'dropoff_zip': 10023}, input_type=dict]  For further information visit https://errors.pydantic.dev/2.10/v/missing",
2016-02-19T01:26:39Z,2016-02-19T01:40:01Z,4.42,15.0,10003,11222,False,"1 validation error for Trip id  Field required [type=missing, input_value={'tpep_pickup_datetime': ...3, 'dropoff_zip': 11222}, input_type=dict]  For further information visit https://errors.pydantic.dev/2.10/v/missing",
