In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat, round, lit
from googletrans import Translator
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import psycopg2
from psycopg2 import sql


# Importar las clases necesarias
import requests
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

# Crear una instancia de SparkSession
spark = SparkSession.builder.getOrCreate()

# Configurar la API key de OpenWeatherMap
api_key = "cb3c7af6f8a3112d069b2cd42e3d2651"



# Definir las provincias de Argentina
provincias = ["Buenos Aires", "Cordoba", "Santa Fe", "Mendoza", "Tucuman", "Parana", "Salta", "Resistencia", "Corrientes", "Misiones", "Santiago del Estero", "San Juan", "San Salvador de Jujuy", "Viedma", "Formosa", "Neuquen", "Rawson", "San Luis", "Catamarca", "La Rioja, AR", "Santa Rosa, AR", "Río Gallegos", "Ushuaia"]

# Definir el esquema del DataFrame
schema = StructType([
    StructField("provincia", StringType(), nullable=False),
    StructField("coord_lon", DoubleType(), nullable=False),
    StructField("coord_lat", DoubleType(), nullable=False),
    StructField("condiciones_climaticas", StringType(), nullable= True),
    StructField("descripcion_clima", StringType(), nullable=True),
    StructField("temperatura_principal", DoubleType(), nullable=False),
    StructField("sensacion_termica", DoubleType(), nullable=False),
    StructField("temperatura_minima", DoubleType(), nullable=False),
    StructField("temperatura_maxima", DoubleType(), nullable=False),
    StructField("presion", IntegerType(), nullable=False),
    StructField("humedad", IntegerType(), nullable=False),
    StructField("velocidad_viento", DoubleType(), nullable=False),
    StructField("direccion_viento", IntegerType(), nullable=False),
    StructField("ráfaga_viento", DoubleType(), nullable=True),
    StructField("nubosidad", IntegerType(), nullable=False),
    StructField("pais", StringType(), nullable=False)
])

# Crear un DataFrame vacío
df = spark.createDataFrame([], schema)


# Crear una instancia de Translator
translator = Translator(service_urls=['translate.google.com'])


# Obtener el clima para cada provincia y cargarlo en el DataFrame
for provincia in provincias:
    # Realizar la solicitud a la API de clima y obtener los datos en formato JSON
    url = f"https://api.openweathermap.org/data/2.5/weather?q={provincia}&appid={api_key}&units=metric"
    respuesta = requests.get(url)
    data = respuesta.json()


    # Extraer los datos relevantes del JSON
    coord_lon = data['coord']['lon']
    coord_lat = data['coord']['lat']
    condiciones_climaticas_ingles = data['weather'][0]['main']
    descripcion_clima_ingles = data['weather'][0]['description']  

    # Traducción de los textos al español
    if condiciones_climaticas_ingles is not None:
        condiciones_climaticas = translator.translate(condiciones_climaticas_ingles, dest='es').text
    else:
        condiciones_climaticas = "Dato No Encontrado"

    if descripcion_clima_ingles is not None:
        descripcion_clima = translator.translate(descripcion_clima_ingles, dest='es').text
    else:
        descripcion_clima = "Dato No Encontrado"



    temperatura_principal = float(data['main']['temp'])
    sensacion_termica = float(data['main']['feels_like'])
    temperatura_minima = float(data['main']['temp_min'])
    temperatura_maxima = float(data['main']['temp_max'])
    presion = data['main']['pressure']
    humedad = data['main']['humidity']
    velocidad_viento = float(data['wind']['speed'])
    direccion_viento = data['wind']['deg']
    ráfaga_viento = float(data['wind'].get('gust', 'NaN')) if 'gust' in data['wind'] else None
    nubosidad = data['clouds']['all']
    pais = data['sys']['country']

    # Crear un DataFrame temporal con los datos del clima de la provincia actual
    provincia_df = spark.createDataFrame([(provincia, coord_lon, coord_lat, condiciones_climaticas, descripcion_clima, temperatura_principal, sensacion_termica, temperatura_minima, temperatura_maxima, presion, humedad, velocidad_viento, direccion_viento, ráfaga_viento, nubosidad, pais)], schema)

    # Unir el DataFrame temporal con el DataFrame principal
    df = df.union(provincia_df)


# Agregar el símbolo porcentual a la columna "humedad"
df = df.withColumn("humedad", concat(col("humedad"), lit("%")))

# Agregar una columna nueva que sea la amplitud térmica
df = df.withColumn("amplitud_termica", round(col("temperatura_maxima") - col("temperatura_minima"), 2))

# Agregar una columna nueva que sea la temperatura promedio de la provincia
df = df.withColumn("temp_promedio", round((col("temperatura_minima") + col("temperatura_maxima")) / 2, 2))

# Mostrar el DataFrame con las transformaciones aplicadas
df.show(truncate=False)




# Datos de conexión a Amazon Redshift
host = "data-engineer-cluster.cyhh5bfevlmn.us-east-1.redshift.amazonaws.com"
port = 5439
database = "data-engineer-database"
user = "mau_giovanetti_coderhouse"
password = "5K6m1tR3h9"
schema = "mau_giovanetti_coderhouse"

# Crear la conexión a Amazon Redshift
conn = psycopg2.connect(
    host=host,
    port=port,
    database=database,
    user=user,
    password=password
)

# Verificar si la tabla ya existe
def table_exists(table_name):
    cursor = conn.cursor()
    query = sql.SQL("SELECT EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = {} AND tablename = {})").format(
        sql.Literal(schema),
        sql.Literal(table_name)
    )
    cursor.execute(query)
    exists = cursor.fetchone()[0]
    cursor.close()
    return exists

table_name = "clima"
if table_exists(table_name):
    print(f"La tabla {table_name} ya existe en Amazon Redshift.")

    # Verificar si los datos ya fueron ingresados
    cursor = conn.cursor()
    query = sql.SQL("SELECT provincia FROM {}.{}").format(
        sql.Identifier(schema),
        sql.Identifier(table_name)
    )
    cursor.execute(query)
    existing_provincias = {row[0] for row in cursor.fetchall()}
    cursor.close()

    df_to_insert = df.filter(~col("provincia").isin(existing_provincias))
else:
    # Crear la tabla con sortkeys y distkey
    def create_table(table_name):
        cursor = conn.cursor()
        query = sql.SQL("""
            CREATE TABLE {}.{} (
                provincia VARCHAR(100) NOT NULL SORTKEY DISTKEY,
                coord_lon DOUBLE PRECISION NOT NULL,
                coord_lat DOUBLE PRECISION NOT NULL,
                condiciones_climaticas VARCHAR(100),
                descripcion_clima VARCHAR(200),
                temperatura_principal DOUBLE PRECISION NOT NULL,
                sensacion_termica DOUBLE PRECISION NOT NULL,
                temperatura_minima DOUBLE PRECISION NOT NULL,
                temperatura_maxima DOUBLE PRECISION NOT NULL,
                presion INTEGER NOT NULL,
                humedad VARCHAR(10) NOT NULL,
                velocidad_viento DOUBLE PRECISION NOT NULL,
                direccion_viento INTEGER NOT NULL,
                ráfaga_viento DOUBLE PRECISION,
                nubosidad INTEGER NOT NULL,
                pais VARCHAR(100) NOT NULL,
                amplitud_termica DOUBLE PRECISION,
                temp_promedio DOUBLE PRECISION
            )
        """).format(
            sql.Identifier(schema),
            sql.Identifier(table_name)
        )
        cursor.execute(query)
        conn.commit()
        cursor.close()

    create_table(table_name)
    df_to_insert = df
    
    
# Insertar los datos del DataFrame en la tabla
cursor = conn.cursor()
for row in df_to_insert.collect():
    query = sql.SQL("""
        INSERT INTO {}.{} (
            provincia, coord_lon, coord_lat, condiciones_climaticas, descripcion_clima,
            temperatura_principal, sensacion_termica, temperatura_minima, temperatura_maxima,
            presion, humedad, velocidad_viento, direccion_viento, ráfaga_viento,
            nubosidad, pais, amplitud_termica, temp_promedio
        ) VALUES (
            {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}
        )
    """).format(
        sql.Identifier(schema),
        sql.Identifier(table_name),
        *[sql.Literal(value) for value in row]
    )
    cursor.execute(query)

    # Imprimir la provincia agregada
    provincia = row[0]
    print(f"Provincia agregada: {provincia}")

conn.commit()
cursor.close()







+---------------------+---------+---------+----------------------+-----------------------------+---------------------+-----------------+------------------+------------------+-------+-------+----------------+----------------+-------------+---------+----+----------------+-------------+
|provincia            |coord_lon|coord_lat|condiciones_climaticas|descripcion_clima            |temperatura_principal|sensacion_termica|temperatura_minima|temperatura_maxima|presion|humedad|velocidad_viento|direccion_viento|ráfaga_viento|nubosidad|pais|amplitud_termica|temp_promedio|
+---------------------+---------+---------+----------------------+-----------------------------+---------------------+-----------------+------------------+------------------+-------+-------+----------------+----------------+-------------+---------+----+----------------+-------------+
|Buenos Aires         |-58.3772 |-34.6132 |Nubes                 |nubes nubes                  |9.63                 |5.66             |7.77     