# Extracción sistemática

Una de las principales albores de los ingenieros de datos es la obtención sistemática de datos para poblar nuestros sistemas de analítica. Para ello existen herramientas especializadas que no solo pueden leer de sistemas corporativos (Bases de Datos) si no también de APIs. Aquí veremos una solución de nueva generación que nos ofrece multitud de opciones.

In [None]:
#!pip install dlt

Defaulting to user installation because normal site-packages is not writeable
Collecting dlt
  Downloading dlt-1.11.0-py3-none-any.whl.metadata (11 kB)
Collecting gitpython>=3.1.29 (from dlt)
  Downloading GitPython-3.1.44-py3-none-any.whl.metadata (13 kB)
Collecting giturlparse>=0.10.0 (from dlt)
  Using cached giturlparse-0.12.0-py2.py3-none-any.whl.metadata (4.5 kB)
Collecting hexbytes>=0.2.2 (from dlt)
  Downloading hexbytes-1.3.1-py3-none-any.whl.metadata (3.3 kB)
Collecting humanize>=4.4.0 (from dlt)
  Downloading humanize-4.12.3-py3-none-any.whl.metadata (7.8 kB)
Collecting jsonpath-ng>=1.5.3 (from dlt)
  Downloading jsonpath_ng-1.7.0-py3-none-any.whl.metadata (18 kB)
Collecting makefun>=1.15.0 (from dlt)
  Downloading makefun-1.16.0-py2.py3-none-any.whl.metadata (2.9 kB)
Collecting pathvalidate>=2.5.2 (from dlt)
  Downloading pathvalidate-3.2.3-py3-none-any.whl.metadata (12 kB)
Collecting pendulum>=2.1.2 (from dlt)
  Downloading pendulum-3.1.0-cp311-cp311-win_amd64.whl.metadata

ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
snowflake-connector-python 3.10.1 requires cryptography<43.0.0,>=3.1.0, which is not installed.
snowflake-connector-python 3.10.1 requires pyjwt<3.0.0, which is not installed.
snowflake-connector-python 3.10.1 requires pyOpenSSL<25.0.0,>=16.2.0, which is not installed.
snowflake-connector-python 3.10.1 requires sortedcontainers>=2.4.0, which is not installed.

[notice] A new release of pip is available: 24.1.2 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


**Data Load Tool** se emplea para la obtención e inserción de los datos en sistemas que controlemos nosotros. A modo sencillo podemos ver cómo podemos por ejemplo obtener datos de una API (hemos limitado el número de datos para que no se haga largo).

En este caso consultaremos una versión paginada de la API de pokemon... https://pokeapi.co/docs/v2#info

In [1]:
from dlt.sources.helpers.rest_client import RESTClient

# Initialize the RESTClient with the Pokémon API base URL
client = RESTClient(base_url="https://pokeapi.co/api/v2")

# Using the paginate method to automatically handle pagination
num = 10
poke_list = []
for page in client.paginate("/pokemon"):
    poke_list += page
    num -= 1

    # Paramos al obtener 10
    if num == 0:
        break

Con esto podemos montar nuestro dataframe, operar sobre él y cargarlo en un sistema destino.

In [2]:
import pandas as pd

poke_df = pd.DataFrame(poke_list)
poke_df

Unnamed: 0,name,url
0,bulbasaur,https://pokeapi.co/api/v2/pokemon/1/
1,ivysaur,https://pokeapi.co/api/v2/pokemon/2/
2,venusaur,https://pokeapi.co/api/v2/pokemon/3/
3,charmander,https://pokeapi.co/api/v2/pokemon/4/
4,charmeleon,https://pokeapi.co/api/v2/pokemon/5/
...,...,...
195,espeon,https://pokeapi.co/api/v2/pokemon/196/
196,umbreon,https://pokeapi.co/api/v2/pokemon/197/
197,murkrow,https://pokeapi.co/api/v2/pokemon/198/
198,slowking,https://pokeapi.co/api/v2/pokemon/199/


Los pipelines son la forma de definir esas tuberías de entrada y almacenamiento de datos de manera que se ejecuten de forma continua. Todavía no hemos hablado de las bases de datos pero son un destino frecuente a la hora de definir nuestros destinos.

In [None]:
#!pip install duckdb

Defaulting to user installation because normal site-packages is not writeable
Collecting duckdb
  Downloading duckdb-1.2.2-cp311-cp311-win_amd64.whl.metadata (995 bytes)
Downloading duckdb-1.2.2-cp311-cp311-win_amd64.whl (11.4 MB)
   ---------------------------------------- 0.0/11.4 MB ? eta -:--:--
   ---------------------------------------- 0.1/11.4 MB 4.5 MB/s eta 0:00:03
   - -------------------------------------- 0.5/11.4 MB 6.0 MB/s eta 0:00:02
   ------ --------------------------------- 2.0/11.4 MB 15.8 MB/s eta 0:00:01
   ------------- -------------------------- 3.8/11.4 MB 22.1 MB/s eta 0:00:01
   -------------------- ------------------- 5.8/11.4 MB 26.6 MB/s eta 0:00:01
   --------------------------- ------------ 7.9/11.4 MB 31.5 MB/s eta 0:00:01
   ---------------------------------- ----- 9.9/11.4 MB 33.4 MB/s eta 0:00:01
   ---------------------------------------  11.4/11.4 MB 40.9 MB/s eta 0:00:01
   ---------------------------------------- 11.4/11.4 MB 34.4 MB/s eta 0:00:


[notice] A new release of pip is available: 24.1.2 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [5]:
import dlt

pipeline = dlt.pipeline(destination="duckdb", dataset_name="country_data")

data = [
    {'country': 'USA', 'population': 331449281, 'capital': 'Washington, D.C.'},
    {'country': 'Canada', 'population': 38005238, 'capital': 'Ottawa'},
    {'country': 'Germany', 'population': 83019200, 'capital': 'Berlin'}
]

info = pipeline.run(data, table_name="countries")

print(info)

Pipeline dlt_ipykernel_launcher load step completed in 0.20 seconds
1 load package(s) were loaded to destination duckdb and into dataset country_data
The duckdb destination used duckdb:///c:\DS_PT_MAR2025\MI_DSPT2025\2-Data Analysis\5-Fuentes de datos\Web\Teoría\dlt_ipykernel_launcher.duckdb location to store data
Load package 1747420874.8055952 is LOADED and contains no failed jobs


In [6]:
info.dataset_name

'country_data'

In [7]:
import duckdb

db = duckdb.connect(database="dlt_ipykernel_launcher.duckdb")
db.sql("DESCRIBE;")

┌────────────────────────┬──────────────┬─────────────────────┬──────────────────────────────────────────────────────────────────────────────────────────────────┬─────────────────────────────────────────────────────────────────────────────────────────┬───────────┐
│        database        │    schema    │        name         │                                           column_names                                           │                                      column_types                                       │ temporary │
│        varchar         │   varchar    │       varchar       │                                            varchar[]                                             │                                        varchar[]                                        │  boolean  │
├────────────────────────┼──────────────┼─────────────────────┼──────────────────────────────────────────────────────────────────────────────────────────────────┼───────────────────────────────────────────

In [8]:
db.sql("SELECT * FROM country_data.countries;")

┌─────────┬────────────┬──────────────────┬────────────────────┬────────────────┐
│ country │ population │     capital      │    _dlt_load_id    │    _dlt_id     │
│ varchar │   int64    │     varchar      │      varchar       │    varchar     │
├─────────┼────────────┼──────────────────┼────────────────────┼────────────────┤
│ USA     │  331449281 │ Washington, D.C. │ 1747420874.8055952 │ j7RaUOn391mmnA │
│ Canada  │   38005238 │ Ottawa           │ 1747420874.8055952 │ SZOFJI5dJuhKAA │
│ Germany │   83019200 │ Berlin           │ 1747420874.8055952 │ i7jqO2X6Zjhz+w │
└─────────┴────────────┴──────────────────┴────────────────────┴────────────────┘

In [9]:
data_df = db.sql("SELECT * FROM country_data.countries;").df()
data_df

Unnamed: 0,country,population,capital,_dlt_load_id,_dlt_id
0,USA,331449281,"Washington, D.C.",1747420874.8055952,j7RaUOn391mmnA
1,Canada,38005238,Ottawa,1747420874.8055952,SZOFJI5dJuhKAA
2,Germany,83019200,Berlin,1747420874.8055952,i7jqO2X6Zjhz+w


In [10]:
type(data_df)

pandas.core.frame.DataFrame

Podemos montar estructuras complejas que se hagan cargo de ir obteniendo los datos periodicamente y formando nuestra base de datos.

In [11]:
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import JSONResponsePaginator

github_client = RESTClient(
    base_url="https://pokeapi.co/api/v2",
    paginator=JSONResponsePaginator(next_url_path="next"),
    data_selector="results"
)

@dlt.resource
def get_pokemons():
    for page in github_client.paginate(
        "/pokemon",
        params={
            "limit": 100,
        },
    ):
        yield page

pipeline = dlt.pipeline(
    pipeline_name="get_pokemons",
    destination="duckdb",
    dataset_name="pokemons",
    progress='log'
)
load_info = pipeline.run(get_pokemons)
print(load_info)

  paginator=JSONResponsePaginator(next_url_path="next"),


----------------------------- Extract get_pokemons -----------------------------
Resources: 0/1 (0.0%) | Time: 0.00s | Rate: 0.00/s
Memory usage: 212.21 MB (82.00%) | CPU usage: 0.00%

----------------------------- Extract get_pokemons -----------------------------
Resources: 0/1 (0.0%) | Time: 0.39s | Rate: 0.00/s
get_pokemons: 100  | Time: 0.00s | Rate: 0.00/s
Memory usage: 213.31 MB (82.10%) | CPU usage: 0.00%

----------------------------- Extract get_pokemons -----------------------------
Resources: 1/1 (100.0%) | Time: 0.99s | Rate: 1.01/s
get_pokemons: 1302  | Time: 0.59s | Rate: 2190.89/s
Memory usage: 215.86 MB (82.10%) | CPU usage: 0.00%

----------------------------- Extract get_pokemons -----------------------------
Resources: 0/1 (0.0%) | Time: 0.00s | Rate: 0.00/s
Memory usage: 215.86 MB (82.10%) | CPU usage: 0.00%

----------------------------- Extract get_pokemons -----------------------------
Resources: 0/1 (0.0%) | Time: 0.01s | Rate: 0.00/s
_dlt_pipeline_state: 1  | 

In [12]:
db = duckdb.connect(database="get_pokemons.duckdb")
db.sql("DESCRIBE;")

┌──────────────┬──────────┬─────────────────────┬──────────────────────────────────────────────────────────────────────────────────────────────────┬─────────────────────────────────────────────────────────────────────────────────────────┬───────────┐
│   database   │  schema  │        name         │                                           column_names                                           │                                      column_types                                       │ temporary │
│   varchar    │ varchar  │       varchar       │                                            varchar[]                                             │                                        varchar[]                                        │  boolean  │
├──────────────┼──────────┼─────────────────────┼──────────────────────────────────────────────────────────────────────────────────────────────────┼─────────────────────────────────────────────────────────────────────────────────────────┼─────────

In [13]:
response = db.sql("SELECT * FROM pokemons.get_pokemons;")
response.to_df()

Unnamed: 0,name,url,_dlt_load_id,_dlt_id
0,bulbasaur,https://pokeapi.co/api/v2/pokemon/1/,1747420893.8823023,M/04NYsmWjyUXw
1,ivysaur,https://pokeapi.co/api/v2/pokemon/2/,1747420893.8823023,bUmJ4oFMPDrQtg
2,venusaur,https://pokeapi.co/api/v2/pokemon/3/,1747420893.8823023,njZ8bmmH1V6qfA
3,charmander,https://pokeapi.co/api/v2/pokemon/4/,1747420893.8823023,gVNIsUI/EC/+Iw
4,charmeleon,https://pokeapi.co/api/v2/pokemon/5/,1747420893.8823023,0aMfe6WBq2k3ZQ
...,...,...,...,...
1297,ogerpon-wellspring-mask,https://pokeapi.co/api/v2/pokemon/10273/,1747420893.8823023,HBaZ1k+fMaCc8A
1298,ogerpon-hearthflame-mask,https://pokeapi.co/api/v2/pokemon/10274/,1747420893.8823023,G7C15QB8sok7/A
1299,ogerpon-cornerstone-mask,https://pokeapi.co/api/v2/pokemon/10275/,1747420893.8823023,O0jW/HKaL6LB3A
1300,terapagos-terastal,https://pokeapi.co/api/v2/pokemon/10276/,1747420893.8823023,7nLD7RPBHIsHgA
