In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType
import requests
import re
import logging
import time
import sys
import os
from urllib.parse import unquote
from datetime import datetime

sys.path.append(os.path.join(os.path.dirname(os.path.abspath('')), 'src'))

from utils.data_validation import valida_fonte, ler_fonte
from utils.table_operations import cria_tabela
from config.pipeline_config import configura_users_yt

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [0]:
config = configura_users_yt()

# Páginas podem ser redirecionadas, então definimos o nome do creator
# usando a API da Wikipedia para garantir que estamos pegando o nome correto
def define_creator_wiki(creator, config=config):
    try:
        url = config["api_endpoint"]
        params = {
            "action": "parse",
            "page": creator,
            "format": "json"
        }

        response = requests.get(url, params=params, timeout=10, headers=config['headers'])
        response.raise_for_status()
        data = response.json()
        text = data['parse']['text']['*']
        if 'Redirect to:' in text:
            m = re.search(r'href=[\'"]/wiki/([^\'"#?]+)', text)
            return unquote(m.group(1)) if m else None
        else:
            return creator
    except Exception as e:
        logger.error(f"Erro ao acessar a API da Wikipedia para definir o nome do creator {creator}: {e}")
        raise


def extrai_user_id_youtube(creator, config=config):
    try:
        url = config["api_endpoint"]
        params = {
            "action": "parse",
            "page": creator,
            "format": "json"
        }
        
        response = requests.get(url, params=params, timeout=10, headers=config["headers"])
        response.raise_for_status()
        data = response.json()
        
        if 'error' in data:
            logger.warning(f"Página não encontrada na Wikipedia: {creator}")
            return None
            
        if 'parse' not in data or 'text' not in data['parse']:
            logger.warning(f"Conteúdo não disponível para: {creator}")
            return None
            
        html_content = data['parse']['externallinks']
        
        youtube_patterns = r"https?://(?:www\.)?youtube\.com/(?:user/|c/|channel/|@)([a-zA-Z0-9_.-]+)"

        for item in html_content:
            matches = re.findall(youtube_patterns, item, re.IGNORECASE)
            if matches:
                valid_matches = [
                    match for match in matches 
                    if len(match) > 2 and 
                    match.lower() not in ['watch', 'embed', 'playlist', 'results', 'feed', 'trending']
                ]
                
                if valid_matches:
                    user_id = valid_matches[0]
                    logger.info(f"User ID encontrado para {creator}: {user_id}")
                    return user_id
                else:
                    logger.warning(f"Nenhum user_id do YouTube encontrado para: {creator}")
                    return None
        
    except requests.RequestException as e:
        logger.error(f"Erro na requisição para {creator}: {str(e)}")
        return None
    except Exception as e:
        logger.error(f"Erro inesperado ao processar {creator}: {str(e)}")
        return None


def processa_creators_batch(creators_list, config=config):
    results = []
    total_creators = len(creators_list)
    
    logger.info(f"Processando {total_creators} creators em lotes de {config['batch_size']}")
    
    for i in range(0, total_creators, config['batch_size']):
        batch = creators_list[i:i + config['batch_size']]
        batch_num = (i // config['batch_size']) + 1
        total_batches = (total_creators + config['batch_size'] - 1) // config['batch_size']
        
        logger.info(f"Processando lote {batch_num}/{total_batches}")
        
        for wiki_page in batch:
            real_user_id = define_creator_wiki(wiki_page)
            user_id = extrai_user_id_youtube(real_user_id)
            
            if user_id:
                results.append({
                    'wiki_page': wiki_page,
                    'user_id': user_id
                })
            
            time.sleep(0.5)
        
        if i + config['batch_size'] < total_creators:
            logger.info("Pausa entre lotes...")
            time.sleep(1)
    
    return results

In [0]:
logging.basicConfig(level=getattr(logging, config["log_level"]))
logger = logging.getLogger(__name__)

logger.info(f"Pipeline iniciado em {config['processing_timestamp']}")
logger.info(f"Fonte: {config['source']}")
logger.info(f"Endpoint da API: {config['api_endpoint']}")

if not valida_fonte(spark, config):
    raise FileNotFoundError(f"Tabela fonte não encontrada: {config['source']}")

creators_df = ler_fonte(spark, config)
logger.info(f"Tabela {config['source']} carregada com sucesso")

creators_list = [row.wiki_page for row in creators_df.select("wiki_page").collect()]
logger.info(f"Total de creators para processar: {len(creators_list)}")

INFO:__main__:Pipeline iniciado em 2025-08-19 23:55:16.348699
INFO:__main__:Fonte: workspace.default.bronze_creators_scrape_wiki
INFO:__main__:Endpoint da API: https://en.wikipedia.org/w/api.php
INFO:utils.data_validation:Fonte validada: workspace.default.bronze_creators_scrape_wiki
INFO:__main__:Tabela workspace.default.bronze_creators_scrape_wiki carregada com sucesso
INFO:__main__:Total de creators para processar: 10


In [0]:
results = processa_creators_batch(creators_list, config)

logger.info(f"Processamento concluído. User IDs encontrados: {len(results)}")
logger.info(f"Taxa de sucesso: {len(results)}/{len(creators_list)} ({len(results)/len(creators_list)*100:.1f}%)")

INFO:__main__:Processando 10 creators em lotes de 3
INFO:__main__:Processando lote 1/4
INFO:__main__:User ID encontrado para KondZilla: KondZilla
INFO:__main__:User ID encontrado para Luccas_Neto: luccasneto
INFO:__main__:User ID encontrado para Pirulla: Pirulla25
INFO:__main__:Pausa entre lotes...
INFO:__main__:Processando lote 2/4
INFO:__main__:User ID encontrado para Cocomelon: UCbCmjCuTUZos6Inko4u57UQ
INFO:__main__:User ID encontrado para Porta_dos_Fundos: portadosfundos
INFO:__main__:User ID encontrado para Felipe_Neto: felipeneto
INFO:__main__:Pausa entre lotes...
INFO:__main__:Processando lote 3/4
INFO:__main__:User ID encontrado para PewDiePie: PewDiePie
INFO:__main__:User ID encontrado para Judson_Laipply: judsonlaipply
INFO:__main__:User ID encontrado para Ray_William_Johnson: RayWilliamJohnson
INFO:__main__:Pausa entre lotes...
INFO:__main__:Processando lote 4/4
INFO:__main__:User ID encontrado para Zoe_Sugg: zoella280390
INFO:__main__:Processamento concluído. User IDs encon

In [0]:
if results:
    schema = StructType([
        StructField("user_id", StringType(), True),
        StructField("wiki_page", StringType(), True)
    ])
    
    users_yt_df = spark.createDataFrame(results, schema)
    
    logger.info("Exemplos de dados extraídos:")
    users_yt_df.show(10, truncate=False)
    
    total_rows = users_yt_df.count()
    logger.info(f"Total de registros criados: {total_rows}")
    
else:
    logger.warning("Nenhum user_id foi extraído com sucesso")
    schema = StructType([
        StructField("user_id", StringType(), True),
        StructField("wiki_page", StringType(), True)
    ])
    users_yt_df = spark.createDataFrame([], schema)

INFO:__main__:Exemplos de dados extraídos:


+------------------------+-------------------+
|user_id                 |wiki_page          |
+------------------------+-------------------+
|KondZilla               |KondZilla          |
|luccasneto              |Luccas_Neto        |
|Pirulla25               |Pirula_(YouTuber)  |
|UCbCmjCuTUZos6Inko4u57UQ|Cocomelon          |
|portadosfundos          |Porta_dos_Fundos   |
|felipeneto              |Felipe_Neto        |
|PewDiePie               |PewDiePie          |
|judsonlaipply           |Judson_Laipply     |
|RayWilliamJohnson       |Ray_William_Johnson|
|zoella280390            |Zoe_Sugg           |
+------------------------+-------------------+



INFO:__main__:Total de registros criados: 10


In [0]:
config.update({
    "layer": "bronze",
    "total_rows": users_yt_df.count(),
    "column_count": len(users_yt_df.columns),
    "has_data": users_yt_df.count() > 0
})

try:
    success = cria_tabela(spark, users_yt_df, config)
    
    if success:
        logger.info(f"Tabela {config['catalog_name']}.{config['schema_name']}.{config['table_name']} criada com sucesso!")
        logger.info(f"Total de registros salvos: {config['total_rows']}")
        
    else:
        raise Exception("Falha na criação da tabela")
        
except Exception as e:
    logger.error(f"Erro ao criar tabela: {str(e)}")
    raise

INFO:utils.table_operations:Iniciada a criação da tabela delta na camada bronze: workspace.default.users_yt
INFO:utils.table_operations:Metadados da tabela - Camada: bronze, Salvo em: workspace.default.users_yt
INFO:utils.table_operations:Tabela criada em 2025-08-19T23:55:16.348699
INFO:utils.table_operations:Linhas: 10
INFO:utils.table_operations:Colunas: 2
INFO:utils.table_operations:Batch ID: 5535d59d
INFO:utils.table_operations:Contém dados: True
INFO:utils.table_operations:Tabela criada com sucesso: workspace.default.users_yt
INFO:utils.table_operations:Comentário com metadados adicionado com sucesso
INFO:__main__:Tabela workspace.default.users_yt criada com sucesso!
INFO:__main__:Total de registros salvos: 10
