Cuaderno de python para almacenar los datos extraidos de datos abiertos en una base en sql server para su posterior procesamiento.

In [1]:
import pandas as pd
import requests
import pyodbc
from io import StringIO

# Cundinamarca Dic 2025

## Presión

In [None]:
# URL base de la API
# URL sin paginación - la paginación se hará construyendo dinámicamente el query
base_url_api = "https://www.datos.gov.co/resource/62tk-nxj5.csv"

# Parámetros
limit = 1000  # Reducido a 500 para queries más rápidas
offset = 0    # desplazamiento inicial
all_data = [] # lista para almacenar los bloques

# Configuración de los parámetros de conexión
server = 'localhost\\SQLEXPRESS'  # O el punto '.' que usaste en SSMS
database = 'EM_CUN'

# Cadena de conexión usando Autenticación de Windows (Trusted_Connection)
conn_str = (
    f'DRIVER={{ODBC Driver 17 for SQL Server}};'
    f'SERVER={server};'
    f'DATABASE={database};'
    f'Trusted_Connection=yes;'
)


insert_query = """
    INSERT INTO dbo.presion (
        codigo_estacion, codigo_sensor, fecha_observacion, valor_observado,
        nombre_estacion, departamento, municipio, zona_hidrografica,
        latitud, longitud, descripcion_sensor, unidad_medida
    ) 
    VALUES (?,?,?,?,?,?,?,?,?,?,?,?)
"""

# Función para reintentos
import time
def request_with_retries(url, max_retries=3, timeout=60):
    """Intenta hacer la solicitud hasta 3 veces con timeout de 60 segundos"""
    for attempt in range(max_retries):
        try:
            response = requests.get(url, timeout=timeout)
            return response
        except requests.exceptions.Timeout:
            print(f"Timeout en intento {attempt + 1}/{max_retries}. Esperando 5 segundos...")
            time.sleep(5)
        except requests.exceptions.ConnectionError as e:
            print(f"Error de conexión en intento {attempt + 1}/{max_retries}: {e}")
            time.sleep(5)
    
    raise Exception(f"No se pudo conectar después de {max_retries} intentos")

try:
    # 1. Establecer conexión inicial
    with pyodbc.connect(conn_str) as conn:
        cursor = conn.cursor()
        cursor.fast_executemany = True
        
        print("Iniciando proceso de descarga e ingesta...")

        while True:
            # 2. Construir el query SoQL con LIMIT y OFFSET incluidos
            soql_query = f"""SELECT
  `codigoestacion`,
  `codigosensor`,
  `fechaobservacion`,
  `valorobservado`,
  `nombreestacion`,
  `departamento`,
  `municipio`,
  `zonahidrografica`,
  `latitud`,
  `longitud`,
  `descripcionsensor`,
  `unidadmedida`
WHERE
  `fechaobservacion`
    BETWEEN "2025-12-01T00:00:00" :: floating_timestamp
    AND "2025-12-31T23:59:59" :: floating_timestamp
  AND caseless_eq(`departamento`, "CUNDINAMARCA")
LIMIT {limit}
OFFSET {offset}"""
            
            # URL-encode el query
            import urllib.parse
            encoded_query = urllib.parse.quote(soql_query)
            url = f"{base_url_api}?$query={encoded_query}"
            
            response = request_with_retries(url, max_retries=3, timeout=60)
            
            if response.status_code != 200:
                print(f"Error en la API: {response.status_code}")
                print(f"Respuesta: {response.text}")
                break

            # 3. Leer bloque como DataFrame
            df_chunk = pd.read_csv(StringIO(response.text), dtype=str)
            
            if df_chunk.empty:
                print("Proceso finalizado: No hay más datos para descargar.")
                break
            
            # Convertir tipos de datos
            df_chunk["valorobservado"] = df_chunk["valorobservado"].astype(float)
            df_chunk["latitud"] = df_chunk["latitud"].astype(float)
            df_chunk["longitud"] = df_chunk["longitud"].astype(float)  
            df_chunk["fechaobservacion"] = pd.to_datetime(df_chunk["fechaobservacion"])            
            
            # 4. Cargar bloque actual a la base de datos
            try:
                records = df_chunk.values.tolist()
                cursor.executemany(insert_query, records)
                conn.commit()  # Commit por bloque para asegurar persistencia
                
                print(f"Procesadas {offset + len(df_chunk)} filas...")
                
            except Exception as e_db:
                print(f"Error insertando bloque en offset {offset}: {e_db}")
                conn.rollback()
                break

            # 5. Incrementar offset para el siguiente bloque
            offset += limit

except pyodbc.Error as e_conn:
    print(f"Error de conexión a la base de datos: {e_conn}")

finally:
    print("Conexión cerrada.")


Iniciando proceso de descarga e ingesta...
Descargando registros desde 0...
Procesadas 1000 filas...
Descargando registros desde 1000...
Procesadas 2000 filas...
Descargando registros desde 2000...
Procesadas 3000 filas...
Descargando registros desde 3000...
Procesadas 4000 filas...
Descargando registros desde 4000...
Procesadas 5000 filas...
Descargando registros desde 5000...
Procesadas 6000 filas...
Descargando registros desde 6000...
Procesadas 7000 filas...
Descargando registros desde 7000...
Procesadas 8000 filas...
Descargando registros desde 8000...
Procesadas 9000 filas...
Descargando registros desde 9000...
Procesadas 10000 filas...
Descargando registros desde 10000...
Procesadas 11000 filas...
Descargando registros desde 11000...
Procesadas 12000 filas...
Descargando registros desde 12000...
Procesadas 13000 filas...
Descargando registros desde 13000...
Procesadas 14000 filas...
Descargando registros desde 14000...
Procesadas 15000 filas...
Descargando registros desde 15000.

## Dirección del viento

In [3]:
# URL base de la API
# URL sin paginación - la paginación se hará construyendo dinámicamente el query
base_url_api = "https://www.datos.gov.co/resource/kiw7-v9ta.csv"

# Parámetros
limit = 1000  # número máximo permitido por la API
offset = 0    # desplazamiento inicial
all_data = [] # lista para almacenar los bloques

# Configuración de los parámetros de conexión
server = 'localhost\\SQLEXPRESS'  # O el punto '.' que usaste en SSMS
database = 'EM_CUN'

# Cadena de conexión usando Autenticación de Windows (Trusted_Connection)
conn_str = (
    f'DRIVER={{ODBC Driver 17 for SQL Server}};'
    f'SERVER={server};'
    f'DATABASE={database};'
    f'Trusted_Connection=yes;'
)


insert_query = """
    INSERT INTO dbo.dir_viento (
        codigo_estacion, codigo_sensor, fecha_observacion, valor_observado,
        nombre_estacion, departamento, municipio, zona_hidrografica,
        latitud, longitud, descripcion_sensor, unidad_medida
    ) 
    VALUES (?,?,?,?,?,?,?,?,?,?,?,?)
"""


try:
    # 1. Establecer conexión inicial
    with pyodbc.connect(conn_str) as conn:
        cursor = conn.cursor()
        cursor.fast_executemany = True
        
        print("Iniciando proceso de descarga e ingesta...")

        while True:
            # 2. Construir el query SoQL con LIMIT y OFFSET incluidos
            soql_query = f"""SELECT
  `codigoestacion`,
  `codigosensor`,
  `fechaobservacion`,
  `valorobservado`,
  `nombreestacion`,
  `departamento`,
  `municipio`,
  `zonahidrografica`,
  `latitud`,
  `longitud`,
  `descripcionsensor`,
  `unidadmedida`
WHERE
  `fechaobservacion`
    BETWEEN "2025-12-01T00:00:00" :: floating_timestamp
    AND "2025-12-31T23:59:59" :: floating_timestamp
  AND caseless_eq(`departamento`, "CUNDINAMARCA")
LIMIT {limit}
OFFSET {offset}"""
            
            # URL-encode el query
            import urllib.parse
            encoded_query = urllib.parse.quote(soql_query)
            url = f"{base_url_api}?$query={encoded_query}"
            
            response = requests.get(url, timeout=15)
            
            if response.status_code != 200:
                print(f"Error en la API: {response.status_code}")
                print(f"Respuesta: {response.text}")
                break

            # 3. Leer bloque como DataFrame
            df_chunk = pd.read_csv(StringIO(response.text), dtype=str)
            
            if df_chunk.empty:
                print("Proceso finalizado: No hay más datos para descargar.")
                break
            
            # Convertir tipos de datos
            df_chunk["valorobservado"] = df_chunk["valorobservado"].astype(float)
            df_chunk["latitud"] = df_chunk["latitud"].astype(float)
            df_chunk["longitud"] = df_chunk["longitud"].astype(float)  
            df_chunk["fechaobservacion"] = pd.to_datetime(df_chunk["fechaobservacion"])            
            
            # 4. Cargar bloque actual a la base de datos
            try:
                records = df_chunk.values.tolist()
                cursor.executemany(insert_query, records)
                conn.commit()  # Commit por bloque para asegurar persistencia
                
                print(f"Procesadas {offset + len(df_chunk)} filas...")
                
            except Exception as e_db:
                print(f"Error insertando bloque en offset {offset}: {e_db}")
                conn.rollback()
                break

            # 5. Incrementar offset para el siguiente bloque
            offset += limit

except pyodbc.Error as e_conn:
    print(f"Error de conexión a la base de datos: {e_conn}")

finally:
    print("Conexión cerrada.")


Iniciando proceso de descarga e ingesta...
Procesadas 1000 filas...
Procesadas 2000 filas...
Procesadas 3000 filas...
Procesadas 4000 filas...
Procesadas 5000 filas...
Procesadas 6000 filas...
Procesadas 7000 filas...
Procesadas 8000 filas...
Procesadas 9000 filas...
Procesadas 10000 filas...
Procesadas 11000 filas...
Procesadas 12000 filas...
Procesadas 13000 filas...
Procesadas 14000 filas...
Procesadas 15000 filas...
Procesadas 16000 filas...
Procesadas 17000 filas...
Procesadas 18000 filas...
Procesadas 19000 filas...
Procesadas 20000 filas...
Procesadas 21000 filas...
Procesadas 22000 filas...
Procesadas 23000 filas...
Procesadas 24000 filas...
Procesadas 25000 filas...
Procesadas 26000 filas...
Procesadas 27000 filas...
Procesadas 28000 filas...
Procesadas 29000 filas...
Procesadas 30000 filas...
Procesadas 31000 filas...
Procesadas 32000 filas...
Procesadas 33000 filas...
Procesadas 34000 filas...
Procesadas 35000 filas...
Procesadas 36000 filas...
Procesadas 37000 filas...
Proc

## Velocidad del viento

In [4]:
# URL base de la API
# URL sin paginación - la paginación se hará construyendo dinámicamente el query
base_url_api = "https://www.datos.gov.co/resource/sgfv-3yp8.csv"

# Parámetros
limit = 1000  # número máximo permitido por la API
offset = 0    # desplazamiento inicial
all_data = [] # lista para almacenar los bloques

# Configuración de los parámetros de conexión
server = 'localhost\\SQLEXPRESS'  # O el punto '.' que usaste en SSMS
database = 'EM_CUN'

# Cadena de conexión usando Autenticación de Windows (Trusted_Connection)
conn_str = (
    f'DRIVER={{ODBC Driver 17 for SQL Server}};'
    f'SERVER={server};'
    f'DATABASE={database};'
    f'Trusted_Connection=yes;'
)


insert_query = """
    INSERT INTO dbo.vel_viento (
        codigo_estacion, codigo_sensor, fecha_observacion, valor_observado,
        nombre_estacion, departamento, municipio, zona_hidrografica,
        latitud, longitud, descripcion_sensor, unidad_medida
    ) 
    VALUES (?,?,?,?,?,?,?,?,?,?,?,?)
"""


try:
    # 1. Establecer conexión inicial
    with pyodbc.connect(conn_str) as conn:
        cursor = conn.cursor()
        cursor.fast_executemany = True
        
        print("Iniciando proceso de descarga e ingesta...")

        while True:
            # 2. Construir el query SoQL con LIMIT y OFFSET incluidos
            soql_query = f"""SELECT
  `codigoestacion`,
  `codigosensor`,
  `fechaobservacion`,
  `valorobservado`,
  `nombreestacion`,
  `departamento`,
  `municipio`,
  `zonahidrografica`,
  `latitud`,
  `longitud`,
  `descripcionsensor`,
  `unidadmedida`
WHERE
  `fechaobservacion`
    BETWEEN "2025-12-01T00:00:00" :: floating_timestamp
    AND "2025-12-31T23:59:59" :: floating_timestamp
  AND caseless_eq(`departamento`, "CUNDINAMARCA")
LIMIT {limit}
OFFSET {offset}"""
            
            # URL-encode el query
            import urllib.parse
            encoded_query = urllib.parse.quote(soql_query)
            url = f"{base_url_api}?$query={encoded_query}"
            
            response = requests.get(url, timeout=15)
            
            if response.status_code != 200:
                print(f"Error en la API: {response.status_code}")
                print(f"Respuesta: {response.text}")
                break

            # 3. Leer bloque como DataFrame
            df_chunk = pd.read_csv(StringIO(response.text), dtype=str)
            
            if df_chunk.empty:
                print("Proceso finalizado: No hay más datos para descargar.")
                break
            
            # Convertir tipos de datos
            df_chunk["valorobservado"] = df_chunk["valorobservado"].astype(float)
            df_chunk["latitud"] = df_chunk["latitud"].astype(float)
            df_chunk["longitud"] = df_chunk["longitud"].astype(float)  
            df_chunk["fechaobservacion"] = pd.to_datetime(df_chunk["fechaobservacion"])            
            
            # 4. Cargar bloque actual a la base de datos
            try:
                records = df_chunk.values.tolist()
                cursor.executemany(insert_query, records)
                conn.commit()  # Commit por bloque para asegurar persistencia
                
                print(f"Procesadas {offset + len(df_chunk)} filas...")
                
            except Exception as e_db:
                print(f"Error insertando bloque en offset {offset}: {e_db}")
                conn.rollback()
                break

            # 5. Incrementar offset para el siguiente bloque
            offset += limit

except pyodbc.Error as e_conn:
    print(f"Error de conexión a la base de datos: {e_conn}")

finally:
    print("Conexión cerrada.")


Iniciando proceso de descarga e ingesta...
Procesadas 1000 filas...
Procesadas 2000 filas...
Procesadas 3000 filas...
Procesadas 4000 filas...
Procesadas 5000 filas...
Procesadas 6000 filas...
Procesadas 7000 filas...
Procesadas 8000 filas...
Procesadas 9000 filas...
Procesadas 10000 filas...
Procesadas 11000 filas...
Procesadas 12000 filas...
Procesadas 13000 filas...
Procesadas 14000 filas...
Procesadas 15000 filas...
Procesadas 16000 filas...
Procesadas 17000 filas...
Procesadas 18000 filas...
Procesadas 19000 filas...
Procesadas 20000 filas...
Procesadas 21000 filas...
Procesadas 22000 filas...
Procesadas 23000 filas...
Procesadas 24000 filas...
Procesadas 25000 filas...
Procesadas 26000 filas...
Procesadas 27000 filas...
Procesadas 28000 filas...
Procesadas 29000 filas...
Procesadas 30000 filas...
Procesadas 31000 filas...
Procesadas 32000 filas...
Procesadas 33000 filas...
Procesadas 34000 filas...
Procesadas 35000 filas...
Procesadas 36000 filas...
Procesadas 37000 filas...
Proc

## Temperatura del aire

In [5]:
# URL base de la API
# URL sin paginación - la paginación se hará construyendo dinámicamente el query
base_url_api = "https://www.datos.gov.co/resource/sbwg-7ju4.csv"

# Parámetros
limit = 1000  # número máximo permitido por la API
offset = 0    # desplazamiento inicial
all_data = [] # lista para almacenar los bloques

# Configuración de los parámetros de conexión
server = 'localhost\\SQLEXPRESS'  # O el punto '.' que usaste en SSMS
database = 'EM_CUN'

# Cadena de conexión usando Autenticación de Windows (Trusted_Connection)
conn_str = (
    f'DRIVER={{ODBC Driver 17 for SQL Server}};'
    f'SERVER={server};'
    f'DATABASE={database};'
    f'Trusted_Connection=yes;'
)


insert_query = """
    INSERT INTO dbo.temp_aire (
        codigo_estacion, codigo_sensor, fecha_observacion, valor_observado,
        nombre_estacion, departamento, municipio, zona_hidrografica,
        latitud, longitud, descripcion_sensor, unidad_medida
    ) 
    VALUES (?,?,?,?,?,?,?,?,?,?,?,?)
"""


try:
    # 1. Establecer conexión inicial
    with pyodbc.connect(conn_str) as conn:
        cursor = conn.cursor()
        cursor.fast_executemany = True
        
        print("Iniciando proceso de descarga e ingesta...")

        while True:
            # 2. Construir el query SoQL con LIMIT y OFFSET incluidos
            soql_query = f"""SELECT
  `codigoestacion`,
  `codigosensor`,
  `fechaobservacion`,
  `valorobservado`,
  `nombreestacion`,
  `departamento`,
  `municipio`,
  `zonahidrografica`,
  `latitud`,
  `longitud`,
  `descripcionsensor`,
  `unidadmedida`
WHERE
  `fechaobservacion`
    BETWEEN "2025-12-01T00:00:00" :: floating_timestamp
    AND "2025-12-31T23:59:59" :: floating_timestamp
  AND caseless_eq(`departamento`, "CUNDINAMARCA")
LIMIT {limit}
OFFSET {offset}"""
            
            # URL-encode el query
            import urllib.parse
            encoded_query = urllib.parse.quote(soql_query)
            url = f"{base_url_api}?$query={encoded_query}"
            
            response = requests.get(url, timeout=15)
            
            if response.status_code != 200:
                print(f"Error en la API: {response.status_code}")
                print(f"Respuesta: {response.text}")
                break

            # 3. Leer bloque como DataFrame
            df_chunk = pd.read_csv(StringIO(response.text), dtype=str)
            
            if df_chunk.empty:
                print("Proceso finalizado: No hay más datos para descargar.")
                break
            
            # Convertir tipos de datos
            df_chunk["valorobservado"] = df_chunk["valorobservado"].astype(float)
            df_chunk["latitud"] = df_chunk["latitud"].astype(float)
            df_chunk["longitud"] = df_chunk["longitud"].astype(float)  
            df_chunk["fechaobservacion"] = pd.to_datetime(df_chunk["fechaobservacion"])            
            
            # 4. Cargar bloque actual a la base de datos
            try:
                records = df_chunk.values.tolist()
                cursor.executemany(insert_query, records)
                conn.commit()  # Commit por bloque para asegurar persistencia
                
                print(f"Procesadas {offset + len(df_chunk)} filas...")
                
            except Exception as e_db:
                print(f"Error insertando bloque en offset {offset}: {e_db}")
                conn.rollback()
                break

            # 5. Incrementar offset para el siguiente bloque
            offset += limit

except pyodbc.Error as e_conn:
    print(f"Error de conexión a la base de datos: {e_conn}")

finally:
    print("Conexión cerrada.")


Iniciando proceso de descarga e ingesta...
Procesadas 1000 filas...
Procesadas 2000 filas...
Procesadas 3000 filas...
Procesadas 4000 filas...
Procesadas 5000 filas...
Procesadas 6000 filas...
Procesadas 7000 filas...
Procesadas 8000 filas...
Procesadas 9000 filas...
Procesadas 10000 filas...
Procesadas 11000 filas...
Procesadas 12000 filas...
Procesadas 13000 filas...
Procesadas 14000 filas...
Procesadas 15000 filas...
Procesadas 16000 filas...
Procesadas 17000 filas...
Procesadas 18000 filas...
Procesadas 19000 filas...
Procesadas 20000 filas...
Procesadas 21000 filas...
Procesadas 22000 filas...
Procesadas 23000 filas...
Procesadas 24000 filas...
Procesadas 25000 filas...
Procesadas 26000 filas...
Procesadas 27000 filas...
Procesadas 28000 filas...
Procesadas 29000 filas...
Procesadas 30000 filas...
Procesadas 31000 filas...
Procesadas 32000 filas...
Procesadas 33000 filas...
Procesadas 34000 filas...
Procesadas 35000 filas...
Procesadas 36000 filas...
Procesadas 37000 filas...
Proc