In [0]:
from pyspark.sql import SparkSession
import requests
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql.functions import lit, current_timestamp
import logging
from datetime import datetime

In [0]:
# Configuração de logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)


# configuração
spark = SparkSession.builder.appName("BikeSampa-API").getOrCreate()

# pegando a data para colocar no nome do arquivo
data_atual = datetime.now().strftime("%Y%m%d_%H%M%S")

# url api
url = 'https://api.citybik.es/v2/networks/bikesampa'

# caminhos para salvar
caminho_bronze = "/Volumes/projeto/default/bronze"
caminho_erro = "/Volumes/projeto/default/erros/bikesampaerro.parquet"
caminho_bronze_bike_data = f"{caminho_bronze}/bike_{data_atual}.parquet"

# estruturando o dataset
bike_schema = StructType([
    StructField("empty_slots", IntegerType(), True),
    StructField("free_bikes", IntegerType(), True),
    StructField("id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("timestamp", StringType(), True)
])

def process_bikes_data(stations):
    # garantir que dados sao validos
    if not all(isinstance(s, dict) and 'id' in s and 'name' in s for s in stations):
        raise ValueError("Dados da estação estão no formato incorreto ou incompleto.")
    
    # cria o df com dados verificados
    return spark.createDataFrame(stations, schema=bike_schema)

try:
    # requisitando o bikesampa
    logger.info("Iniciando requisição para a API BikeSampa...")
    response = requests.get(url, timeout=10)
    response.raise_for_status()

    # Verifica se a resposta tem dados
    stations = response.json().get('network', {}).get('stations', [])
    if not stations:
        raise ValueError("Nenhum dado de estação foi encontrado na resposta da API.")
    
    logger.info(f"{len(stations)} estações encontradas.")

    # Processando os dados da estação
    df_bikes = process_bikes_data(stations)

    # tenta salvar no bronze e na tabela
    try:
        logger.info(f"Salvando dados no formato Parquet em {caminho_bronze}...")
        df_bikes.write.mode("overwrite").parquet(caminho_bronze_bike_data)

        logger.info("Salvando dados na tabela 'bike_tabela'...")
        df_bikes.write.mode("overwrite").saveAsTable("bike_tabela")
    
    except Exception as e:
        logger.error(f"Erro ao salvar os dados no formato Parquet ou na tabela: {e}")
        raise

except requests.exceptions.RequestException as e:
    logger.error(f"Erro ao fazer requisição para a API: {e}")
    # cria DataFrame de erro
    df_bikes = spark.createDataFrame([], bike_schema).withColumn("erro_carregamento", lit(str(e)))
    df_bikes.write.mode("overwrite").parquet(caminho_erro)

except ValueError as e:
    logger.error(f"Erro de valor: {e}")
    # cria DataFrame de erro
    df_bikes = spark.createDataFrame([], bike_schema).withColumn("erro_carregamento", lit(str(e)))
    df_bikes.write.mode("overwrite").parquet(caminho_erro)

except Exception as e:
    logger.error(f"Erro desconhecido: {e}")
    # cria DataFrame de erro
    df_bikes = spark.createDataFrame([], bike_schema).withColumn("erro_carregamento", lit(str(e)))
    df_bikes.write.mode("overwrite").parquet(caminho_erro)
