### **Adquisición de Datos**
---
Tiempo estimado ~ 100 minutos

Espacio Requerido ~ 5 GB

In [7]:
import requests
import json
import time
import base64
import zipfile
import io
import pandas as pd
import os
from tqdm import tqdm
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

In [8]:
# Navegar hasta la raíz del proyecto 
WORKSPACE = os.path.abspath(os.path.join(os.getcwd()))

# Definir la ruta de la carpeta de datos
DATA_FOLDER = os.path.join(WORKSPACE, 'productos/test/precipitation-forecast-co/data')

print("Workspace:", WORKSPACE)
print("Data folder:", DATA_FOLDER)

Workspace: /tf
Data folder: /tf/productos/test/precipitation-forecast-co/data


In [3]:
def get_station_data(station_codes, start_date, end_date, id, label, group_size=20):
    """
    Gets station data from IDEAM server.

    Args:
        station_codes (list): List of station codes
        start_date (str): Start date
        end_date (str): End date  
        id (str): Parameter ID to retrieve
        label (str): Parameter label
        group_size (int): Size of station groups to process

    Returns:
        None (saves files to DATA_FOLDER)
    """
    url = "http://dhime.ideam.gov.co/server/rest/services/AtencionCiudadano/DescargarArchivo/GPServer/DescargarArchivo/submitJob"
    groups = [station_codes[i:i + group_size] for i in range(0, len(station_codes), group_size)]

    empty_groups = 0
    overtime_groups = 0
    total_groups = len(groups)

    with tqdm(total=total_groups, desc=f"PROCESSING {label} ({id})", unit="group") as pbar:
        for group in groups:
            filter_stations = "~or~".join([f"(IdParametro~eq~'{id}'~and~Etiqueta~eq~'{label}'~and~IdEstacion~eq~'{code}')" for code in group])
            params = {
                "Filtro": f"sort=&filter=({filter_stations})&group=&fechaInicio={start_date}T05%3A00%3A00.000Z&fechaFin={end_date}T05%3A00%3A00.000Z&mostrarGrado=true&mostrarCalificador=true&mostrarNivelAprobacion=true",
                "Items": json.dumps([{"IdParametro": id, "Etiqueta": label, "EsEjeY1": False, "EsEjeY2": False, "EsTipoLinea": False, "EsTipoBarra": False, "TipoSerie": "Estandard", "Calculo": ""}] * len(group)),
                "f": "pjson"
            }

            response = requests.post(url, data=params)

            if response.status_code == 200:
                job_id = response.json()['jobId']
                zip_url = f"http://dhime.ideam.gov.co/server/rest/services/AtencionCiudadano/DescargarArchivo/GPServer/DescargarArchivo/jobs/{job_id}/results/Archivo?f=pjson"

                start_time = time.time()
                saved_data = False
                while True:
                    zip_response = requests.get(zip_url)
                    if zip_response.status_code == 200 and 'value' in zip_response.json():
                        base64_string = zip_response.json()['value']
                        padding = 4 - (len(base64_string) % 4)
                        if padding:
                            base64_string += '=' * padding

                        try:
                            decoded_bytes = base64.b64decode(base64_string)
                            with zipfile.ZipFile(io.BytesIO(decoded_bytes)) as zip_file:
                                for filename in zip_file.namelist():
                                    if filename.endswith('.csv'):
                                        with zip_file.open(filename) as f:
                                            csv_data = f.read()
                                        os.makedirs(os.path.join(DATA_FOLDER, 'raw'), exist_ok=True)
                                        with open(os.path.join(DATA_FOLDER, 'raw', f"{label}.csv"), 'ab') as file:
                                            file.write(csv_data)
                                        saved_data = True
                            break
                        except Exception as e:
                            if not saved_data:
                                empty_groups += 1
                            break
                    elif time.time() - start_time > 120:
                        if not saved_data:
                            overtime_groups += 1
                            print(f"{label}: {group}")
                        break
                    else:
                        time.sleep(1)

            pbar.update(1)

    print(f"Number of groups processed successfully: {total_groups - empty_groups - overtime_groups}/{total_groups}")
    print(f"Number of groups without data: {empty_groups}/{total_groups}")
    print(f"Number of groups over time limit: {overtime_groups}/{total_groups}")

In [4]:
def download_cne():
    url = "https://bart.ideam.gov.co/cneideam/CNE_IDEAM.xls"
    os.makedirs(DATA_FOLDER, exist_ok=True)
    file_name = os.path.basename(url)
    file_path = os.path.join(DATA_FOLDER, file_name)
    response = requests.get(url, verify=False)

    if response.status_code == 200:
        with open(file_path, 'wb') as file:
            file.write(response.content)
    else:
        pass

In [6]:
def main():
    download_cne()
    stations_catalog = pd.read_excel(os.path.join(DATA_FOLDER, 'CNE_IDEAM.xls'))
    parameters_labels = {
        "Precipitacion Acumulada": {"IdParameter": "PRECIPITACION", "Label": "PTPM_CON"}
    }
    for parameter_name, values in parameters_labels.items():
        try:
            get_station_data(
                stations_catalog['CODIGO'].tolist(),
                '2000-01-01',
                '2024-12-31',
                values['IdParameter'],
                values['Label']
            )
        except Exception as e:
            print(f"Error processing {parameter_name}: {e}")

if __name__ == '__main__':
    main()

PROCESSING PTPM_CON (PRECIPITACION): 100%|██████████| 226/226 [1:36:41<00:00, 25.67s/group]

Number of groups processed successfully: 218/226
Number of groups without data: 8/226
Number of groups over time limit: 0/226



