In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, udf, pandas_udf
from pyspark.sql.types import StringType, ArrayType
import trafilatura
import spacy
from transformers import pipeline
import pandas as pd

# Inicia SparkSession
spark = SparkSession.builder.appName("EnriquecimentoSilver").getOrCreate()

# Caminhos
bronze_path = "s3://portfolio-projeto-cinco/datalake/bronze/source=bbc/"
silver_path = "s3://portfolio-projeto-cinco/datalake/silver/"

# Carrega dados da Bronze
df = spark.read.parquet(bronze_path)

# Baixa modelo spaCy
import spacy.cli
spacy.cli.download("en_core_web_sm")
nlp = spacy.load("en_core_web_sm")

# Carrega pipelines HuggingFace
summarizer = pipeline("summarization", model="sshleifer/distilbart-cnn-12-6")
sentiment = pipeline("sentiment-analysis")

# Função para extrair texto da notícia
@udf(StringType())
def extrair_texto_udf(link):
    try:
        downloaded = trafilatura.fetch_url(link)
        if downloaded:
            return trafilatura.extract(downloaded, include_comments=False, include_tables=False)
        return None
    except:
        return None

# Extrai texto
df = df.withColumn("article_text", extrair_texto_udf(col("link")))

# Funções auxiliares em Pandas UDF
@pandas_udf(ArrayType(StringType()))
def extrair_entidades_pandas(textos: pd.Series, tipo: str = "GPE") -> pd.Series:
    return textos.apply(lambda x: list({ent.text for ent in nlp(x).ents if ent.label_ == tipo}) if x else [])

@pandas_udf(StringType())
def resumir_texto_pandas(textos: pd.Series) -> pd.Series:
    def resumir(t):
        if not t or len(t) < 100:
            return ""
        partes = [t[i:i+1000] for i in range(0, len(t), 1000)]
        resumos = []
        for parte in partes:
            try:
                resumo = summarizer(parte, max_length=40, min_length=10, do_sample=False)
                resumos.append(resumo[0]['summary_text'])
            except:
                continue
        return " ".join(resumos)
    return textos.apply(resumir)

@pandas_udf(StringType())
def sentimento_pandas(textos: pd.Series) -> pd.Series:
    return textos.apply(lambda x: sentiment(x[:512])[0]["label"] if x else "NEUTRAL")

# Aplica enriquecimentos
df = df.withColumn("locations", extrair_entidades_pandas(col("article_text")))
df = df.withColumn("people", extrair_entidades_pandas(col("article_text"), "PERSON"))
df = df.withColumn("text_summary", resumir_texto_pandas(col("article_text")))
df = df.withColumn("overall_feeling", sentimento_pandas(col("article_text")))

# Cria partição de data
df = df.withColumn("date", to_date(col("timestamp")))

# Escreve na Silver
(
    df.write
    .mode("append")
    .partitionBy("source", "date")
    .parquet(silver_path)
)

print("✅ Dados enriquecidos salvos na camada Silver.")


In [None]:
import boto3
import pandas as pd
import pyarrow.parquet as pq
import pyarrow as pa
import json
from datetime import datetime
import io

# Configurações
bucket = "portfolio-projeto-cinco"
log_prefix = "datalake/logs/bronze_ingest_log/"
bronze_prefix = "datalake/bronze/"
checkpoint_path = "datalake-checkpoints/bbc_silver.json"

s3 = boto3.client("s3")

# Carrega o checkpoint da Silver
def get_last_silver_checkpoint():
    try:
        obj = s3.get_object(Bucket=bucket, Key=checkpoint_path)
        data = json.loads(obj['Body'].read().decode("utf-8"))
        return datetime.fromisoformat(data["last_processed_at"])
    except:
        return datetime(2000, 1, 1)

# Salva o novo checkpoint
def update_silver_checkpoint():
    novo_checkpoint = datetime.utcnow().isoformat()
    s3.put_object(
        Bucket=bucket,
        Key=checkpoint_path,
        Body=json.dumps({"last_processed_at": novo_checkpoint})
    )

# Lista os arquivos válidos da Bronze usando o log
def listar_arquivos_bronze():
    last_checkpoint = get_last_silver_checkpoint()
    paginator = s3.get_paginator("list_objects_v2")
    pages = paginator.paginate(Bucket=bucket, Prefix=log_prefix)

    arquivos_validos = []
    for page in pages:
        for obj in page.get("Contents", []):
            if not obj["Key"].endswith(".parquet"):
                continue
            buffer = io.BytesIO()
            s3.download_fileobj(bucket, obj["Key"], buffer)
            buffer.seek(0)
            table = pq.read_table(buffer)
            df = table.to_pandas()
            
            print(f"{len(df)} atualizações encontradas")

            df_filtrado = df[
                (df["source"] == "bbc") &
                (df["status"] == "success") &
                (pd.to_datetime(df["processed_at"]) > last_checkpoint)
            ]
            arquivos_validos.extend(df_filtrado["path"].tolist())

    return arquivos_validos

# Exemplo de leitura dos arquivos da bronze filtrados
def carregar_dados_bronze():
    arquivos = listar_arquivos_bronze()
    if not arquivos:
        print("Nenhum novo arquivo para processar.")
        return pd.DataFrame()

    dfs = []
    for path in arquivos:
        buffer = io.BytesIO()
        s3.download_fileobj(bucket, path.replace(f"s3://{bucket}/", ""), buffer)
        buffer.seek(0)
        table = pq.read_table(buffer)
        dfs.append(table.to_pandas())

    df_bronze = pd.concat(dfs, ignore_index=True)
    return df_bronze

# Execução principal
df = carregar_dados_bronze()
if not df.empty:
    print(f"🔍 {len(df)} registros carregados da Bronze.")
    update_silver_checkpoint()
    print("Checkpoint atualizado.")
else:
    print("Nenhum dado carregado.")

In [None]:
# !pip install trafilatura spacy transformers keybert; python -m spacy download en_core_web_sm
import trafilatura
from keybert import KeyBERT
from transformers import pipeline
import spacy
import warnings

# Supressão de warnings de parsing XML
warnings.filterwarnings("ignore", category=UserWarning)

# Carregamento de modelos
nlp = spacy.load("en_core_web_sm")
summarizer = pipeline("summarization", model="sshleifer/distilbart-cnn-12-6")
sentiment = pipeline("sentiment-analysis")

def extrair_texto(link):
    try:
        baixado = trafilatura.fetch_url(link)
        if baixado:
            return trafilatura.extract(baixado, include_comments=False, include_tables=False)
        return ""
    except Exception as e:
        print(f"Erro ao extrair texto de {link}: {e}")
        return ""

def extrair_entidades(texto, tipo):
    try:
        doc = nlp(texto)
        return list(set(ent.text for ent in doc.ents if ent.label_ == tipo))
    except Exception:
        return []

def resumir_texto(texto, max_chunk_chars=1000):
    if not texto or len(texto) < 100:
        return ""
    partes = [texto[i:i+max_chunk_chars] for i in range(0, len(texto), max_chunk_chars)]
    resumos = []
    for parte in partes:
        try:
            out = summarizer(parte, max_length=40, min_length=10, do_sample=False)
            resumos.append(out[0]["summary_text"])
        except Exception:
            continue
    return " ".join(resumos)

def sentimento_principal(texto):
    try:
        return sentiment(texto[:512])[0]["label"]
    except Exception:
        return "NEUTRAL"

def enriquecer_dataframe(df):
    df["article_text"] = df["link"].apply(extrair_texto)
    df["people"] = df["article_text"].apply(lambda x: extrair_entidades(x, "PERSON"))
    df["locations"] = df["article_text"].apply(lambda x: extrair_entidades(x, "GPE"))
    df["text_summary"] = df["article_text"].apply(resumir_texto)
    df["overall_feeling"] = df["article_text"].apply(sentimento_principal)
    return df


df_enriquecido = enriquecer_dataframe(df)


In [None]:
import boto3
import pandas as pd
import pyarrow.parquet as pq
import pyarrow as pa
import json
from datetime import datetime
import io
import openai
import json
import pandas as pd
import trafilatura
import os

# from dotenv import load_dotenv
# load_dotenv()


# Configurações
bucket = "portfolio-projeto-cinco"
log_prefix = "datalake/logs/bronze_ingest_log/"
bronze_prefix = "datalake/bronze/"
checkpoint_path = "datalake-checkpoints/bbc_silver.json"

s3 = boto3.client("s3")

# Carrega o checkpoint da Silver
def get_last_silver_checkpoint():
    try:
        obj = s3.get_object(Bucket=bucket, Key=checkpoint_path)
        data = json.loads(obj['Body'].read().decode("utf-8"))
        return datetime.fromisoformat(data["last_processed_at"])
    except:
        return datetime.min

# Salva o novo checkpoint
def update_silver_checkpoint():
    novo_checkpoint = datetime.utcnow().isoformat()
    s3.put_object(
        Bucket=bucket,
        Key=checkpoint_path,
        Body=json.dumps({"last_processed_at": novo_checkpoint})
    )

# Lista os arquivos válidos da Bronze usando o log
def listar_arquivos_bronze():
    last_checkpoint = get_last_silver_checkpoint()
    paginator = s3.get_paginator("list_objects_v2")
    pages = paginator.paginate(Bucket=bucket, Prefix=log_prefix)

    arquivos_validos = []
    for page in pages:
        for obj in page.get("Contents", []):
            if not obj["Key"].endswith(".parquet"):
                continue
            buffer = io.BytesIO()
            s3.download_fileobj(bucket, obj["Key"], buffer)
            buffer.seek(0)
            table = pq.read_table(buffer)
            df = table.to_pandas()

            df_filtrado = df[
                (df["source"] == "bbc") &
                (df["status"] == "success") &
                (pd.to_datetime(df["processed_at_ts"]) > last_checkpoint)
            ]
            arquivos_validos.extend(df_filtrado["path"].tolist())

    return arquivos_validos

# Exemplo de leitura dos arquivos da bronze filtrados
def carregar_dados_bronze():
    arquivos = listar_arquivos_bronze()
    if not arquivos:
        print("Nenhum novo arquivo para processar.")
        return pd.DataFrame()

    dfs = []
    for path in arquivos:
        buffer = io.BytesIO()
        s3.download_fileobj(bucket, path.replace(f"s3://{bucket}/", ""), buffer)
        buffer.seek(0)
        table = pq.read_table(buffer)
        dfs.append(table.to_pandas())

    df_bronze = pd.concat(dfs, ignore_index=True)
    return df_bronze

# Execução principal
df = carregar_dados_bronze()
if not df.empty:
    print(f"🔍 {len(df)} registros carregados da Bronze.")
    update_silver_checkpoint()
    print("✅ Checkpoint atualizado.")
else:
    print("⚠️ Nenhum dado carregado.")
    
prompt_template = """
A partir do texto abaixo, extraia as seguintes informações:
- Um resumo em uma frase.
- O sentimento principal entre: POSITIVO, NEGATIVO ou NEUTRO.
- Lista de locais mencionados, incluindo países, estados e cidades.
- Lista de pessoas mencionadas.

Texto:
\"\"\"
{texto}
\"\"\"

Retorne no seguinte formato JSON:
{{
  "resumo": "...",
  "sentimento": "...",
  "locais": [...],
  "pessoas": [...]
}}
"""

def extrair_texto(link):
    try:
        baixado = trafilatura.fetch_url(link)
        if baixado:
            return trafilatura.extract(baixado, include_comments=False, include_tables=False)
        return ""
    except Exception as e:
        print(f"Erro ao extrair texto de {link}: {e}")
        return ""
    
def enriquecer_artigo(texto):
    prompt = prompt_template.format(texto=texto)  # corta para economizar tokens
    client = openai.OpenAI(api_key=os.getenv('TOKEN_OPENAI'))


    try:
        response = client.chat.completions.create(
            model="gpt-4.1-mini-2025-04-14",
            messages=[
                {"role": "user", "content": prompt}
            ],
            temperature=0.3,
            max_tokens=300
        )
        result = (
            response
            .choices[0]
            .message
            .content
            .replace("```", '')
            .replace('json', '')
        )
        
        return json.loads(result)
    except Exception as e:
        print(f"❌ Erro no enriquecimento: {e}")
        return {
            "resumo": None,
            "sentimento": None,
            "locais": [],
            "pessoas": []
        }

df["article_text"] = df["link"].apply(extrair_texto)
df['genai_output'] = df["article_text"].apply(enriquecer_artigo)
df_exploded = pd.json_normalize(df["genai_output"])
df_final = pd.concat([df.drop(columns=["genai_output"]), df_exploded], axis=1)

# Definições de timestamp e source do dataframe
df_final["processed_at"] = datetime.utcnow()
df_final["date"] = df_final["processed_at"].dt.strftime("%Y-%m-%d")
df_final["source"] = "bbc"  # garantir compatibilidade

for date, group in df_final.groupby("date"):
    table = pa.Table.from_pandas(group)
    buffer = BytesIO()
    pq.write_table(table, buffer)
    buffer.seek(0)

    filename = f"bbc_silver_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.parquet"
    target_key = f"datalake/silver/source=bbc/date={date}/{filename}"
    
    s3.upload_fileobj(buffer, Bucket=bucket, Key=target_key)
    print(f"✅ Arquivo escrito em: s3://{bucket}/{target_key}")



In [65]:
def extrair_texto(link):
    try:
        baixado = trafilatura.fetch_url(link)
        if baixado:
            return trafilatura.extract(baixado, include_comments=False, include_tables=False)
        return ""
    except Exception as e:
        print(f"Erro ao extrair texto de {link}: {e}")
        return ""


df_teste = pd.read_parquet('/home/omarcocaja/Desktop/portfolio/projeto_5_webscraping/dev-s3/portfolio-projeto-cinco/datalake/bronze/source=bbc/date=2025-06-12/bbc_2025_06_12_10_42_08_downtown_la_under_curfew_for_second_night_after_days_of_protests.parquet')

df_teste["article_text"] = df_teste["link"].apply(extrair_texto)

In [70]:
df_teste['genai_output'] = df_teste["article_text"].apply(enriquecer_artigo)

In [73]:
df_exploded = pd.json_normalize(df_teste["genai_output"])


In [76]:
df_final = pd.concat([df_teste.drop(columns=["genai_output"]), df_exploded], axis=1)

In [77]:
df_final

Unnamed: 0,title,description,link,pubDate,guid,source,timestamp,date,article_text,resumo,sentimento,locais,pessoas
0,Downtown LA under curfew for second night afte...,LA Mayor Karen Bass has accused federal author...,https://www.bbc.com/news/articles/cn7z45pyrvvo,"Thu, 12 Jun 2025 07:09:16 GMT",https://www.bbc.com/news/articles/cn7z45pyrvvo#0,bbc,2025-06-12 07:09:16+00:00,2025-06-12,Downtown LA under curfew for second night afte...,Los Angeles enfrenta toque de recolher e forte...,NEGATIVO,"[Los Angeles, Downtown LA, Estados Unidos, Cal...","[Karen Bass, Donald Trump, Jim McDonnell, Pam ..."
