# PySpark

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType
from datetime import datetime
import requests
import psycopg2
import os

# Inicializando a SparkSession
spark = SparkSession.builder \
    .appName("ExchangeRate") \
    .config("spark.driver.extraClassPath", "/path/to/postgresql-connector.jar") \
    .getOrCreate()

# URL da API de câmbio
API_URL = "https://api.exchangerate-api.com/v4/latest/USD"

# Fazendo a requisição à API
response = requests.get(API_URL)
data = response.json()

# Cotação do dólar em relação ao real
usd_to_brl = data["rates"]["BRL"]

# Lista das moedas principais
moedas_principais = {
    "Dólar Americano (USD)": "USD",
    "Euro (EUR)": "EUR",
    "Libra esterlina (GBP)": "GBP",
    "Iene (JPY)": "JPY",
    "Dólar Australiano (AUD)": "AUD",
    "Dólar Canadense (CAD)": "CAD"
}

# Capturando a data atual
data_atual = datetime.now().strftime("%Y-%m-%d")

# Criando lista de dados
dados = [
    (moeda, round(usd_to_brl / data["rates"].get(codigo, 1), 2), data_atual) 
    for moeda, codigo in moedas_principais.items()
]

# Criando um esquema para o DataFrame
schema = StructType([
    StructField("Moeda", StringType(), True),
    StructField("Cotação", FloatType(), True),
    StructField("Data", StringType(), True)
])

# Criando DataFrame usando PySpark
df = spark.createDataFrame(dados, schema=schema)
df.show()

# Configuração do banco PostgreSQL na AWS
host = "postgresql-databricks-vs.c8xy4kcoa4rk.us-east-1.rds.amazonaws.com"
user = "postgres"
database = "postgres"
password = "Vitor#2301"

# Criando conexão com o PostgreSQL
conn = psycopg2.connect(host=host, user=user, database=database, password=password)
cursor = conn.cursor()

# Criando tabela se não existir
cursor.execute("""
    CREATE TABLE IF NOT EXISTS cotacoes_moedas (
        id SERIAL PRIMARY KEY,
        moeda VARCHAR(50),
        cotacao NUMERIC,
        data DATE,
        UNIQUE (moeda, data)
    );
""")
conn.commit()

# Verificando se cada moeda já existe na data atual e inserindo/atualizando os dados
for moeda, cotacao, data in dados:
    cursor.execute("SELECT COUNT(*) FROM cotacoes_moedas WHERE moeda = %s AND data = %s;", (moeda, data))
    existe_data = cursor.fetchone()[0]

    if existe_data == 0:
        cursor.execute("""
            INSERT INTO cotacoes_moedas (moeda, cotacao, data)
            VALUES (%s, %s, %s);
        """, (moeda, cotacao, data))
    else:
        cursor.execute("""
            UPDATE cotacoes_moedas SET cotacao = %s WHERE moeda = %s AND data = %s;
        """, (cotacao, moeda, data))

conn.commit()
print("Dados inseridos ou atualizados com sucesso!")

# Fechando conexão
cursor.close()
conn.close()

+--------------------+-------+----------+
|               Moeda|Cotação|      Data|
+--------------------+-------+----------+
|Dólar Americano (...|   5.71|2025-05-25|
|          Euro (EUR)|   6.47|2025-05-25|
|Libra esterlina (...|   7.71|2025-05-25|
|          Iene (JPY)|   0.04|2025-05-25|
|Dólar Australiano...|   3.68|2025-05-25|
|Dólar Canadense (...|   4.14|2025-05-25|
+--------------------+-------+----------+

Dados inseridos ou atualizados com sucesso!
