In [0]:
import requests
from datetime import datetime, timedelta
import pyspark.pandas as ps
import time

In [0]:
def buscar_noticias(dir_escrita):
    try:
        df_final = ps.read_parquet(dir_escrita)
    except:
        df_final = ps.DataFrame()
    data_atual = datetime.now()
    data_inicio = data_atual - timedelta(days=30)
    data_inicio_formatada = data_inicio.strftime('%Y-%m-%d')

    API_KEY = 'b5dee996c6fd4446a025886f2eccd8c2'
    url = 'https://newsapi.org/v2/everything'
    params = {
        'q': ['genoma', 'genetics', 'dna'],
        'apiKey': API_KEY,
        'searchin': 'title',
        'from': data_inicio_formatada,
        'sortBy': 'relevancy'
    }
    response = requests.get(url, params=params)
    if response.status_code == 200:
        noticias_30d = response.json()
        for noticia in noticias_30d['articles']:
            dict_noticia ={
                    'source': noticia['source']['name'],
                    'author': noticia['author'],
                    'title': noticia['title'],
                    'content': noticia['content'],
                    'description': noticia['description'], 
                    'url': noticia['url'],
                    'date': noticia['publishedAt']
                }
    
            if df_final.empty:
                df_noticia = ps.DataFrame([dict_noticia])
                df_final = ps.concat([df_final, df_noticia], ignore_index=True)
            
            if not (df_final['title'].isin([dict_noticia['title']]).any()):
                df_noticia = ps.DataFrame([dict_noticia])
                df_final = ps.concat([df_final, df_noticia], ignore_index=True)
    return df_final

In [0]:
def load_data(df_final):
    try:
        df_final.to_parquet(dir_escrita)

    except Exception as e: #caso o arquivo transformando ainda não exista, quer dizer que é o primeiro processo do pipeline e é preciso criar o arquivo destino
        if 'java.io.FileNotFoundException' in str(e):
            print("Arquivo não encontrado, primeiro processamento")
            df_new.to_parquet(dir_escrita)

    print("resultado carregado com sucesso")

    #realiza a movimentação da pasta de dados brutos para a pasta de dados brutos já carregados
    dbutils.fs.mv("/FileStore/tables/dados_brutos/", "/FileStore/tables/dados_carregados/", True)

In [0]:
def elt(dir_leitura, dir_escrita):
    print("inicializa o ELT")
    try:
        #faz a chamada da extração, da carga e da transformação dos dados
        df_new = buscar_noticias(dir_escrita)
        load_data(df_new)
    except Exception as e: #caso não exista nenhum dado novo, retorna com a mensagem e encerra o processo
        if 'java.io.FileNotFoundException' in str(e):
            print("Nenhum dado novo")
        else:
            print("erro no ELT:", str(e))

In [0]:
def noticias_data(dir_escrita, dir_novo):
    df_geral = ps.read_parquet(dir_escrita)
    df = df_geral.groupby(by=[df_geral.date]).agg(quantidade=('title', 'count'))
    df = df.reset_index(drop=False)
    df.to_parquet(dir_novo)

In [0]:
def noticias_fonte_autor(dir_escrita, dir_novo):
    df_geral = ps.read_parquet(dir_escrita)
    df = df_geral.groupby(by=[df_geral.source, df_geral.author]).agg(quantidade=('title', 'count'))
    df = df.reset_index(drop=False)
    df.to_parquet(dir_novo)


In [0]:
#define o diretório dos arquivos brutos
dir_leitura = "/FileStore/tables/dados_brutos/"
#define o diretório de qual será o arquivo parquet final que irá armazenar os dados carregados
dir_escrita = "/FileStore/tables/dado_carregado/dado_consolidado_noticias_8.parquet"

elt(dir_leitura, dir_escrita)

arquivo_data = "/FileStore/tables/dado_transformado/noticias_dia2.parquet"
arquivo_fonte_autor = "/FileStore/tables/dado_transformado/noticias_fonte_autor2.parquet"
arquivo_palavras_chave = "/FileStore/tables/dado_transformado/noticias_palavra_chave2.parquet"

In [0]:
df_geral = ps.read_parquet(dir_escrita)
df_geral.head(50)

In [0]:
noticias_data(dir_escrita, arquivo_data)
df_noticias_data = ps.read_parquet(arquivo_data)
df_noticias_data.head()


In [0]:
noticias_fonte_autor(dir_escrita, arquivo_fonte_autor)
df_noticias_fonte_autor = ps.read_parquet(arquivo_fonte_autor)
df_noticias_fonte_autor.head()