In [0]:

import requests
from pyspark.sql.functions import *
from delta.tables import DeltaTable
from pyspark.sql import functions as F


API_KEY = '5220c4535529484381b6e56b7f95b0aa'

In [0]:
#Fazer request dos dados

s_tema = "diabetes"
s_from = "2024-09-22"
s_to = "2024-10-17"
s_language = "pt"
s_sortBy = "publishedAt" # "publishedAt" or "popularity" or "relevancy"

url = ('https://newsapi.org/v2/everything?'
       f'q={s_tema}&'
       f'from={s_from}&'
       f'to={s_to}&'
       f'language={s_language}&'
       f'sortBy={s_sortBy}&'
       f'apiKey={API_KEY}')

print(url)
response = requests.get(url)

In [0]:
artigos = response.json()
print(artigos)

In [0]:
#gravar em um df spark
lista_dados = []

for artigo in artigos["articles"]:
  artigo["source"] = artigo["source"]["name"]
  lista_dados.append(artigo)

df = spark.createDataFrame(lista_dados)

In [0]:
df.count()

In [0]:
# delta_table = spark.read.format("delta").load("/FileStore/diabetes")
# display(delta_table)

In [0]:
delta_table_path = "/FileStore/diabetes"
if DeltaTable.isDeltaTable(spark, delta_table_path):
    #Merge de arquivos não duplicados

    # Carregar a tabela Delta com notícias previamente armazenadas
    delta_table = DeltaTable.forPath(spark, "/FileStore/diabetes")

    # Criar DataFrame com novas notícias
    novas_noticias_df = df

 # Usar o MERGE para inserir apenas as notícias que ainda não existem
    delta_table.alias("old").merge(
        novas_noticias_df.alias("new"),
        "old.url = new.url AND old.publishedAt = new.publishedAt"
    ).whenNotMatchedInsertAll().execute()

else:
    #Salvar em delta
    df.write.format("delta").save("/FileStore/diabetes")

In [0]:
delta_table = spark.read.format("delta").load("/FileStore/diabetes")

In [0]:
# Removendo o caractere "Z" do final da string e convertendo para timestamp
df = delta_table

df = df.withColumn("publishedAt", to_timestamp(regexp_replace("publishedAt", "Z$", ""), "yyyy-MM-dd'T'HH:mm:ss"))

# Extraindo o ano, mês e dia
df = df.withColumn("year", year("publishedAt")) \
       .withColumn("month", month("publishedAt")) \
       .withColumn("day", dayofmonth("publishedAt"))

In [0]:
display(df)

In [0]:
df.printSchema()

In [0]:
# 2. Contar o número de artigos por ano
artigos_por_ano = df.groupBy("year").count()
artigos_por_ano.show()

In [0]:
artigos_por_month = df.groupBy("month").count()
artigos_por_month.show()

In [0]:
artigos_por_month = df.groupBy("day").count()
artigos_por_month.show()

In [0]:
type(df)

In [0]:
# 4. Agrupar por `source` e contar o número de artigos por fonte
artigos_por_fonte = df.groupBy("source").count()
artigos_por_fonte.show()

In [0]:
import matplotlib.pyplot as plt

# Coletar os dados em uma lista ou transformar em Pandas DataFrame (se for PySpark DataFrame)
artigos_por_fonte_df = artigos_por_fonte.toPandas()  # Se estiver usando PySpark, caso contrário ignore esta linha

# Ordenar os dados pelo maior valor
artigos_por_fonte_df = artigos_por_fonte_df.sort_values(by='count', ascending=True)

# Plotar gráfico de barras horizontal
plt.figure(figsize=(10,6))
plt.barh(artigos_por_fonte_df['source'], artigos_por_fonte_df['count'], color='skyblue')
plt.xlabel('Número de Artigos')
plt.ylabel('Fonte (Source)')
plt.title('Número de Artigos por Fonte')
plt.tight_layout()

# Exibir gráfico
plt.show()


In [0]:
artigos_por_fonte = df.groupBy("author").count()
artigos_por_fonte.show()

In [0]:
display(df)

In [0]:

keywords = ["diabete", "nacion", "not"]

# Contar a quantidade de aparições das palavras-chave
for keyword in keywords:
    df = df.withColumn(f"{keyword}_count", (
        lower(col("content")).contains(keyword).cast("int") +
        lower(col("description")).contains(keyword).cast("int") +
        lower(col("title")).contains(keyword).cast("int")
    ))
resultado = df.groupBy("year", "month", "day") \
              .agg(
                  *[F.sum(f"{keyword}_count").alias(f"{keyword}_appearances") for keyword in keywords]
              )