# Dlt Exploration Notebook

## Loading data into duckdb

- This is a heavily commented section intended to help the understanding of the code and how it works.

In [20]:
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator
import json


In [21]:
# Define the base URL of the API, ENDPOINT, and STARTING_PAGE
BASE_URL = "https://us-central1-dlthub-analytics.cloudfunctions.net"
ENDPOINT = "data_engineering_zoomcamp_api"
STARTING_PAGE = 1

# define a function that returns a paginated getter
def paginated_getter():
    # create a REST client with the base URL and a page number paginator
    client = RESTClient(
        base_url=BASE_URL,
        paginator=PageNumberPaginator(
          base_page=STARTING_PAGE,  # Start pagination from page 1
          total_path=None  # No total count path, paginate until no more pages
        )
    )

    # Iterate over each page of data from the API using the client
    for page in client.paginate(ENDPOINT):
        yield page  # Yield each page of data for processing

In [28]:
page_num = STARTING_PAGE
# Iterate over each page of data
for page_data in paginated_getter():
    if page_num == STARTING_PAGE:
      # Store the first record as a sample for reference
      sample_record = json.dumps(page_data[0], indent=2)
    
    # Print the total records on each page   
    print(f"Page {page_num} - Total records: {len(page_data)}")
    page_num+=1
    break # For a quick test

Page 1 - Total records: 1000


In [29]:
# Print the sample record
print("Sample Record: \n", sample_record)

Sample Record: 
 {
  "End_Lat": 40.742963,
  "End_Lon": -73.980072,
  "Fare_Amt": 45.0,
  "Passenger_Count": 1,
  "Payment_Type": "Credit",
  "Rate_Code": null,
  "Start_Lat": 40.641525,
  "Start_Lon": -73.787442,
  "Tip_Amt": 9.0,
  "Tolls_Amt": 4.15,
  "Total_Amt": 58.15,
  "Trip_Distance": 17.52,
  "Trip_Dropoff_DateTime": "2009-06-14 23:48:00",
  "Trip_Pickup_DateTime": "2009-06-14 23:23:00",
  "mta_tax": null,
  "store_and_forward": null,
  "surcharge": 0.0,
  "vendor_name": "VTS"
}


In [30]:
# dlt pipeline
pipeline = dlt.pipeline(
    pipeline_name="ny_taxi_pipeline",
    destination="duckdb",
    dataset_name="ny_taxi_data",
)


In [None]:
# Load the data into the "rides" table in DuckDB, replacing any existing data.
load_info = pipeline.run(
  paginated_getter(),  # The generator function that retrieves paginated data
  table_name="rides",  # Specify the target table name
  write_disposition="replace",  # Replace the existing data in the table
)

print(load_info)

In [None]:
# Streamlit info
!dlt pipeline ny_taxi_pipeline show

### Data in duckdb

As expected, 10,000 rows on duckDb

![](assets/images/image.png)

And the json file properly dumped into duckDb

| Index | name                   | data_type | nullable |
|-------|------------------------|-----------|----------|
| 0     | end_lat                | double    | true     |
| 1     | end_lon                | double    | true     |
| 2     | fare_amt               | double    | true     |
| 3     | passenger_count        | bigint    | true     |
| 4     | payment_type           | text      | true     |
| 5     | start_lat              | double    | true     |
| 6     | start_lon              | double    | true     |
| 7     | tip_amt                | double    | true     |
| 8     | tolls_amt              | double    | true     |
| 9     | total_amt              | double    | true     |
| 10    | trip_distance          | double    | true     |
| 11    | trip_dropoff_date_time | timestamp | true     |
| 12    | trip_pickup_date_time  | timestamp | true     |
| 13    | surcharge              | double    | true     |
| 14    | vendor_name            | text      | true     |
| 15    | _dlt_load_id           | text      | false    |
| 16    | _dlt_id                | text      | false    |
| 17    | store_and_forward      | double    | true     |


## dlt decorator


In [37]:
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator

# Define the base URL of the API, ENDPOINT, and STARTING_PAGE
BASE_URL = "https://us-central1-dlthub-analytics.cloudfunctions.net"
ENDPOINT = "data_engineering_zoomcamp_api"
STARTING_PAGE = 1

# Define the API resource for NYC taxi data
@dlt.resource(name="decorated_rides")   # <--- The name of the resource (will be used as the table name if none provided)
def ny_taxi():
    # Create a REST client with the base URL and a page number paginator
    client = RESTClient(
        base_url=BASE_URL,
        paginator=PageNumberPaginator(
          base_page=STARTING_PAGE,  # Start pagination from page 1
          total_path=None  # No total count path, paginate until no more pages
        )
    )

    # Iterate over each page of data from the API using the client
    for page in client.paginate(ENDPOINT):    # <--- API endpoint for retrieving taxi ride data
        yield page   # <--- yield data to manage memory

In [38]:
# define new dlt pipeline
pipeline = dlt.pipeline(
    pipeline_name="decorated_ny_taxi_pipeline", 
    destination="duckdb",
    dataset_name="decorated_ny_taxi_data"
)

In [None]:
# run the pipeline with the new resource
load_info = pipeline.run(
  ny_taxi,
  write_disposition="replace",
  # table_name="decorated_rides" # Optional 
)

print(load_info)

In [None]:
# Streamlit info
!dlt pipeline decorated_ny_taxi_pipeline show

In [42]:

# Optionally, explore loaded data
# pipeline.dataset(dataset_type="default").decorated_rides.df().info()
pipeline.dataset(dataset_type="default").decorated_rides.df()

Unnamed: 0,end_lat,end_lon,fare_amt,passenger_count,payment_type,start_lat,start_lon,tip_amt,tolls_amt,total_amt,trip_distance,trip_dropoff_date_time,trip_pickup_date_time,surcharge,vendor_name,_dlt_load_id,_dlt_id,store_and_forward
0,40.742963,-73.980072,45.0,1,Credit,40.641525,-73.787442,9.0,4.15,58.15,17.52,2009-06-14 23:48:00+00:00,2009-06-14 23:23:00+00:00,0.0,VTS,1739553950.1613045,XJrLSClDEwajoQ,
1,40.740187,-74.005698,6.5,1,Credit,40.722065,-74.009767,1.0,0.00,8.50,1.56,2009-06-18 17:43:00+00:00,2009-06-18 17:35:00+00:00,1.0,VTS,1739553950.1613045,DiF290VyRZe5xA,
2,40.718043,-74.004745,12.5,5,Credit,40.761945,-73.983038,2.0,0.00,15.50,3.37,2009-06-10 18:27:00+00:00,2009-06-10 18:08:00+00:00,1.0,VTS,1739553950.1613045,0Cgm+EQcDKtziw,
3,40.739637,-73.985233,4.9,1,CASH,40.749802,-73.992247,0.0,0.00,5.40,1.11,2009-06-14 23:58:00+00:00,2009-06-14 23:54:00+00:00,0.5,VTS,1739553950.1613045,/8hM0nihcXj6pA,
4,40.730032,-73.852693,25.7,1,CASH,40.776825,-73.949233,0.0,4.15,29.85,11.09,2009-06-13 13:23:00+00:00,2009-06-13 13:01:00+00:00,0.0,VTS,1739553950.1613045,eZ0GVT4KfsuakA,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9995,40.783522,-73.970690,5.7,1,CASH,40.778560,-73.953660,0.0,0.00,5.70,1.16,2009-06-19 11:28:00+00:00,2009-06-19 11:22:00+00:00,0.0,VTS,1739553950.1613045,FwZY/ECCDFiRyg,
9996,40.777200,-73.964197,4.1,1,CASH,40.779800,-73.974297,0.0,0.00,4.10,0.89,2009-06-17 07:43:00+00:00,2009-06-17 07:41:00+00:00,0.0,VTS,1739553950.1613045,2oqiFRUCIauBRA,
9997,40.780172,-73.957617,6.1,1,CASH,40.788388,-73.976758,0.0,0.00,6.10,1.30,2009-06-19 11:46:00+00:00,2009-06-19 11:39:00+00:00,0.0,VTS,1739553950.1613045,5Lr9pCI2CNcHhg,
9998,40.777342,-73.957242,5.7,1,CASH,40.773828,-73.956690,0.0,0.00,6.20,0.97,2009-06-17 04:19:00+00:00,2009-06-17 04:13:00+00:00,0.5,VTS,1739553950.1613045,0/CXu6RtYCuZ4Q,


## Incremental load

In [44]:
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator

# Define the base URL of the API, ENDPOINT, and STARTING_PAGE
BASE_URL = "https://us-central1-dlthub-analytics.cloudfunctions.net"
ENDPOINT = "data_engineering_zoomcamp_api"
STARTING_PAGE = 1

# Define the API resource for NYC taxi data and the incremental cursor
@dlt.resource(name="rides", write_disposition="append")   # <--- The name of the resource (will be used as the table name)
def ny_taxi(
    cursor_date = dlt.sources.incremental(
        "Trip_Dropoff_DateTime",   # <--- field to track, our timestamp
        initial_value="2009-06-15",   # <--- start date June 15, 2009
    )
  ):
    client = RESTClient(
        base_url=BASE_URL,
        paginator=PageNumberPaginator(
          base_page=STARTING_PAGE,  # Start pagination from page 1
          total_path=None  # No total count path, paginate until no more pages
        )
    )

    # Iterate over each page of data from the API using the client
    for page in client.paginate(ENDPOINT):    # <--- API endpoint for retrieving taxi ride data
        yield page   # <--- yield data to manage memory
        break

In [45]:
# define new dlt pipeline
pipeline = dlt.pipeline(
    pipeline_name="ny_taxi_incremental",
    destination="duckdb",
    dataset_name="ny_taxi_data"
)

In [None]:
# run the pipeline with the new resource
load_info = pipeline.run(ny_taxi)

print(pipeline.last_trace)

In [None]:
!dlt pipeline ny_taxi_incremental show

In [16]:
with pipeline.sql_client() as client:
    res = client.execute_sql(
            """
            SELECT
            MIN(trip_dropoff_date_time)
            FROM rides;
            """
        )
    # print(res)
    print(f"Minimum trip_dropoff_date_time: {res[0][0]}")

    

Minimum trip_dropoff_date_time: 2009-06-01 11:48:00+00:00


In [36]:
# define new dlt pipeline
pipeline = dlt.pipeline(pipeline_name="ny_taxi_incremental", destination="duckdb", dataset_name="ny_taxi_data")


In [None]:
# run the pipeline with the new resource
load_info = pipeline.run(ny_taxi)

print(pipeline.last_trace)

## Load to Postgres

1. Make sure to have `dlt[postgres]` in your environment
2. **(Optional)** Check your [docker-compose.yml](docker-compose.yml) file and adjust it accordingly if necessary.
3. Run `docker compose up` for `postgres` and `pgadmin` services.
4. Check pgAdmin at `localhost:8080` 
    - `admin@admin.com`, `admin`
5. For postgres connection:
     - host: `postgres`
     - port: `5432`
     - database: `ny_taxi_data`
     - user: `postgres`
     - password: `postgres`

![alt text](assets/images/image-2.png)


In [6]:
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator

# Define the base URL of the API, ENDPOINT, and STARTING_PAGE
BASE_URL = "https://us-central1-dlthub-analytics.cloudfunctions.net"
ENDPOINT = "data_engineering_zoomcamp_api"
STARTING_PAGE = 1

# Define the API resource for NYC taxi data and the incremental cursor
@dlt.resource(name="rides", write_disposition="append")   # <--- The name of the resource (will be used as the table name)
def ny_taxi(
    cursor_date = dlt.sources.incremental(
        "Trip_Dropoff_DateTime",   # <--- field to track, our timestamp
        initial_value="2009-06-15",   # <--- start date June 15, 2009
    )
):
    client = RESTClient(
        base_url=BASE_URL,
        paginator = PageNumberPaginator(
          base_page= STARTING_PAGE,
          total_path=None
        )
    )
  
    # Iterate over each page of data from the API using the client
    for page in client.paginate(ENDPOINT):    # <--- API endpoint for retrieving taxi ride data
        yield page   # <--- yield data to manage memory

In [7]:
pipeline = dlt.pipeline(
    pipeline_name="postgres_ny_taxi",
    # Credentials usage:
    # secrets.toml or ENVs.
    destination="postgres",
    # For testing purposes eg.:
    # destination= dlt.destinations.postgres("postgresql://postgres:postgres@localhost:5432/ny_taxi_data"),
    dataset_name="postgres_ny_taxi_data"
)

In [None]:
load_info = pipeline.run(
    ny_taxi()
)

print(pipeline.last_trace)

#### Postgres Query Results
![alt text](assets/images/image-3.png)

## Load into BigQuery

Things are pretty similar for Bigquery, so lets fasten up a bit.
- Make sure to have `dlt[bigquery]` in your environment
- Properly config your `ENVs` or `secrets.toml` for GCP credentials

In [9]:
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator

@dlt.resource(name="rides", write_disposition="append")
def ny_taxi():
    client = RESTClient(
        base_url="https://us-central1-dlthub-analytics.cloudfunctions.net",
        paginator=PageNumberPaginator(
            base_page=1,
            total_path=None
        )
    )

    for page in client.paginate("data_engineering_zoomcamp_api"):
        yield page

In [None]:
pipeline = dlt.pipeline(
    pipeline_name='taxi_data',
    destination='duckdb', # <--- to test pipeline locally
    dataset_name='taxi_rides',
)

pipeline = dlt.pipeline(
    pipeline_name='taxi_data',
    destination='bigquery', # <--- to run pipeline in production
    dataset_name='taxi_rides',
)

In [12]:
pipeline = dlt.pipeline(
    pipeline_name="taxi_data",
    destination="bigquery",
    dataset_name="taxi_rides",
    progress="enlighten", # <--- Install enlighten for better visualization. Otherwise comment this line
    dev_mode=True,
)

info = pipeline.run(ny_taxi)
# print(info)

#### Check results


|![alt text](assets/images/image-4.png)|![alt text](assets/images/image-5.png)|
|---|---|