# Transforming and filtering the data

In this lesson, we will take a look at various ways of doing data transformations and filtering of the data during and after the ingestion.

dlt provides several ways of doing it during the ingestion:
1. With custom query (applicable for `sql_database` source).
2. With dlt special functions (`add_map` and `add_filter`).
3. Via `@dlt.transformers`.
4. With `pipeline.dataset()`.

Let's review and compare those methods.

##  What you’ll learn:

- How to limit rows at the source with SQL queries.
- How to apply custom Python logic per record.
- How to write transformations using functional and declarative APIs.
- How to access and query your loaded data using `.dataset()`.

## Setup and initial Load

We will be using the `sql_database` source as an example and will connect to the public [MySQL RFam](https://www.google.com/url?q=https%3A%2F%2Fwww.google.com%2Furl%3Fq%3Dhttps%253A%252F%252Fdocs.rfam.org%252Fen%252Flatest%252Fdatabase.html) database. The RFam database contains publicly accessible scientific data on RNA structures.

Let's perform an initial load:

In [1]:
%%capture
!pip install "dlt[sql_database, duckdb]"
!pip install pymysql

In [2]:
import dlt

from dlt.sources.sql_database import sql_database

source = sql_database(
    "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam",
    table_names=["family","genome"]
)

pipeline = dlt.pipeline(
    pipeline_name="sql_database_pipeline",
    destination="duckdb",
    dataset_name="sql_data",
)
load_info = pipeline.run(source)
print(load_info)

Pipeline sql_database_pipeline load step completed in 10.13 seconds
1 load package(s) were loaded to destination duckdb and into dataset sql_data
The duckdb destination used duckdb:////content/sql_database_pipeline.duckdb location to store data
Load package 1746871155.2125173 is LOADED and contains no failed jobs


In [3]:
with pipeline.sql_client() as client:
  with client.execute_query("SELECT * FROM genome") as table:
      genome = table.df()
genome

Unnamed: 0,upid,assembly_acc,assembly_version,wgs_acc,wgs_version,assembly_name,assembly_level,study_ref,description,total_length,...,common_name,kingdom,num_rfam_regions,num_families,is_reference,is_representative,created,updated,_dlt_load_id,_dlt_id
0,RG000000001,,,,,,,,Potato spindle tuber viroid,4591,...,,viroids,0,0,1,0,2017-06-06 15:11:02+00:00,2020-04-23 11:46:08+00:00,1746870338.0995858,H9gGlZfDnwGJNw
1,RG000000002,,,,,,,,Columnea latent viroid,370,...,,viroids,0,0,1,0,2017-06-06 15:11:07+00:00,2020-04-23 11:46:08+00:00,1746870338.0995858,axdQXOJYhvDWnw
2,RG000000003,,,,,,,,Tomato apical stunt viroid-S,360,...,,viroids,0,0,1,0,2017-06-06 15:11:12+00:00,2020-04-23 11:47:10+00:00,1746870338.0995858,wxN/7zvjfM+xzg
3,RG000000004,,,,,,,,Tomato apical stunt viroid,360,...,,viroids,0,0,1,0,2017-06-06 15:11:17+00:00,2020-04-23 11:46:08+00:00,1746870338.0995858,aIoLRtZOn8Thag
4,RG000000005,,,,,,,,Cucumber yellows virus,7899,...,,viruses,0,0,1,0,2017-06-06 15:11:21+00:00,2020-04-23 11:46:28+00:00,1746870338.0995858,XmeoWhZ7heNLlg
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
96502,UP001295980,GCA_963583275.1,1.0,,,,,,,44107,...,,viruses,0,0,0,0,2024-06-27 09:26:33+00:00,2024-09-02 20:32:58+00:00,1746871155.2125173,FuHnU16vOUGpHQ
96503,UP001296009,GCA_963583145.1,1.0,,,,,,,43831,...,,viruses,0,0,0,0,2024-06-28 19:29:45+00:00,2024-09-02 20:32:58+00:00,1746871155.2125173,Opyr+QQOgMayMA
96504,UP001296230,GCA_963583065.1,1.0,,,,,,,40906,...,,viruses,0,0,0,0,2024-06-27 09:19:44+00:00,2024-09-02 20:32:58+00:00,1746871155.2125173,hh+uCqGoxcm8Kg
96505,UP001296237,GCA_963583445.1,1.0,,,,,,,42982,...,,viruses,0,0,0,0,2024-06-28 19:34:50+00:00,2024-09-02 20:32:58+00:00,1746871155.2125173,FuNFFCUQSKHyBA


You can check your data count using `sql_client`:

In [4]:
with pipeline.sql_client() as client:
  with client.execute_query("SELECT COUNT(*) AS total_rows FROM genome") as table:
      print(table.df())

   total_rows
0       96507


## **1. Filtering the data during the ingestion with `query_adapter_callback`**

Imagine a use-case where we're only interested in getting the genome data for bacterias. In this case, ingesting the whole `genome` table would be an unnecessary use of time and compute resources.

In [5]:
with pipeline.sql_client() as client:
  with client.execute_query("SELECT COUNT(*) AS total_rows FROM genome WHERE kingdom='bacteria'") as table:
      print(table.df())

   total_rows
0       41763


When ingesting data using the `sql_database` source, dlt runs a `SELECT` statement in the back, and using the `query_adapter_callback` parameter makes it possible to pass a `WHERE` clause inside the underlying `SELECT` statement.  
  
In this example, only the table `genome` is filtered on the column `kingdom`

In [6]:
def query_adapter_callback(query, table):
    if table.name == "genome":
        # Only select rows where the column kingdom has value "bacteria"
        return query.where(table.c.kingdom=="bacteria")
    # Use the original query for other tables
    return query


Attach it:

In [7]:
import dlt
from dlt.sources.sql_database import sql_database


source = sql_database(
    "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam",
    table_names=["genome"],
    query_adapter_callback=query_adapter_callback
)


pipeline = dlt.pipeline(
    pipeline_name="sql_database_pipeline_filtered",
    destination="duckdb",
    dataset_name="sql_data"
)

load_info = pipeline.run(source, write_disposition="replace")

print(pipeline.last_trace)

Run started at 2025-05-10 09:59:46.747678+00:00 and COMPLETED in 12.64 seconds with 4 steps.
Step extract COMPLETED in 6.23 seconds.

Load package 1746871186.894474 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 1.83 seconds.
Normalized data for the following tables:
- genome: 13921 row(s)

Load package 1746871186.894474 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs

Step load COMPLETED in 4.47 seconds.
Pipeline sql_database_pipeline_filtered load step completed in 4.33 seconds
1 load package(s) were loaded to destination duckdb and into dataset sql_data
The duckdb destination used duckdb:////content/sql_database_pipeline_filtered.duckdb location to store data
Load package 1746871186.894474 is LOADED and contains no failed jobs

Step run COMPLETED in 12.64 seconds.
Pipeline sql_database_pipeline_filtered load step completed in 4.33 seconds
1 load package(s) were loaded to destination duckdb a

In the snippet above we created an SQL VIEW in your source database and extracted data from it. In that case, dlt will infer all column types and read data in shape you define in a view without any further customization.

If creating a view is not feasible, you can fully rewrite the automatically generated query with an extended version of `query_adapter_callback`:

In [8]:
import dlt
import sqlalchemy as sa

from dlt.sources.sql_database import sql_database


def query_adapter_callback(query, table):
  if table.name == "genome":
    return sa.text(f"SELECT * FROM {table.fullname} WHERE kingdom='bacteria'")
  return query


source = sql_database(
    "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam",
    table_names=["genome", "clan"],
    query_adapter_callback=query_adapter_callback
)


pipeline = dlt.pipeline(
    pipeline_name="sql_database_pipeline_filtered",
    destination="duckdb",
    dataset_name="sql_data"

)

load_info = pipeline.run(source, write_disposition="replace")

print(load_info)

Pipeline sql_database_pipeline_filtered load step completed in 3.55 seconds
1 load package(s) were loaded to destination duckdb and into dataset sql_data
The duckdb destination used duckdb:////content/sql_database_pipeline_filtered.duckdb location to store data
Load package 1746871201.6816118 is LOADED and contains no failed jobs


In [9]:
with pipeline.sql_client() as client:

    with client.execute_query("SELECT COUNT(*) AS total_rows, MAX(_dlt_load_id) as latest_load_id FROM clan") as table:
        print("Table clan:")
        print(table.df())
        print("\n")

    with client.execute_query("SELECT COUNT(*) AS total_rows, MAX(_dlt_load_id) as latest_load_id FROM genome") as table:
        print("Table genome:")
        print(table.df())

Table clan:
   total_rows      latest_load_id
0         147  1746871201.6816118


Table genome:
   total_rows      latest_load_id
0       13921  1746871201.6816118


## **2. Transforming the data after extract and before load**

Since dlt is a Python library, it gives you a lot of control over the extracted data.  

You can attach any number of transformations that are evaluated on an item-per-item basis to your resource. The available transformation types:

* `map` - transform the data item (resource.add_map).
* `filter` - filter the data item (resource.add_filter).
* `yield map` - a map that returns an iterator (so a single row may generate many rows - resource.add_yield_map).
* `limit` - limits the number of records processed by a resource. Useful for testing or reducing data volume during development.
  
For example, if we wanted to anonymize sensitive data before it is loaded into the destination, then we can write a python function for it and apply it to source or resource using the `.add_map()` method.

[dlt documentation.](https://dlthub.com/docs/general-usage/resource#filter-transform-and-pivot-data)

### Using `add_map`

In the table `clan`, we notice that there is a column `author` that we would like to anonymize.

In [10]:
with pipeline.sql_client() as client:

    with client.execute_query("SELECT DISTINCT author FROM clan LIMIT 5") as table:
        print("Table clan:")
        print(table.df())

Table clan:
                            author
0                       Gardner PP
1  Gardner PP; 0000-0002-7808-1213
2                           Daub J
3                          Brown C
4                         Burge SW


We write a function in python that anonymizes a string

In [11]:
import hashlib

def pseudonymize_name(row):
    '''
    Pseudonymization is a deterministic type of PII-obscuring.
    Its role is to allow identifying users by their hash,
    without revealing the underlying info.
    '''
    # add a constant salt to generate
    salt = 'WI@N57%zZrmk#88c'
    salted_string = row['author'] + salt
    sh = hashlib.sha256()
    sh.update(salted_string.encode())
    hashed_string = sh.digest().hex()
    row['author'] = hashed_string
    return row

In [12]:
import dlt
import hashlib
from dlt.sources.sql_database import sql_database


pipeline = dlt.pipeline(
    pipeline_name="sql_database_pipeline_anonymized",
    destination="duckdb",
    dataset_name="sql_data"
)


source = sql_database("mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam", table_names=["clan"])

source.clan.add_map(pseudonymize_name) # Apply the anonymization function to the extracted data

info = pipeline.run(source)
print(info)

Pipeline sql_database_pipeline_anonymized load step completed in 0.11 seconds
1 load package(s) were loaded to destination duckdb and into dataset sql_data
The duckdb destination used duckdb:////content/sql_database_pipeline_anonymized.duckdb location to store data
Load package 1746871217.3990965 is LOADED and contains no failed jobs


After the pipeline has run, we can observe that the author column has been anonymized.

In [13]:
with pipeline.sql_client() as client:

    with client.execute_query("SELECT DISTINCT author FROM clan LIMIT 5") as table:
        print("Table clan:")
        clan = table.df()

clan

Table clan:


Unnamed: 0,author
0,eb00a968f270b436a506949ba46fd70a66591c5acf2b57...
1,c6723bd5737f14a8d4b2e447a063ce547be1f88c7b6450...
2,9766eaa9533fcb3684649b2e4a5db9573f1bf10f103ed4...
3,bee07cbb64f4e771a4bd9327afd9244e9a5dbc3aa957d9...
4,9d281938612b0b9646c75049e817208c236431dee86583...


**Note:** If you're using the `pyarrow` or `connectorx` backend, the data is not processed item-by-item. Instead they're processed in batches, therefore your function should be adjusted. For example, for PyArrow chunks the function could be changed as follows:

In [14]:
import hashlib
import pyarrow as pa
import pyarrow.compute as pc


def pseudonymize_name_pyarrow(table: pa.Table) -> pa.Table:
    """
    Pseudonymizes the 'author' column in a PyArrow Table.
    """
    salt = 'WI@N57%zZrmk#88c'

    # Convert PyArrow Table to Pandas DataFrame for hashing
    df = table.to_pandas()

    # Apply SHA-256 hashing
    df['author'] = df['author'].astype(str).apply(lambda x: hashlib.sha256((x + salt).encode()).hexdigest())

    # Convert back to PyArrow Table
    new_table = pa.Table.from_pandas(df)

    return new_table

pipeline = dlt.pipeline(
    pipeline_name="sql_database_pipeline_anonymized1",
    destination="duckdb",
    dataset_name="sql_data"
)


source = sql_database(
    "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam",
    table_names=["clan"],
    backend="pyarrow",
    )

source.clan.add_map(pseudonymize_name_pyarrow) # Apply the anonymization function to the extracted data

info = pipeline.run(source)
print(info)



Pipeline sql_database_pipeline_anonymized1 load step completed in 0.09 seconds
1 load package(s) were loaded to destination duckdb and into dataset sql_data
The duckdb destination used duckdb:////content/sql_database_pipeline_anonymized1.duckdb location to store data
Load package 1746871221.826223 is LOADED and contains no failed jobs


In [15]:
with pipeline.sql_client() as client:

    with client.execute_query("SELECT DISTINCT author FROM clan LIMIT 5") as table:
        print("Table clan:")
        print(table.df())

Table clan:
                                              author
0  eb00a968f270b436a506949ba46fd70a66591c5acf2b57...
1  c6723bd5737f14a8d4b2e447a063ce547be1f88c7b6450...
2  9766eaa9533fcb3684649b2e4a5db9573f1bf10f103ed4...
3  bee07cbb64f4e771a4bd9327afd9244e9a5dbc3aa957d9...
4  9d281938612b0b9646c75049e817208c236431dee86583...


### `add_map` vs `add_yield_map`

The difference between `add_map` and `add_yield_map` matters when a transformation returns multiple records from a single input.

- **`add_map`**: The function should return a list. `dlt` flattens this list into separate records.
- **`add_yield_map`**: The function should yield one record at a time.

`add_yield_map` lets you yield records one at a time, giving better control over processing. This is helpful when your logic requires conditional output, filtering, or more granular handling.

### Using `add_filter`
`add_filter` function can be used similarly. The difference is that `add_filter` expects a function that returns a boolean value for each item. For example, to implement the same filtering we did with a query callback, we can use:

In [16]:
import time
import dlt
from dlt.sources.sql_database import sql_database


source = sql_database(
    "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam",
    table_names=["genome"]
)


pipeline = dlt.pipeline(
    pipeline_name="sql_database_pipeline_filtered",
    destination="duckdb",
    dataset_name="sql_data"
)
source.genome.add_filter(lambda item: item["kingdom"] == "bacteria")

load_info = pipeline.run(source, write_disposition="replace")

print(pipeline.last_trace)

Run started at 2025-05-10 10:00:25.995102+00:00 and COMPLETED in 13.43 seconds with 4 steps.
Step extract COMPLETED in 7.70 seconds.

Load package 1746871226.0582514 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 2.17 seconds.
Normalized data for the following tables:
- genome: 13921 row(s)

Load package 1746871226.0582514 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs

Step load COMPLETED in 3.52 seconds.
Pipeline sql_database_pipeline_filtered load step completed in 3.38 seconds
1 load package(s) were loaded to destination duckdb and into dataset sql_data
The duckdb destination used duckdb:////content/sql_database_pipeline_filtered.duckdb location to store data
Load package 1746871226.0582514 is LOADED and contains no failed jobs

Step run COMPLETED in 13.43 seconds.
Pipeline sql_database_pipeline_filtered load step completed in 3.38 seconds
1 load package(s) were loaded to destination duckd

In [17]:
with pipeline.sql_client() as client:
    with client.execute_query("SELECT COUNT(*) AS total_rows, MAX(_dlt_load_id) as latest_load_id FROM genome") as table:
        print("Table genome:")
        genome_count = table.df()
genome_count

Table genome:


Unnamed: 0,total_rows,latest_load_id
0,13921,1746871226.0582514


### Question 1:

What is a `total_rows` in the example above?

**-->13921**

### Using `add_limit`

If your resource loads thousands of pages of data from a REST API or millions of rows from a database table, you may want to sample just a fragment of it in order to quickly see the dataset with example data and test your transformations, etc.

To do this, you limit how many items will be yielded by a resource (or source) by calling the `add_limit` method. This method will close the generator that produces the data after the limit is reached.

In [18]:
import time
import dlt
from dlt.sources.sql_database import sql_database


source = sql_database(
    "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam",
    table_names=["genome"],
    chunk_size=10,
)


pipeline = dlt.pipeline(
    pipeline_name="sql_database_pipeline_filtered",
    destination="duckdb",
    dataset_name="sql_data"
)
source.genome.add_limit(1)

load_info = pipeline.run(source, write_disposition="replace")

print(pipeline.last_trace)



Run started at 2025-05-10 10:00:41.543970+00:00 and COMPLETED in 5.98 seconds with 4 steps.
Step extract COMPLETED in 5.77 seconds.

Load package 1746871241.6095686 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 0.03 seconds.
Normalized data for the following tables:
- genome: 10 row(s)

Load package 1746871241.6095686 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs

Step load COMPLETED in 0.13 seconds.
Pipeline sql_database_pipeline_filtered load step completed in 0.12 seconds
1 load package(s) were loaded to destination duckdb and into dataset sql_data
The duckdb destination used duckdb:////content/sql_database_pipeline_filtered.duckdb location to store data
Load package 1746871241.6095686 is LOADED and contains no failed jobs

Step run COMPLETED in 5.98 seconds.
Pipeline sql_database_pipeline_filtered load step completed in 0.12 seconds
1 load package(s) were loaded to destination duckdb and

In [19]:
with pipeline.sql_client() as client:
  with client.execute_query("SELECT * FROM genome") as table:
      genome_limited = table.df()
genome_limited

Unnamed: 0,upid,assembly_acc,assembly_version,wgs_acc,wgs_version,assembly_name,assembly_level,study_ref,description,total_length,...,common_name,kingdom,num_rfam_regions,num_families,is_reference,is_representative,created,updated,_dlt_load_id,_dlt_id
0,RG000000001,,,,,,,,Potato spindle tuber viroid,4591,...,,viroids,0,0,1,0,2017-06-06 15:11:02+00:00,2020-04-23 11:46:08+00:00,1746871241.6095686,uxLAfj89r1Zu5w
1,RG000000002,,,,,,,,Columnea latent viroid,370,...,,viroids,0,0,1,0,2017-06-06 15:11:07+00:00,2020-04-23 11:46:08+00:00,1746871241.6095686,7QHOolkOytXCag
2,RG000000003,,,,,,,,Tomato apical stunt viroid-S,360,...,,viroids,0,0,1,0,2017-06-06 15:11:12+00:00,2020-04-23 11:47:10+00:00,1746871241.6095686,ddRsKiZ20zvoQg
3,RG000000004,,,,,,,,Tomato apical stunt viroid,360,...,,viroids,0,0,1,0,2017-06-06 15:11:17+00:00,2020-04-23 11:46:08+00:00,1746871241.6095686,q5EijxY86JdE6w
4,RG000000005,,,,,,,,Cucumber yellows virus,7899,...,,viruses,0,0,1,0,2017-06-06 15:11:21+00:00,2020-04-23 11:46:28+00:00,1746871241.6095686,ZptJqod/eBrSzg
5,RG000000006,,,,,,,,Caprine arthritis encephalitis virus Roccaverano,8418,...,,viruses,0,0,1,0,2017-06-06 15:11:24+00:00,2020-04-23 11:52:32+00:00,1746871241.6095686,AqOjM1V52qV+kg
6,RG000000007,GCA_000413255.1,1.0,,,,,,Physarum polycephalum,189961418,...,,eukaryota,0,0,1,0,2017-06-06 15:11:28+00:00,2020-04-23 11:44:55+00:00,1746871241.6095686,iFrv3OcKk7OJXQ
7,RG000000009,GCF_000836845.1,1.0,,,,,,Arabis mosaic virus small satellite RNA,300,...,,viruses,0,0,1,0,2017-06-06 15:11:43+00:00,2020-04-23 11:49:54+00:00,1746871241.6095686,j4JnvCc9zK3Wcg
8,RG000000010,GCF_000840125.1,1.0,,,,,,Tobacco ringspot virus satellite RNA,359,...,,viruses,0,0,1,0,2017-06-06 15:11:48+00:00,2020-04-23 11:46:26+00:00,1746871241.6095686,W7Iqyo6+18EVBA
9,RG000000011,GCF_000847605.1,1.0,,,,,,Equine infectious anemia virus,8359,...,,viruses,0,0,1,0,2017-06-06 15:11:52+00:00,2020-04-23 11:46:01+00:00,1746871241.6095686,7KCXkxVqK1Bo/w


## **3. Transforming data with `@dlt.transformer`**

The main purpose of transformers is to create children tables with additional data requests, but they can also be used for data transformations especially if you want to keep the original data as well.

In [20]:
import time

import dlt
import hashlib
from dlt.sources.sql_database import sql_database


@dlt.transformer()
def batch_stats(items):
    '''
    Pseudonymization is a deterministic type of PII-obscuring.
    Its role is to allow identifying users by their hash,
    without revealing the underlying info.
    '''
    # add a constant salt to generate
    yield {"batch_length": len(items), "max_length": max([item["total_length"] for item in items])}


source = sql_database(
    "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam",
    chunk_size=10000
    ).genome


pipeline = dlt.pipeline(
    pipeline_name="sql_database_pipeline_with_transformers1",
    destination="duckdb",
    dataset_name="sql_data",
    dev_mode=True
)

info = pipeline.run([source, source | batch_stats])
print(pipeline.last_trace)

Run started at 2025-05-10 10:00:57.523005+00:00 and COMPLETED in 21.26 seconds with 4 steps.
Step extract COMPLETED in 7.46 seconds.

Load package 1746871257.549375 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 4.96 seconds.
Normalized data for the following tables:
- _dlt_pipeline_state: 1 row(s)
- genome: 32169 row(s)
- batch_stats: 4 row(s)

Load package 1746871257.549375 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs

Step load COMPLETED in 8.83 seconds.
Pipeline sql_database_pipeline_with_transformers1 load step completed in 8.80 seconds
1 load package(s) were loaded to destination duckdb and into dataset sql_data_20250510100057
The duckdb destination used duckdb:////content/sql_database_pipeline_with_transformers1.duckdb location to store data
Load package 1746871257.549375 is LOADED and contains no failed jobs

Step run COMPLETED in 21.26 seconds.
Pipeline sql_database_pipeline_with_tr

In [21]:
with pipeline.sql_client() as client:
  with client.execute_query("SELECT * FROM batch_stats") as table:
        res = table.df()
res

Unnamed: 0,batch_length,max_length,_dlt_load_id,_dlt_id
0,10000,13427940695,1746871257.549375,BuduPxp1kz8Y6g
1,10000,6250353185,1746871257.549375,M1ClY4oVSncAhg
2,10000,10237952243,1746871257.549375,d5r6y+im5Y1Ckg
3,2169,3527038434,1746871257.549375,m1oKzjYcpaNCOg


## **4. Transforming data after the load**

Another possibility for data transformation is transforming data after the load. dlt provides several way of doing it:

* using `sql_client`,
* via `.dataset()` and ibis integration,
* via [dbt integration](https://dlthub.com/docs/dlt-ecosystem/transformations/dbt/).


### SQL client

You already saw examples of using dlt's SQL client. dlt gives you an opportunity to connect to your destination and execute any SQL query.

In [22]:
# NOTE: this is the duckdb sql dialect, other destinations may use different expressions
with pipeline.sql_client() as client:
    client.execute_sql(
        """ CREATE OR REPLACE TABLE genome_length AS
            SELECT
                SUM(total_length) AS total_total_length,
                AVG(total_length) AS average_total_length
            FROM
                genome
    """)
    with client.execute_query("SELECT * FROM genome_length") as table:
        genome_length = table.df()

genome_length

Unnamed: 0,total_total_length,average_total_length
0,1519769000000.0,47243290.0


### Accessing loaded data with `pipeline.dataset()`

Use `pipeline.dataset()` to inspect and work with your data in Python after loading.


In [23]:
dataset = pipeline.dataset()

# List tables
dataset.row_counts().df()

Unnamed: 0,table_name,row_count
0,batch_stats,4
1,genome,32169


Note that `row_counts` didn't return the new table `genome_length`,

In [24]:
# Access as pandas
df = dataset["genome"].df()
df

Unnamed: 0,upid,assembly_acc,assembly_version,wgs_acc,wgs_version,assembly_name,assembly_level,study_ref,description,total_length,...,common_name,kingdom,num_rfam_regions,num_families,is_reference,is_representative,created,updated,_dlt_load_id,_dlt_id
0,RG000000001,,,,,,,,Potato spindle tuber viroid,4591,...,,viroids,0,0,1,0,2017-06-06 15:11:02+00:00,2020-04-23 11:46:08+00:00,1746871257.549375,eN2bfciLhBRmIQ
1,RG000000002,,,,,,,,Columnea latent viroid,370,...,,viroids,0,0,1,0,2017-06-06 15:11:07+00:00,2020-04-23 11:46:08+00:00,1746871257.549375,8tKbRImcxR0mpg
2,RG000000003,,,,,,,,Tomato apical stunt viroid-S,360,...,,viroids,0,0,1,0,2017-06-06 15:11:12+00:00,2020-04-23 11:47:10+00:00,1746871257.549375,NQDbVdGacV3lKg
3,RG000000004,,,,,,,,Tomato apical stunt viroid,360,...,,viroids,0,0,1,0,2017-06-06 15:11:17+00:00,2020-04-23 11:46:08+00:00,1746871257.549375,G33Oz4bVvIMjVA
4,RG000000005,,,,,,,,Cucumber yellows virus,7899,...,,viruses,0,0,1,0,2017-06-06 15:11:21+00:00,2020-04-23 11:46:28+00:00,1746871257.549375,hwlPZBbHzc9wew
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
32164,UP001295980,GCA_963583275.1,1.0,,,,,,,44107,...,,viruses,0,0,0,0,2024-06-27 09:26:33+00:00,2024-09-02 20:32:58+00:00,1746871257.549375,Wi/ud/wFcUghXw
32165,UP001296009,GCA_963583145.1,1.0,,,,,,,43831,...,,viruses,0,0,0,0,2024-06-28 19:29:45+00:00,2024-09-02 20:32:58+00:00,1746871257.549375,TYFvsNwDKkJ8DQ
32166,UP001296230,GCA_963583065.1,1.0,,,,,,,40906,...,,viruses,0,0,0,0,2024-06-27 09:19:44+00:00,2024-09-02 20:32:58+00:00,1746871257.549375,8V8ATtEPFHP75g
32167,UP001296237,GCA_963583445.1,1.0,,,,,,,42982,...,,viruses,0,0,0,0,2024-06-28 19:34:50+00:00,2024-09-02 20:32:58+00:00,1746871257.549375,+w+9re10Uxpqbw


In [25]:
# Access as Arrow
arrow_table = dataset["genome_length"].arrow()
arrow_table

pyarrow.Table
total_total_length: decimal128(38, 0)
average_total_length: double
----
total_total_length: [[1519769444378]]
average_total_length: [[47243291.50355933]]

You can also filter, limit, and select columns:

In [26]:
df = dataset["genome"].select("kingdom", "ncbi_id").limit(10).df()
df

Unnamed: 0,kingdom,ncbi_id
0,viroids,12892
1,viroids,12901
2,viroids,53194
3,viroids,12885
4,viruses,32618
5,viruses,507757
6,eukaryota,5791
7,viruses,196399
8,viruses,31504
9,viruses,11665


To iterate over large data:

In [27]:
for chunk in dataset["genome"].iter_df(chunk_size=500):
    print(chunk.head())

          upid assembly_acc  assembly_version wgs_acc  wgs_version  \
0  RG000000001         None               NaN    None          NaN   
1  RG000000002         None               NaN    None          NaN   
2  RG000000003         None               NaN    None          NaN   
3  RG000000004         None               NaN    None          NaN   
4  RG000000005         None               NaN    None          NaN   

  assembly_name assembly_level study_ref                   description  \
0          None           None      None   Potato spindle tuber viroid   
1          None           None      None        Columnea latent viroid   
2          None           None      None  Tomato apical stunt viroid-S   
3          None           None      None    Tomato apical stunt viroid   
4          None           None      None        Cucumber yellows virus   

   total_length  ...  common_name  kingdom  num_rfam_regions num_families  \
0          4591  ...         None  viroids               

For more advanced users, this interface supports **Ibis expressions**, joins, and subqueries.

### Ibis integration

Ibis is a powerful portable Python dataframe library. Learn more about what it is and how to use it in the [official documentation](https://ibis-project.org/).

[dlt provides a way to use Ibis expressions natively](https://dlthub.com/docs/general-usage/dataset-access/ibis-backend) with a lot of destinations. Supported ones are:
* Snowflake
* DuckDB
* MotherDuck
* Postgres
* Redshift
* Clickhouse
* MSSQL (including Synapse)
* BigQuery

In [28]:
!pip install ibis-framework[duckdb]



In [29]:
# get the dataset from the pipeline
dataset = pipeline.dataset()
dataset_name = pipeline.dataset_name

# get the native ibis connection from the dataset
ibis_connection = dataset.ibis()

# list all tables in the dataset
# NOTE: You need to provide the dataset name to ibis, in ibis datasets are named databases
print(ibis_connection.list_tables(database=dataset_name))

# get the items table
table = ibis_connection.table("batch_stats", database=dataset_name)

# print the first 2 rows
print(table.limit(2).execute())

['_dlt_loads', '_dlt_pipeline_state', '_dlt_version', 'batch_stats', 'genome', 'genome_length']
   batch_length   max_length       _dlt_load_id         _dlt_id
0         10000  13427940695  1746871257.549375  BuduPxp1kz8Y6g
1         10000   6250353185  1746871257.549375  M1ClY4oVSncAhg


✅ ▶ Proceed to the [next lesson](https://colab.research.google.com/drive/1XT1xUIQIWj0nPWOmTixThgdXzi4vudce#forceEdit=true&sandboxMode=true)!