## Processing Raw Data

In [15]:
import re
import nltk
from nltk.corpus import stopwords
from pyspark.sql.functions import *
from pyspark.sql.types import *
nltk.download('stopwords', quiet=True)

StatementMeta(, ef1098ed-91b1-4951-bfcb-320f2cbecb22, 19, Finished, Available, Finished, False)

True

In [16]:
import spacy

nlp = spacy.load("en_core_web_sm")

StatementMeta(, ef1098ed-91b1-4951-bfcb-320f2cbecb22, 20, Finished, Available, Finished, False)

In [17]:
# STOPWORDS
STOPWORDS_EN = set(stopwords.words('english'))
CUSTOM_STOPWORDS = {
    'said', 'new', 'first', 'will', 'also', 'says',
    'may', 'could', 'would', 'one', 'two', 'three',
    'year', 'years', 'day', 'days', 'time', 'week',
    'month', 'months', 'get', 'make', 'take', 'go', 'appeared', 'post', 'after'
}
ALL_STOPWORDS = STOPWORDS_EN.union(CUSTOM_STOPWORDS)

# KEYWORDS
TOPIC_KEYWORDS_backup = {
    "Mars": ["mars"],
    "Moon": ["moon", "lunar", "artemis", "apollo", "selene"],
    "Space Launches": ["launch", "liftoff", "launched", "launching", "flight", "launches","space", "missions", "mission"],
    "ISS": ["iss", "international", "station", "lab"],
    "SpaceX": ["spacex", "falcon", "dragon", "starship", "starlink", "elon"],
    "NASA": ["nasa", "astronaut", "astronauts", "nasa's", "center", "agency", "crew"],
    "Satellite": ["satellite", "satellites", "orbit", "orbital", "constellation"],
    "Rocket": ["rocket", "booster", "engine", "propulsion"],
    "Telescope": ["telescope", "webb", "hubble", "jwst", "observatory"],
    "China": ["china", "cnsa", "tiangong", "long march"],
    "Russia": ["russia", "roscosmos", "soyuz", "progress"],
    "Europe": ["esa", "european", "ariane"],
    "Commercial": ["commercial", "private", "tourism", "blue origin", "virgin galactic"]
}

TOPIC_KEYWORDS = {
    "Launches": [
        "launch", "liftoff", "launched", "launching", "rocket launch",
        "maiden flight", "test flight", "orbital launch", "mission", "missions"
    ],
    "Spacecraft": [
        "spacecraft", "capsule", "vehicle", "starship", "dragon",
        "crew dragon", "cargo dragon", "soyuz", "starliner"
    ],
    "Rockets": [
        "rocket", "booster", "falcon", "ariane", "atlas",
        "delta", "electron", "vulcan", "long march", "sls"
    ],
    "ISS": [
        "iss", "international space station", "space station",
        "orbital laboratory", "spacewalk", "eva"
    ],
    "Moon": [
        "moon", "lunar", "artemis", "apollo", "selene",
        "south pole"
    ],
    "Mars": [
        "mars", "red planet", "martian"
    ],
    "Satellites": [
        "satellite", "satellites", "constellation", "starlink",
        "orbit", "orbital", "leo", "geostationary"
    ],
    "Space_Tourism": [
        "blue origin", "virgin galactic", "axiom space",
        "space tourist", "tourism", "suborbital", "private astronaut", "commercial"
    ],
    "NASA": [
        "nasa", "kennedy space center", "jpl", "johnson space center",
        "astronaut", "astronauts", "nasa's", "canaveral", "usa"
    ],
    "International": [
        "esa", "jaxa", "roscosmos", "cnsa", "isro",
        "european space agency", "arianespace", "china", "russia"
    ],
    "Science": [
        "telescope", "hubble", "webb", "jwst", "observatory",
        "exoplanet", "astronomy", "research", "discovery"
    ],
    "Technology": [
        "propulsion", "engine", "technology", "innovation",
        "reusable", "landing", "test", "development"
    ]
}


StatementMeta(, ef1098ed-91b1-4951-bfcb-320f2cbecb22, 21, Finished, Available, Finished, False)

In [18]:
# GENERAR TABLA TOPIC
rows = []
keyword_id = 1

for category, keywords in TOPIC_KEYWORDS.items():
    for kw in keywords:
        rows.append((keyword_id, kw, category))
        keyword_id += 1

rows.append((keyword_id, "general", "General"))

topic_df = spark.createDataFrame(rows, ["keyword_id", "keyword", "category"])
topic_df.write.mode("overwrite").format("delta").saveAsTable("topic")
print(f"Tabla topic creada: {topic_df.count()} keywords en {len(TOPIC_KEYWORDS) + 1} categorías")

StatementMeta(, ef1098ed-91b1-4951-bfcb-320f2cbecb22, 22, Finished, Available, Finished, False)

Tabla topic creada: 96 keywords en 13 categorías


In [19]:
# FUNCIÓN DE EXTRACCIÓN DE ENTIDADES CON SPACY
# ============================================

def extract_entities_spacy(text):
    """
    Extrae entidades usando spaCy.
    """
    if not text or len(text) < 5:
        return {"organizations": [], "persons": [], "locations": []}
    
    # Limitar texto para eficiencia
    text = text[:2000]
    
    organizations = []
    persons = []
    locations = []
    
    try:
        # Procesar texto con spaCy
        doc = nlp(text)
        
        # Extraer entidades reconocidas
        for ent in doc.ents:
            if ent.label_ == "ORG":
                organizations.append(ent.text)
            elif ent.label_ == "PERSON":
                persons.append(ent.text)
            elif ent.label_ in ["GPE", "LOC"]:
                locations.append(ent.text)
    
    except Exception as e:
        print(f"Error en spaCy NER: {e}")
        pass

    # Lista de compañías espaciales clave que deben ser detectadas
    space_companies = [
        "spacex", "virgin galactic", "roscosmos", "arianespace",
        "united launch alliance", "ula", "rocket lab", "astra",
        "relativity space", "axiom space", "northrop grumman",
        "lockheed martin", "boeing", "airbus"
    ]
    
    text_lower = text.lower()
    
    # Buscar cada compañía con word boundaries
    for company in space_companies:
        pattern = r'\b' + re.escape(company) + r'\b'
        if re.search(pattern, text_lower):
            # Capitalizar correctamente
            company_capitalized = company.title()
            # Agregar solo si no está ya en la lista
            if company_capitalized not in organizations:
                organizations.append(company_capitalized)
 
    # Eliminar duplicados
    return {
        "organizations": list(set(organizations)),
        "persons": list(set(persons)),
        "locations": list(set(locations))
    }

print("Función de extracción de entidades creada")

entity_schema = StructType([
    StructField("organizations", ArrayType(StringType()), True),
    StructField("persons", ArrayType(StringType()), True),
    StructField("locations", ArrayType(StringType()), True)
])

def extract_entities_wrapper(text):
    """Wrapper para UDF de Spark."""
    result = extract_entities_spacy(text)
    return (result["organizations"], result["persons"], result["locations"])

extract_entities_udf = udf(extract_entities_wrapper, entity_schema)


StatementMeta(, ef1098ed-91b1-4951-bfcb-320f2cbecb22, 23, Finished, Available, Finished, False)

Función de extracción de entidades creada


In [20]:
# FUNCIÓN OPTIMIZADA CON CARGA INCREMENTAL
def process_table(table_name, silver_name, topic_df):
    """
    Procesa tabla Bronze → Silver con CARGA INCREMENTAL:
    - Solo procesa registros nuevos (no reprocesa histórico)
    - keyword_id (clasificación de tema)
    - companies (organizaciones detectadas)
    - people (personas detectadas)
    - places (lugares detectados)
    """
    print(f"\n{'='*70}")
    print(f"Procesando: {table_name} → {silver_name}")
    print(f"{'='*70}")
    
    # Leer Bronze
    df_bronze = spark.read.table(table_name)
    initial_count = df_bronze.count()
    print(f"Registros totales en Bronze: {initial_count:,}")
    
    # Identificar registros nuevos
    try:
        df_silver_existing = spark.read.table(silver_name)
        existing_ids = df_silver_existing.select("id").distinct()
        
        # Left anti join — solo registros que NO están en silver
        df = df_bronze.join(existing_ids, "id", "left_anti")
        
        new_count = df.count()
        print(f"Registros nuevos a procesar: {new_count:,}")
        
        if new_count == 0:
            print("✓ No hay registros nuevos — omitiendo procesamiento")
            return
    
    except Exception as e:
        # Primera ejecución — la tabla silver no existe
        print(f"Primera carga detectada — procesando todos los registros")
        df = df_bronze
    
    # Filtrar registros válidos
    df = df.filter(
        col("published_at").isNotNull() &
        col("id").isNotNull() &
        col("title").isNotNull()
    )
    
    valid_count = df.count()
    print(f"Registros válidos: {valid_count:,}")
    
    if valid_count == 0:
        print("⚠ No hay registros válidos para procesar")
        return
    
    # Parsear a tipo fecha y campos adicionales para partición
    df = df.withColumn("published_at", to_date("published_at")) \
            .withColumn("updated_at", to_date("updated_at")) \
            .withColumn("year", year("published_at")) \
            .withColumn("month", month("published_at")) \
            .withColumn("date_key", date_format(col("published_at"), "yyyyMMdd").cast("int"))
    
    # PASO 1: CLASIFICACIÓN POR KEYWORD_ID
    # Crear full_text TEMPORAL (title cuenta doble)
    df_temp = df.withColumn(
        "full_text_temp",
        lower(concat_ws(" ", col("title"), col("title"), col("summary")))
    )
    
    # Filtrar nulls en full_text
    df_temp = df_temp.filter(col("full_text_temp").isNotNull() & (col("full_text_temp") != ""))
    
    # El DataFrame se reutiliza en clasificación y NER, por lo que se cachea
    df_temp.cache()
    cached_count = df_temp.count()
    print(f"Registros cacheados para procesamiento: {cached_count:,}")
    
    # Broadcast topic_df
    topic_broadcast = broadcast(topic_df)
    
    # Keywords ordenadas por longitud (más largas primero)
    topic_list = topic_broadcast.select("keyword_id", "keyword").collect()
    keyword_id_list = [
        (row.keyword_id, row.keyword.lower()) 
        for row in topic_list 
        if row.keyword != "general"
    ]
    keyword_id_list.sort(key=lambda x: len(x[1]), reverse=True)
    
    # ID de General
    general_id = topic_df.filter(col("category") == "General") \
        .select("keyword_id").collect()[0][0]
    
    # UDF de clasificación
    def get_primary_keyword_optimized(text):
        if not text:
            return general_id
        max_count = 0
        selected_id = general_id
        for kid, kw in keyword_id_list:
            pattern = r'\b' + re.escape(kw) + r'\b'
            matches = len(re.findall(pattern, text))
            if matches > max_count:
                max_count = matches
                selected_id = kid
        return selected_id
    
    udf_primary_keyword = udf(get_primary_keyword_optimized, IntegerType())
    
    # Aplicar clasificación
    df_with_keyword = df_temp.withColumn("keyword_id", udf_primary_keyword(col("full_text_temp")))
    
    # PASO 2: EXTRACCIÓN DE ENTIDADES (NER CON SPACY)
    # Crear texto para NER (title + summary sin duplicar)
    df_with_ner_text = df_with_keyword.withColumn(
        "ner_text_temp",
        concat_ws(" ", col("title"), col("summary"))
    )
    
    # Aplicar NER con spaCy
    df_with_entities = df_with_ner_text.withColumn(
        "entities_temp",
        extract_entities_udf(col("ner_text_temp"))
    )
    
    # Expandir struct en columnas separadas
    df_with_expanded = df_with_entities \
        .withColumn("organizations", col("entities_temp.organizations")) \
        .withColumn("related_people", col("entities_temp.persons")) \
        .withColumn("places", col("entities_temp.locations"))
    
    # ELIMINAR todas las columnas temporales antes de guardar
    df_result = df_with_expanded.drop("full_text_temp", "ner_text_temp", "entities_temp")
    
    # Transformar los campos con listas a strings separados por comas
    target_columns = ["organizations", "related_people", "places"]
    for col_name in target_columns:
        df_result = df_result.withColumn(
            col_name, 
            concat_ws(", ", col(col_name))
        )
    
    # Ya no se necesita el DataFrame cacheado, liberamos memoria
    df_temp.unpersist()
    
    # Guardar en silver con APPEND
    print(f"Guardando {df_result.count():,} registros en {silver_name}...")
    
    df_result.write \
        .mode("append") \
        .format("delta") \
        .partitionBy("year", "month") \
        .option("mergeSchema", "true") \
        .saveAsTable(silver_name)
    
    print(f"✓ Tabla {silver_name} actualizada exitosamente")

StatementMeta(, ef1098ed-91b1-4951-bfcb-320f2cbecb22, 24, Finished, Available, Finished, False)

In [21]:
silver_articles = process_table("bronze_articles", "silver_articles", topic_df)
silver_blogs = process_table("bronze_blogs", "silver_blogs", topic_df)
silver_reports = process_table("bronze_reports", "silver_reports", topic_df)

StatementMeta(, ef1098ed-91b1-4951-bfcb-320f2cbecb22, 25, Finished, Available, Finished, False)


Procesando: bronze_articles → silver_articles
Registros totales en Bronze: 32,192
Primera carga detectada — procesando todos los registros
Registros válidos: 32,192
Registros cacheados para procesamiento: 32,192
Guardando 32,192 registros en silver_articles...
✓ Tabla silver_articles actualizada exitosamente

Procesando: bronze_blogs → silver_blogs
Registros totales en Bronze: 1,884
Primera carga detectada — procesando todos los registros
Registros válidos: 1,884
Registros cacheados para procesamiento: 1,884
Guardando 1,884 registros en silver_blogs...
✓ Tabla silver_blogs actualizada exitosamente

Procesando: bronze_reports → silver_reports
Registros totales en Bronze: 1,415
Primera carga detectada — procesando todos los registros
Registros válidos: 1,415
Registros cacheados para procesamiento: 1,415
Guardando 1,415 registros en silver_reports...
✓ Tabla silver_reports actualizada exitosamente
