### API CONNECTION  

In [2]:
#importa librerias
import pandas as pd
import requests
import json
import psycopg2
from sqlalchemy import create_engine


In [3]:
# Leer la API key desde el archivo de texto
with open("key.txt",'r') as f:
    pwd= f.read()

In [4]:
# URL de la API
url = 'https://opendata.aemet.es/opendata/api/observacion/convencional/todas'

# Parametros de la solicitud GET
params = {"api_key": pwd}

# Realiza la solicitud GET a la API
response = requests.get(url, params=params)

# Verifica si la solicitud fue exitosa
if response.status_code == 200:
    try:
        # Obtiene la URL de los datos reales del diccionario de respuesta
        data_url = response.json().get('datos')
        metadatos_url = response.json().get('metadatos')
        
        # Realiza una solicitud GET a la URL de los datos reales
        response_data = requests.get(data_url)
        response_metadatos  = requests.get(metadatos_url)
        
        # Carga los datos en un DataFrame si la solicitud fue exitosa
        if response_data.status_code == 200:
            # Obtiene los datos en formato JSON
            data_json = response_data.json()
            
            # Convierte los datos a un DataFrame de Pandas
            df = pd.DataFrame(data_json)
            
        else:
            print("Error al obtener los datos reales:", response_data.status_code)
    except Exception as e:
        print("Error al procesar la respuesta:", e)
else:
    print("Error al obtener la URL de los datos:", response.status_code)


### COLUMN SELECTION 

In [6]:
# Obtiene la URL de los datos reales del diccionario de respuesta
metadatos_json = response_metadatos.json()
# Convierte los datos a un DataFrame de Pandas
df_aux = pd.DataFrame(metadatos_json)
# Imprime resultados del df
df_aux.head()

Unnamed: 0,unidad_generadora,periodicidad,formato,copyright,notaLegal,campos
0,Servicio de Observación,continuamente,application/json,© AEMET. Autorizado el uso de la información y...,https://www.aemet.es/es/nota_legal,"{'id': 'idema', 'descripcion': 'Indicativo cli..."
1,Servicio de Observación,continuamente,application/json,© AEMET. Autorizado el uso de la información y...,https://www.aemet.es/es/nota_legal,"{'id': 'lon', 'descripcion': 'Longitud de la e..."
2,Servicio de Observación,continuamente,application/json,© AEMET. Autorizado el uso de la información y...,https://www.aemet.es/es/nota_legal,"{'id': 'lat', 'descripcion': 'Latitud de la es..."
3,Servicio de Observación,continuamente,application/json,© AEMET. Autorizado el uso de la información y...,https://www.aemet.es/es/nota_legal,"{'id': 'alt', 'descripcion': 'Altitud de la es..."
4,Servicio de Observación,continuamente,application/json,© AEMET. Autorizado el uso de la información y...,https://www.aemet.es/es/nota_legal,"{'id': 'ubi', 'descripcion': 'Ubicación de la ..."


In [7]:
# Normaliza la columna "campos" del diccionario JSON para entender la metadata
df_normalized = pd.json_normalize(df_aux['campos'])

# Imprime resultados
df_normalized

Unnamed: 0,id,descripcion,tipo_datos,requerido
0,idema,Indicativo climatógico de la estación meteorol...,string,True
1,lon,Longitud de la estación meteorológica (grados),float,True
2,lat,Latitud de la estación meteorológica (grados),float,True
3,alt,Altitud de la estación en metros,float,True
4,ubi,Ubicación de la estación. Nombre de la estación,string,True
5,fint,"Fecha hora final del período de observación, s...",string (AAAA-MM-DDTHH:MM:SS),False
6,prec,"Precipitación acumulada, medida por el pluvióm...",float,False
7,pacutp,"Precipitación acumulada, medida por el disdróm...",float,False
8,pliqtp,Precipitación líquida acumulada durante los 60...,float,False
9,psolt,Precipitación sólida acumulada durante los 60 ...,float,False


In [8]:
# Selecciona columnas deseadas para el dataframe final
columnas_seleccionadas = ['ubi','lon', 'lat','alt','fint', 'prec', 'hr', 'ta', 'tamin', 'tamax']
df_final = df[columnas_seleccionadas]
df_final

Unnamed: 0,ubi,lon,lat,alt,fint,prec,hr,ta,tamin,tamax
0,ALFORJA,0.963335,41.213894,406.0,2024-03-10T18:00:00,0.0,86.0,7.5,7.5,8.6
1,REUS/AEROPUERTO,1.178894,41.149720,71.0,2024-03-10T18:00:00,0.9,77.0,10.2,10.0,12.0
2,VALLS,1.260838,41.293053,233.0,2024-03-10T18:00:00,0.0,73.0,11.3,11.3,12.1
3,TARRAGONA FAC. GEOGRAFIA,1.249167,41.123894,55.0,2024-03-10T18:00:00,0.6,71.0,11.7,11.7,13.2
4,PONTONS,1.519269,41.417053,632.0,2024-03-10T18:00:00,0.0,75.0,7.2,7.2,7.8
...,...,...,...,...,...,...,...,...,...,...
19155,LAS PALMAS DE GC. PLAZA DE LA FERIA,-15.421389,28.113056,15.0,2024-03-11T17:00:00,0.0,78.0,19.2,19.2,19.8
19156,MASPALOMAS,-15.595833,27.735832,6.0,2024-03-11T17:00:00,0.0,69.0,20.3,20.3,21.1
19157,TEGUISE LA GRACIOSA-HELIPUERTO,-13.510210,29.229586,19.0,2024-03-11T17:00:00,0.0,51.0,19.4,19.3,20.5
19158,SAN ANDRÉS-DEPÓSITO CABILDO,-17.960417,27.768951,1070.0,2024-03-11T17:00:00,0.0,77.0,13.0,12.7,13.2


### REDSHIFT CONNECTION  

In [10]:
# Crea la conexión a Redshift
url="data-engineer-cluster.cyhh5bfevlmn.us-east-1.redshift.amazonaws.com"
data_base="data-engineer-database"
user="julietagsaez_coderhouse"
with open("redshift_key.txt",'r') as f:
    pwd_rd= f.read()
try:
    conn = psycopg2.connect(
        host='data-engineer-cluster.cyhh5bfevlmn.us-east-1.redshift.amazonaws.com',
        dbname=data_base,
        user=user,
        password=pwd_rd,
        port='5439'
    )
    print("Conectado a Redshift con éxito")
    
except Exception as e:
    print("Error al intentar conectar a Redshift")
    print(e)

Conectado a Redshift con éxito


In [11]:
# Crea la tabla en Redshift con el ID + columnas 
with conn.cursor() as cur:
    cur.execute("""
        CREATE TABLE IF NOT EXISTS julietagsaez_coderhouse.estaciones_metereologicas (
            id INT IDENTITY(1,1) PRIMARY KEY,
            ubicacion VARCHAR (50),
            longitud FLOAT,
            latitud FLOAT,
            altitud FLOAT, 
            fecha_obs VARCHAR(50), 
            precipitaciones FLOAT,
            humedad FLOAT,
            temperatura FLOAT, 
            temperatura_min FLOAT,
            temperatura_max FLOAT         
        )
    """)
    conn.commit()

In [12]:
# Trunca tabla para evitar errores
with conn.cursor() as cur:
  cur.execute("Truncate table estaciones_metereologicas")
  count = cur.rowcount

In [11]:
# Consulta la tabla
cur = conn.cursor()
cur.execute("SELECT * FROM estaciones_metereologicas")
resultados = cur.fetchall()
resultados 

[]

In [12]:
# Define el mapeo de columnas
column_mapping = {
    'ubi': 'ubicacion',
    'lon': 'longitud',
    'lat': 'latitud',
    'alt': 'altitud',
    'fint': 'fecha_obs',
    'prec': 'precipitaciones',
    'hr': 'humedad',
    'ta': 'temperatura',
    'tamin': 'temperatura_min',
    'tamax': 'temperatura_max'
}

# Aplica el mapeo de columnas al DataFrame
df_final_mapped = df_final.rename(columns=column_mapping)

df_final_mapped.head()

Unnamed: 0,ubicacion,longitud,latitud,altitud,fecha_obs,precipitaciones,humedad,temperatura,temperatura_min,temperatura_max
0,ALFORJA,0.963335,41.213894,406.0,2024-03-10T18:00:00,0.0,86.0,7.5,7.5,8.6
1,REUS/AEROPUERTO,1.178894,41.14972,71.0,2024-03-10T18:00:00,0.9,77.0,10.2,10.0,12.0
2,VALLS,1.260838,41.293053,233.0,2024-03-10T18:00:00,0.0,73.0,11.3,11.3,12.1
3,TARRAGONA FAC. GEOGRAFIA,1.249167,41.123894,55.0,2024-03-10T18:00:00,0.6,71.0,11.7,11.7,13.2
4,PONTONS,1.519269,41.417053,632.0,2024-03-10T18:00:00,0.0,75.0,7.2,7.2,7.8


In [13]:
# Crea motor SQLAlchemy
engine = create_engine(f'postgresql+psycopg2://{user}:{pwd_rd}@{url}:{5439}/{data_base}')

In [None]:
# Carga DataFrame en la tabla de Redshift
df_final_mapped.to_sql('estaciones_metereologicas', engine, index=False, if_exists='replace')