In [2]:
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator
import duckdb

In [7]:
@dlt.resource(name="openaq_locs")
def openaq_locs():
    client = RESTClient(
        base_url="https://api.openaq.org/v3/locations",
        headers={"X-API-Key":"932148dc9fced6a1df5c6d006c2ab3ae249eb6076ad539c693487236ace264dc"},
        paginator=PageNumberPaginator(
            base_page=1,
            total_path=None
        )
    )

    for page in client.paginate(params={"limit":"1000"}):
        yield page

In [24]:
pipeline_name = "open_aq_locs_load"
dataset_name = "open_aq"
table_name = "locs"

pipeline = dlt.pipeline(destination="filesystem", pipeline_name=pipeline_name, dataset_name=dataset_name)
load_info = pipeline.run(openaq_locs, table_name=table_name, loader_file_format="parquet", write_disposition="replace")

#pipeline = dlt.pipeline(destination="duckdb", pipeline_name=pipeline_name, dataset_name=dataset_name)
#load_info = pipeline.run(openaq_locs, table_name=table_name, loader_file_format="jsonl", write_disposition="replace")

PipelineStepFailed: Pipeline execution failed at stage sync with exception:

<class 'dlt.common.configuration.exceptions.ConfigFieldMissingException'>
Following fields are missing: ['bucket_url'] in configuration with spec FilesystemDestinationClientConfiguration
	for field "bucket_url" config providers and keys were tried in following order:
		In Environment Variables key OPEN_AQ_LOCS_LOAD__DESTINATION__FILESYSTEM__BUCKET_URL was not found.
		In Environment Variables key OPEN_AQ_LOCS_LOAD__DESTINATION__BUCKET_URL was not found.
		In Environment Variables key OPEN_AQ_LOCS_LOAD__BUCKET_URL was not found.
		In Environment Variables key DESTINATION__FILESYSTEM__BUCKET_URL was not found.
		In Environment Variables key DESTINATION__BUCKET_URL was not found.
		In Environment Variables key BUCKET_URL was not found.
WARNING: dlt looks for .dlt folder in your current working directory and your cwd (c:\Users\risen\DataEng\Project\airquality) is different from directory of your pipeline script (c:\Users\risen\DataEng\Project\airquality\.venv\Lib\site-packages).
If you keep your secret files in the same folder as your pipeline script but run your script from some other folder, secrets/configs will not be found
Please refer to https://dlthub.com/docs/general-usage/credentials/ for more information


In [10]:
db_conn = duckdb.connect(f'{pipeline_name}.duckdb')
db_conn.sql(f'SET search_path = {dataset_name}')

In [13]:
print(f'Number of tables: {db_conn.sql("DESCRIBE").df().shape[0]}')

print(f'Number of records extracted: {pipeline.dataset(dataset_type="default").locs.df().shape[0]}')

Number of tables: 8
Number of records extracted: 19986


In [21]:
with pipeline.sql_client() as db_client:
    res = db_client.execute_sql(
        """
        SELECT *
        FROM locs
        limit 1;
        """
    )
    print(res)

[(3, 'NMA - Nima', 'Africa/Accra', 152, 'GH', 'Ghana', 4, 'Unknown Governmental Organization', 209, 'Dr. Raphael E. Arku and Colleagues', False, True, 5.58389, -0.19968, '1743198912.5263681', 'lq/3xKbxXQQI5w', None, None, None, None, None)]


In [18]:
with pipeline.sql_client() as db_client:
    res = db_client.execute_sql(
        """
SHOW TABLES;
        """
    )
    print(res)

[('_dlt_loads',), ('_dlt_pipeline_state',), ('_dlt_version',), ('locs',), ('locs__bounds',), ('locs__instruments',), ('locs__licenses',), ('locs__sensors',)]


In [None]:
with pipeline.sql_client() as db_client:
    res = db_client.execute_sql(
        """
DESCRIBE locs;
        """
    )
    print(res)

[('id', 'BIGINT', 'YES', None, None, None), ('name', 'VARCHAR', 'YES', None, None, None), ('timezone', 'VARCHAR', 'YES', None, None, None), ('country__id', 'BIGINT', 'YES', None, None, None), ('country__code', 'VARCHAR', 'YES', None, None, None), ('country__name', 'VARCHAR', 'YES', None, None, None), ('owner__id', 'BIGINT', 'YES', None, None, None), ('owner__name', 'VARCHAR', 'YES', None, None, None), ('provider__id', 'BIGINT', 'YES', None, None, None), ('provider__name', 'VARCHAR', 'YES', None, None, None), ('is_mobile', 'BOOLEAN', 'YES', None, None, None), ('is_monitor', 'BOOLEAN', 'YES', None, None, None), ('coordinates__latitude', 'DOUBLE', 'YES', None, None, None), ('coordinates__longitude', 'DOUBLE', 'YES', None, None, None), ('_dlt_load_id', 'VARCHAR', 'NO', None, None, None), ('_dlt_id', 'VARCHAR', 'NO', None, None, None), ('datetime_first__utc', 'TIMESTAMP WITH TIME ZONE', 'YES', None, None, None), ('datetime_first__local', 'TIMESTAMP WITH TIME ZONE', 'YES', None, None, None),

In [20]:
with pipeline.sql_client() as db_client:
    res = db_client.execute_sql(
        """
DESCRIBE locs__sensors;
        """
    )
    print(res)

[('id', 'BIGINT', 'YES', None, None, None), ('name', 'VARCHAR', 'YES', None, None, None), ('parameter__id', 'BIGINT', 'YES', None, None, None), ('parameter__name', 'VARCHAR', 'YES', None, None, None), ('parameter__units', 'VARCHAR', 'YES', None, None, None), ('parameter__display_name', 'VARCHAR', 'YES', None, None, None), ('_dlt_parent_id', 'VARCHAR', 'NO', None, None, None), ('_dlt_list_idx', 'BIGINT', 'NO', None, None, None), ('_dlt_id', 'VARCHAR', 'NO', None, None, None)]


In [19]:
with pipeline.sql_client() as db_client:
    res = db_client.execute_sql(
        """
SELECT *
FROM locs__sensors
LIMIT 1;
        """
    )
    print(res)

[(6, 'pm10 µg/m³', 1, 'pm10', 'µg/m³', 'PM10', 'lq/3xKbxXQQI5w', 0, 'LlPu/vTR4H8tRQ')]


In [None]:
import pandas as pd

sensors_df = pd.read_parquet("C:/Users/risen/aq_data/open_aq/locs__sensors/")#1743212940.5629327.d66fe46d42.parquet")
pd.unique(sensors_df.parameter__name)

locs_df = pd.read_parquet("C:/Users/risen/aq_data/open_aq/locs/1743212940.5629327.2f63273e3c.parquet")

[['id_sensor', 'name_sensor', 'parameter__id', 'parameter__name', 'parameter__units', 'parameter__display_name', 'id_loc', 'name_loc', 'timezone', 'country__id', 'country__code', 'country__name', 'coordinates__latitude', 'coordinates__longitude', 'datetime_first__utc', 'datetime_last__utc']]
enr_df = sensors_df.set_index('_dlt_parent_id').join(locs_df.set_index('_dlt_id'), lsuffix="_sensor", rsuffix="_loc")
with pd.option_context('display.max_seq_items', None):
    print(enr_df.columns)

#sensors_final_df = enr_df[['id_sensor', 'name_sensor', 'parameter__id', 'parameter__name', 'parameter__units', 'parameter__display_name', 'id_loc', 'name_loc', 'timezone', 'country__id', 'country__code', 'country__name', 'coordinates__latitude', 'coordinates__longitude', 'datetime_first__utc', 'datetime_last__utc']]
#sensors_final_df.to_parquet()
#enr_df

Index(['id_sensor', 'name_sensor', 'parameter__id', 'parameter__name',
       'parameter__units', 'parameter__display_name', '_dlt_list_idx',
       '_dlt_id', 'id_loc', 'name_loc', 'timezone', 'country__id',
       'country__code', 'country__name', 'owner__id', 'owner__name',
       'provider__id', 'provider__name', 'is_mobile', 'is_monitor',
       'coordinates__latitude', 'coordinates__longitude', '_dlt_load_id',
       'datetime_first__utc', 'datetime_first__local', 'datetime_last__utc',
       'datetime_last__local', 'locality'],
      dtype='object')


In [5]:
df

Unnamed: 0,id,name,parameter__id,parameter__name,parameter__units,parameter__display_name,_dlt_parent_id,_dlt_list_idx,_dlt_id
0,6,pm10 µg/m³,1,pm10,µg/m³,PM10,yzWpoYvCgS02gQ,0,oFgptEyndBBdYQ
1,5,pm25 µg/m³,2,pm25,µg/m³,PM2.5,yzWpoYvCgS02gQ,1,+te7IjPPOku9cQ
2,7,pm10 µg/m³,1,pm10,µg/m³,PM10,4mU3gtxVPvFcMQ,0,F0OK1r9CGMYENA
3,8,pm25 µg/m³,2,pm25,µg/m³,PM2.5,4mU3gtxVPvFcMQ,1,eNY5JbxDx0ISAA
4,10,pm10 µg/m³,1,pm10,µg/m³,PM10,IwqX3OwyTC+Tww,0,u+0QVnvNKDErQQ
...,...,...,...,...,...,...,...,...,...
82373,12613806,pm25 µg/m³,2,pm25,µg/m³,PM2.5,OshPrCQ4lZwckA,0,vv5UhaIE/ipIhw
82374,12613805,pm25 µg/m³,2,pm25,µg/m³,PM2.5,9w+mn4bqyxhhPA,0,Qya5QR/OQ2qVmg
82375,12614084,pm25 µg/m³,2,pm25,µg/m³,PM2.5,1FG/obi7b6WG3g,0,sKVNCG5q6O/Mtg
82376,12615991,pm25 µg/m³,2,pm25,µg/m³,PM2.5,YjRO0nBjErFNoQ,0,oYgJgFkZjFG58w
