In [0]:
%pip install fastapi uvicorn pyspark requests apscheduler


Python interpreter will be restarted.
Collecting fastapi
  Downloading fastapi-0.115.2-py3-none-any.whl (94 kB)
Collecting uvicorn
  Downloading uvicorn-0.32.0-py3-none-any.whl (63 kB)
Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
Collecting apscheduler
  Downloading APScheduler-3.10.4-py3-none-any.whl (59 kB)
Collecting pydantic!=1.8,!=1.8.1,!=2.0.0,!=2.0.1,!=2.1.0,<3.0.0,>=1.7.4
  Downloading pydantic-2.9.2-py3-none-any.whl (434 kB)
Collecting typing-extensions>=4.8.0
  Downloading typing_extensions-4.12.2-py3-none-any.whl (37 kB)
Collecting starlette<0.41.0,>=0.37.2
  Downloading starlette-0.40.0-py3-none-any.whl (73 kB)
Collecting h11>=0.8
  Downloading h11-0.14.0-py3-none-any.whl (58 kB)
Collecting py4j==0.10.9.7
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
Collecting tzlocal!=3.*,>=2.0
  Downloading tzlocal-5.2-py3-none-any.whl (17 kB)
Collecting annotated-types>=0.6.0
  Downloading annotated_types-0.7.0-py3-none-any.whl (13 kB)
Collecting pydantic

#### Integração com automação de atualização a cada hora

In [0]:
from fastapi import FastAPI, HTTPException
import uvicorn
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType
import requests
import os
from apscheduler.schedulers.background import BackgroundScheduler

app = FastAPI()

# Configurando Spark
spark = SparkSession.builder.appName("FastAPI Spark Parquet").getOrCreate()

# Definir o esquema dos artigos
schema = StructType([
    StructField("source", StructType([
        StructField("id", StringType(), True),
        StructField("name", StringType(), True)
    ]), True),
    StructField("author", StringType(), True),
    StructField("title", StringType(), True),
    StructField("url", StringType(), True),
    StructField("urlToImage", StringType(), True),
    StructField("publishedAt", StringType(), True),
    StructField("content", StringType(), True)
])

# Definindo a API Key e o URL
API_KEY = "325474a96fcb4288bb357897851f51ce"
API_URL = "https://newsapi.org/v2/everything"
PARQUET_PATH = "/dbfs/FileStore/dados.parquet"

# Inicializando o agendador
scheduler = BackgroundScheduler()

def fetch_data():
    """
    Função que faz a requisição à API e retorna os dados como DataFrame do Spark.
    """
    params = {
        'q': "genomics or saúde or pesquisa",  # Palavras-chave relevantes
        'language': 'pt',  # Língua do artigo
        'sortBy': 'publishedAt',  # Ordenar por data de publicação
        'apiKey': API_KEY
    }

    # Fazer a requisição
    response = requests.get(API_URL, params=params)

    if response.status_code != 200:
        raise HTTPException(status_code=response.status_code, detail="Erro ao acessar a API")

    data = response.json()
    artigos = data['articles']

    # Converter para DataFrame do Spark
    df = spark.createDataFrame(artigos, schema=schema)
    return df


def update_parquet(df_atual, parquet_path):
    """
    Função que verifica se o arquivo Parquet existe e realiza a atualização.
    """
    # Verifica se o arquivo Parquet existe
    try:
        if os.path.exists(parquet_path):
            # Se o arquivo existir, lê-lo
            df_arquivo = spark.read.parquet(parquet_path)
            print("Arquivo Parquet existente lido com sucesso.")
            
            # Encontrar novos dados que não estão no Parquet
            df_novos_dados = df_atual.join(df_arquivo, on=["title"], how="left_anti")
            
            if df_novos_dados.count() > 0:
                # Gravar apenas novos dados
                df_novos_dados.write.mode("append").parquet(parquet_path)
                return f"{df_novos_dados.count()} novos registros adicionados ao Parquet."
            else:
                return "Nenhum dado novo para atualizar."

        else:
            # Se o arquivo não existir, gravar df_atual como Parquet
            df_atual.write.mode("overwrite").parquet(parquet_path)
            return "Arquivo Parquet não existia. Criado novo arquivo."

    except Exception as e:
        return f"Ocorreu um erro: {str(e)}"


def extrair_dados():
    """
    Função que extrai dados da API e armazena em Parquet, se necessário.
    """
    try:
        # Extrair dados da API
        df_atual = fetch_data()

        # Atualizar o arquivo Parquet
        resultado = update_parquet(df_atual, PARQUET_PATH)

        return {"message": resultado}

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


@app.on_event("startup")
def start_scheduler():
    # Agendar a tarefa para executar a extração de dados a cada hora
    scheduler.add_job(extrair_dados, 'interval', hours=1)
    scheduler.start()


@app.on_event("shutdown")
def shutdown_scheduler():
    scheduler.shutdown()


@app.get("/")
def root():
    return {"message": "API de Atualização de Dados com FastAPI e Spark"}


# Executando o servidor
if __name__ == "__main__":
    config = uvicorn.Config(app)
    server = uvicorn.Server(config)
    await server.serve()    


        on_event is deprecated, use lifespan event handlers instead.

        Read more about it in the
        [FastAPI docs for Lifespan Events](https://fastapi.tiangolo.com/advanced/events/).
        
  @app.on_event("startup")
        on_event is deprecated, use lifespan event handlers instead.

        Read more about it in the
        [FastAPI docs for Lifespan Events](https://fastapi.tiangolo.com/advanced/events/).
        
  @app.on_event("shutdown")
INFO:     Started server process [2264]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)


INFO:     127.0.0.1:50488 - "GET /extradir_dados HTTP/1.1" 404 Not Found
INFO:     127.0.0.1:38946 - "GET /extrair_dados HTTP/1.1" 404 Not Found


#### Integração com automação por requisição

In [0]:
from fastapi import FastAPI, HTTPException
import uvicorn
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType
import requests
import os

app = FastAPI()

# Configurando Spark
spark = SparkSession.builder.appName("FastAPI Spark Parquet").getOrCreate()

# Definir o esquema dos artigos
schema = StructType([
    StructField("source", StructType([
        StructField("id", StringType(), True),
        StructField("name", StringType(), True)
    ]), True),
    StructField("author", StringType(), True),
    StructField("title", StringType(), True),
    StructField("url", StringType(), True),
    StructField("urlToImage", StringType(), True),
    StructField("publishedAt", StringType(), True),
    StructField("content", StringType(), True)
])

# Definindo a API Key e o URL
API_KEY = "325474a96fcb4288bb357897851f51ce"
API_URL = "https://newsapi.org/v2/everything"
PARQUET_PATH = "/dbfs/FileStore/dados.parquet"

def fetch_data():
    """
    Função que faz a requisição à API e retorna os dados como DataFrame do Spark.
    """
    # Parâmetros de consulta
    params = {
        'q': "genomics OR genomic OR 'genetic sequencing' OR 'DNA research' OR 'genetic analysis'",  # Palavras-chave relevantes
        'language': 'pt',  # Língua do artigo
        'sortBy': 'publishedAt',  # Ordenar por data de publicação
        'apiKey': API_KEY
    }

    # Fazer a requisição
    response = requests.get(API_URL, params=params)
    
    if response.status_code != 200:
        raise HTTPException(status_code=response.status_code, detail="Erro ao acessar a API")

    data = response.json()
    artigos = data['articles']

    # Converter para DataFrame do Spark
    df = spark.createDataFrame(artigos, schema=schema)
    return df

def update_parquet(df_atual, parquet_path):
    """
    Função que verifica se o arquivo Parquet existe e realiza a atualização.
    """
    try:
        # Verifica se o arquivo Parquet existe usando dbutils
        files = dbutils.fs.ls(os.path.dirname(parquet_path))
        file_names = [file.name for file in files]

        if os.path.basename(parquet_path) in file_names:
            # Se o arquivo existir, lê-lo
            df_arquivo = spark.read.parquet(parquet_path)
            print("Arquivo Parquet existente lido com sucesso.")
            
            # Encontrar novos dados que não estão no Parquet
            df_novos_dados = df_atual.join(df_arquivo, on=["title"], how="left_anti")
            
            if df_novos_dados.count() > 0:
                # Gravar apenas novos dados
                df_novos_dados.write.mode("append").parquet(parquet_path)
                return f"{df_novos_dados.count()} novos registros adicionados ao Parquet."
            else:
                return "Nenhum dado novo para atualizar."

        else:
            # Se o arquivo não existir, gravar df_atual como Parquet
            df_atual.write.mode("overwrite").parquet(parquet_path)
            return "Arquivo Parquet não existia. Criado novo arquivo."

    except Exception as e:
        return f"Ocorreu um erro: {str(e)}"

@app.get("/extrair_dados")
def extrair_dados():
    """
    Endpoint para extrair dados da API e armazenar em Parquet, se necessário.
    """
    try:
        # Extrair dados da API
        df_atual = fetch_data()

        # Atualizar o arquivo Parquet
        resultado = update_parquet(df_atual, PARQUET_PATH)

        return {"message": resultado}

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/")
def root():
    return {"message": "API de Atualização de Dados com FastAPI e Spark"}

if __name__ == "__main__":
    config = uvicorn.Config(app)
    server = uvicorn.Server(config)
    await server.serve()    
