In [0]:

spark.sql("USE CATALOG spark_catalog")
spark.sql("USE DATABASE default")


In [0]:
# ---------------------------------------------------------
# ✅ 1️⃣ Mejora final 2
# ---------------------------------------------------------
# ---------------------------------------------------------
# ✅ 1️⃣ Widgets y variables
# ---------------------------------------------------------
dbutils.widgets.text("source_table", "pubmed_affiliaciones", "Tabla RAW")
dbutils.widgets.text("ror_table", "unirdat.pubmed_db.n_affiliaciones", "Tabla ROR Delta")
dbutils.widgets.text("output_table", "unirdat.pubmed_db.m_autor_afiliacion_ror_normalized_final_2", "Tabla final")
dbutils.widgets.text("chunk_size", "50", "Autores por chunk")

source_table = dbutils.widgets.get("source_table")
ror_table = dbutils.widgets.get("ror_table")
output_table = dbutils.widgets.get("output_table")
chunk_size = int(dbutils.widgets.get("chunk_size"))

print(f"Source: {source_table}")
print(f"ROR: {ror_table}")
print(f"Output: {output_table}")
print(f"Chunk size: {chunk_size}")

# ---------------------------------------------------------
# ✅ 2️⃣ Librerías
# ---------------------------------------------------------
from pyspark.sql.functions import col, lower, trim, regexp_replace, monotonically_increasing_id, udf
from pyspark.sql.types import BooleanType
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

from pyspark.ml.feature import Tokenizer, NGram, HashingTF, MinHashLSH

# ---------------------------------------------------------
# ✅ 3️⃣ UDF para filtrar vectores no vacíos
# ---------------------------------------------------------
def vector_nonzero(v):
    if v is None:
        return False
    try:
        return v.numNonzeros() > 0
    except:
        return False

nonzero_udf = udf(vector_nonzero, BooleanType())

# ---------------------------------------------------------
# ✅ 4️⃣ Cargar tablas base y limpiar ROR
# ---------------------------------------------------------
df_all = spark.table(source_table).filter(col("nombre_completo")=="agostino pierro")
df_ror = spark.table(ror_table)

df_ror_clean = df_ror.withColumn(
    "alias_clean",
    trim(regexp_replace(lower(col("aliases")), "[^a-zA-Z0-9 ]", ""))
).filter(
    (col("alias_clean").isNotNull()) & (col("alias_clean") != "")
)

# ---------------------------------------------------------
# ✅ 5️⃣ Obtener autores únicos
# ---------------------------------------------------------
autores = [row["nombre_completo"] for row in df_all.select("nombre_completo").distinct().collect()]
print(f"Total autores únicos: {len(autores)}")

# ---------------------------------------------------------
# ✅ 6️⃣ Procesar por chunk de autores
# ---------------------------------------------------------
for i in range(0, len(autores), chunk_size):
    batch = autores[i:i+chunk_size]
    print(f"\n=== Procesando autores {i} → {i+len(batch)-1}: {batch} ===")

    # 🔹 6.1 Filtrar chunk
    df_affil_ini = df_all.filter(col("nombre_completo").isin(batch)) \
                         .select("nombre_completo", "affiliation").distinct()

    # 🔹 6.2 Limpiar texto
    df_affil_clean = df_affil_ini.withColumn(
        "affiliation_clean",
        trim(regexp_replace(lower(col("affiliation")), "[^a-zA-Z0-9 ]", ""))
    ).filter(
        (col("affiliation_clean").isNotNull()) & (col("affiliation_clean") != "")
    )

    df_affil_clean = df_affil_clean.withColumn("id_aff", monotonically_increasing_id())

    # 🔹 6.3 Exact match
    df_exact = df_affil_clean.join(
        df_ror_clean,
        df_affil_clean.affiliation_clean == df_ror_clean.alias_clean,
        "left"
    ).withColumn(
        "affiliation_normalized",
        col("name")
    ).withColumn(
        "ror_id",
        col("id")
    )

    df_exact_ok = df_exact.filter(col("ror_id").isNotNull()) \
        .select("nombre_completo", "id_aff", "affiliation", "ror_id", "affiliation_normalized")

    df_no_match = df_exact.filter(col("ror_id").isNull()) \
        .select("id_aff", "nombre_completo", "affiliation", "affiliation_clean") \
        .persist()

    # 🔹 6.4 Fuzzy match solo si hay sin match
    if df_no_match.count() > 0:
        tokenizer = Tokenizer(inputCol="affiliation_clean", outputCol="words")
        df_affil_tokens = tokenizer.transform(df_no_match)

        tokenizer_ror = Tokenizer(inputCol="alias_clean", outputCol="words")
        df_ror_tokens = tokenizer_ror.transform(df_ror_clean)

        hashingTF = HashingTF(inputCol="words", outputCol="features", numFeatures=2000)
        df_affil_features = hashingTF.transform(df_affil_tokens)
        df_ror_features = hashingTF.transform(df_ror_tokens)

        # ✅ Filtrar vectores vacíos en ambos lados
        df_affil_features_filtered = df_affil_features.filter(is_nonzero_udf("features"))
        df_ror_features_filtered = df_ror_features.filter(is_nonzero_udf("features"))

        lsh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=10)
        lsh_model = lsh.fit(df_ror_features_filtered)

        df_matches = lsh_model.approxSimilarityJoin(
            df_affil_features_filtered,
            df_ror_features_filtered,
            0.8,
            distCol="JaccardDistance"
        ).select(
            col("datasetA.id_aff"),
            col("datasetA.nombre_completo"),
            col("datasetA.affiliation"),
            col("datasetB.id").alias("ror_id_fuzzy"),
            col("datasetB.name").alias("name_fuzzy"),
            col("JaccardDistance")
        )

        w = Window.partitionBy("id_aff").orderBy(col("JaccardDistance").asc())
        df_best_matches = df_matches.withColumn(
            "rn",
            row_number().over(w)
        ).filter(col("rn") == 1).drop("rn")

        df_fuzzy_ok = df_best_matches.withColumnRenamed("ror_id_fuzzy", "ror_id") \
            .withColumnRenamed("name_fuzzy", "affiliation_normalized") \
            .select("nombre_completo", "id_aff", "affiliation", "ror_id", "affiliation_normalized")



    else:
        # Dataframe vacío compatible si no hay sin match
        df_fuzzy_ok = spark.createDataFrame([], df_exact_ok.schema)

    # 🔹 6.5 Combinar exact + fuzzy
    df_final = df_exact_ok.unionByName(df_fuzzy_ok)

    # 🔹 6.6 Guardar chunk incremental con mergeSchema
    df_final.write.mode("append").format("delta") \
        .option("mergeSchema", "true") \
        .saveAsTable(output_table)

    print(f"✅ Chunk {i} → {i+len(batch)-1} guardado OK")

print("\n🎉 TODOS LOS CHUNKS TERMINADOS 🚀")




Source: pubmed_affiliaciones
ROR: unirdat.pubmed_db.n_affiliaciones
Output: unirdat.pubmed_db.m_autor_afiliacion_ror_normalized_final_3
Chunk size: 50
Total autores únicos: 1

=== Procesando autores 0 → 0: ['agostino pierro'] ===
✅ Chunk 0 → 0 guardado OK

🎉 TODOS LOS CHUNKS TERMINADOS 🚀
