In [0]:
%run ./api_key

In [0]:
# Packages Importing
import datetime
import json
import requests
import os
import pyspark.pandas as ps
import time

from math import ceil

# ======== FUNÇÃO DE EXTRAÇÃO DOS DADOS DA API ========
API_KEY = m_api_key

In [0]:
# =========================================

# ============ BATCH FUNCTIONS ============

# =========================================

In [0]:
# ======== FUNÇÃO DE EXTRAÇÃO DOS DADOS DA API ========
API_KEY = m_api_key

# ======== FUNÇÃO DE EXTRAÇÃO DOS DADOS DA API ========
def p(string):
    print('\n', string, sep='')

def gerar_query_string():
    # ------------ Filtro de palavras -------------- #
    # Lista de palavras chaves
    key_words = [
        'genomics', 'genetic sequencing', 'dna', 'genes',
        'genomic analysis', 'genetic variations', 'targeted therapy',
        'customized treatment', 'genetic markers', 'molecular diagnosis',
        'pharmacogenomics', 'personalized medicine'
        ]
    
    key_words_3 = [
        'genomics', 'genetic', 'dna',
        ]

    for idx, word in enumerate(key_words_3):
        if ' ' in word:
            word = word.replace(' ', ' AND ')
            key_words_3[idx] = '(' + word + ')' 
    
    return ' OR '.join(key_words_3)


# Função 1
def get_raw_news_files(
        query='genomic', 
        start_date='2023-09-26', 
        end_date='2023-09-26', 
        start_time='00:00:00', 
        end_time='01:00:00') -> None:
    """ 
    Faz a requisição à API e persiste em um arquivo JSON dentro do DBFS 
        As diversas páginas da pesquisa são salvas em arquivos JSON separados
        no diretório [FileStore/proj_4_genomics_sa/raw/queue]. 
    """

    # Variáveis da consulta
    page_n = 1
    page_size = 100
    total_pages = 1

    # Formata as datas e horários
    FROM = f'&from={start_date}T{start_time}'
    TO = f'&to={end_date}T{end_time}'

    # Configurações da API da consulta
    QUERY = f'q={query}'
    PAGE = f'&page={page_n}'
    LANGUAGE = '&language=en'
    SORTBY = '&sortBy=relevancy'

    while True:
        if (page_n > total_pages) or (page_n >= 5):
            break
        
        # ---------- Conexão com a API ------------ #
        # Monta a URL da API
        api_url = f'https://newsapi.org/v2/everything?{QUERY}{FROM}{TO}{SORTBY}{LANGUAGE}{PAGE}&apiKey={API_KEY}'
        # print(f'URI: {api_url}')
        
        # Faz a requisição à API
        response = requests.get(api_url)
        
        # Persistência do conteúdo
        if response.status_code == 200:
            # Arquivo JSON
            now = datetime.datetime.now()
            json_folder_path = '/tmp/proj'
            json_file_path = f'/newsapi-get_{now.strftime("%Y-%m-%dT%H_%M_%S")}P{page_n}.json'
            json_uri = json_folder_path + json_file_path        

            # Verifica se diretório já existe
            if not os.path.exists(json_folder_path):
                os.makedirs(json_folder_path)
            
            # Armazena o json em bloco (driver node)
            data_json = response.json()
            with open(json_uri, 'w') as json_file:
                json.dump(data_json, json_file)
            
            # Move para armazenamento em objeto (DBFS)
            json_folder_dbfs = 'dbfs:/FileStore/proj_4_genomics_sa/raw/queue'
            dbutils.fs.mv(f'file:{json_uri}', f'{json_folder_dbfs}{json_file_path}')

            # Paginação
            resultados = data_json['totalResults']
            total_pages = ceil( resultados / page_size )
            page_n += 1
            PAGE = f'&page={page_n}'

            # log
            if page_n-1 == 1:
                print(f'A requisição possui {resultados} notícias em {total_pages} páginas.')

            print(f'Arquivo JSON salvo em DBFS: {json_folder_dbfs}{json_file_path}')

        else:
            # Falha na persistência do arquivo
            print(f'Falha na requisição! Status code: {response.status_code}')
            resultados = 0
            total_pages = 0
            break

    print(f'Extração por REST API concluída: {resultados} resultados coletados em {page_n} páginas.')

In [0]:
# ---------- sub-ETL: Fluxo de JSON separados para PARQUET unificado ----------

def func_convert_to_df(json_obj: dict) -> ps.DataFrame:
    """ Função de tabulação dos dados do JSON da NewsAPI em um PyPandas """
    # Coleção de registros
    tabular_data = []

    for row in json_obj['articles']: 
        # ----------- Organização dos dados ------------ #
        dict_data = {}

        dict_data['id']      = row.get('source', '').get('id', '')
        dict_data['name']    = row.get('source', '').get('name', '')
        dict_data['author']  = row.get('author', '')
        dict_data['title']   = row.get('title', '')
        dict_data['url']     = row.get('url', '')
        dict_data['content'] = row.get('content', '')
        dict_data['description'] = row.get('description', '')
        dict_data['publishedAt'] = row.get('publishedAt', '')

        # Adicição à coleção de registros
        tabular_data.append(dict_data)
    return ps.DataFrame(tabular_data)


# Função 2
def union_raw_news_pages() -> dict:
    """ Função converte páginas JSON em um único arquivo estruturado Parquet. 
    
    RETORNO
    -------
        Dict[path: str]
        Retorna um dicionário com informação do arquivo Parquet gerado.
    """

    arquivos = dbutils.fs.ls('file:/tmp/proj_raw')
    timestamp = arquivos[0].name.split('_', maxsplit=1)[-1].split('P')[0]

    # DataFrame unido
    df = ps.DataFrame()

    n = 0
    for arquivo in arquivos:
        # Path DBFS para padrão File (Drive Node)
        json_path = arquivo.path.replace('file:', '')

        # Extrai dados de JSON
        with open(json_path, 'r') as json_file:
            json_data = json.load(json_file)

        # Transformação de JSON para DataFrame
        df_temp = func_convert_to_df(json_data)

        # Concatena df do arquivo com o df unificado
        df = ps.concat([df, df_temp], ignore_index=True)
        df['extract_date'] = timestamp

        # Log
        n += 1
        print(f'{n} de {len(arquivos)} arquivos processados')
    
    # Persiste dados em parquet
    file_path = f'dbfs:/FileStore/proj_4_genomics_sa/validated/api_responses/noticias_{timestamp}.parquet'

    # df.reset_index().to_parquet(file_path)
    df.to_parquet(file_path)

    print('Páginas juntadas com sucesso em [./validated/api_responses/]')
    return {'path': file_path}


In [0]:
# Função 3
def load_into_unified_file(news_filepath: str) -> None:
    """
    Função carrega arquivo novo de notícias e uni com o arquivo unificado.

    ARGUMENTOS
    ----------
    news_filepath : str
        Caminho do arquivo das novas notícias.
    """

    # Leitura do arquivo com as novos notícias
    df_news = ps.read_parquet(news_filepath)
    print('Tamanho do arquivo novo:', df_news.shape)

    # Caminho do arquivo unificado
    noticias_filepath = 'dbfs:/FileStore/proj_4_genomics_sa/validated/noticias.parquet'

    try:
        # Leitura da base unificada de notícias
        arquivo = dbutils.fs.ls(noticias_filepath)
        # df_unified = spark.read.parquet(noticias_filepath)
        df_unified = ps.read_parquet(noticias_filepath)
        print('Tamanho arquivo unificado:', df_unified.shape)

        # Concatenação dos dados
        df_unified = ps.concat([df_news, df_unified])

        # Descatas linhas duplicadas com base na url mantendo o artigo mais recente
        df_unified = df_unified.sort_values(by=['extract_date'])

        key = 'url'

        df_unified = df_unified.drop_duplicates(subset=key, keep='last') 

        print('Tamanho novo arquivo unificado:', df_unified.shape)
        
        # Persiste no mesmo arquivo
        df_unified.to_parquet(noticias_filepath)

    except Exception as e:
        # print(e)
        # Caso o arquivo unificado não exista ainda
        if 'java.io.FileNotFoundException' in str(e):
            print("Arquivo não encontrado, primeiro processamento")
            df_news.to_parquet(noticias_filepath)
            print('Tamanho novo arquivo unificado:', df_news.shape)
        else:
            raise e
    finally:
        print("Resultado carregado com sucesso")    

In [0]:
def noticias_por_ano_mes_dia(df):
    # Convert a coluna para datetime
    df['published_data'] = df['publishedAt'].apply(lambda x: x.split('T')[0])
    df['published_data'] = ps.to_datetime(df['published_data'])

    # Criar colunas para ano, mês e dia
    df['year'] = df['published_data'].dt.year
    df['month'] = df['published_data'].dt.month
    df['day'] = df['published_data'].dt.day
    
    # Consolidação 1: Quantidade de notícias por ano, mês e dia de publicação
    noticias_por_dia = df.groupby(['year', 'month', 'day']).size().reset_index(name='news_qtd')

    file_path = f'dbfs:/FileStore/proj_4_genomics_sa/curated/'

    noticias_por_dia.to_parquet(file_path + 'noticias_por_dia.parquet')

    #return noticias_por_ano, noticias_por_mes, noticias_por_dia

def noticias_por_fonte_autor(df):
    # Consolidação 2: Quantidade de notícias por fonte e autor
    noticias_por_fonte = df.groupby('name').size().reset_index(name='news_qtd')
    noticias_por_autor = df.groupby('author').size().reset_index(name='news_qtd')

    file_path = f'dbfs:/FileStore/proj_4_genomics_sa/curated/'

    noticias_por_fonte.to_parquet(file_path + 'noticias_por_fonte.parquet')
    noticias_por_autor.to_parquet(file_path + 'noticias_por_autor.parquet')

    #return noticias_por_fonte, noticias_por_autor

def contagem_palavras_chaves(df):
    # Consolidação 3: Quantidade de aparições de 3 palavras chaves por ano, mês e dia de publicação
    keywords = ['genomics', 'dna', 'genes']

    df['conteudo_lowercase'] = df['content'].str.lower()
    
    for word in keywords:
        df[word] = df['conteudo_lowercase'].str.count(word)
    
    df = df.drop(columns=['conteudo_lowercase'])
    
    aparicoes_por_dia = df.groupby(['year', 'month', 'day'])[keywords].sum()

    file_path = f'dbfs:/FileStore/proj_4_genomics_sa/curated/'

    aparicoes_por_dia.to_parquet(file_path + 'aparicoes_por_dia.parquet')
    
    #return aparicoes_por_ano, aparicoes_por_mes, aparicoes_por_dia

In [0]:
# ===================================

# ============ BATCH ETL ============

# ===================================

In [0]:
def etl_news( end_datetime: datetime.datetime ) -> None:
    """ Função main do ETL de novas notícias sobre o campo da Genômica. 
    
    ARGUMENTS
    ---------
        end_datetime : datetime.datetime
            Data e horário do fim da execução da extração. O horário de início será 
            calculado para 1 hora antes.

    CALL
    ----
    end_time = '2023-10-03T10:55:39'
    etl_news( datetime.datetime.fromisoformat(end_time) )
    """
    
    p('======== REQUISIÇÕES REST =========')
    # Query de busca
    # query = 'genomic'
    query = gerar_query_string()

    # ------------- Filtro de horário -------------- #
    end_date = end_datetime.strftime('%Y-%m-%d')
    end_time = end_datetime.strftime('%H-%M-%S')
    
    start_datetime = end_datetime - datetime.timedelta(hours=1)
    start_date = start_datetime.strftime('%Y-%m-%d')
    start_time = start_datetime.strftime('%H-%M-%S')
    

    # Extrai dados da API e consolida no DBFS
    get_raw_news_files(
        query=query, 
        start_date=start_date,
        end_date=end_date,
        start_time=start_time,
        end_time=end_time)
    
    p('======== UNIÃO DAS PÁGINAS =========')
    # Adaptação para o Community Edition :: copia arquivo para uma pasta do Driver Node
    dbutils.fs.cp('dbfs:/FileStore/proj_4_genomics_sa/raw/queue', 'file:/tmp/proj_raw', True)

    # Transforma JSON da consulta em Parquet
    novo_arquivo = union_raw_news_pages()

    # Movimenta arquivos lidos
    dbutils.fs.mv('dbfs:/FileStore/proj_4_genomics_sa/raw/queue', 'dbfs:/FileStore/proj_4_genomics_sa/raw/readed', True)
    print('Arquivo movidos para [readed]')

    p('======== CARGA DE TODAS NOTÍCIAS EM ARQUIVO UNIFICADO =========')
    # Carrega em arquivo unificado
    load_into_unified_file(novo_arquivo['path'])

In [0]:
def etl_consolidacao(_):
    # Leitura da base unificada de notícias
    print('ETL de consolidação sendo inicializado...')
    noticias_filepath = 'dbfs:/FileStore/proj_4_genomics_sa/validated/noticias.parquet'
    df = ps.read_parquet(noticias_filepath)

    # 4.1 - Quantidade de notícias por ano, mês e dia de publicação
    print('Consolidando as notícias por dia, mês e ano....', end='')
    noticias_por_ano_mes_dia(df)
    print('concluído.')

    # 4.2 - Quantidade de notícias por fonte e autor
    print('Consolidando as notícias por fonte e autor....', end='')
    noticias_por_fonte_autor(df)
    print('concluído.')

    # 4.3 - Quantidade de aparições de 3 palavras chaves por ano, mês e dia de publicação
    print('Consolidando as notícias por 3 palavras e dia, mês e ano....', end='')
    contagem_palavras_chaves(df)
    print('concluído.')

    print('Conclusão de consolidação! Arquivos salvos em [./curated]\n')