Projeto desenvolvido por Andreza Lima no âmbito do módulo de Extração de Dados I, na trilha de Engenharia de dados do programa Santander Coders 2023, em parceria com a Ada Tech.

Desenvolvido com Databricks Community edition.

# Projeto Sistema de Monitoramento de Avanços no Campo da Genômica

Contexto:

O grupo trabalha no time de engenharia de dados na HealthGen, uma empresa especializada em genômica e pesquisa de medicina personalizada. A genômica é o estudo do conjunto completo de genes de um organismo, desempenha um papel fundamental na medicina personalizada e na pesquisa biomédica. Permite a análise do DNA para identificar variantes genéticas e mutações associadas a doenças e facilita a personalização de tratamentos com base nas características genéticas individuais dos pacientes.

A empresa precisa se manter atualizada sobre os avanços mais recentes na genômica, identificar oportunidades para pesquisa e desenvolvimento de tratamentos personalizados e acompanhar as tendências em genômica que podem influenciar estratégias de pesquisa e desenvolvimento. Pensando nisso, o time de dados apresentou uma proposta de desenvolvimento de um sistema que coleta, analisa e apresenta as últimas notícias relacionadas à genômica e à medicina personalizada, e também estuda o avanço do campo nos últimos anos. 

O time de engenharia de dados tem como objetivo desenvolver e garantir um pipeline de dados confiável e estável. As principais atividades são:

1. Consumo de dados com a News API: 
Implementar um mecanismo para consumir dados de notícias de fontes confiáveis e especializadas em genômica e medicina personalizada, a partir da News API: 
https://newsapi.org/

2. Definir Critérios de Relevância:
Desenvolver critérios precisos de relevância para filtrar as notícias. Por exemplo, o time pode se concentrar em notícias que mencionem avanços em sequenciamento de DNA, terapias genéticas personalizadas ou descobertas relacionadas a doenças genéticas específicas.

3. Cargas em Batches:
Armazenar as notícias relevantes em um formato estruturado e facilmente acessível para consultas e análises posteriores. Essa carga deve acontecer 1 vez por hora. Se as notícias extraídas já tiverem sidos armazenadas na carga anterior, o processo deve ignorar e não armazenar as notícias novamente, os dados carregados não podem ficar duplicados.

<br>

<div style="text-align: center;">
<img src="https://drive.google.com/uc?export=view&id=1QLZBxgK4c4_yysUnvtamuwXzRJm4nNit"  width="70%" height="40%">
<br>
<br>

</div>

4. Dados transformados para consulta do público final:
A partir dos dados carregados, aplicar as seguintes transformações e armazenar o resultado final para a consulta do público final:

    4.1 - Quantidade de notícias por ano, mês e dia de publicação;

    4.2 - Quantidade de notícias por fonte e autor;
    
    4.3 - Quantidade de aparições de 3 palavras chaves por ano, mês e dia de publicação (as 3 palavras chaves serão as mesmas usadas para fazer os filtros de relevância do item 2 (2. Definir Critérios de Relevância)).

Atualizar os dados transformados 1 vez por dia.

<br>

<div style="text-align: center;">
<img src="https://drive.google.com/uc?export=view&id=1QOFkzKrWqb-9CY3kC3_1XkTWNVNE05dd"  width="70%" height="40%">
<br>
<br>

</div>

Além das atividades principais, existe a necessidade de busca de dados por eventos em tempo real quando é necessário, para isso foi desenhado duas opções:

Opção 1 - Apache Kafka e Spark Streaming:

Preparar um pipeline com Apache Kafka e Spark Streaming para receber os dados do Produtor Kafka representado por um evento manual e consumir os dados com o Spark Streaming armazenando os resultados temporariamente. Em um processo paralelo, verificar os resultados armazenados temporiamente e armazenar no mesmo destino do item 3 (3. Cargas em Batches) aqueles resultados que ainda não foram armazenados no destino (os dados carregados não podem ficar duplicados). E por fim, eliminar os dados temporários após a verificação e a eventual carga.

<br>

<div style="text-align: center;">
<img src="https://drive.google.com/uc?export=view&id=1PvAxBXU0fvwEtJg36ZJ1VfBVSGETBpUZ"  width="70%" height="40%">
<br>
<br>

</div>


Opção 2 - Webhooks com notificações por eventos:

Configurar um webhook para adquirir as últimas notícias a partir de um evento representado por uma requisição POST e fazer a chamada da API e por fim armazenar os resultados temporariamente. Em um processo paralelo, verificar os resultados armazenados temporiamente e armazenar no mesmo destino do item 3 (3. Cargas em Batches) aqueles resultados que ainda não foram armazenados no destino (os dados carregados não podem ficar duplicados). E por fim, eliminar os dados temporários após a verificação e a eventual carga.

<br>

<div style="text-align: center;">
<img src="https://drive.google.com/uc?export=view&id=1Px6Jp3aNuF-wpn_9earonylEMebzOcBW"  width="70%" height="40%">
<br>
<br>

</div>

## Atividades que precisam ser realizadas pelo grupo:

O grupo precisa construir o pipeline de dados seguindo os requisitos das atividades principais e escolher entre a Opção 1 e Opção 2 para desenvolvimento.

# Proposta de Resolução

A execução deste projeto foi dividida em 5 arquivos.

1-ada-projeto-load-data: contém os passos necessários para carregamento dos dados de hora em hora

2-ada-projeto-transform-data: contém os passos necessários para transformação do arquivo consolidado, a cada 24 horas

3-ada-projeto-webhook: cria uma aplicação flask para o processo de webhook

4-ada-projeto-load-temp-data: contém os passos necessários para carregamento dos dados em uma pasta temporária, e depois carregamento no arquivo principal

5-ada-projeto-webhook-post (este arquivo): envia post commands para a aplicação criada no arquivo 3-ada-projeto-webhook

## Importando Pacotes

In [0]:
## Executar apenas se for necessário instalar pacotes
# !pip install pyspark
# !pip install flask

In [0]:
from datetime import datetime
# import flask
import json
import pyspark.pandas as ps
import requests
import time

## 1. Consumo de dados com a News API
Ir para o arquivo 1-ada-projeto-load-data

## 2. Definir Critérios de Relevância

De acordo com Antunes (2022)¹, após cerca de 20 anos depois da publicação do primeiro rascunho da sequência de todo o genoma humano, os investigadores da área mergulharam na era “pós-genómica”, remodelando a investigação biológica e abrindo portas para a chamada medicina personalizada ou medicina de precisão, que pesquisa como medicamentos podem ser adaptados de um modo preciso, proporcionando o melhor tratamento possível através da sequenciação do genoma de cada indivíduo.

Tendo como base essa discussão sobre os avanços da genômica, as palavras-chave escolhidas para este projeto foram os termos em inglês *genomics*, *precision medicine* e *personalized medicine*.

¹ Antunes, A., (2022) Avanços da genómica, Rev. Ciência Elem., V10(4):056. Disponível em: https://rce.casadasciencias.org/rceapp/art/2022/056/. Acesso em: 25 set. 2023.

## 3. Cargas em Batches:

Ir para o arquivo 1-ada-projeto-load-data

## Opção 2 - Webhooks com notificações por eventos:

Webhook é uma forma de automação de comunicação entre sistemas. É um método que permite que um sistema envie automaticamente dados ou informações para outro sistema assim que um evento específico ocorrer. 

Para fins deste projeto, será criada uma aplicação com flask para simular um evento que ocorre sempre que um post command é executado.


### Criação de uma aplicação flask para simular um evento

Configurar um webhook para adquirir as últimas notícias a partir de um evento representado por uma requisição POST e fazer a chamada da API e por fim armazenar os resultados temporariamente. Em um processo paralelo, verificar os resultados armazenados temporiamente e armazenar no mesmo destino do item 3 (3. Cargas em Batches) aqueles resultados que ainda não foram armazenados no destino (os dados carregados não podem ficar duplicados). E por fim, eliminar os dados temporários após a verificação e a eventual carga.

Ir para o arquivo 3-ada-projeto-webhook

### Carregamento de dados temporários

Ir para o arquivo 4-ada-projeto-load-temp-data

### Enviando uma requisição

In [0]:
# URL da API webhook
url = "http://localhost:5000/webhook"

# Dados a serem enviados na requisição
data = {"message": "Executar"}

# Enviando a requisição POST para a API
response = requests.post(url, json=data)

# Verificando o status da resposta
if response.status_code == 200:
    # Se o status é 200, a requisição foi bem-sucedida
    print("Webhook enviado com sucesso.")
else:
    # Caso contrário, houve um erro na requisição
    print("Erro ao enviar webhook.")

Webhook enviado com sucesso.
