In [1]:
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator

In [2]:
dlt.__version__

'1.6.1'

In [3]:
BASE_API_URL = "https://us-central1-dlthub-analytics.cloudfunctions.net/data_engineering_zoomcamp_api"

@dlt.resource(name="ny_taxi_data")
def ny_taxi():
    client = RESTClient(
        base_url=BASE_API_URL,
        paginator=PageNumberPaginator(
            base_page=1,
            total_path=None
        )
    )

    for page in client.paginate("data_engineering_zoomcamp_api"):
        yield page

pipeline = dlt.pipeline(
    pipeline_name="ny_taxi_pipeline",
    destination="duckdb",
    dataset_name="ny_taxi_data"
)

In [4]:
load_info = pipeline.run(ny_taxi)
print(load_info)



Pipeline ny_taxi_pipeline load step completed in 2.98 seconds
1 load package(s) were loaded to destination duckdb and into dataset ny_taxi_data
The duckdb destination used duckdb:///c:\Users\49162\Desktop\zoomcamps\de_zoomcamp\de_zoomcamp\dlt\ny_taxi_pipeline.duckdb location to store data
Load package 1739374340.4431148 is LOADED and contains no failed jobs


In [5]:
import duckdb

conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")
conn.sql(f"SET search_path = '{pipeline.dataset_name}'")
conn.sql("DESCRIBE").df()

Unnamed: 0,database,schema,name,column_names,column_types,temporary
0,ny_taxi_pipeline,ny_taxi_data,_dlt_loads,"[load_id, schema_name, status, inserted_at, sc...","[VARCHAR, VARCHAR, BIGINT, TIMESTAMP WITH TIME...",False
1,ny_taxi_pipeline,ny_taxi_data,_dlt_pipeline_state,"[version, engine_version, pipeline_name, state...","[BIGINT, BIGINT, VARCHAR, VARCHAR, TIMESTAMP W...",False
2,ny_taxi_pipeline,ny_taxi_data,_dlt_version,"[version, engine_version, inserted_at, schema_...","[BIGINT, BIGINT, TIMESTAMP WITH TIME ZONE, VAR...",False
3,ny_taxi_pipeline,ny_taxi_data,ny_taxi_data,"[end_lat, end_lon, fare_amt, passenger_count, ...","[DOUBLE, DOUBLE, DOUBLE, BIGINT, VARCHAR, DOUB...",False


In [6]:
df = pipeline.dataset(dataset_type="default").ny_taxi_data.df()
df.shape

(10000, 18)

In [7]:
with pipeline.sql_client() as client:
    res = client.execute_sql(
            """
            SELECT
            AVG(date_diff('minute', trip_pickup_date_time, trip_dropoff_date_time))
            FROM ny_taxi_data;
            """
        )
    # Prints column values of the first row
    print(res)

[(12.3049,)]
