In [121]:
import os
import botocore
import boto3
import boto3.s3.transfer as s3transfer
import time
import duckdb
from dotenv import load_dotenv
from scripts.utils import download_object, get_all_objects

# Para probar que podamos conectarnos al bucket correctamente

In [122]:
load_dotenv()  # take environment variables from .env.

True

In [144]:
os.path.isfile('databases/sample.db')

False

In [145]:
con = duckdb.connect('./databases/sample.db')
con.sql('show tables;')
# con.close() # to terminate connection

┌─────────┐
│  name   │
│ varchar │
├─────────┤
│ 0 rows  │
└─────────┘

In [124]:
botocore_config = botocore.config.Config(max_pool_connections=20)
s3 = boto3.client('s3',
    endpoint_url='https://'+os.getenv('DUCKDB_S3_ENDPOINT'),
    config=botocore_config
)

transfer_config = s3transfer.TransferConfig(
    use_threads=True,
    max_concurrency=20,
)

s3t = s3transfer.create_transfer_manager(s3, transfer_config)

In [125]:
print('Probando desde duckdb:')
res = duckdb.sql(
'''
select *
from read_csv_auto('s3://archiva-apagones/penguins.csv')   
''')
print(res)

Probando desde duckdb:
┌─────────┬───────────┬────────────────┬───────────────┬───────────────────┬─────────────┬─────────┐
│ species │  island   │ bill_length_mm │ bill_depth_mm │ flipper_length_mm │ body_mass_g │   sex   │
│ varchar │  varchar  │     double     │    double     │       int64       │    int64    │ varchar │
├─────────┼───────────┼────────────────┼───────────────┼───────────────────┼─────────────┼─────────┤
│ Adelie  │ Torgersen │           39.1 │          18.7 │               181 │        3750 │ MALE    │
│ Adelie  │ Torgersen │           39.5 │          17.4 │               186 │        3800 │ FEMALE  │
│ Adelie  │ Torgersen │           40.3 │          18.0 │               195 │        3250 │ FEMALE  │
│ Adelie  │ Torgersen │           NULL │          NULL │              NULL │        NULL │ NULL    │
│ Adelie  │ Torgersen │           36.7 │          19.3 │               193 │        3450 │ FEMALE  │
│ Adelie  │ Torgersen │           39.3 │          20.6 │            

In [126]:
# Parameters for function
local_bucket_path = './archiva-apagones' # Probs more useful as env var
prefix = 'regions_without_service'

# Create local bucket path, if not already there
os.makedirs(local_bucket_path, exist_ok=True)

# Get list of objects in bucket
obj_list = list(get_all_objects(s3, prefix))
bucket_obj_keys = [obj['Key'] for obj in obj_list]

# Get list of objects locally
local_obj_keys = []
for root, dirs, files in os.walk(local_bucket_path):
    for file in files:
        # Add to local_obj_list but strip off the local_bucket_path
        obj_key = os.path.join(root, file).replace(local_bucket_path, '')[1:]
        local_obj_keys.append(obj_key)

# Determine which (prefixed) objects are missing locally
missing_keys = set(bucket_obj_keys).difference(set(local_obj_keys))
print(f'{len(missing_keys)} objects missing locally')



254 objects missing locally


In [127]:
# Download missing objects
start_time = time.time()
for obj_key in missing_keys:
    local_path = os.path.join(local_bucket_path, obj_key)
    download_object(s3t, obj_key, local_path, verbose=True)
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Elapsed time: {elapsed_time} seconds")

Downloading regions_without_service/2024-01-24/regions_without_service__2024-01-24T11-50-17-0400.json to ./archiva-apagones/regions_without_service/2024-01-24/regions_without_service__2024-01-24T11-50-17-0400.json
Downloading regions_without_service/2024-01-24/regions_without_service__2024-01-24T06-10-16-0400.json to ./archiva-apagones/regions_without_service/2024-01-24/regions_without_service__2024-01-24T06-10-16-0400.json
Downloading regions_without_service/2024-01-24/regions_without_service__2024-01-24T08-25-16-0400.json to ./archiva-apagones/regions_without_service/2024-01-24/regions_without_service__2024-01-24T08-25-16-0400.json
Downloading regions_without_service/2024-01-24/regions_without_service__2024-01-24T04-15-16-0400.json to ./archiva-apagones/regions_without_service/2024-01-24/regions_without_service__2024-01-24T04-15-16-0400.json
Downloading regions_without_service/2024-01-24/regions_without_service__2024-01-24T05-10-15-0400.json to ./archiva-apagones/regions_without_serv

In [146]:
con.execute(
'''
create or replace TEMP table raw_regions_without_service as (
    select *
    -- from read_json('s3://archiva-apagones/regions_without_service/2023-12-19/*.json', filename=true, auto_detect=true, format='auto') -- globs too much 
    from read_json('./archiva-apagones/regions_without_service/*/*.json', filename=true, auto_detect=true, format='auto') 
)
''')
# select * from read_json('samples/regions_without_service/*.json', filename=true, auto_detect=true, format='auto')

<duckdb.duckdb.DuckDBPyConnection at 0x10e277730>

In [129]:
# con.close()

In [147]:
con.sql(
'''
describe
    select
        *
    from raw_regions_without_service
''')

┌─────────────┬────────────────────────────────────────────────────────────────┬─────────┬─────────┬─────────┬─────────┐
│ column_name │                          column_type                           │  null   │   key   │ default │  extra  │
│   varchar   │                            varchar                             │ varchar │ varchar │ varchar │ varchar │
├─────────────┼────────────────────────────────────────────────────────────────┼─────────┼─────────┼─────────┼─────────┤
│ regions     │ STRUCT("name" VARCHAR, percentageClientsWithService DOUBLE, …  │ YES     │ NULL    │ NULL    │ NULL    │
│ timestamp   │ VARCHAR                                                        │ YES     │ NULL    │ NULL    │ NULL    │
│ totals      │ STRUCT(totalClients BIGINT, totalClientsWithService BIGINT, …  │ YES     │ NULL    │ NULL    │ NULL    │
│ filename    │ VARCHAR                                                        │ YES     │ NULL    │ NULL    │ NULL    │
└─────────────┴─────────────────

In [131]:
"%Y-%m-%dT%H-%M-%S%z"
con.sql(
f'''
    select
        strptime("timestamp", '%m/%d/%Y %I:%M %p') as "marca_hora_presentada",
        -- totals.totalClients as clientes,
        -- totals.totalClientsWithService as clientes_con_servicio,
        -- totals.totalClientsWithoutService as clientes_sin_servicio,
        filename
            .string_split('__')[2]
            .regexp_extract('(.*).json', 1)
            .strptime('%Y-%m-%dT%H-%M-%S%z')
            ::TIMESTAMP -- drop timezone
            as "marca_hora_accedida",
        regions,
        totals,
        filename
            .string_split('{local_bucket_path}')[2]
            .ltrim('/')
            as object_key
        , -- TODO: Check que sea versatil para local y github actions workflow
        
    from raw_regions_without_service
''')

┌──────────────────────┬─────────────────────┬──────────────────────┬──────────────────────┬───────────────────────────┐
│ marca_hora_present…  │ marca_hora_accedida │       regions        │        totals        │        object_key         │
│      timestamp       │      timestamp      │ struct("name" varc…  │ struct(totalclient…  │          varchar          │
├──────────────────────┼─────────────────────┼──────────────────────┼──────────────────────┼───────────────────────────┤
│ 2023-12-25 11:50:00  │ 2023-12-25 11:50:18 │ [{'name': Arecibo,…  │ {'totalClients': 1…  │ regions_without_service…  │
│ 2023-12-25 05:25:00  │ 2023-12-25 05:25:18 │ [{'name': Arecibo,…  │ {'totalClients': 1…  │ regions_without_service…  │
│ 2023-12-25 13:30:00  │ 2023-12-25 13:30:20 │ [{'name': Arecibo,…  │ {'totalClients': 1…  │ regions_without_service…  │
│ 2023-12-25 21:30:00  │ 2023-12-25 21:30:18 │ [{'name': Arecibo,…  │ {'totalClients': 1…  │ regions_without_service…  │
│ 2023-12-25 02:55:00  │ 2023-12

In [148]:
"%Y-%m-%dT%H-%M-%S%z"
con.execute(
f'''
create or replace table regions_without_service_staging as (
    select
        strptime("timestamp", '%m/%d/%Y %I:%M %p') as "marca_hora_presentada",
        -- totals.totalClients as clientes,
        -- totals.totalClientsWithService as clientes_con_servicio,
        -- totals.totalClientsWithoutService as clientes_sin_servicio,
        filename
            .string_split('__')[2]
            .regexp_extract('(.*).json', 1)
            .strptime('%Y-%m-%dT%H-%M-%S%z')
            ::TIMESTAMP -- drop timezone
            as "marca_hora_accedida",
        regions,
        totals,
        filename
            .string_split('{local_bucket_path}')[2]
            .ltrim('/')
            as object_key
        , -- TODO: Check que sea versatil para local y github actions workflow
        
    from raw_regions_without_service
);
''')

<duckdb.duckdb.DuckDBPyConnection at 0x10e277730>

In [149]:
con.sql(
'''
select 
    distinct object_key
from regions_without_service_staging
'''
)#.fetchall()

┌───────────────────────────────────────────────────────────────────────────────────────────┐
│                                        object_key                                         │
│                                          varchar                                          │
├───────────────────────────────────────────────────────────────────────────────────────────┤
│ regions_without_service/2023-12-25/regions_without_service__2023-12-25T12-15-19-0400.json │
│ regions_without_service/2023-12-25/regions_without_service__2023-12-25T12-40-19-0400.json │
│ regions_without_service/2023-12-25/regions_without_service__2023-12-25T23-05-15-0400.json │
│ regions_without_service/2023-12-25/regions_without_service__2023-12-25T19-50-18-0400.json │
│ regions_without_service/2023-12-25/regions_without_service__2023-12-25T18-00-22-0400.json │
│ regions_without_service/2023-12-25/regions_without_service__2023-12-25T06-35-18-0400.json │
│ regions_without_service/2023-12-25/regions_without_service

In [134]:
con.sql(
'''

(with initial as (
select
    marca_hora_presentada,
    marca_hora_accedida,
    -- unnest(totals),
    totals.totalClients as total_clientes,
    totals.totalClientsWithService as total_clientes_con_servicio,
    totals.totalClientsWithoutService as total_clientes_sin_servicio,
    totals.totalPercentageWithService as porcentaje_clientes_con_servicio,
    totals.totalPercentageWithoutService as porcentaje_clientes_sin_servicio,
from regions_without_service_staging
)

select * from initial
)
'''
)

┌──────────────────────┬─────────────────────┬───┬──────────────────────┬──────────────────────┬──────────────────────┐
│ marca_hora_present…  │ marca_hora_accedida │ … │ total_clientes_sin…  │ porcentaje_cliente…  │ porcentaje_cliente…  │
│      timestamp       │      timestamp      │   │        int64         │        double        │        double        │
├──────────────────────┼─────────────────────┼───┼──────────────────────┼──────────────────────┼──────────────────────┤
│ 2023-12-25 11:50:00  │ 2023-12-25 11:50:18 │ … │                  335 │                99.98 │                 0.02 │
│ 2023-12-25 05:25:00  │ 2023-12-25 05:25:18 │ … │                  847 │                99.94 │                 0.06 │
│ 2023-12-25 13:30:00  │ 2023-12-25 13:30:20 │ … │                  409 │                99.97 │                 0.03 │
│ 2023-12-25 21:30:00  │ 2023-12-25 21:30:18 │ … │                   63 │                100.0 │                  0.0 │
│ 2023-12-25 02:55:00  │ 2023-12-25 02:5

In [135]:
con.sql(
'''
describe
    select
        regions[1]
    from raw_regions_without_service
''')

┌─────────────┬────────────────────────────────────────────────────────────────┬─────────┬─────────┬─────────┬─────────┐
│ column_name │                          column_type                           │  null   │   key   │ default │  extra  │
│   varchar   │                            varchar                             │ varchar │ varchar │ varchar │ varchar │
├─────────────┼────────────────────────────────────────────────────────────────┼─────────┼─────────┼─────────┼─────────┤
│ regions[1]  │ STRUCT("name" VARCHAR, percentageClientsWithService DOUBLE, …  │ YES     │ NULL    │ NULL    │ NULL    │
└─────────────┴────────────────────────────────────────────────────────────────┴─────────┴─────────┴─────────┴─────────┘

In [136]:
con.sql(
'''
    select
        regions[1],
        "timestamp",
        totals,
        filename
    from raw_regions_without_service
''')

┌──────────────────────┬─────────────────────┬──────────────────────┬──────────────────────────────────────────────────┐
│      regions[1]      │      timestamp      │        totals        │                     filename                     │
│ struct("name" varc…  │       varchar       │ struct(totalclient…  │                     varchar                      │
├──────────────────────┼─────────────────────┼──────────────────────┼──────────────────────────────────────────────────┤
│ {'name': Arecibo, …  │ 12/25/2023 11:50 AM │ {'totalClients': 1…  │ ./archiva-apagones/regions_without_service/202…  │
│ {'name': Arecibo, …  │ 12/25/2023 05:25 AM │ {'totalClients': 1…  │ ./archiva-apagones/regions_without_service/202…  │
│ {'name': Arecibo, …  │ 12/25/2023 01:30 PM │ {'totalClients': 1…  │ ./archiva-apagones/regions_without_service/202…  │
│ {'name': Arecibo, …  │ 12/25/2023 09:30 PM │ {'totalClients': 1…  │ ./archiva-apagones/regions_without_service/202…  │
│ {'name': Arecibo, …  │ 12/25/2

In [150]:
con.close()