In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
from datetime import datetime as dt
import dlt



In [None]:


catalog = dbutils.jobs.taskValues.get(
    taskKey="task_start", key="catalog", debugValue="dev"
)


In [None]:
@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")


In [None]:
# destination table to which we append flows
dlt.create_streaming_table(
    name="filtered_taxis",
    comment="taxi data",
    table_properties={"pipelines.autoOptimize.zOrderCols": "snapshot_dt"},
    partition_cols=["VendorID"],
    schema="DOLocationID BIGINT,  PULocationID BIGINT,  RatecodeID BIGINT,  VendorID BIGINT,  congestion_surcharge DOUBLE,  extra DOUBLE,  fare_amount DOUBLE,  improvement_surcharge DOUBLE,  mta_tax DOUBLE,  passenger_count BIGINT,  payment_type BIGINT,  store_and_fwd_flag STRING,  tip_amount DOUBLE,  tolls_amount DOUBLE,  total_amount DOUBLE,  tpep_dropoff_datetime STRING,  tpep_pickup_datetime STRING,  trip_distance DOUBLE,  pep_pickup_date_txt DATE",
    expect_all_or_drop={
        "PULocationID present": "PULocationID > 0",
        "fare_amount greater than 0": "fare_amount > 0",
        "pep_pickup_date_txt from 2020 onwards": "pep_pickup_date_txt > date'2020-01-01'"
    }
)

In [None]:
 # Create and append the flow for each table and segment ID, as append_flow is decorator I had to use evaluate as name of the function has to be diffrent every time.
    # soon parameter name= will be available (it is already in docuemntation) so it will be possible to remove exec()
@dlt.append_flow(target="filtered_taxis")
def flow_filtered_taxis():
    return (
    spark.readStream.format("delta")
    .option("readChangeFeed", "true")
    .table(f"dojo_gold.filtered_taxis")
    .filter("_change_type IN ('insert')")
)


In [None]:
@dlt.table(
  name="filtered_taxis")
def filtered_taxis():
  return dlt.read("taxi_raw").filter(F.expr("fare_amount < 30"))