In [1]:
import time
import typing as tp
from dataclasses import dataclass

@dataclass
class MockResponse:
    headers: dict
    def raise_for_status(self): pass

class NCEIBackendMock:
    def __init__(self):
        # Simulamos que tenemos un archivo de 50MB
        self.file_size = 50 * 1024 * 1024  
        self.chunk_count = 50 # Lo dividiremos en 50 partes de 1MB

    def size(self, *, uri: str) -> int:
        return self.file_size

    def read_(self, *, uri: str, chunk_size: int = 1024 * 1024) -> tp.Iterable[bytes]:
        print(f"\n[Mock] Iniciando stream para: {uri}")
        for _ in range(self.chunk_count):
            time.sleep(0.1)  # Simulamos latencia de red
            yield b"0" * chunk_size # Enviamos un MB de "datos" dummy

# Mock simple del Datasource para que funcione con smart_download
class DatasourceMock:
    def __init__(self):
        self.backend = NCEIBackendMock()
        self.mountpoint = "/ncei_data"

In [2]:
import pathlib as pl
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed

def smart_download(ds, remote_uris: list[str], local_dest: str | pl.Path, max_workers: int = 4):
    """
    Descarga archivos usando streaming y progreso por bytes si está disponible,
    o progreso por archivos como fallback.
    """
    local_root = pl.Path(local_dest)
    
    def _download_one(uri: str, position: int):
        # 1. Intentar obtener el tamaño para la barra de progreso
        try:
            total_size = ds.backend.size(uri=uri)
        except (AttributeError, Exception):
            total_size = None

        # 2. Preparar ruta local
        # (Uso de lstrip para asegurar que sea relativa al root local)
        relative_path = uri.split(ds.mountpoint)[-1].lstrip("/") 
        target_path = local_root / relative_path
        target_path.parent.mkdir(parents=True, exist_ok=True)

        # 3. Descarga con streaming si read_ existe
        if hasattr(ds.backend, 'read_') and total_size:
            with tqdm(
                total=total_size, 
                unit='B', 
                unit_scale=True, 
                desc=uri.split('/')[-1],
                leave=False, # La barra desaparece al terminar para no saturar
                position=position # Evita que las barras se solapen en multihilo
            ) as pbar:
                with open(target_path, "wb") as f:
                    for chunk in ds.backend.read_(uri=uri):
                        f.write(chunk)
                        pbar.update(len(chunk))
        else:
            # Fallback al método tradicional si no hay streaming disponible
            content = ds.load(uri=uri) # Asumiendo que load usa backend.read()
            target_path.write_bytes(content.getvalue())

    # Ejecución
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Asignamos una posición a cada hilo para que las barras de tqdm se apilen
        futures = [
            executor.submit(_download_one, uri, i % max_workers) 
            for i, uri in enumerate(remote_uris)
        ]
        
        # Barra de progreso global (por archivos)
        with tqdm(total=len(remote_uris), desc="Progreso Global", unit="file") as main_pbar:
            for future in as_completed(futures):
                main_pbar.update(1)

In [3]:
from tqdm.notebook import tqdm
import pathlib as pl

# 1. Instanciar el mock
ds_mock = DatasourceMock()

# 2. Definir archivos ficticios para descargar
archivos_ficticios = [
    "/ncei_data/sensor_v1_2024.nc",
    "/ncei_data/sensor_v2_2024.nc",
    "/ncei_data/metadata_global.nc"
]

# 3. Ejecutar la descarga inteligente
print("Iniciando prueba de descarga concurrente con streaming...")
smart_download(
    ds=ds_mock, 
    remote_uris=archivos_ficticios, 
    local_dest="./test_mock_download",
    max_workers=3
)

Iniciando prueba de descarga concurrente con streaming...


sensor_v1_2024.nc:   0%|          | 0.00/52.4M [00:00<?, ?B/s]

metadata_global.nc:   0%|          | 0.00/52.4M [00:00<?, ?B/s]

sensor_v2_2024.nc:   0%|          | 0.00/52.4M [00:00<?, ?B/s]

Progreso Global:   0%|          | 0/3 [00:00<?, ?file/s]


[Mock] Iniciando stream para: /ncei_data/sensor_v2_2024.nc

[Mock] Iniciando stream para: /ncei_data/sensor_v1_2024.nc

[Mock] Iniciando stream para: /ncei_data/metadata_global.nc


In [None]:
import tfg.storage

In [None]:
gc = tfg.storage.use_gcs_cloud(bucket="gcp-public-data-goes-16", cache_file="hola.json", expire_after=100.0)
ls = tfg.storage.use_local_drive(root_path=".")

content = gc.list(prefix="/ABI-L2-CMIPF/2020/319/20")
data = gc.load(uri=content[0])

ls.save(uri=content[0], data=data)
print(type(data))

In [None]:
ds = tfg.storage.use_aws_cloud(bucket="noaa-goes16", cache_file="hola.json", expire_after=100.0)

content = ds.list(prefix="/ABI-L2-CMIPF/2020/319/20")
data = ds.load(uri=content[1])

ls.save(uri=content[1], data=data)
print(type(data))

In [None]:
ls = tfg.storage.use_local_drive(root_path=".")
content = ls.list(prefix="/ABI-L2-CMIPF/2020/319/20")
data = ls.load(uri=content[0])
print(type(data))

In [None]:
gd = tfg.storage.use_google_drive()
gd.save(uri=content[0], data=data)
print(type(data))

In [None]:
nc = tfg.storage.use_ncei_archive(base_url="https://www.ncei.noaa.gov/data/gridsat-goes/access/goes", cache_file="hola.json", expire_after=100.0)
ls = tfg.storage.use_local_drive(root_path=".")

content = nc.list(prefix="/2007/08/")
data = nc.load(uri=content[0])

ls.save(uri=content[0], data=data)
print(type(data))