<div style="text-align: center;">
  <img src="dlt-pacman.gif" width="1000" height="300">
</div>

# Links

GitHub pessoal: https://github.com/victor2seabra

Documentação dlt: https://dlthub.com/docs/intro

Documentação uv: https://docs.astral.sh/uv/guides/install-python/

# Instalação

A instalação "básica" do dlt pode ser feita com: `pip install dlt`

Em um ambiente virtual uv: `uv pip install dlt`

Podemos usar extensões que facilitam integração como: `pip install "dlt[athena]"` `pip install "dlt[parquet]"`

# O que é o dlt?

Uma ferramenta que nos **auxilia** a fazer **ingestão** de dados estruturados (Parquet), **semi-estruturados** (JSON) e **não-estruturados** (MongoDB) em **formato estruturado**

Pode ser usada tambem em transformações como por exemplo:

+   definição de **schema**; 
+   **filtração** de dados antes de fazer o carregamento;
+   **normalização** de dados aninhados em **tabelas relacionais** (nested tables).

![image.png](attachment:image.png)

# O que não é o dlt?

**Não se trata de um modo diferente de fazer requests**, ainda que alguns sources/resources de algumas fontes permitam uma **estrutura pré-definida**, mas pode conter algumas **limitações** ===> **dlt Core**

# Vantagens do dlt

**1.**  Conversão de dados **não-estruturados em estruturados**;

**2.**  Realiza **inferencia de schemas**;

**3.**  Seu **deploy** pode ser feito em qualquer lugar que aceite Python (**Airflow, Lambda, Glue, etc**);

**4.**  Auxilia na implementação de **incremental**;

**5.**  Permite a **evolução do schema**.

# Desvantagens do dlt

**1.**  Recursos de **providers nativos** dele podem ser um pouco **limitado**, precisando o uso do **dlt Core**

# Estrutura de um projeto dlt

![image.png](attachment:image.png)

**restapi_pipeline.py** o codigo do dlt

**secrets.toml** usado para armazenar **credenciais** que precisam de segurança. **ele é incluso no .gitignore**

**config.toml** usado para armazenar **configurações do projeto** usado dlt como destinations, sources e resources que **não tem necessidade de serem privadas**

# dlt Core

## Modos de carregamento (`write_disposition`)

### Arvore de decisao

![image.png](attachment:image.png)

### `"append"`

Modo padrão, **linhas** seram **anexadas** ao final da tabela

### `"replace"` (full-load)

A **tabela** como um todo é **substituida**

### Incremental

Extrai **somente** dados do timestamp da **ultima execução em diante** ignorando os dados extraídos em execuções anteriores

### `"merge"`

Permite **atualizar** dados de tabelas de acordo com colunas específicas **mantendo o historico**

3 **estratégias** aceitas:

+   `"delete_insert"` (padrão)

+   `"scd2"`

+   `"upsert"`

## Source

Se trata de um **conjunto de resources** que são, **geralmente**, extraidos e carregados ao mesmo tempo

Tem como função:

**1.**  **Armazenar os resources** que temos interesse para usos na extração

**2.**  Passar **credenciais essenciais** no processo de extração

![image-2.png](attachment:image-2.png)

**O SOURCE NÃO TEM COMO FUNÇÃO EXTRAIR OS DADOS, E SIM DEFINIR COMO SERÃO EXTRAÍDOS PELO PIPELINE**

In [None]:
import dlt
from dlt.sources.config import configspec, with_config
from dlt.common.configuration.specs import BaseConfiguration


@configspec
class MySourceConfiguration(BaseConfiguration):
    api_key: str = None  
    page_size: int = 100  


@dlt.source(
    name="example",
    max_table_nesting=3,       # segundo a documentação, 2 ou 3 geralmente ja soluciona o problema
    parallelized=True,
    root_key=True,
    schema_contract="freeze",  # impede a criação de novas tabelas, colunas e tipagem
    spec=MySourceConfiguration,
)
def example_api():

    @dlt.resource()
    def endpoint_1():
        pass

    @dlt.resource(spec=MySourceConfiguration)
    def endpoint_2(config: MySourceConfiguration):
        api_key = config.api_key
        page_size = config.page_size
        pass

    return [endpoint_1, endpoint_2]

podemos setar configurações especiais usando os kwargs do decorator `@dlt.source`

*   ****

`name=` nome que sua fonte terá. caso contrário é usado o nome da função que o decorator decora. **muito importante em codigos com mais de um source**

*   ****

`parallelized=` permite definir se esta fonte tera seus **resources sendo executados em paralelo**

*   ****

`max_table_nesting` permite **transformar campos aninhados** num JSON/Document do MongoDB **em tabelas normalizadas e relacionadas** de modo automático. Seu valor numérico **define o grau de aninhamento que tabelas serão criadas**.

*   ****

`root_key=` cria uma coluna que **permite relacionar** tabelas originadas por aninhamento da estrutura da fonte de dados (**nested tables**) e realizar o modo **incremental**

*   ****

`schema=` aceita um objeto `dlt.Schema` que **permite a definição do schema**. se **ausente** o schema será **definido** de modo **automático**

*   ****

`schema_contract=` **configurações** a respeito da estrutura do **schema** a ser usado, por exemplo, se será possivel a criação automatica de novas tabelas se identificado na response de uma API

*   ****

`section=` define uma seção no secrets.toml

*   ****

`spec=` uma especificação de **configuração e secrets** necessária pelo source

*   ****

Um objeto **dlt.Source** é **instanciado** ao **executar** a **função** que ele decora


Os **atributos** de um dlt.Source são:

*   ****

`name`

*   ****

`max_table_nesting`

*   ****

`root_key`

*   ****

`parallelize`

*   ****

`resources` um dicionário de **todos os resources** dentro da source em questão

*   ****

`schema`

*   ****

`schema_contract`

*   ****

`section`

*   ****

`selected_resources` um dicionário de todos os **resources** que serão **usados**

*   ****

`state`

*   ****

`exhausted` checa se **todos os pipelines selecionados** estão sendo **executados**, se sim, a fonte é considerada exausta

Os **métodos** de um dlt.Source são:

*   ****

`add_limit()` permite **limitar** a fonte de acordo com o **numero de itens e/ou tempo de execução**

*   ****

`clone()` realiza um **deep copy do source** (schema, resources e pipes), podemos renomear esta copia com o parametro `with_name=`

*   ****

`decompose()` **decompõe** um source **em uma lista de resources** de acordo com a estratégia (**"none"** ou **"scc"**) passada a ele como argumento

*   ****

`discover_schema()` **computa o schema para todos os resources selecionados** no source e computa um merge com o atual schema

*   ****

`from_data()` **converte** qualquer **objeto** Python suportado pelo pipeline.run() em um **dlt.Source**

*   ****

`run()` uma **forma de executar o pipeline ativo** para um source. se nenhum pipeline for encontrado pelo dlt, ele **cria um novo para nós**

*   ****

`with_resources()` permite **selecionar resource(s) de uma source** quando executado o processo de carregamento de dados do pipeline

## Resource

Se trata de um função que **gera** dados mas **não extrai**

**Não necessariamente necessita de um source para ser usado**

In [None]:
import dlt
import httpx
from zeep.client import Client
import xml.etree.ElementTree as ET


@dlt.source(
    name="example",
    max_table_nesting=3, 
    parallelized=True,
    root_key=True,
    schema_contract="freeze",
)
async def example_api():

    @dlt.resource(
        name="soap",
        table_name="table_soap",
        file_format="parquet",                                        # formato que o recurso será salvo e carregado se possivel
        table_format="iceberg",                                       # formato tabular que será usado para armazenamento
        write_disposition="merge",
        schema_contract="evolve",                                     # sobrescreve as configurações do dlt.source
        max_table_nesting=2,                                          # sobrescreve as configurações do dlt.source
        parallelized=False,                                           # sobrescreve as configurações do dlt.source
        incremental=dlt.sources.incremental("updated_at"),            # configuração de incremental
        primary_key="",                                               # coluna usada no modo incremental
        merge_key="",                                                 # coluna usada no modo merge
        nested_hints={
            "purchases": dlt.mark.make_nested_hints( # o nome da tabela será table_soap__purchases
                columns=[{"name": "price", "data_type": "decimal", "scale": 2}],
                schema_contract={"columns": "freeze"},
                parent_table_name="", # podemos alterar o nome da tabela-mãe alterando então o nome dessa nova tabela
                primary_key="",       # quando setada torna ela uma tabela normal e nao mais nested table
                merge_key="",         # quando setada torna ela uma tabela normal e nao mais nested table
                references="",
                table_format="",
                write_disposition="",
            ),
            "observations": {},
            ("observations", "good_observation"): { # o nome da tabela será table_soap__purchases__good_observation
                "columns": {"id": {"data_type": "bigint"}}
            }
        },
        columns={"name": {"data_type": "text", "nullable": True}},
        references=[
            {"referenced_table": "other_table", "columns": ["other_col1", "other_col2"], "referenced_columns": ["col1", "col2"]}
        ],
        selected=True,    # permite definir se o resource estará disponivel para a migração por padrão
        # spec="",        # discutido na seção Source
        # section="",     # discutido na seção Source
    )
    async def endpoint_1(): 
        """Exemplo de uso em REST API"""
        response = httpx.post(url="", data={})
        response = [
            {
                "id": 1,
                "name": "simon",
                "city": "berlin",
                "purchases": [{"id": 1, "name": "apple", "price": "1.50"}, {"id": 2, "name": "pear", "price": "2.50"}],
                "observations": {"good_observation": [{"id": "1", "msg": "I liked the purchase"}]}
            },
        ]

        yield response

    @dlt.resource(
        max_table_nesting=1
    )
    def endpoint_2():
        """Exemplo de uso em SOAP API"""
        client = Client(url="")
        response = client.service.OperacaoDesejada()
        root = ET.fromstring(response)
        
        yield root

    return [endpoint_1, endpoint_2]

Podemos parametrizar um resource gerando um **resource dinamico**

In [13]:
@dlt.resource(name='table_name', write_disposition='replace')
def generate_var_rows(nr: int):
    for i in range(nr):
        yield {'id': i, 'example_string': 'abc'}

for row in generate_var_rows(10):
    print(row)

{'id': 0, 'example_string': 'abc'}
{'id': 1, 'example_string': 'abc'}
{'id': 2, 'example_string': 'abc'}
{'id': 3, 'example_string': 'abc'}
{'id': 4, 'example_string': 'abc'}
{'id': 5, 'example_string': 'abc'}
{'id': 6, 'example_string': 'abc'}
{'id': 7, 'example_string': 'abc'}
{'id': 8, 'example_string': 'abc'}
{'id': 9, 'example_string': 'abc'}


Podemos usar **Pydantic** para definição de colunas

In [23]:
from pydantic import BaseModel
from typing import List, Optional, Union

class Address(BaseModel):
    street: str
    city: str
    postal_code: str

class User(BaseModel):
    id: int
    name: str
    tags: List[str]         # define elementos aninhados, por padrão usam o data type "json"
    email: Optional[str]    # equivalente a "nullable": True
    address: Address
    status: Union[int, str] # usa o primeiro tipo não-nulo dentro do Union, no caso, int -> bigint

@dlt.resource(name="user", columns=User)
def get_users():
    ...

Alguns **métodos** de um dlt.resource são:

*   ****

`add_map()`

*   ****

`add_filter()`

*   ****

`add_yield_map()`

*   ****

`add_step()`

*   ****

`add_pipe()`

*   ****

`add_limit()`

*   ****

`apply_hints()`

*   ****

`bind()`

*   ****

`compute_nested_tables_schemas()`

*   ****

`compute_table_schema()`

*   ****

`from_data()`

*   ****

`parallelize()`

*   ****

`merge_hints()`

*   ****

`pipe_data_from()`

*   ****

`select_tables()`

*   ****

`validate_reference_hint()`

*   ****

`validate_dynamic_hints()`

*   ****

`validate_transformer_generator_function()`

*   ****

`validate_write_disposition_hint()`

*   ****

`set_incremental()`

## Transformer

Podemos passar dados de um resource para outro usando um **tipo especial de resource** denominado de `dlt.transformer`

In [None]:
@dlt.resource(write_disposition="replace")
def users(limit=None):
    for u in range(limit):
        yield u


@dlt.transformer(
    name="transformer",
    data_from=users,
    # columns="",
    # file_format="",
    # max_table_nesting="",
    # merge_key="",
    # nested_hints="",
    # parallelized="",
    # primary_key="",
    # references="",
    # schema_contract="",
    # section="",
    # selected="",
    # spec="",
    # table_format="",
    # table_name="",
    # write_disposition=""
)
def users_details(user_item):
    for detail in range(user_item["user_id"]):
        yield detail

pipeline = dlt.pipeline()
pipeline.run(users_details)

Exemplos de transformers nativos do dlt são:

In [21]:
from dlt.sources.filesystem import read_csv, read_csv_duckdb, read_parquet, read_jsonl, filesystem

Transformers podem receber **inputs** via **pipe |** de outros resources/objetos

In [None]:
# Exemplo 1
users = users_details({"teste":1})
pipeline.run(users(limit=100) | users_details)

# Exemplo 2
@dlt.transformer
async def pokemon(id):
    async with httpx.AsyncClient() as client:
        r = await client.get(f"https://pokeapi.co/api/v2/pokemon/{id}")
        return r.json()

print(list([1,2] | pokemon()))

# Exemplo 3
files = filesystem(bucket_url="gs://filesystem-tutorial", file_glob="encounters*.csv")
reader = (files | read_csv()).with_name("encounters")

## Destination

Local onde ocorre a **criação e manutenção** da atual versão do **schema** e onde seus **dados** são **carregados**

Tem uma grande diversidade:

+   databases

+   datalakes

+   vector stores

+   arquivos

Podem ser configurados de 4 maneiras:

**1.**  Via **config.toml**

```toml
config.toml
[destination.filesystem]
bucket_url="s3://..."

[destination.filesystem.credentials]
AWS_ACCESS_KEY_ID=""
AWS_SECRET_ACCESS_KEY=""
AWS_SESSION_TOKEN=""
```
Acessando no código:

```python
import dlt

pipeline = dlt.pipeline("pipeline", destination="dlt.destinations.filesystem")
```

*   ****

**2.**  Via **variáveis de ambiente**

```linux
export DESTINATION__FILESYSTEM__BUCKET_URL=s3://...
export DESTINATION__FILESYSTEM__CREDENTIALS__AWS_ACCESS_KEY_ID=""
export DESTINATION__FILESYSTEM__CREDENTIALS__AWS_SESSION_TOKEN=""
```

*   ****

**3.**  Diretamente no código usando um **template**

```python
import dlt

aws_bucket = filesystem(bucket_url="s3://...", destination_name="production_s3_bucket")
pipeline = dlt.pipeline("pipeline", destination=aws_bucket)
```

*   ****

**4.**  Criando um **dlt.destination** customizado

In [None]:
@dlt.destination(
    # name="dest",
    naming_convention="",               # 
    batch_size="",                      # define quantos itens por chamada de função serão batelados juntos e enviados como um array
    loader_file_format="",              # define o formato do arquivo a ser armazenado antes de ser enviado para a destination
    loader_parallelism_strategy="",     # define como tabelas irão ser carregadas
    max_parallel_load_jobs="",          # permite definir o grau de paralelismo
    # max_table_nesting="",
    # spec=""
)
def destination():
    pass

**5.** Atribuindo credenciais diretamente no pipeline (**não recomendado**)

```python
import dlt
from dlt.destinations import postgres

pipeline = dlt.pipeline(
    "pipeline",
    destination=postgres(credentials="postgresql://loader:loader@localhost:5432/dlt_data"),
)
```

## Pipeline

**É o coração do dlt**

Nesse objeto é que são feitas as tarefas de **migração, transformação e carregamento** das fontes/sources, recursos/resources e destinos/destinations passados a ele

![image.png](attachment:image.png)

In [None]:
import dlt

pipeline = dlt.pipeline(
    pipeline_name="",           # o nome do pipeline. usado para monitoramento de eventos e restauração de schemas em execuções seguintes
    destination="",             # local onde será armazenado os dados
    dataset_name="",            # nome do dataset no local de armazenamento (pasta onde estarão os arquivos)
    dev_mode="",                # usado em desenvolvimento para que cada execução os dados sejam armazenados num novo dataset com sufixo diferente
    export_schema_path="",      # exporta arquivos YAML com a definição do schema 
    import_schema_path="",      # importa arquivos YAML com a definição do schema 
    pipeline_salt="",           # um valor aleatório de hash para fazer anonimização de dados
    pipelines_dir="",           # permite definir uma pasta onde estarão os metadados do pipeline
    progress="",                # permite incorporar barras de progresso e logs em cada etapa feita pelo pipeline.run()
    staging="",                 # um destination onde o dlt usará de stage antes de fazero load no destination final
)

A **execução** do pipeline de ponta a ponta consiste na execução do método `.run()` que pode ser composto das seguintes etapas

**Cada etapa** tem seu respectivo **método** permitindo seu uso individual

![image-2.png](attachment:image-2.png)

In [None]:
pipeline.run(
    data="",
    columns="",
    credentials="",
    # dataset_name="",
    # destination="",
    loader_file_format="",
    primary_key="",
    refresh="",
    # schema="",
    # schema_contract="",
    # staging="",
    # table_format="",
    # table_name="",
    # write_disposition=""
)

In [None]:
pipeline.extract(
    data="",
    columns="",
    loader_file_format="",
    max_parallel_items="",
    # parent_table_name="",
    # primary_key="",
    # refresh="",
    # schema="",
    # schema_contract="",
    # table_format="",
    # table_name="",
    workers="",
    # write_disposition=""
)

In [None]:
pipeline.normalize(
    workers=""
)

In [None]:
pipeline.load(
    # credentials="",
    # dataset_name="",
    # destination="",
    raise_on_failed_jobs="",
    # workers=""
)

## States

Similar ao state de terraform

Armazena **metadados do pipeline** que podem ser acessados em execuções seguintes, por exemplo

## Schema

Se trata do schema do dado que desejamos, assim como o schema de um database

## Apendice

**Mais funcionalidades** podem ser encontradas em:

In [28]:
dlt.mark

<module 'dlt.pipeline.mark' from '/home/victor/.pyenv/versions/3.13.2/lib/python3.13/site-packages/dlt/pipeline/mark.py'>

In [None]:
dlt.mark.make_hints()
dlt.mark.make_nested_hints()
dlt.mark.materialize_table_schema()
dlt.mark.with_file_import()
dlt.mark.with_hints()
dlt.mark.with_table_name()