# Projeto - Extração de Dados I

## Sistema de Monitoramento de Avanços no Campo da Genômica

### Contexto:

O grupo trabalha no time de engenharia de dados na HealthGen, uma empresa especializada em genômica e pesquisa de medicina personalizada. A genômica é o estudo do conjunto completo de genes de um organismo, desempenha um papel fundamental na medicina personalizada e na pesquisa biomédica. Permite a análise do DNA para identificar variantes genéticas e mutações associadas a doenças e facilita a personalização de tratamentos com base nas características genéticas individuais dos pacientes.

A empresa precisa se manter atualizada sobre os avanços mais recentes na genômica, identificar oportunidades para pesquisa e desenvolvimento de tratamentos personalizados e acompanhar as tendências em genômica que podem influenciar estratégias de pesquisa e desenvolvimento. Pensando nisso, o time de dados apresentou uma proposta de desenvolvimento de um sistema que coleta, analisa e apresenta as últimas notícias relacionadas à genômica e à medicina personalizada, e também estuda o avanço do campo nos últimos anos.

O time de engenharia de dados tem como objetivo desenvolver e garantir um pipeline de dados confiável e estável. As principais atividades são:

1. **Consumo de dados com a News API**:
    - Implementar um mecanismo para consumir dados de notícias de fontes confiáveis e especializadas em genômica e medicina personalizada, a partir da News API:
      [https://newsapi.org/](https://newsapi.org/)

2. **Definir Critérios de Relevância**:
    - Desenvolver critérios precisos de relevância para filtrar as notícias. Por exemplo, o time pode se concentrar em notícias que mencionem avanços em sequenciamento de DNA, terapias genéticas personalizadas ou descobertas relacionadas a doenças genéticas específicas.

3. **Cargas em Batches**:
    - Armazenar as notícias relevantes em um formato estruturado e facilmente acessível para consultas e análises posteriores. Essa carga deve acontecer 1 vez por hora. Se as notícias extraídas já tiverem sido armazenadas na carga anterior, o processo deve ignorar e não armazenar as notícias novamente, os dados carregados não podem ficar duplicados.

4. **Dados transformados para consulta do público final**:
    - A partir dos dados carregados, aplicar as seguintes transformações e armazenar o resultado final para a consulta do público final:
        - Quantidade de notícias por ano, mês e dia de publicação;
        - Quantidade de notícias por fonte e autor;
        - Quantidade de aparições de 3 palavras-chave por ano, mês e dia de publicação (as 3 palavras-chave serão as mesmas usadas para fazer os filtros de relevância do item 2 (2. Definir Critérios de Relevância)).
    - Atualizar os dados transformados 1 vez por dia.

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, isnull, date_format
from pyspark.sql.types import StructType, StructField, StringType, DateType
from pyspark.sql.utils import AnalysisException
import requests, json, time


spark = SparkSession.builder \
    .appName("Tratamento de Dados") \
    .getOrCreate()

def buscar_news():
    api_key = '4f8fba63c2ec4291a45c702ea3858120'
    exemplos = ["Terapia Gênica", "Terapia Celular CAR-T", "CRISPR-Cas9", "Gene therapy", "CAR-T cell therapy"]
    all_articles = []
    for exemplo in exemplos:
        url = f'https://newsapi.org/v2/everything?q={exemplo}&apiKey={api_key}'
        response = requests.get(url)
        data = response.json().get('articles', []) if response.ok else []
        all_articles.extend(data)
    return all_articles

schema = StructType([
    StructField("source", StructType([
        StructField("id", StringType(), True),
        StructField("name", StringType(), True)
    ]), True),
    StructField("author", StringType(), True),
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("url", StringType(), True),
    StructField("urlToImage", StringType(), True),
    StructField("publishedAt", StringType(), True),
    StructField("content", StringType(), True)
])

def requisitar_news():
    articles = buscar_news()
    if articles:
        df = spark.createDataFrame(articles, schema=schema)
        return df
    else:
        return None

resultado = buscar_news()
print(json.dumps(resultado, indent=2))

#df.write.mode("overwrite").saveAsTable("noticias_bancodedados") #IMPORTANTE

[
  {
    "source": {
      "id": null,
      "name": "Uol.com.br"
    },
    "author": "Renata Turbiani",
    "title": null,
    "description": "Rejany Machado Pena e Jos\u00e9 Carlos da Silva, moradores de Goi\u00e2nia, se tornaram pais no dia 29 de novembro de 2000. Durante toda a gravidez, apesar de a futura mam\u00e3e ter de lidar com um cisto de ov\u00e1rio que s\u00f3 crescia, o beb\u00ea parecia bem.Mas tudo mudou ap",
    "url": "https://www.uol.com.br/vivabem/noticias/redacao/2024/03/10/pais-levam-quase-20-anos-para-descobrir-doenca-ultrarrara-do-filho.htm",
    "urlToImage": "https://conteudo.imguol.com.br/c/entretenimento/41/2024/03/06/joao-pedro-com-a-mae-1709724623447_v2_615x300.jpg",
    "publishedAt": "2024-03-10T08:51:42Z",
    "content": "S\u00f3 que dessa vez foi diferente. Quando chegou o resultado, havia um diagn\u00f3stico: defici\u00eancia da descarboxilase dos amino\u00e1cidos L-arom\u00e1ticos (defici\u00eancia de AADC). Finalmente, depois de 19 anos, soube\u20

In [0]:
from pyspark.sql.utils import AnalysisException

def atualizar_base_dados(df, spark):
    """
    Atualiza a base de dados Delta Lake com novas notícias.
    
    :param df: DataFrame contendo as novas notícias a serem adicionadas.
    :param spark: SparkSession.
    :return: DataFrame atualizado.
    """
    try:
        # Tenta ler o arquivo Delta existente
        base_dados = spark.read.format("delta").load("dbfs:/user/hive/warehouse/noticias_bancodedados")
    except AnalysisException as e:
        print("Erro ao acessar Delta Lake:", str(e))
        # Se houver um erro ao acessar Delta Lake, assume que é a primeira vez e usa o DataFrame fornecido
        print("Criando uma nova tabela Delta com os dados fornecidos.")
        base_dados = df
        base_dados.write.mode("overwrite").saveAsTable("noticias_bancodedados")
        return base_dados
    
    try:
        # Verifica se há novas notícias comparando o número de registros
        novas_noticias_count = df.join(base_dados, ["url"], "left_anti").count()
        
        # Adiciona novas notícias
        if novas_noticias_count > 0:
            base_dados = base_dados.union(df)
            print("Novas notícias adicionadas à tabela de Noticias.")
        else:
            print("Nenhuma nova notícia para adicionar à tabela Noticias.")
    except AnalysisException as e:
        print("Erro ao atualizar tabela Delta Lake:", str(e))
        return base_dados
    
    try:
        # Escreve o DataFrame de volta para a tabela Delta
        base_dados.write.mode("overwrite").format("delta").save("dbfs:/user/hive/warehouse/noticias_bancodedados")
        print("\nDados atualizados com sucesso!")
    except AnalysisException as e:
        print("Erro ao atualizar tabela Delta Lake:", str(e))
    
    return base_dados


In [0]:
# Função para tratar os dados do DataFrame
def tratar_source(df):
    # Requisita as notícias e armazena no DataFrame 'df'
    df = requisitar_news()
    
    # Verifica se há dados no DataFrame
    if df:
        # Cria uma nova coluna 'source_name' com base nos valores de 'source.id' e 'source.name'
        df = df.withColumn("source_name", when(col("source.id").isNull(), col("source.name")).otherwise(col("source.id")))
        
        # Remove a coluna 'source' do DataFrame
        df = df.drop("source")
        
        # Remove linhas com valores nulos na coluna 'author'
        df = df.dropna(subset=["author"])
        
        # Substitui valores nulos na coluna 'author' por 'Desconhecido'
        df = df.withColumn("author", when(col("author").isNull(), "Desconhecido").otherwise(col("author")))
        
        # Substitui valores nulos na coluna 'title' por 'Sem Titulo'
        df = df.withColumn("title", when(col("title").isNull(), "Sem Titulo").otherwise(col("title")))
        
        # Formata a coluna 'publishedAt' para o formato "dd/MM/yyyy"
        df = df.withColumn("publishedAt", date_format(df["publishedAt"], "dd/MM/yyyy"))
        
        # Remove as colunas '_id' e 'urlToImage' do DataFrame
        df = df.drop("_id", "urlToImage")
        
        # Seleciona as colunas desejadas, incluindo 'source_name' como a primeira coluna
        columns = ["source_name"] + [col_name for col_name in df.columns if col_name != "source_name"]
        df = df.select(columns)
        
        # Chama a função para atualizar a base de dados com os dados tratados
        df = atualizar_base_dados(df, spark)

    return df

# Chama a função para tratar os dados
df = tratar_source(df)

df.show()


Nenhuma nova notícia para adicionar à tabela Noticias.

Dados atualizados com sucesso!
+-------------------+--------------------+--------------------+--------------------+--------------------+-----------+--------------------+
|        source_name|              author|               title|         description|                 url|publishedAt|             content|
+-------------------+--------------------+--------------------+--------------------+--------------------+-----------+--------------------+
|         Uol.com.br|     Renata Turbiani|          Sem Titulo|Rejany Machado Pe...|https://www.uol.c...| 10/03/2024|Só que dessa vez ...|
|         Uol.com.br|      Agência Fapesp|Pesquisador da US...|Os testes com cél...|https://gizmodo.u...| 15/03/2024|Texto: André Juli...|
|Olhardigital.com.br|Leandro Costa Cri...|SP dá início a es...|81 pacientes deve...|https://olhardigi...| 26/03/2024|Já havíamos falad...|
|Olhardigital.com.br|Alessandro Di Lor...|CAR-T Cell: estud...|USP vai dar iníc

In [0]:
import time

while True:
    # Consulta para contar o número de linhas no banco de dados
    rows_before = spark.sql("SELECT COUNT(*) as count FROM noticias_bancodedados").collect()[0]['count']
    
    print("Iniciando: Processo de atualização.")
    print(f"Atualmente o banco de dados tem {rows_before} notícias.")
    
    # Busca as novas notícias
    buscar_news()
    
    # Requisita as novas notícias e armazena no DataFrame 'df'
    df = requisitar_news()
    
    # Chama a função para tratar os dados
    df = tratar_source(df)
    
    # Atualiza a base de dados com os dados tratados
    df = atualizar_base_dados(df, spark)
    
    # Consulta para contar o número de linhas após a atualização
    rows_after = spark.sql("SELECT COUNT(*) as count FROM noticias_bancodedados").collect()[0]['count']
    
    # Verifica se o número de linhas aumentou
    if rows_after > rows_before:
        print("O número de notícias aumentou!")
    else:
        print("O número de notícias não aumentou.")
    
    print("Repetindo...")
    # Espera 60 minutos
    time.sleep(60 * 1)


Iniciando: Processo de atualização.
Atualmente o banco de dados tem 247 linhas.
O número de linhas não aumentou.
Repetindo...
Iniciando: Processo de atualização.
Atualmente o banco de dados tem 247 linhas.
O número de linhas não aumentou.
Repetindo...


In [0]:
spark.sql("SELECT * FROM noticias_bancodedados LIMIT 10").show()

+-------------------+--------------------+--------------------+--------------------+--------------------+-----------+--------------------+
|        source_name|              author|               title|         description|                 url|publishedAt|             content|
+-------------------+--------------------+--------------------+--------------------+--------------------+-----------+--------------------+
|         Uol.com.br|     Renata Turbiani|          Sem Titulo|Rejany Machado Pe...|https://www.uol.c...| 10/03/2024|Só que dessa vez ...|
|         Uol.com.br|      Agência Fapesp|Pesquisador da US...|Os testes com cél...|https://gizmodo.u...| 15/03/2024|Texto: André Juli...|
|Olhardigital.com.br|Leandro Costa Cri...|SP dá início a es...|81 pacientes deve...|https://olhardigi...| 26/03/2024|Já havíamos falad...|
|Olhardigital.com.br|Alessandro Di Lor...|CAR-T Cell: estud...|USP vai dar iníci...|https://olhardigi...| 14/03/2024|Um importante pas...|
|         Www.abc.es|      

In [0]:
quantidades_data = spark.sql("""
    SELECT DAY(to_date(publishedAt, 'dd/MM/yyyy')) AS dia, 
           MONTH(to_date(publishedAt, 'dd/MM/yyyy')) AS mes, 
           YEAR(to_date(publishedAt, 'dd/MM/yyyy')) AS ano,
           COUNT(*) AS quantidade 
    FROM noticias_bancodedados 
    GROUP BY DAY(to_date(publishedAt, 'dd/MM/yyyy')), 
             MONTH(to_date(publishedAt, 'dd/MM/yyyy')), 
             YEAR(to_date(publishedAt, 'dd/MM/yyyy'))
    ORDER BY quantidade DESC
""")


quantidades_data.show()

quantidades_data = quantidades_data.toJSON().collect()

+---+---+----+----------+
|dia|mes| ano|quantidade|
+---+---+----+----------+
| 21|  3|2024|        24|
| 20|  3|2024|        17|
| 15|  3|2024|        17|
| 14|  3|2024|        17|
|  2|  4|2024|        15|
|  4|  4|2024|        14|
| 28|  3|2024|        12|
| 13|  3|2024|        11|
| 27|  3|2024|        10|
|  1|  4|2024|         9|
| 18|  3|2024|         9|
| 25|  3|2024|         8|
| 19|  3|2024|         7|
| 22|  3|2024|         7|
|  7|  3|2024|         7|
| 29|  3|2024|         7|
| 12|  3|2024|         6|
| 30|  3|2024|         6|
|  8|  3|2024|         5|
| 16|  3|2024|         5|
+---+---+----+----------+
only showing top 20 rows



In [0]:
quantidades_fonte_autor = spark.sql("""
    SELECT source_name, 
           author, 
           COUNT(*) AS quantidade 
    FROM noticias_bancodedados 
    GROUP BY source_name, author
    ORDER BY quantidade DESC
""")

quantidades_fonte_autor.show()

quantidades_fonte_autor = quantidades_fonte_autor.toJSON().collect()

+--------------------+--------------------+----------+
|         source_name|              author|quantidade|
+--------------------+--------------------+----------+
|      ETF Daily News|     MarketBeat News|        13|
|       Investing.com|       Investing.com|         9|
|   InvestorsObserver|   InvestorsObserver|         6|
|              Forbes|William A. Haselt...|         5|
|       GlobeNewswire|Research and Markets|         5|
|            Phys.Org|           Science X|         5|
|     Singularity Hub|          Shelly Fan|         5|
|          Biztoc.com|        benzinga.com|         5|
|          Www.abc.es|               (abc)|         4|
|Bengreenfieldlife...|support@bengreenf...|         4|
|               WebMD|https://www.faceb...|         4|
|                  rt|RT en Español\n ,...|         3|
|        ars-technica|           Beth Mole|         3|
|    Sputnikglobe.com|   Chimauchem  Nwosu|         3|
|       Investing.com|             Reuters|         3|
|       Gl

In [0]:
rank_palavras = spark.sql("""
    SELECT palavra, COUNT(*) AS quantidade
    FROM (
        SELECT EXPLODE(SPLIT(content, ' ')) AS palavra
        FROM noticias_bancodedados
    ) palavras
    WHERE LENGTH(palavra) > 3
    GROUP BY palavra
    ORDER BY quantidade DESC
""")

rank_palavras.show()

json_rank_palavras = rank_palavras.toJSON().collect() #Ideia para como armazenar na API

#print(json_rank_palavras)

+------------+----------+
|     palavra|quantidade|
+------------+----------+
|      chars]|       237|
|        with|        33|
|        that|        33|
|        have|        30|
|       March|        24|
|     therapy|        24|
|        cell|        22|
|Therapeutics|        21|
|        gene|        18|
|        from|        17|
|     Report)|        17|
|        been|        17|
|        Free|        17|
|      CRISPR|        17|
|       first|        16|
|      cancer|        16|
|    patients|        15|
|        this|        15|
|        2024|        14|
|        will|        14|
+------------+----------+
only showing top 20 rows



In [0]:
pip install kafka-python

Python interpreter will be restarted.
Collecting kafka-python
  Using cached kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
Installing collected packages: kafka-python
Successfully installed kafka-python-2.0.2
Python interpreter will be restarted.


In [0]:
from kafka import KafkaProducer, KafkaConsumer
import json

In [0]:

producer = KafkaProducer(bootstrap_servers=["localhost:9092"])

topicos = ["rank_palavras", "quantidades_data", "quantidades_fonte_autor"]

# Dados JSON para cada tópico
json_rank_palavras = json_rank_palavras
json_quantidades_data = quantidades_data
json_quantidades_fonte_autor = quantidades_fonte_autor

# Enviar dados para cada tópico
for topico, json_data in zip(topicos, [json_rank_palavras, json_quantidades_data, json_quantidades_fonte_autor]):
    mensagem = json.dumps(json_data).encode('utf-8')
    producer.send(topico, mensagem)


In [0]:
consumer = KafkaConsumer("rank_palavras", bootstrap_servers=["localhost:9092"],auto_offset_reset='earliest')

for message in consumer:
    print(message.value.decode('utf-8'))


["{\"palavra\":\"chars]\",\"quantidade\":237}", "{\"palavra\":\"with\",\"quantidade\":33}", "{\"palavra\":\"that\",\"quantidade\":33}", "{\"palavra\":\"have\",\"quantidade\":30}", "{\"palavra\":\"March\",\"quantidade\":24}", "{\"palavra\":\"therapy\",\"quantidade\":24}", "{\"palavra\":\"cell\",\"quantidade\":22}", "{\"palavra\":\"Therapeutics\",\"quantidade\":21}", "{\"palavra\":\"gene\",\"quantidade\":18}", "{\"palavra\":\"from\",\"quantidade\":17}", "{\"palavra\":\"Report)\",\"quantidade\":17}", "{\"palavra\":\"been\",\"quantidade\":17}", "{\"palavra\":\"Free\",\"quantidade\":17}", "{\"palavra\":\"CRISPR\",\"quantidade\":17}", "{\"palavra\":\"first\",\"quantidade\":16}", "{\"palavra\":\"cancer\",\"quantidade\":16}", "{\"palavra\":\"patients\",\"quantidade\":15}", "{\"palavra\":\"this\",\"quantidade\":15}", "{\"palavra\":\"2024\",\"quantidade\":14}", "{\"palavra\":\"will\",\"quantidade\":14}", "{\"palavra\":\"treatment\",\"quantidade\":14}", "{\"palavra\":\"brain\",\"quantidade\":14}"

In [0]:
consumer = KafkaConsumer("quantidades_data", bootstrap_servers=["localhost:9092"],auto_offset_reset='earliest')

for message in consumer:
    print(message.value.decode('utf-8'))

["{\"dia\":21,\"mes\":3,\"ano\":2024,\"quantidade\":24}", "{\"dia\":20,\"mes\":3,\"ano\":2024,\"quantidade\":17}", "{\"dia\":15,\"mes\":3,\"ano\":2024,\"quantidade\":17}", "{\"dia\":14,\"mes\":3,\"ano\":2024,\"quantidade\":17}", "{\"dia\":2,\"mes\":4,\"ano\":2024,\"quantidade\":15}", "{\"dia\":4,\"mes\":4,\"ano\":2024,\"quantidade\":14}", "{\"dia\":28,\"mes\":3,\"ano\":2024,\"quantidade\":12}", "{\"dia\":13,\"mes\":3,\"ano\":2024,\"quantidade\":11}", "{\"dia\":27,\"mes\":3,\"ano\":2024,\"quantidade\":10}", "{\"dia\":1,\"mes\":4,\"ano\":2024,\"quantidade\":9}", "{\"dia\":18,\"mes\":3,\"ano\":2024,\"quantidade\":9}", "{\"dia\":25,\"mes\":3,\"ano\":2024,\"quantidade\":8}", "{\"dia\":19,\"mes\":3,\"ano\":2024,\"quantidade\":7}", "{\"dia\":22,\"mes\":3,\"ano\":2024,\"quantidade\":7}", "{\"dia\":7,\"mes\":3,\"ano\":2024,\"quantidade\":7}", "{\"dia\":29,\"mes\":3,\"ano\":2024,\"quantidade\":7}", "{\"dia\":12,\"mes\":3,\"ano\":2024,\"quantidade\":6}", "{\"dia\":30,\"mes\":3,\"ano\":2024,\"quan

In [0]:
consumer = KafkaConsumer("quantidades_fonte_autor", bootstrap_servers=["localhost:9092"],auto_offset_reset='earliest')

for message in consumer:
    print(message.value.decode('utf-8'))

["{\"source_name\":\"ETF Daily News\",\"author\":\"MarketBeat News\",\"quantidade\":13}", "{\"source_name\":\"Investing.com\",\"author\":\"Investing.com\",\"quantidade\":9}", "{\"source_name\":\"InvestorsObserver\",\"author\":\"InvestorsObserver\",\"quantidade\":6}", "{\"source_name\":\"GlobeNewswire\",\"author\":\"Research and Markets\",\"quantidade\":5}", "{\"source_name\":\"Forbes\",\"author\":\"William A. Haseltine, Contributor, \\n William A. Haseltine, Contributor\\n https://www.forbes.com/sites/williamhaseltine/\",\"quantidade\":5}", "{\"source_name\":\"Phys.Org\",\"author\":\"Science X\",\"quantidade\":5}", "{\"source_name\":\"Singularity Hub\",\"author\":\"Shelly Fan\",\"quantidade\":5}", "{\"source_name\":\"Biztoc.com\",\"author\":\"benzinga.com\",\"quantidade\":5}", "{\"source_name\":\"Www.abc.es\",\"author\":\"(abc)\",\"quantidade\":4}", "{\"source_name\":\"Bengreenfieldlife.com\",\"author\":\"support@bengreenfieldfitness.com (Ben Greenfield)\",\"quantidade\":4}", "{\"sourc