# **Install `dlt`⏳**

What is dlt?

* dlt is an open-source library that you can add to your Python scripts to load data from various and often messy data sources into well-structured, live datasets.
* You can install it using pip and there's no need to start any backends or containers. You can simply import dlt in your Python script and write a simple pipeline to load data from sources like APIs, databases, files, etc. into a destination of your choice.

Here are a few reasons why you should use dlt:

* Automated maintenance: With schema inference and evolution and alerts, and with short declarative code, maintenance becomes simple.
* Run it where Python runs: You can use dlt on Airflow, serverless functions, notebooks. It doesn't require external APIs, backends or containers, and scales on both micro and large infrastructures.
* User-friendly, declarative interface: dlt provides a user-friendly interface that removes knowledge obstacles for beginners while empowering senior professionals.

Benefits: As a data engineer, dlt offers several benefits:

* Efficient Data Extraction and Loading: dlt simplifies the process of extracting and loading data. It allows you to decorate your data-producing functions with loading or incremental extraction metadata, enabling dlt to extract and load data according to your custom logic. This is particularly useful when dealing with large datasets, as dlt supports scalability through iterators, chunking, and parallelization. Read more

* Automated Schema Management: dlt automatically infers a schema from data and loads the data to the destination. It can easily adapt and structure data as it evolves, reducing the time spent on maintenance and development. This ensures data consistency and quality. Read more
* Data Governance Support: dlt pipelines offer robust governance support through pipeline metadata utilization, schema enforcement and curation, and schema change alerts. This promotes data consistency, traceability, and control throughout the data processing lifecycle. Read more

* Flexibility and Scalability: dlt can be used on Airflow, serverless functions, notebooks, and scales on both micro and large infrastructures. It also offers several mechanisms and configuration options to scale up and fine-tune pipelines. Read more

* Post-Loading Transformations: dlt provides several options for transformations after loading the data, including using dbt, the dlt SQL client, or Pandas. This allows you to shape and manipulate the data before or after loading it, allowing you to meet specific requirements and ensure data quality and consistency. Read more



In [4]:
%%capture
!pip install dlt[duckdb] # Install dlt with all the necessary DuckDB dependencies

# Part 1: Data Extraction


## Example 1: Extracting API data with a generator

Premise:

For this example, we created a simple http api that returns json "page by page",  1000 records per page.

It accepts a parameter called `page`, representing the page number.
If we request a larger page number than there is data, we get an empty response.

To get the pages, we write a loop that asks for pages starting from 1 and increasing, until we receive an empty page.

As we do not know ahead of time how many pages have data and if they fit in memory, yielding the data so it can be handled page by page scales better than first collecting all pages in memory and then returning them.

In [1]:
import requests



BASE_API_URL = "https://us-central1-dlthub-analytics.cloudfunctions.net/data_engineering_zoomcamp_api"

# I call this a paginated getter
# as it's a function that gets data
# and also paginates until there is no more data
# by yielding pages, we "microbatch", which speeds up downstream processing

def paginated_getter():
    page_number = 1

    while True:
        # Set the query parameters
        params = {'page': page_number}

        # Make the GET request to the API
        response = requests.get(BASE_API_URL, params=params)
        response.raise_for_status()  # Raise an HTTPError for bad responses
        page_json = response.json()
        print(f'got page number {page_number} with {len(page_json)} records')

        # if the page has no records, stop iterating
        if page_json:
            yield page_json
            page_number += 1
        else:
            # No more data, break the loop
            break


if __name__ == '__main__':
    # Use the generator to iterate over pages
    for page_data in paginated_getter():
        # Process each page as needed
        print(page_data)


got page number 1 with 1000 records
[{'End_Lat': 40.742963, 'End_Lon': -73.980072, 'Fare_Amt': 45.0, 'Passenger_Count': 1, 'Payment_Type': 'Credit', 'Rate_Code': None, 'Start_Lat': 40.641525, 'Start_Lon': -73.787442, 'Tip_Amt': 9.0, 'Tolls_Amt': 4.15, 'Total_Amt': 58.15, 'Trip_Distance': 17.52, 'Trip_Dropoff_DateTime': '2009-06-14 23:48:00', 'Trip_Pickup_DateTime': '2009-06-14 23:23:00', 'mta_tax': None, 'store_and_forward': None, 'surcharge': 0.0, 'vendor_name': 'VTS'}, {'End_Lat': 40.740187, 'End_Lon': -74.005698, 'Fare_Amt': 6.5, 'Passenger_Count': 1, 'Payment_Type': 'Credit', 'Rate_Code': None, 'Start_Lat': 40.722065, 'Start_Lon': -74.009767, 'Tip_Amt': 1.0, 'Tolls_Amt': 0.0, 'Total_Amt': 8.5, 'Trip_Distance': 1.56, 'Trip_Dropoff_DateTime': '2009-06-18 17:43:00', 'Trip_Pickup_DateTime': '2009-06-18 17:35:00', 'mta_tax': None, 'store_and_forward': None, 'surcharge': 1.0, 'vendor_name': 'VTS'}, {'End_Lat': 40.718043, 'End_Lon': -74.004745, 'Fare_Amt': 12.5, 'Passenger_Count': 5, 'Pay

## Example 2: The "bad" way to download a file

In this example we download a json lines file.

Since the download is text but we want to work with iterable data strutures for loading, we convert the contents to list of jsons.

This is a less than ideal approach because if the file size is unknown, we run the risk of running out of memory. In the case of machines that run multiple jobs, an out of memory error runs the risk of killing not just the current jobs but also anything else running on the machine at the time - a situation most data engineers **really really** like to avoid.

In [None]:
import requests
import json


def download_and_read_jsonl(url):
    response = requests.get(url)
    response.raise_for_status()  # Raise an HTTPError for bad responses
    data = response.text.splitlines()
    parsed_data = [json.loads(line) for line in data]
    return parsed_data


# time the download
import time
start = time.time()

url = "https://storage.googleapis.com/dtc_zoomcamp_api/yellow_tripdata_2009-06.jsonl"
downloaded_data = download_and_read_jsonl(url)

if downloaded_data:
    # Process or print the downloaded data as needed
    print(downloaded_data[:5])  # Print the first 5 entries as an example

# time the download
end = time.time()
print(end - start)

[{'vendor_name': 'VTS', 'Trip_Pickup_DateTime': '2009-06-14 23:23:00', 'Trip_Dropoff_DateTime': '2009-06-14 23:48:00', 'Passenger_Count': 1, 'Trip_Distance': 17.52, 'Start_Lon': -73.787442, 'Start_Lat': 40.641525, 'Rate_Code': None, 'store_and_forward': None, 'End_Lon': -73.980072, 'End_Lat': 40.742963, 'Payment_Type': 'Credit', 'Fare_Amt': 45.0, 'surcharge': 0.0, 'mta_tax': None, 'Tip_Amt': 9.0, 'Tolls_Amt': 4.15, 'Total_Amt': 58.15}, {'vendor_name': 'VTS', 'Trip_Pickup_DateTime': '2009-06-18 17:35:00', 'Trip_Dropoff_DateTime': '2009-06-18 17:43:00', 'Passenger_Count': 1, 'Trip_Distance': 1.56, 'Start_Lon': -74.009767, 'Start_Lat': 40.722065, 'Rate_Code': None, 'store_and_forward': None, 'End_Lon': -74.005698, 'End_Lat': 40.740187, 'Payment_Type': 'Credit', 'Fare_Amt': 6.5, 'surcharge': 1.0, 'mta_tax': None, 'Tip_Amt': 1.0, 'Tolls_Amt': 0.0, 'Total_Amt': 8.5}, {'vendor_name': 'VTS', 'Trip_Pickup_DateTime': '2009-06-10 18:08:00', 'Trip_Dropoff_DateTime': '2009-06-10 18:27:00', 'Passeng

## Example 3: Extracting file data with a generator "the best practice way"

"The best practice way" here refers to the most scalable way to do it, but if you are confident scale will not be an issue, then the right way might be the simplest :)

In this example we download a jsonl (like json, but lines) file.
Since it's jsonl, it has lines so we can process it line by line.

We stream download it and yield the data.

If this file were json and not jsonl, we could use ijson library to break it into lines without loading to memory.



In [2]:
import requests
import json

url = "https://storage.googleapis.com/dtc_zoomcamp_api/yellow_tripdata_2009-06.jsonl"

def stream_download_jsonl(url):
    response = requests.get(url, stream=True)
    response.raise_for_status()  # Raise an HTTPError for bad responses
    for line in response.iter_lines():
        if line:
            yield json.loads(line)

# time the download
import time
start = time.time()

# Use the generator to iterate over rows with minimal memory usage
row_counter = 0
for row in stream_download_jsonl(url):
    print(row)
    row_counter += 1
    if row_counter >= 5:
        break

# time the download
end = time.time()
print(end - start)

{'vendor_name': 'VTS', 'Trip_Pickup_DateTime': '2009-06-14 23:23:00', 'Trip_Dropoff_DateTime': '2009-06-14 23:48:00', 'Passenger_Count': 1, 'Trip_Distance': 17.52, 'Start_Lon': -73.787442, 'Start_Lat': 40.641525, 'Rate_Code': None, 'store_and_forward': None, 'End_Lon': -73.980072, 'End_Lat': 40.742963, 'Payment_Type': 'Credit', 'Fare_Amt': 45.0, 'surcharge': 0.0, 'mta_tax': None, 'Tip_Amt': 9.0, 'Tolls_Amt': 4.15, 'Total_Amt': 58.15}
{'vendor_name': 'VTS', 'Trip_Pickup_DateTime': '2009-06-18 17:35:00', 'Trip_Dropoff_DateTime': '2009-06-18 17:43:00', 'Passenger_Count': 1, 'Trip_Distance': 1.56, 'Start_Lon': -74.009767, 'Start_Lat': 40.722065, 'Rate_Code': None, 'store_and_forward': None, 'End_Lon': -74.005698, 'End_Lat': 40.740187, 'Payment_Type': 'Credit', 'Fare_Amt': 6.5, 'surcharge': 1.0, 'mta_tax': None, 'Tip_Amt': 1.0, 'Tolls_Amt': 0.0, 'Total_Amt': 8.5}
{'vendor_name': 'VTS', 'Trip_Pickup_DateTime': '2009-06-10 18:08:00', 'Trip_Dropoff_DateTime': '2009-06-10 18:27:00', 'Passenger_

### Loading the generator (any of the above)

We have 3 ways to download the same data. Let's use the fast and reliable way to load some data and inspect it in DuckDB.

In this example, we are using `dlt` library to do the loading, which will process data from the generators incrementally, following the same memory management paradigm.

We will discuss more details about `dlt` or "data load tool" later.

In [5]:
import dlt

# define the connection to load to.
# We now use duckdb, but you can switch to Bigquery later
generators_pipeline = dlt.pipeline(destination='duckdb', dataset_name='generators')


# we can load any generator to a table at the pipeline destnation as follows:
info = generators_pipeline.run(paginated_getter(),
										table_name="http_download",
										write_disposition="replace")

# the outcome metadata is returned by the load and we can inspect it by printing it.
print(info)

# we can load the next generator to the same or to a different table.
info = generators_pipeline.run(stream_download_jsonl(url),
										table_name="stream_download",
										write_disposition="replace")

print(info)


got page number 1 with 1000 records
got page number 2 with 1000 records
got page number 3 with 1000 records
got page number 4 with 1000 records
got page number 5 with 1000 records
got page number 6 with 1000 records
got page number 7 with 1000 records
got page number 8 with 1000 records
got page number 9 with 1000 records
got page number 10 with 1000 records
got page number 11 with 0 records
Pipeline dlt_colab_kernel_launcher load step completed in 2.36 seconds
1 load package(s) were loaded to destination duckdb and into dataset generators
The duckdb destination used duckdb:////content/dlt_colab_kernel_launcher.duckdb location to store data
Load package 1707276994.556395 is LOADED and contains no failed jobs
Pipeline dlt_colab_kernel_launcher load step completed in 2.88 seconds
1 load package(s) were loaded to destination duckdb and into dataset generators
The duckdb destination used duckdb:////content/dlt_colab_kernel_launcher.duckdb location to store data
Load package 1707277023.9241

In [6]:
# show outcome

import duckdb

conn = duckdb.connect(f"{generators_pipeline.pipeline_name}.duckdb")

# let's see the tables
conn.sql(f"SET search_path = '{generators_pipeline.dataset_name}'")
print('Loaded tables: ')
display(conn.sql("show tables"))

# and the data

print("\n\n\n http_download table below:")

rides = conn.sql("SELECT * FROM http_download").df()
display(rides)

print("\n\n\n stream_download table below:")

passengers = conn.sql("SELECT * FROM stream_download").df()
display(passengers)

# As you can see, the same data was loaded in both cases.

Loaded tables: 


┌─────────────────────┐
│        name         │
│       varchar       │
├─────────────────────┤
│ _dlt_loads          │
│ _dlt_pipeline_state │
│ _dlt_version        │
│ http_download       │
│ stream_download     │
└─────────────────────┘




 http_download table below:


Unnamed: 0,end_lat,end_lon,fare_amt,passenger_count,payment_type,start_lat,start_lon,tip_amt,tolls_amt,total_amt,trip_distance,trip_dropoff_date_time,trip_pickup_date_time,surcharge,vendor_name,_dlt_load_id,_dlt_id,store_and_forward
0,40.742963,-73.980072,45.0,1,Credit,40.641525,-73.787442,9.0,4.15,58.15,17.52,2009-06-14 23:48:00+00:00,2009-06-14 23:23:00+00:00,0.0,VTS,1707276994.556395,24SqFiDUKmiMyA,
1,40.740187,-74.005698,6.5,1,Credit,40.722065,-74.009767,1.0,0.00,8.50,1.56,2009-06-18 17:43:00+00:00,2009-06-18 17:35:00+00:00,1.0,VTS,1707276994.556395,dmZUtxDwR4IvPQ,
2,40.718043,-74.004745,12.5,5,Credit,40.761945,-73.983038,2.0,0.00,15.50,3.37,2009-06-10 18:27:00+00:00,2009-06-10 18:08:00+00:00,1.0,VTS,1707276994.556395,XA6XE0EIivTC+g,
3,40.739637,-73.985233,4.9,1,CASH,40.749802,-73.992247,0.0,0.00,5.40,1.11,2009-06-14 23:58:00+00:00,2009-06-14 23:54:00+00:00,0.5,VTS,1707276994.556395,8TgSImltplnfww,
4,40.730032,-73.852693,25.7,1,CASH,40.776825,-73.949233,0.0,4.15,29.85,11.09,2009-06-13 13:23:00+00:00,2009-06-13 13:01:00+00:00,0.0,VTS,1707276994.556395,ozN4Joxai+Bnhw,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9995,40.783522,-73.970690,5.7,1,CASH,40.778560,-73.953660,0.0,0.00,5.70,1.16,2009-06-19 11:28:00+00:00,2009-06-19 11:22:00+00:00,0.0,VTS,1707276994.556395,D5+PYi5zyrV5wQ,
9996,40.777200,-73.964197,4.1,1,CASH,40.779800,-73.974297,0.0,0.00,4.10,0.89,2009-06-17 07:43:00+00:00,2009-06-17 07:41:00+00:00,0.0,VTS,1707276994.556395,BaENRjYsczAvoQ,
9997,40.780172,-73.957617,6.1,1,CASH,40.788388,-73.976758,0.0,0.00,6.10,1.30,2009-06-19 11:46:00+00:00,2009-06-19 11:39:00+00:00,0.0,VTS,1707276994.556395,aQAPX8cGtd4cFA,
9998,40.777342,-73.957242,5.7,1,CASH,40.773828,-73.956690,0.0,0.00,6.20,0.97,2009-06-17 04:19:00+00:00,2009-06-17 04:13:00+00:00,0.5,VTS,1707276994.556395,BO8DZn2xvV1FPw,





 stream_download table below:


Unnamed: 0,vendor_name,trip_pickup_date_time,trip_dropoff_date_time,passenger_count,trip_distance,start_lon,start_lat,end_lon,end_lat,payment_type,fare_amt,surcharge,tip_amt,tolls_amt,total_amt,_dlt_load_id,_dlt_id,store_and_forward
0,VTS,2009-06-14 23:23:00+00:00,2009-06-14 23:48:00+00:00,1,17.52,-73.787442,40.641525,-73.980072,40.742963,Credit,45.0,0.0,9.0,4.15,58.15,1707277023.9241588,i3Ll0pkdT1CO/Q,
1,VTS,2009-06-18 17:35:00+00:00,2009-06-18 17:43:00+00:00,1,1.56,-74.009767,40.722065,-74.005698,40.740187,Credit,6.5,1.0,1.0,0.00,8.50,1707277023.9241588,5gUINtvcCvDlpg,
2,VTS,2009-06-10 18:08:00+00:00,2009-06-10 18:27:00+00:00,5,3.37,-73.983038,40.761945,-74.004745,40.718043,Credit,12.5,1.0,2.0,0.00,15.50,1707277023.9241588,jZ4ny5sFpKd3ig,
3,VTS,2009-06-14 23:54:00+00:00,2009-06-14 23:58:00+00:00,1,1.11,-73.992247,40.749802,-73.985233,40.739637,CASH,4.9,0.5,0.0,0.00,5.40,1707277023.9241588,ZhrG+D0pjwBhQQ,
4,VTS,2009-06-13 13:01:00+00:00,2009-06-13 13:23:00+00:00,1,11.09,-73.949233,40.776825,-73.852693,40.730032,CASH,25.7,0.0,0.0,4.15,29.85,1707277023.9241588,gawRXqcHLJ3ayQ,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9995,VTS,2009-06-19 11:22:00+00:00,2009-06-19 11:28:00+00:00,1,1.16,-73.953660,40.778560,-73.970690,40.783522,CASH,5.7,0.0,0.0,0.00,5.70,1707277023.9241588,kyRpTkL4IFNp9Q,
9996,VTS,2009-06-17 07:41:00+00:00,2009-06-17 07:43:00+00:00,1,0.89,-73.974297,40.779800,-73.964197,40.777200,CASH,4.1,0.0,0.0,0.00,4.10,1707277023.9241588,qFXKcELDmIenFg,
9997,VTS,2009-06-19 11:39:00+00:00,2009-06-19 11:46:00+00:00,1,1.30,-73.976758,40.788388,-73.957617,40.780172,CASH,6.1,0.0,0.0,0.00,6.10,1707277023.9241588,bq58XwYMxTyTVQ,
9998,VTS,2009-06-17 04:13:00+00:00,2009-06-17 04:19:00+00:00,1,0.97,-73.956690,40.773828,-73.957242,40.777342,CASH,5.7,0.5,0.0,0.00,6.20,1707277023.9241588,O8J+bgObzK8QCQ,


# Part 2: Normalisation

## Load nested data with auto normalisation

When converting nested data to tabular formats, to keep fragmentations minimal:
* Nested dictionaries can be flattened into the parent row to
* Nested lists however need to be expressed as separate tables due to the different granularity (1:n relationship)

And of course, when going from JSON to DB, we want some things standardised:
* Data types such as timestamps should be detected correctly
* Column names should be converted to db-compatible names
* Unnested sub-tables should be linked to parent tables via auto generated keys


For this work, we will use `dlt` library, which is purpose-made to solve such tasks in a scalable way, for example by using generators.



### Introducing dlt

dlt is a python library created for the purpose of assisting data engineers to build simpler, faster and more robust pipelines with minimal effort.

dlt automates much of the tedious work a data engineer would do, and does it in a way that is robust.

dlt can handle things like:

- Schema: Inferring and evolving schema, alerting changes, using schemas as data contracts.
- Typing data, flattening structures, renaming columns to fit database standards.
- Processing a stream of events/rows without filling memory. This includes extraction from generators. In our example we will pass the “data” you can see above.
- Loading to a variety of dbs of file formats.

Read more about dlt [here](https://dlthub.com/docs/intro).

Now let’s use it to load our nested json to duckdb:

In [7]:
import dlt
import duckdb

data = [
    {
        "vendor_name": "VTS",
				"record_hash": "b00361a396177a9cb410ff61f20015ad",
        "time": {
            "pickup": "2009-06-14 23:23:00",
            "dropoff": "2009-06-14 23:48:00"
        },
        "Trip_Distance": 17.52,
        # nested dictionaries could be flattened
        "coordinates": { # coordinates__start__lon
            "start": {
                "lon": -73.787442,
                "lat": 40.641525
            },
            "end": {
                "lon": -73.980072,
                "lat": 40.742963
            }
        },
        "Rate_Code": None,
        "store_and_forward": None,
        "Payment": {
            "type": "Credit",
            "amt": 20.5,
            "surcharge": 0,
            "mta_tax": None,
            "tip": 9,
            "tolls": 4.15,
						"status": "booked"
        },
        "Passenger_Count": 2,
        # nested lists need to be expressed as separate tables
        "passengers": [
            {"name": "John", "rating": 4.9},
            {"name": "Jack", "rating": 3.9}
        ],
        "Stops": [
            {"lon": -73.6, "lat": 40.6},
            {"lon": -73.5, "lat": 40.5}
        ]
    },
    # ... more data
]


# define the connection to load to.
# We now use duckdb, but you can switch to Bigquery later
pipeline = dlt.pipeline(destination='duckdb', dataset_name='taxi_rides')



# run with merge write disposition.
# This is so scaffolding is created for the next example,
# where we look at merging data

info = pipeline.run(data,
										table_name="rides",
										write_disposition="merge",
                    primary_key="record_hash")

print(info)

Pipeline dlt_colab_kernel_launcher load step completed in 1.00 seconds
1 load package(s) were loaded to destination duckdb and into dataset taxi_rides
The duckdb destination used duckdb:////content/dlt_colab_kernel_launcher.duckdb location to store data
Load package 1707277048.769128 is LOADED and contains no failed jobs


### Inspecting the nested structure, joining the child tables

Let's look at what happened during the load
- By looking at the loaded tables, we can see our json document got flattened and sub-documents got split into separate tables
- We can re-join those child tables to the parent table by using the generated keys `on parent_table._dlt_id = child_table._dlt_parent_id`.
- Data types: If you will pay attention to datatypes, you will note that the timestamps, which in json are of string type, are now of timestamp type in the db.


In [8]:
# show the outcome

conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")

# let's see the tables
conn.sql(f"SET search_path = '{pipeline.dataset_name}'")
print('Loaded tables: ')
display(conn.sql("show tables"))


print("\n\n\n Rides table below: Note the times are properly typed")
rides = conn.sql("SELECT * FROM rides").df()
display(rides)

print("\n\n\n Pasengers table")
passengers = conn.sql("SELECT * FROM rides__passengers").df()
display(passengers)
print("\n\n\n Stops table")
stops = conn.sql("SELECT * FROM rides__stops").df()
display(stops)


# to reflect the relationships between parent and child rows, let's join them
# of course this will have 4 rows due to the two 1:n joins

print("\n\n\n joined table")

joined = conn.sql("""
SELECT *
FROM rides as r
left join rides__passengers as rp
  on r._dlt_id = rp._dlt_parent_id
left join rides__stops as rs
  on r._dlt_id = rs._dlt_parent_id
""").df()
display(joined)

Loaded tables: 


┌─────────────────────┐
│        name         │
│       varchar       │
├─────────────────────┤
│ _dlt_loads          │
│ _dlt_pipeline_state │
│ _dlt_version        │
│ rides               │
│ rides__passengers   │
│ rides__stops        │
└─────────────────────┘




 Rides table below: Note the times are properly typed


Unnamed: 0,record_hash,vendor_name,time__pickup,time__dropoff,trip_distance,coordinates__start__lon,coordinates__start__lat,coordinates__end__lon,coordinates__end__lat,payment__type,payment__amt,payment__surcharge,payment__tip,payment__tolls,payment__status,passenger_count,_dlt_load_id,_dlt_id
0,b00361a396177a9cb410ff61f20015ad,VTS,2009-06-14 23:23:00+00:00,2009-06-14 23:48:00+00:00,17.52,-73.787442,40.641525,-73.980072,40.742963,Credit,20.5,0,9,4.15,booked,2,1707277048.769128,yhPNchvYO6Tyug





 Pasengers table


Unnamed: 0,name,rating,_dlt_root_id,_dlt_parent_id,_dlt_list_idx,_dlt_id
0,John,4.9,yhPNchvYO6Tyug,yhPNchvYO6Tyug,0,bmeAeLFG4y5PhQ
1,Jack,3.9,yhPNchvYO6Tyug,yhPNchvYO6Tyug,1,5icAD4nc518Oaw





 Stops table


Unnamed: 0,lon,lat,_dlt_root_id,_dlt_parent_id,_dlt_list_idx,_dlt_id
0,-73.6,40.6,yhPNchvYO6Tyug,yhPNchvYO6Tyug,0,6Wi5Eub/DUbHbA
1,-73.5,40.5,yhPNchvYO6Tyug,yhPNchvYO6Tyug,1,zbxtXB2vjAajUQ





 joined table


Unnamed: 0,record_hash,vendor_name,time__pickup,time__dropoff,trip_distance,coordinates__start__lon,coordinates__start__lat,coordinates__end__lon,coordinates__end__lat,payment__type,...,_dlt_root_id,_dlt_parent_id,_dlt_list_idx,_dlt_id_2,lon,lat,_dlt_root_id_2,_dlt_parent_id_2,_dlt_list_idx_2,_dlt_id_3
0,b00361a396177a9cb410ff61f20015ad,VTS,2009-06-14 23:23:00+00:00,2009-06-14 23:48:00+00:00,17.52,-73.787442,40.641525,-73.980072,40.742963,Credit,...,yhPNchvYO6Tyug,yhPNchvYO6Tyug,1,5icAD4nc518Oaw,-73.5,40.5,yhPNchvYO6Tyug,yhPNchvYO6Tyug,1,zbxtXB2vjAajUQ
1,b00361a396177a9cb410ff61f20015ad,VTS,2009-06-14 23:23:00+00:00,2009-06-14 23:48:00+00:00,17.52,-73.787442,40.641525,-73.980072,40.742963,Credit,...,yhPNchvYO6Tyug,yhPNchvYO6Tyug,0,bmeAeLFG4y5PhQ,-73.5,40.5,yhPNchvYO6Tyug,yhPNchvYO6Tyug,1,zbxtXB2vjAajUQ
2,b00361a396177a9cb410ff61f20015ad,VTS,2009-06-14 23:23:00+00:00,2009-06-14 23:48:00+00:00,17.52,-73.787442,40.641525,-73.980072,40.742963,Credit,...,yhPNchvYO6Tyug,yhPNchvYO6Tyug,1,5icAD4nc518Oaw,-73.6,40.6,yhPNchvYO6Tyug,yhPNchvYO6Tyug,0,6Wi5Eub/DUbHbA
3,b00361a396177a9cb410ff61f20015ad,VTS,2009-06-14 23:23:00+00:00,2009-06-14 23:48:00+00:00,17.52,-73.787442,40.641525,-73.980072,40.742963,Credit,...,yhPNchvYO6Tyug,yhPNchvYO6Tyug,0,bmeAeLFG4y5PhQ,-73.6,40.6,yhPNchvYO6Tyug,yhPNchvYO6Tyug,0,6Wi5Eub/DUbHbA


What are we looking at?
- Nested dicts got flattened into the parent row, the structure `{"coordinates":{"start": {"lat": ...}}}` became
`coordinates__start__lat`

- Nested lists got broken out into separate tables with generated columns that would allow us to join the data back when needed.

# Part 3: Incremental loading
## Update nested data

In this example the scores of the 2 passengers changed. Turns out their payment didn't go through for the ride before and they got a bad rating from the driver, so now we have to update their rating.

As you can see after running the code, their ratings are now lowered

In [9]:
import dlt
import duckdb

data = [
    {
        "vendor_name": "VTS",
				"record_hash": "b00361a396177a9cb410ff61f20015ad",
        "time": {
            "pickup": "2009-06-14 23:23:00",
            "dropoff": "2009-06-14 23:48:00"
        },
        "Trip_Distance": 17.52,
        "coordinates": {
            "start": {
                "lon": -73.787442,
                "lat": 40.641525
            },
            "end": {
                "lon": -73.980072,
                "lat": 40.742963
            }
        },
        "Rate_Code": None,
        "store_and_forward": None,
        "Payment": {
            "type": "Credit",
            "amt": 20.5,
            "surcharge": 0,
            "mta_tax": None,
            "tip": 9,
            "tolls": 4.15,
						"status": "cancelled"
        },
        "Passenger_Count": 2,
        "passengers": [
            {"name": "John", "rating": 4.4},
            {"name": "Jack", "rating": 3.6}
        ],
        "Stops": [
            {"lon": -73.6, "lat": 40.6},
            {"lon": -73.5, "lat": 40.5}
        ]
    },
]

# define the connection to load to.
# We now use duckdb, but you can switch to Bigquery later
pipeline = dlt.pipeline(destination='duckdb', dataset_name='taxi_rides')

# run the pipeline with default settings, and capture the outcome
info = pipeline.run(data,
										table_name="rides",
										write_disposition="merge",
                    primary_key='record_hash')

# show the outcome

conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")

# let's see the tables
conn.sql(f"SET search_path = '{pipeline.dataset_name}'")
print('Loaded tables: ')
display(conn.sql("show tables"))


print("\n\n\n Rides table below: Note the times are properly typed")
rides = conn.sql("SELECT * FROM rides").df()
display(rides)

print("\n\n\n Pasengers table")
passengers = conn.sql("SELECT * FROM rides__passengers").df()
display(passengers)
print("\n\n\n Stops table")
stops = conn.sql("SELECT * FROM rides__stops").df()
display(stops)


Loaded tables: 


┌─────────────────────┐
│        name         │
│       varchar       │
├─────────────────────┤
│ _dlt_loads          │
│ _dlt_pipeline_state │
│ _dlt_version        │
│ rides               │
│ rides__passengers   │
│ rides__stops        │
└─────────────────────┘




 Rides table below: Note the times are properly typed


Unnamed: 0,record_hash,vendor_name,time__pickup,time__dropoff,trip_distance,coordinates__start__lon,coordinates__start__lat,coordinates__end__lon,coordinates__end__lat,payment__type,payment__amt,payment__surcharge,payment__tip,payment__tolls,payment__status,passenger_count,_dlt_load_id,_dlt_id
0,b00361a396177a9cb410ff61f20015ad,VTS,2009-06-14 23:23:00+00:00,2009-06-14 23:48:00+00:00,17.52,-73.787442,40.641525,-73.980072,40.742963,Credit,20.5,0,9,4.15,cancelled,2,1707277075.1631584,ZgZSOiq8pdY1lA





 Pasengers table


Unnamed: 0,name,rating,_dlt_root_id,_dlt_parent_id,_dlt_list_idx,_dlt_id
0,John,4.4,ZgZSOiq8pdY1lA,ZgZSOiq8pdY1lA,0,fW+i0MQSs45ZuA
1,Jack,3.6,ZgZSOiq8pdY1lA,ZgZSOiq8pdY1lA,1,xTcy8dFQjuQt5A





 Stops table


Unnamed: 0,lon,lat,_dlt_root_id,_dlt_parent_id,_dlt_list_idx,_dlt_id
0,-73.6,40.6,ZgZSOiq8pdY1lA,ZgZSOiq8pdY1lA,0,oOhWVmk/ICik6w
1,-73.5,40.5,ZgZSOiq8pdY1lA,ZgZSOiq8pdY1lA,1,nAReuMPo1PdiBw


# Bonus snippets



## Load to parquet file

In [10]:
%%capture
!pip install dlt[parquet] # Install dlt with all the necessary DuckDB dependencies
!pip install parquet
!mkdir .dlt

In [11]:
import os
import dlt
import parquet
import json
import glob

# Set the bucket_url. We can also use a local folder
os.environ['DESTINATION__FILESYSTEM__BUCKET_URL'] = 'file:///content/.dlt/my_folder'

url = "https://storage.googleapis.com/dtc_zoomcamp_api/yellow_tripdata_2009-06.jsonl"
# Define your pipeline
pipeline = dlt.pipeline(
    pipeline_name='my_pipeline',
    destination='filesystem',
    dataset_name='mydata'
)



# Run the pipeline with the generator we created earlier.
load_info = pipeline.run(stream_download_jsonl(url), table_name="users", loader_file_format="parquet")

print(load_info)

# Get a list of all Parquet files in the specified folder
parquet_files = glob.glob('/content/.dlt/my_folder/mydata/users/*.parquet')

# show parquet files
print("Loaded files: ")
for file in parquet_files:
  print(file)


Pipeline my_pipeline load step completed in 0.16 seconds
1 load package(s) were loaded to destination filesystem and into dataset mydata
The filesystem destination used file:///content/.dlt/my_folder location to store data
Load package 1707277176.3060365 is LOADED and contains no failed jobs
Loaded files: 
/content/.dlt/my_folder/mydata/users/1707277176.3060365.1584d7ea36.parquet


## Load to bigquery
To load to bigquery, we need credentials to bigquery.
- dlt looks for credentials in several places as described in the [credential docs.](https://dlthub.com/docs/general-usage/credentials/configuration)
- In the case of Bigquery you can read the docs [here](https://dlthub.com/docs/dlt-ecosystem/destinations/bigquery) for how to do it.
- If you are running from Colab or a GCP machine, or you are authenticated with the gcp CLI, you can use these already-available local credentials. We will use the Colab Oauth here.

In [12]:
%%capture
!pip install dlt[bigquery]

In [13]:
# Authenticate to Google BigQuery
from google.colab import auth
auth.authenticate_user()

In [15]:
import os
import dlt

os.environ['GOOGLE_CLOUD_PROJECT'] = 'semiotic-plexus-412804'


# Define your pipeline
pipeline = dlt.pipeline(
    pipeline_name='my_pipeline',
    destination='bigquery',
    dataset_name='dtc'
)

# Run the pipeline
load_info = pipeline.run(stream_download_jsonl(url), table_name="users")

print(load_info)

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

query = """
    SELECT *
    FROM `dtc.users`
"""

query_job = client.query(query)  # Make an API request.

print("The query data:")
for row in query_job:
    # Row values can be accessed by field name or index.
    print(row)

Pipeline my_pipeline load step completed in 17.52 seconds
1 load package(s) were loaded to destination bigquery and into dataset dtc
The bigquery destination used None@semiotic-plexus-412804 location to store data
Load package 1707277590.3888998 is LOADED and contains no failed jobs
The query data:
Row(('VTS', datetime.datetime(2009, 6, 17, 13, 7, tzinfo=datetime.timezone.utc), datetime.datetime(2009, 6, 17, 13, 7, tzinfo=datetime.timezone.utc), 208, 0.0, -73.937825, 40.758278, -73.937818, 40.758275, 'CASH', 2.5, 0.0, 0.0, 0.0, 2.5, '1707277590.3888998', 'kGiMj9bqqlyofQ', None), {'vendor_name': 0, 'trip_pickup_date_time': 1, 'trip_dropoff_date_time': 2, 'passenger_count': 3, 'trip_distance': 4, 'start_lon': 5, 'start_lat': 6, 'end_lon': 7, 'end_lat': 8, 'payment_type': 9, 'fare_amt': 10, 'surcharge': 11, 'tip_amt': 12, 'tolls_amt': 13, 'total_amt': 14, '_dlt_load_id': 15, '_dlt_id': 16, 'store_and_forward': 17})
Row(('VTS', datetime.datetime(2009, 6, 11, 6, 50, tzinfo=datetime.timezone

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



Row(('VTS', datetime.datetime(2009, 6, 4, 20, 17, tzinfo=datetime.timezone.utc), datetime.datetime(2009, 6, 4, 20, 38, tzinfo=datetime.timezone.utc), 1, 12.89, -73.777082, 40.645033, -73.926328, 40.6136, 'CASH', 29.3, 0.5, 0.0, 0.0, 29.8, '1707277590.3888998', 'AxruP5aJ5mAtpg', None), {'vendor_name': 0, 'trip_pickup_date_time': 1, 'trip_dropoff_date_time': 2, 'passenger_count': 3, 'trip_distance': 4, 'start_lon': 5, 'start_lat': 6, 'end_lon': 7, 'end_lat': 8, 'payment_type': 9, 'fare_amt': 10, 'surcharge': 11, 'tip_amt': 12, 'tolls_amt': 13, 'total_amt': 14, '_dlt_load_id': 15, '_dlt_id': 16, 'store_and_forward': 17})
Row(('VTS', datetime.datetime(2009, 6, 29, 22, 2, tzinfo=datetime.timezone.utc), datetime.datetime(2009, 6, 29, 22, 39, tzinfo=datetime.timezone.utc), 1, 12.11, -73.981228, 40.737887, -73.9218, 40.625458, 'CASH', 31.3, 0.5, 0.0, 0.0, 31.8, '1707277590.3888998', 'SQ3F1moZEXA3oQ', None), {'vendor_name': 0, 'trip_pickup_date_time': 1, 'trip_dropoff_date_time': 2, 'passenger_

## Other demos
Find more demos in this repo, or look on our blog for multiple community demos
* https://github.com/dlt-hub/dlt_demos
* https://dlthub.com/docs/blog

## Docs Links

This course was tailored to enable all the cohort to complete it succesfully - so more complex things were left out. We strongly encourage you to keep learning on your own.


You will find more info about advanced capabilities of dlt here: https://dlthub.com/docs/build-a-pipeline-tutorial



Don't miss the GPT-4 docs helper button - it will help with simple questions.

If you get stuck, consider joining our community for help https://dlthub.com/community