In [2]:
import findspark  # type: ignore
import pandas as pd  # type: ignore
import re
from pyspark.sql import SparkSession  # type: ignore
from pyspark.sql import functions as F  # type: ignore
from pyspark.sql.functions import col, udf  # type: ignore
from pyspark.sql.types import IntegerType, StringType  # type: ignore
from unidecode import unidecode  # type: ignore
from pyspark.sql.functions import lit

# Ignorar warnings
import warnings
warnings.filterwarnings("ignore")

---

# <span style="color: green">Iniciar Spark</span>

In [3]:
findspark.init()

df_senasa = pd.read_excel(r"1 Data/Diccionario-Senasa_2024-11-07_CC.xlsx")
df_veritrade = pd.read_excel("1 Data/Veritrade_CMEDRANO@BIOMONT.COM.PE_PE_E_20241030151038_EXPORTACIONES_30102024.xlsx", header=5)
# df_veritrade = pd.read_excel(r"Data_ExportacionesCompetidoresGeriax.xlsx", header=5)
# df_veritrade = pd.read_excel(r"Data_ExportacionesCompetidoresGeriax.xlsx")

df_senasa.fillna("", inplace=True)
df_veritrade.fillna("", inplace=True)

```
import os

print("SPARK_HOME:", os.environ.get('SPARK_HOME'))
print("HADOOP_HOME:", os.environ.get('HADOOP_HOME'))
```

In [3]:
# print(df_veritrade.dtypes)

PYSPARK ACEPTA SOLO FECHAS ENTRE 1970 y 2038

```
spark = SparkSession.builder \
    .appName("Pandas to PySpark DataFrame") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.cores", "2") \
    .config("spark.network.timeout", "800s") \
    .config("spark.executor.heartbeatInterval", "60s") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

# spark.conf.set("spark.network.timeout", "800s")
# spark.conf.set("spark.sql.shuffle.partitions", "200")

df_senasa_spark = spark.createDataFrame(df_senasa)
df_veritrade_spark = spark.createDataFrame(df_veritrade)`
```

In [4]:
spark = (
    SparkSession.builder.appName("Basic Example")
    .config("spark.driver.memory", "4g")
    .config("spark.executor.memory", "4g")
    .config("spark.executor.cores", "4")
    .config("spark.memory.offHeap.enabled", "true")
    .config("spark.memory.offHeap.size", "2g")
    .config("spark.driver.maxResultSize", "2g")
    .getOrCreate()
)

df_senasa_spark = spark.createDataFrame(df_senasa)
df_veritrade_spark = spark.createDataFrame(df_veritrade)

In [5]:
# !pip install --upgrade setuptools wheel

In [None]:
print("Driver Memory: ", spark.sparkContext.getConf().get("spark.driver.memory"))
print("Executor Memory: ", spark.sparkContext.getConf().get("spark.executor.memory"))
print("Executor Memory: ", spark.sparkContext.getConf().get("spark.executor.cores"))
print("Executor Memory: ", spark.sparkContext.getConf().get("spark.memory.offHeap.enabled"))
print("Executor Memory: ", spark.sparkContext.getConf().get("spark.memory.offHeap.size"))

In [7]:
df_senasa_spark = df_senasa_spark.repartition(100)
df_veritrade_spark = df_veritrade_spark.repartition(100)

# df_senasa_spar k= df_senasa_spark.coalesce(50)
# df_veritrade_spark = df_veritrade_spark.coalesce(50)

In [8]:
# num_particiones_veritrade = df_veritrade_spark.rdd.getNumPartitions()
# print(f"Número de particiones en df_veritrade_spark: {num_particiones_veritrade}")

In [9]:
df_senasa_spark = df_senasa_spark.orderBy("ID")

In [10]:
# df_senasa_spark.show(1)
# df_veritrade_spark.show(1)

In [11]:
# df_senasa_spark.head(1)
# df_veritrade_spark.head(1)

In [None]:
df_senasa_spark.printSchema()

In [None]:
df_veritrade_spark.printSchema()

---

# <span style="color: green">Variables y funciones</span>

In [5]:
# Declarar variables
guardar_excel = False

# Funciones
"""
Guarda DataFrames de Pandas en CSV
"""


def guardar_df_pandas_excel(df, nombre_archivo):
    # Guardar DataFrame de Pandas
    df.to_csv(f"2 DataCreada/{nombre_archivo}", index=False)


"""
Guarda DataFrames de Spark en CSV
"""


def guardar_df_spark_excel(df, nombre_archivo):
    # Convertir DataFrame de Spark a DataFrame de Pandas
    df_pandas = df.toPandas()

    # Guardar DataFrame de Pandas
    df_pandas.to_excel(f"2 DataCreada/{nombre_archivo}", index=False)


""" 
Detener script
"""


def detener_script():
    import sys

    sys.exit("Detener script")

---

# <span style="color: green">Guardar Excel</span>

In [15]:
if guardar_excel == True:
    guardar_df_pandas_excel(df_senasa, "0 senasa.xlsx")
    guardar_df_pandas_excel(df_veritrade, "0 veritrade.xlsx")
    # detener_script()

In [16]:
if guardar_excel == True:
    guardar_df_spark_excel(df_senasa_spark, "2 senasa.xls")
    guardar_df_spark_excel(df_veritrade_spark, "2 veritrade.xlsx")
    # detener_script()

In [17]:
# if guardar_excel == True:
#     # Guardar DataFrame de Spark
#     df_senasa_spark.write.csv("2 DataCreada/senasa.csv", mode='overwrite', header=True)
#     df_veritrade_spark.write.csv("2 DataCreada/veritrade.csv", mode='overwrite', header=True)

---

# <span style="color: green">Etapa 1</span>

## <span style="color: green">Etapa 1 -- Encontrar caracteres especiales</span> 

In [None]:
"""
caracteres_especiales = set()

def encontrar_caracteres_especiales(nombre):
    return {caracter for caracter in nombre if not re.match(r'[A-Za-z0-9 ]', caracter)}

# Obteniendo los caracteres especiales existentes en los nombres de los productos
caracteres_especiales = df_senasa_spark.select(col("NOMBRE_COMERCIAL")) \
    .rdd.flatMap(lambda row: encontrar_caracteres_especiales(row[0])) \
    .distinct() \
    .collect()

print(type(caracteres_especiales))
print("Caracteres especiales encontrados:", sorted(caracteres_especiales))
"""

In [None]:
from pyspark.sql import functions as F

# Definir una expresión regular para encontrar caracteres especiales
regex = r"[A-Za-z0-9 ]"

# Obtener caracteres especiales existentes en los nombres de los productos
caracteres_especiales = (
    df_senasa_spark.select(
        F.explode(F.split(F.regexp_replace(F.col("NOMBRE_COMERCIAL"), regex, ""), ""))
    )
    .alias("caracter")
    .distinct()
    .collect()
)

# Filtrar caracteres especiales
caracteres_especiales = set(
    [caracter["col"] for caracter in caracteres_especiales if caracter["col"]]
)

print(type(caracteres_especiales))
print("Caracteres especiales encontrados:", sorted(list(caracteres_especiales)))

In [20]:
# Detener script
# import sys
# sys.exit("Detener script")

## <span style="color: green">Etapa 1 -- Contar palabras</span>

In [21]:
def contar_palabras(texto):
    return len(texto.split()) if texto else 0


contar_palabras_udf = udf(contar_palabras, IntegerType())

df_senasa_spark = df_senasa_spark.withColumn(
    "NUMERO_DE_PALABRAS", contar_palabras_udf(col("NOMBRE_COMERCIAL"))
)

In [22]:
# df_senasa_spark.select("ID", "NOMBRE_COMERCIAL", "NUMERO_DE_PALABRAS").show(3)

In [23]:
# Detener script
# import sys
# sys.exit("Detener script")

## <span style="color: green">Etapa 1 -- Estandarizar texto</span>

In [24]:
"""  
Permitir caracteres alfanumericos y espacios
Permitir caracteres especiales
Buscar y reemplazar lo que sea que no este permitido y reemplazarlo por vacio
ELIMINAMOS NUMERO Y UNIDAD DE MEDIDA PUEDE CAUSAR CONFUNSION CON OTRO NOMBRE DE PRODUCTO MM 100
KEYWORD MATERIA PRIMA QUE PUEDEN CONFUNDIR CONSIDERAR QUE ESTA KEYWORD SOLO SE ELIMINE EN VERITRADE
Reemplazar "," por espacio " "
"""

def estandarizar_texto(texto):
    texto = str(texto)
    resultado = texto.replace("MDCMTO.", "")
    resultado = resultado.replace("MDCMTO", "")
    resultado = resultado.replace("MDCTO.", "")
    resultado = resultado.replace("MDCTO", "")
    resultado = resultado.strip()
    resultado = resultado.upper()
    resultado = unidecode(resultado)
    caracteres_permitidos = "".join(caracteres_especiales)
    resultado = re.sub(r"[^A-Za-z0-9 " + re.escape(caracteres_permitidos) + "]", "", resultado)
    resultado = re.sub(r"\s*\d+(\.\d+)?\s*MM", "", resultado)
    resultado = resultado.replace("MATERIA PRIMA", "")
    resultado = resultado.replace("0", "O")
    resultado = resultado.replace(",", " ")
    return resultado


estandarizar_texto_udf = udf(estandarizar_texto, StringType())

In [25]:
df_senasa_spark = df_senasa_spark.withColumn(
    "DES_PROD", estandarizar_texto_udf(col("NOMBRE_COMERCIAL"))
)

In [None]:
"""
COLUMNAS SENASA:
    - ID                 (COLUMNA EXISTENTE)
    - NOMBRE_COMERCIAL   (COLUMNA EXISTENTE)
    - NUMERO_DE_PALABRAS (COLUMNA NUEVA)     ----> NOMBRE COMERCIAL
    - DES_PROD           (COLUMNA NUEVA)     ----> NOMBRE_COMERCIAL
"""

# df_senasa_spark.select("ID", "NOMBRE_COMERCIAL", "NUMERO_DE_PALABRAS", "DES_PROD").show(3)

In [27]:
df_veritrade_spark = df_veritrade_spark.withColumn(
    "DES_COM", estandarizar_texto_udf(col("Descripcion Comercial"))
)

In [None]:
"""
COLUMNAS VERITRADE:
    - Descripcion Comercial (COLUMNA EXISTENTE)
    - DES_COM               (COLUMNA NUEVA)     ----> Descripcion Comercial
"""

# df_veritrade_spark.select("Descripcion Comercial", "DES_COM").show(3)

In [29]:
# num_particiones_veritrade = df_veritrade_spark.rdd.getNumPartitions()
# print(f"Número de particiones en df_veritrade_spark: {num_particiones_veritrade}")

In [30]:
# df_veritrade_spark = df_veritrade_spark.coalesce(4)
# df_veritrade_spark.cache()

In [31]:
# df_veritrade_spark.select("Descripcion Comercial", "DES_COM").show(3)

## <span style="color: green">Etapa 1 -- Crear un diccionario de Senasa con claves "DES_PROD" y valores "ID"</span>

In [32]:
df_senasa_filtrado = df_senasa_spark.filter(col("NUMERO_DE_PALABRAS") >= 2)

# Crear un diccionario usando collect() directamente en el Dataframe

dic_senasa = {
    row["DES_PROD"]: row["ID"]
    for row in df_senasa_filtrado.select("DES_PROD", "ID").collect()
}

# dic_senasa

In [None]:
# Ordenar el diccionario por la longitud de las claves de mayor a menor
dic_senasa_ordenado = dict(sorted(dic_senasa.items(), key=lambda item: len(item[0]), reverse=True))

# Reasignar diccionario
dic_senasa = dic_senasa_ordenado

# Imprimir el diccionario ordenado
print(dic_senasa) 

## <span style="color: green">Etapa 1 -- Detectar productos</span>

In [34]:
def detectar_producto(desVeritrade, nomProdSenasa):
    palabrasDesVeritrade = desVeritrade.split()
    palabrasNomProdSenasa = nomProdSenasa.split()
    set_artificio = set({})
    for prod in palabrasNomProdSenasa:
        for des in palabrasDesVeritrade:
            if des == prod:
                set_artificio.add(des)

    if set(palabrasNomProdSenasa) == set_artificio:
        return True
    else:
        return False


def obtener_foreign_key(desVeritrade):
    ids_senasa = []  # Lista para acumular los IDs
    # Recorrer senasa
    for nomProdSenasa in dic_senasa.keys():
        # Obtener ID de senasa
        if detectar_producto(desVeritrade, nomProdSenasa):
            id_senasa = dic_senasa.get(nomProdSenasa)  # Agregar ID a la lista
            ids_senasa.append(id_senasa)  # Agregar ID a la lista
    # Validar lista vacia
    if ids_senasa == []:
        ids_senasa = None
    return ids_senasa  # Retornar lista de IDs


obtener_foreign_key_udf = udf(lambda x: obtener_foreign_key(x), StringType())

# Recorrer veritrade
df_veritrade_spark = df_veritrade_spark.withColumn(
    "FK_SENASA_ID", obtener_foreign_key_udf(col("DES_COM"))
)

# Primero se crea FK_SENASA_ID y luego FK_SENASA_ID_ETA1 por orden de creacion de columnas
# Recorrer veritrade - Depurar ids por etapas
# df_veritrade_spark = df_veritrade_spark.withColumn(
#     "FK_SENASA_ID_ETA1", obtener_foreign_key_udf(col("DES_COM"))
# )

# Crear una columna vacía llamada "NUEVA_COLUMNA"
df_veritrade_spark = df_veritrade_spark.withColumn("FK_SENASA_ETAPA", lit(None))

# Recorrer veritrade - Depurar ids por etapas
df_veritrade_spark = df_veritrade_spark.withColumn(
    "FK_SENASA_ETAPA",
    F.when(
        (col("FK_SENASA_ID").isNotNull() & (col("FK_SENASA_ID") != ""))
        & (col("FK_SENASA_ETAPA").isNull() | (col("FK_SENASA_ETAPA") == "")),
        "ETAPA 1",
    ).otherwise(col("FK_SENASA_ETAPA")),
)

In [None]:
"""
COLUMNAS VERITRADE:
    - Descripcion Comercial (COLUMNA EXISTENTE)
    - DES_COM               (COLUMNA NUEVA)     ----> Descripcion Comercial
    - FK_SENASA_ID          (COLUMNA NUEVA)     ----> ID (COLUMNAS SENASA)
"""

# Verificar df_veritrade_spark
# df_veritrade_spark.select("Descripcion Comercial", "DES_COM", "FK_SENASA_ID").show(3)
# df_veritrade_filtrado_validacion_fk_senasa_id = df_veritrade_spark.filter(col("FK_SENASA_ID").isNotNull())
# df_veritrade_filtrado_validacion_fk_senasa_id.select("Descripcion Comercial", "DES_COM", "FK_SENASA_ID").show(3)
# detener_script()

## <span style="color: green">Etapa 1 -- Verificar esquema del DataFrame</span>

In [None]:
df_senasa_spark.printSchema()

In [None]:
df_veritrade_spark.printSchema()

---

## <span style="color: green">Etapa 1 -- Guardar Excel</span>

In [38]:
if guardar_excel == True:
    guardar_df_spark_excel(df_senasa_spark, "1 senasa.xlsx")
    guardar_df_spark_excel(df_veritrade_spark, "1 veritrade.xlsx")
    # detener_script()

---

## <span style="color: red">Notas</span>
* Hasta aqui todo tiene sentido

---

# <span style="color: green">Etapa 2</span>

df_senasa_spark.createOrReplaceTempView("senasa")
senasa = spark.sql("SELECT * FROM senasa")

df_veritrade_spark.createOrReplaceTempView("veritrade")
veritrade = spark.sql("SELECT * FROM veritrade")

query="""
SELECT ver.*,sen.DES_PROD,sen.TIPO_PRODUCTO,sen.DETALLE_VAR1 AS COMPOSICION,sen.DETALLE_VAR2 AS CLASIFICACION,sen.DETALLE_VAR3 AS FORMA
FROM veritrade ver
LEFT JOIN senasa sen
ON ver.FK_SENASA_ID=sen.ID
"""

df_veritrade_spark_ite1 = spark.sql(query)
df_veritrade_spark_ite1=df_veritrade_spark_ite1.coalesce(60)
df_veritrade_spark_ite1.cache()


In [39]:
df_veritrade_spark_eta2 = df_veritrade_spark

## <span style="color: green">Etapa 2 -- Estandarizar texto</span>

In [40]:
""" 
Permitir caracteres alfanumericos y espacios
No permitir caracteres especiales
Buscar y reemplazar lo que sea que no este permitido y reemplazarlo por vacio
ELIMINAMOS NUMERO Y UNIDAD DE MEDIDA PUEDE CAUSAR CONFUNSION CON OTRO NOMBRE DE PRODUCTO MM 100
KEYWORD MATERIA PRIMA QUE PUEDEN CONFUNDIR CONSIDERAR QUE ESTA KEYWORD SOLO SE ELIMINE EN VERITRADE
No reemplazar "," por espacio " "
Reemplazar espacios "  " por espacio " "
"""

def estandarizar_texto2(texto):
    texto = str(texto)
    resultado = texto.replace("MDCMTO.", "")
    resultado = resultado.replace("MDCMTO", "")
    resultado = resultado.replace("MDCTO.", "")
    resultado = resultado.replace("MDCTO", "")
    resultado = resultado.strip()
    resultado = resultado.upper()
    resultado = unidecode(resultado)
    caracteres_permitidos = "".join(" ")  
    resultado = re.sub(r"[^A-Za-z0-9 " + re.escape(caracteres_permitidos) + "]", " ", resultado)  
    resultado = re.sub(r"\s*\d+(\.\d+)?\s*MM", "", resultado)  
    resultado = resultado.replace("MATERIA PRIMA", "")
    resultado = resultado.replace("0", "O")
    # resultado = resultado.replace(",", " ") 
    resultado = re.sub(r"\s+", " ", resultado)    
    return resultado


estandarizar_texto2_udf = udf(estandarizar_texto2, StringType())

In [41]:
df_senasa_spark = df_senasa_spark.withColumn(
    "DES_PROD2", estandarizar_texto2_udf(col("NOMBRE_COMERCIAL"))
)

In [None]:
"""
COLUMNAS SENASA:
    - ID                 (COLUMNA EXISTENTE)
    - NOMBRE_COMERCIAL   (COLUMNA EXISTENTE)
    - NUMERO_DE_PALABRAS (COLUMNA NUEVA)     ----> NOMBRE COMERCIAL
    - DES_PROD           (COLUMNA NUEVA)     ----> NOMBRE_COMERCIAL
    - DES_PROD2          (COLUMNA NUEVA)     ----> NOMBRE_COMERCIAL
"""

# df_senasa_spark.select("ID", "NOMBRE_COMERCIAL", "NUMERO_DE_PALABRAS", "DES_PROD", "DES_PROD2").show(3)

In [43]:
df_veritrade_spark_eta2 = df_veritrade_spark_eta2.withColumn(
    "DES_COM2", estandarizar_texto2_udf(col("Descripcion Comercial"))
)

In [None]:
"""
COLUMNAS VERITRADE:
    - Descripcion Comercial (COLUMNA EXISTENTE)
    - DES_COM               (COLUMNA NUEVA)     ----> Descripcion Comercial
    - DES_COM2              (COLUMNA NUEVA)     ----> Descripcion Comercial
"""

# df_veritrade_spark.select("Descripcion Comercial", "DES_COM", "DES_COM2").show(3)

## <span style="color: green">Etapa 2 -- Crear un diccionario de Senasa con claves "DES_PROD2" y valores "ID"</span>

In [45]:
df_senasa_filtrado = df_senasa_spark.filter(col("NUMERO_DE_PALABRAS") >= 2)

# Crear un diccionario usando collect() directamente en el Dataframe

dic_senasa = {
    row["DES_PROD2"]: row["ID"]
    for row in df_senasa_filtrado.select("DES_PROD2", "ID").collect()
}

# dic_senasa

In [None]:
# Ordenar el diccionario por la longitud de las claves de mayor a menor
dic_senasa_ordenado = dict(sorted(dic_senasa.items(), key=lambda item: len(item[0]), reverse=True))

# Reasignar diccionario
dic_senasa = dic_senasa_ordenado

# Imprimir el diccionario ordenado
print(dic_senasa) 

## <span style="color: green">Etapa 2 -- Detectar productos</span>

In [47]:
# Recorrer veritrade - Depurar ids por etapas
# df_veritrade_spark_eta2 = df_veritrade_spark_eta2.withColumn(
#     "FK_SENASA_ID_ETA2",
#     F.when(
#         col("FK_SENASA_ID").isNull() | (col("FK_SENASA_ID") == ""),
#         obtener_foreign_key_udf(col("DES_COM2")),
#     ),
# )

# Recorrer veritrade
df_veritrade_spark_eta2 = df_veritrade_spark_eta2.withColumn(
    "FK_SENASA_ID",
    F.when(
        col("FK_SENASA_ID").isNull() | (col("FK_SENASA_ID") == ""),
        obtener_foreign_key_udf(col("DES_COM2")),
    ).otherwise(col("FK_SENASA_ID")),
)

# Recorrer veritrade - Depurar ids por etapas
df_veritrade_spark_eta2 = df_veritrade_spark_eta2.withColumn(
    "FK_SENASA_ETAPA",
    F.when(
        (col("FK_SENASA_ID").isNotNull() & (col("FK_SENASA_ID") != ""))
        & (col("FK_SENASA_ETAPA").isNull() | (col("FK_SENASA_ETAPA") == "")),
        "ETAPA 2",
    ).otherwise(col("FK_SENASA_ETAPA")),
)

df_senasa_spark.createOrReplaceTempView("senasa")
senasa = spark.sql("SELECT * FROM senasa")

df_veritrade_spark_ite1.createOrReplaceTempView("veritrade")
veritrade = spark.sql("SELECT * FROM veritrade")

query="""
SELECT ver.*,sen.DES_PROD,sen.TIPO_PRODUCTO,sen.DETALLE_VAR1 AS COMPOSICION,sen.DETALLE_VAR2 AS CLASIFICACION,sen.DETALLE_VAR3 AS FORMA
FROM veritrade ver
LEFT JOIN senasa sen
ON ver.FK_SENASA_ID=sen.ID
"""

df_veritrade_spark_ite2 = spark.sql(query)
df_veritrade_spark_ite2=df_veritrade_spark_ite2.coalesce(60)

df_veritrade_spark_ite2.cache()

---

## <span style="color: green">Etapa 2 -- Guardar Excel</span>

In [48]:
# df_veritrade_spark_eta2 = df_veritrade_spark_eta2.drop("DES_COM2")

In [49]:
if guardar_excel == True or True:
    guardar_df_spark_excel(df_senasa_spark, "2 senasa.xlsx")
    guardar_df_spark_excel(df_veritrade_spark_eta2, "2 veritrade.xlsx")
    # detener_script()

---

## <span style="color: red">Notas</span>
* Estandarizar texto con cierto sentido

---

# <span style="color: green">Reiniciar Spark</span>

In [None]:
spark.stop()

findspark.init()

df_senasa = pd.read_excel(r"2 DataCreada/2 senasa.xlsx")
df_veritrade = pd.read_excel("2 DataCreada/2 veritrade.xlsx")

df_senasa.fillna("", inplace=True)
df_veritrade.fillna("", inplace=True)

spark = (
    SparkSession.builder.appName("Basic Example")
    .config("spark.driver.memory", "4g")
    .config("spark.executor.memory", "4g")
    .config("spark.executor.cores", "4")
    .config("spark.memory.offHeap.enabled", "true")
    .config("spark.memory.offHeap.size", "1g")
    .config("spark.driver.maxResultSize", "2g")
    .getOrCreate()
)

df_senasa_spark = spark.createDataFrame(df_senasa)
df_veritrade_spark = spark.createDataFrame(df_veritrade)

df_senasa_spark = df_senasa_spark.repartition(100)
df_veritrade_spark = df_veritrade_spark.repartition(100)

df_senasa_spark = df_senasa_spark.orderBy("ID")

df_senasa_spark.printSchema()
df_veritrade_spark.printSchema()

In [None]:
df_veritrade.head(5)

---

# <span style="color: green">Etapa 3</span>

In [52]:
df_veritrade_spark_eta3 = df_veritrade_spark

## <span style="color: green">Etapa 3 -- Estandarizar texto</span>

In [53]:
""" 
Permitir caracteres alfanumericos y espacios
No permitir caracteres especiales, permitir guiones
Buscar y reemplazar lo que sea que no este permitido y reemplazarlo por vacio
ELIMINAMOS NUMERO Y UNIDAD DE MEDIDA PUEDE CAUSAR CONFUNSION CON OTRO NOMBRE DE PRODUCTO MM 100
KEYWORD MATERIA PRIMA QUE PUEDEN CONFUNDIR CONSIDERAR QUE ESTA KEYWORD SOLO SE ELIMINE EN VERITRADE
No reemplazar "," por espacio " "
Reemplazar espacios "  " por espacio " "
"""

def estandarizar_texto3(texto):
    texto = str(texto)
    resultado = texto.replace("MDCMTO.", "")
    resultado = resultado.replace("MDCMTO", "")
    resultado = resultado.replace("MDCTO.", "")
    resultado = resultado.replace("MDCTO", "")
    resultado = resultado.strip()
    resultado = resultado.upper()
    resultado = unidecode(resultado)
    resultado = re.sub(r"[^A-Za-z0-9 -]", " ", resultado)
    resultado = re.sub(r"\s*\d+(\.\d+)?\s*MM", "", resultado)
    resultado = resultado.replace("MATERIA PRIMA", "")
    resultado = resultado.replace("0", "O")
    # resultado = resultado.replace(",", " ") 
    resultado = re.sub(r"\s+", " ", resultado)    
    return resultado


estandarizar_texto3_udf = udf(estandarizar_texto3, StringType())

In [54]:
df_senasa_spark = df_senasa_spark.withColumn(
    "DES_PROD3", estandarizar_texto3_udf(col("NOMBRE_COMERCIAL"))
)

In [None]:
"""
COLUMNAS SENASA:
    - ID                 (COLUMNA EXISTENTE)
    - NOMBRE_COMERCIAL   (COLUMNA EXISTENTE)
    - NUMERO_DE_PALABRAS (COLUMNA NUEVA)     ----> NOMBRE COMERCIAL
    - DES_PROD           (COLUMNA NUEVA)     ----> NOMBRE_COMERCIAL
    - DES_PROD2          (COLUMNA NUEVA)     ----> NOMBRE_COMERCIAL
    - DES_PROD3          (COLUMNA NUEVA)     ----> NOMBRE_COMERCIAL
"""

# df_senasa_spark.select("ID", "NOMBRE_COMERCIAL", "NUMERO_DE_PALABRAS", "DES_PROD", "DES_PROD2", "DES_PROD3").show(3)

## <span style="color: green">Etapa 3 -- Crear un diccionario de Senasa con claves "DES_PROD3" y valores "ID"</span>

In [56]:
df_senasa_filtrado = df_senasa_spark.filter(col("NUMERO_DE_PALABRAS") == 1)

# Crear un diccionario usando collect() directamente en el Dataframe

dic_senasa = {
    row["DES_PROD3"]: row["ID"]
    for row in df_senasa_filtrado.select("DES_PROD3", "ID").collect()
}

# dic_senasa

In [None]:
# Ordenar el diccionario por la longitud de las claves de mayor a menor
dic_senasa_ordenado = dict(sorted(dic_senasa.items(), key=lambda item: len(item[0]), reverse=True))

# Reasignar diccionario
dic_senasa = dic_senasa_ordenado

# Imprimir el diccionario ordenado
print(dic_senasa) 

## <span style="color: green">Etapa 3 -- Detectar productos</span>

In [58]:
def detectar_producto3(desVeritrade, nomProdSenasa):
    desVeritrade = str(desVeritrade)
    nomProdSenasa = str(nomProdSenasa)

    if desVeritrade.startswith(nomProdSenasa + " "):
        return True
    else:
        return False


def obtener_foreign_key3(desVeritrade):
    ids_senasa = []  # Lista para acumular los IDs
    # Recorrer senasa
    for nomProdSenasa in dic_senasa.keys():
        # Obtener ID de senasa
        if detectar_producto3(desVeritrade, nomProdSenasa):
            id_senasa = dic_senasa.get(nomProdSenasa)
            ids_senasa.append(id_senasa)  # Agregar ID a la lista
    # Validar lista vacia
    if ids_senasa == []:
        ids_senasa = None
    return ids_senasa  # Retornar lista de IDs


obtener_foreign_key3_udf = udf(lambda x: obtener_foreign_key3(x), StringType())

# Recorrer veritrade - Depurar ids por etapas
# df_veritrade_spark_eta3 = df_veritrade_spark_eta3.withColumn(
#     "FK_SENASA_ID_ETA3",
#     F.when(
#         (col("FK_SENASA_ID").isNull()) | (col("FK_SENASA_ID") == ""),
#         obtener_foreign_key3_udf(col("DES_COM")),
#     ),
# )

# Recorrer veritrade
df_veritrade_spark_eta3 = df_veritrade_spark_eta3.withColumn(
    "FK_SENASA_ID",
    F.when(
        (col("FK_SENASA_ID").isNull()) | (col("FK_SENASA_ID") == ""),
        obtener_foreign_key3_udf(col("DES_COM")),
    ).otherwise(col("FK_SENASA_ID")),
)

# Recorrer veritrade - Depurar ids por etapas
df_veritrade_spark_eta3 = df_veritrade_spark_eta3.withColumn(
    "FK_SENASA_ETAPA",
    F.when(
        (col("FK_SENASA_ID").isNotNull() & (col("FK_SENASA_ID") != ""))
        & (col("FK_SENASA_ETAPA").isNull() | (col("FK_SENASA_ETAPA") == "")),
        "ETAPA 3",
    ).otherwise(col("FK_SENASA_ETAPA")),
)

df_senasa_spark.createOrReplaceTempView("senasa")
senasa = spark.sql("SELECT * FROM senasa")

df_veritrade_spark_ite2.createOrReplaceTempView("veritrade")
veritrade = spark.sql("SELECT * FROM veritrade")

query="""
SELECT ver.*,sen.DES_PROD,sen.TIPO_PRODUCTO,sen.DETALLE_VAR1 AS COMPOSICION,sen.DETALLE_VAR2 AS CLASIFICACION,sen.DETALLE_VAR3 AS FORMA
FROM veritrade ver
LEFT JOIN senasa sen
ON ver.FK_SENASA_ID=sen.ID
"""

df_veritrade_spark_ite3 = spark.sql(query)
df_veritrade_spark_ite3=df_veritrade_spark_ite3.coalesce(60)

df_veritrade_spark_ite3.cache()

---

## <span style="color: green">Etapa 3 -- Guardar Excel</span>

In [59]:
if guardar_excel == True:
    guardar_df_spark_excel(df_senasa_spark, "3 senasa.xlsx")
    guardar_df_spark_excel(df_veritrade_spark_eta3, "3 veritrade.xlsx")
    # detener_script()

--- 

## <span style="color: red">Notas</span>
* La funcion detectar producto es diferente y tiene sentido, busca solo coincidencias de una sola palabra
* Los ids tienen sentido

---

# <span style="color: green">Etapa 4</span>

In [60]:
df_veritrade_spark_eta4 = df_veritrade_spark_eta3

## <span style="color: green">Etapa 4 -- Estandarizar texto</span>

In [61]:
""" 
Permitir caracteres alfanumericos y espacios
No permitir caracteres especiales
Buscar y reemplazar lo que sea que no este permitido y reemplazarlo por vacio
ELIMINAMOS NUMERO Y UNIDAD DE MEDIDA PUEDE CAUSAR CONFUNSION CON OTRO NOMBRE DE PRODUCTO MM 100
KEYWORD MATERIA PRIMA QUE PUEDEN CONFUNDIR CONSIDERAR QUE ESTA KEYWORD SOLO SE ELIMINE EN VERITRADE
No reemplazar "," por espacio " "
Reemplazar espacios "  " por espacio " "
"""

def estandarizar_texto4(texto):
    texto = str(texto)
    resultado = texto.replace("MDCMTO.", "")
    resultado = resultado.replace("MDCMTO", "")
    resultado = resultado.replace("MDCTO.", "")
    resultado = resultado.replace("MDCTO", "")
    resultado = resultado.strip()
    resultado = resultado.upper()
    resultado = unidecode(resultado)
    resultado = re.sub(r"[^A-Za-z0-9 ]", " ", resultado)
    resultado = re.sub(r"\s*\d+(\.\d+)?\s*MM", "", resultado)
    resultado = resultado.replace("MATERIA PRIMA", "")
    resultado = resultado.replace("0", "O")
    # resultado = resultado.replace(",", " ") 
    resultado = re.sub(r"\s+", " ", resultado)    
    return resultado


estandarizar_texto4_udf = udf(estandarizar_texto4, StringType())

In [62]:
df_senasa_spark = df_senasa_spark.withColumn(
    "DES_PROD4", estandarizar_texto4_udf(col("NOMBRE_COMERCIAL"))
)

In [None]:
"""
COLUMNAS SENASA:
    - ID                 (COLUMNA EXISTENTE)
    - NOMBRE_COMERCIAL   (COLUMNA EXISTENTE)
    - NUMERO_DE_PALABRAS (COLUMNA NUEVA)     ----> NOMBRE COMERCIAL
    - DES_PROD           (COLUMNA NUEVA)     ----> NOMBRE_COMERCIAL
    - DES_PROD2          (COLUMNA NUEVA)     ----> NOMBRE_COMERCIAL
    - DES_PROD3          (COLUMNA NUEVA)     ----> NOMBRE_COMERCIAL
    - DES_PROD4          (COLUMNA NUEVA)     ----> NOMBRE_COMERCIAL
"""

# df_senasa_spark.select("ID", "NOMBRE_COMERCIAL", "NUMERO_DE_PALABRAS", "DES_PROD", "DES_PROD2", "DES_PROD3", "DES_PROD4").show(3)

## <span style="color: green">Etapa 4 -- Crear un diccionario de Senasa con claves "DES_PROD3" y valores "ID"</span>

In [64]:
df_senasa_filtrado = df_senasa_spark.filter(col("NUMERO_DE_PALABRAS") == 1)

# Crear un diccionario usando collect() directamente en el Dataframe

dic_senasa = {
    row["DES_PROD4"]: row["ID"]
    for row in df_senasa_filtrado.select("DES_PROD4", "ID").collect()
}

# dic_senasa

In [None]:
# Ordenar el diccionario por la longitud de las claves de mayor a menor
dic_senasa_ordenado = dict(sorted(dic_senasa.items(), key=lambda item: len(item[0]), reverse=True))

# Reasignar diccionario
dic_senasa = dic_senasa_ordenado

# Imprimir el diccionario ordenado
print(dic_senasa) 

## <span style="color: green">Etapa 4 -- Detectar productos</span>

In [66]:
def detectar_producto4(desVeritrade, nomProdSenasa):
    desVeritrade = str(desVeritrade)
    nomProdSenasa = str(nomProdSenasa)

    # Cambiado de startswith a in para buscar en cualquier parte de la cadena
    if nomProdSenasa + " " in desVeritrade:
        return True
    else:
        return False


def obtener_foreign_key4(desVeritrade):
    ids_senasa = []  # Lista para acumular los IDs
    # Recorrer senasa
    for nomProdSenasa in dic_senasa.keys():
        # Obtener ID de senasa
        if detectar_producto4(desVeritrade, nomProdSenasa):
            id_senasa = dic_senasa.get(nomProdSenasa)
            ids_senasa.append(id_senasa)  # Agregar ID a la lista
    # Validar lista vacia
    if ids_senasa == []:
        ids_senasa = None
    return ids_senasa  # Retornar lista de IDs


obtener_foreign_key4_udf = udf(lambda x: obtener_foreign_key4(x), StringType())

# Recorrer veritrade - Depurar ids por etapas
# df_veritrade_spark_eta4 = df_veritrade_spark_eta4.withColumn(
#     "FK_SENASA_ID_ETA4",
#     F.when(
#         (col("FK_SENASA_ID").isNull()) | (col("FK_SENASA_ID") == ""),
#         obtener_foreign_key4_udf(col("DES_COM")),
#     ),
# )

# Recorrer veritrade
df_veritrade_spark_eta4 = df_veritrade_spark_eta4.withColumn(
    "FK_SENASA_ID",
    F.when(
        (col("FK_SENASA_ID").isNull()) | (col("FK_SENASA_ID") == ""),
        obtener_foreign_key4_udf(col("DES_COM")),
    ).otherwise(col("FK_SENASA_ID")),
)

# Recorrer veritrade - Depurar ids por etapas
df_veritrade_spark_eta4 = df_veritrade_spark_eta4.withColumn(
    "FK_SENASA_ETAPA",
    F.when(
        (col("FK_SENASA_ID").isNotNull() & (col("FK_SENASA_ID") != ""))
        & (col("FK_SENASA_ETAPA").isNull() | (col("FK_SENASA_ETAPA") == "")),
        "ETAPA 4",
    ).otherwise(col("FK_SENASA_ETAPA")),
)

df_senasa_spark.createOrReplaceTempView("senasa")
senasa = spark.sql("SELECT * FROM senasa")

df_veritrade_spark_ite2.createOrReplaceTempView("veritrade")
veritrade = spark.sql("SELECT * FROM veritrade")

query="""
SELECT ver.*,sen.DES_PROD,sen.TIPO_PRODUCTO,sen.DETALLE_VAR1 AS COMPOSICION,sen.DETALLE_VAR2 AS CLASIFICACION,sen.DETALLE_VAR3 AS FORMA
FROM veritrade ver
LEFT JOIN senasa sen
ON ver.FK_SENASA_ID=sen.ID
"""

df_veritrade_spark_ite3 = spark.sql(query)
df_veritrade_spark_ite3=df_veritrade_spark_ite3.coalesce(60)

df_veritrade_spark_ite3.cache()

---

## <span style="color: green">Etapa 4 -- Guardar Excel</span>

In [67]:
if guardar_excel == True or True:
    guardar_df_spark_excel(df_senasa_spark, "4 senasa.xlsx")
    guardar_df_spark_excel(df_veritrade_spark_eta4, "4 veritrade.xlsx")
    # detener_script()

--- 

## <span style="color: red">Notas</span>

---

# <span style="color: green">Reiniciar Spark</span>

In [151]:
spark.stop()

findspark.init()

df_senasa = pd.read_excel(r"2 DataCreada/4 senasa.xlsx")
df_veritrade = pd.read_excel("2 DataCreada/4 veritrade.xlsx")

df_senasa.fillna("", inplace=True)
df_veritrade.fillna("", inplace=True)

spark = (
    SparkSession.builder.appName("Basic Example")
    .config("spark.driver.memory", "4g")
    .config("spark.executor.memory", "4g")
    .config("spark.executor.cores", "4")
    .config("spark.memory.offHeap.enabled", "true")
    .config("spark.memory.offHeap.size", "1g")
    .config("spark.driver.maxResultSize", "2g")
    .getOrCreate()
)

df_senasa_spark = spark.createDataFrame(df_senasa)
df_veritrade_spark = spark.createDataFrame(df_veritrade)

df_senasa_spark = df_senasa_spark.repartition(100)
df_veritrade_spark = df_veritrade_spark.repartition(100)

df_senasa_spark = df_senasa_spark.orderBy("ID")

df_senasa_spark.printSchema()
df_veritrade_spark.printSchema()

root
 |-- N_PROD_PAGINA: long (nullable = true)
 |-- TIPO_PRODUCTO: string (nullable = true)
 |-- NOMBRE_COMERCIAL: string (nullable = true)
 |-- ID: long (nullable = true)
 |-- EMPRESA_RESPONSABLE_REGISTRO: string (nullable = true)
 |-- N_REGISTRO: string (nullable = true)
 |-- FECHA_VENCIMIENTO: string (nullable = true)
 |-- ETIQUETA: long (nullable = true)
 |-- VAR1: string (nullable = true)
 |-- DETALLE_VAR1: string (nullable = true)
 |-- VAR2: string (nullable = true)
 |-- DETALLE_VAR2: string (nullable = true)
 |-- VAR3: string (nullable = true)
 |-- DETALLE_VAR3: string (nullable = true)
 |-- VAR4: string (nullable = true)
 |-- DETALLE_VAR4: string (nullable = true)
 |-- VAR5: string (nullable = true)
 |-- DETALLE_VAR5: string (nullable = true)
 |-- VAR6: string (nullable = true)
 |-- DETALLE_VAR6: string (nullable = true)
 |-- VAR7: string (nullable = true)
 |-- DETALLE_VAR7: string (nullable = true)
 |-- VAR8: string (nullable = true)
 |-- DETALLE_VAR8: string (nullable = true

In [152]:
df_veritrade.head(5)

Unnamed: 0,Partida Aduanera,Descripcion de la Partida Aduanera,Aduana,DUA / DAM,Fecha,Cod. Tributario,Exportador,Importador,Kg Bruto,Kg Neto,...,Descripcion4,Descripcion5,Naviera,Agente Carga(Origen),Agente Carga(Destino),Canal,DES_COM,FK_SENASA_ID,FK_SENASA_ETAPA,DES_COM2
0,3808911900,"LOS DEMÁS INSECTICIDAS, RATICIDAS, FUNGICIDAS,...",MARITIMA DEL CALLAO,092504 | 23,2024-09-18,20100055237,ALICORP SAA,RAMS TRADING LTD,90.45,81.72,...,INSEC.MATAMOSCA SAPOLIO 360ML 12UND,INSEC.MATAMOSCA SAPOLIO 360ML 12UND,IAN TAYLOR AGENCIAS S.A.C.,_x0000_,_x0000_,VERDE,INSEC.MATAMOSCA SAPOLIO 36OML 12UND INSEC.MATA...,,,INSEC MATAMOSCA SAPOLIO 36OML 12UND INSEC MATA...
1,2309909000,"LAS DEMAS PREPARACIONES, UTILIZADAS P` LA ALIM...",TUMBES,007023 | 1,2024-08-16,20555271566,VITAPRO S.A.,VITAPRO ECUADOR CIA. LTDA,238750.0,237600.0,...,NICOVITA KATAL CAMARON 35% 2.0 MARCA:NICOVITA ...,NICOVITA KATAL CAMARON 35% 2.0 MARCA:NICOVITA ...,,,,VERDE,NICOVITA KATAL CAMARON 35% 2.O MARCA:NICOVITA ...,,,NICOVITA KATAL CAMARON 35 2 O MARCA NICOVITA U...
2,3004202000,MEDICAMENTOS Q` CONTENGAN OTROS ANTIBIOTICOS P...,MARITIMA DEL CALLAO,090798 | 15,2024-09-04,20250406941,AGROVET MARKET S.A,Revetsa Animal Health. S.A.,58.29,55.37,...,CANI-TABS LIVER+ CJA 12 UND X 60 TAB PARA USO ...,CANI-TABS LIVER+ CJA 12 UND X 60 TAB PARA USO ...,MEDITERRANEAN SHIPPING COMPANY DEL PERU SAC,TIAMAT PERU S.A.C.,UNIVERSAL CARGO PTY CORP,VERDE,CANI-TABS LIVER+ CJA 12 UND X 6O TAB PARA USO ...,[10903],ETAPA 1,CANI TABS LIVER CJA 12 UND X 6O TAB PARA USO V...
3,3004502000,DEMAS MEDICAMENTOS P` USO VETERINARIO Q` CONTE...,MARITIMA DEL CALLAO,086880 | 12,2024-09-15,20250406941,AGROVET MARKET S.A,LOGISTICA TOTAL SAS,2227.78,2116.39,...,HEMATOFOS B12 CJA 6 UND X 500 ML - COLOMBIA PA...,HEMATOFOS B12 CJA 6 UND X 500 ML - COLOMBIA PA...,TRABAJOS MARITIMOS S.A.,OVERSEAS CARGO FORWARDING S.A.C. - OC FORWARDI...,JMC LOGISTICS CARGO SAS,VERDE,HEMATOFOS B12 CJA 6 UND X 5OO ML - COLOMBIA PA...,[15749],ETAPA 1,HEMATOFOS B12 CJA 6 UND X 5OO ML COLOMBIA PARA...
4,3004502000,DEMAS MEDICAMENTOS P` USO VETERINARIO Q` CONTE...,DESAGUADERO,005317 | 1,2024-08-10,20110200201,QUIMTIA S.A.,,1163.35,1000.0,...,ANTI-HOT X 1 KG EN BOLSAS DE 1 KG PRESENTACION...,ANTI-HOT X 1 KG EN BOLSAS DE 1 KG PRESENTACION...,,,,VERDE,ANTI-HOT X 1 KG EN BOLSAS DE 1 KG PRESENTACION...,,,ANTI HOT X 1 KG EN BOLSAS DE 1 KG PRESENTACION...


---

# <span style="color: green">Etapa 5</span>

In [153]:
df_veritrade_spark_eta5 = df_veritrade_spark

## <span style="color: green">Etapa 5 -- Estandarizar texto</span>

Lo mismo que en la etapa 1, usamos `DES_PROD` y `DES_COM`

## <span style="color: green">Etapa 5 -- Crear un diccionario de Senasa con claves "DES_PROD" y valores "ID"</span>

In [154]:
df_senasa_filtrado = df_senasa_spark.filter(col("NUMERO_DE_PALABRAS") >= 1)

# Crear un diccionario usando collect() directamente en el Dataframe

dic_senasa = {
    row["DES_PROD"]: row["ID"]
    for row in df_senasa_filtrado.select("DES_PROD", "ID").collect()
}

# dic_senasa

In [155]:
# Ordenar el diccionario por la longitud de las claves de mayor a menor
dic_senasa_ordenado = dict(sorted(dic_senasa.items(), key=lambda item: len(item[0]), reverse=True))

# Reasignar diccionario
dic_senasa = dic_senasa_ordenado

# Imprimir el diccionario ordenado
print(dic_senasa) 

{'OCTOCELL-VAC VACUNA COMBINADA CONTRA EL MOQUILLO  HEPATITIS INFECCIOSA  ADENOVIROSIS  PARVOVIROSIS  PARAINFLUENZA  CORONAVIROSIS  LEPTOSPIROSIS (L. CANICOLA Y L. ICTEROHAEMORRHAGIAE)': 3413, 'VACUNA V11-ELEVENCELL VAC-VACUNA COMBINADA CONTRA LA CINOMOSIS CANINA  HEPATITIS INFECCIOSA  ADENOVIROSIS  PARVOVIROSIS  PARAINFLUENZA  CORINAVIROSIS Y LEPTOSPIROSIS CANINA': 8798, 'PRO PLAN VETERINARY DIETES UR SAVORY SELECTS URINARY ST/OX SALMON RECIPE IN SAUCE / PRO PLAN VETERINARY DIETS UR SAVORY SELECTS URINARY ST/OX SALMON Y OTROS EN SALSA': 5176, 'PRO PLAN ADULT 7+ COMPLETE ESSENTIALS MORSELS IN GRAVY BEEF & RICE ENTREE / PRO PLAN ADULTO 7+ COMPLETE ESSENTIALS BOCADOS EN SALSA CARNE  ARROZ Y OTROS ENTREE': 5116, 'GUABI NATURAL GRAIN FREE SACHE GATO ADULTO CASTRADO FRANGO  SALMAO E VEGETAIS // GUABI NATURAL GRAIN FREE SOBRE GATO ADULTO CASTRADO POLLO  SALMON Y VEGETALES': 15588, 'FANCY FEAST GRAVY LOVERS CHICKEN FEAST IN GRILLED CHICKEN FLAVOR GRAVY /FANCY FEAST GRAVY LOVERS A LA PARRILLA 

## <span style="color: green">Etapa 5 -- Detectar productos</span>

In [156]:
def detectar_producto5(desVeritrade, nomProdSenasa):
    palabrasDesVeritrade = desVeritrade.split()
    palabrasNomProdSenasa = nomProdSenasa.split()
    
    # Filtrar palabras que no son números
    palabrasNomProdSenasa = [prod for prod in palabrasNomProdSenasa if not prod.isdigit() and len(prod) >= 3]
    
    # Contar coincidencias
    coincidencias = sum(1 for prod in palabrasNomProdSenasa if prod in palabrasDesVeritrade)
    
    # Verificar condiciones
    if len(palabrasNomProdSenasa) >= 3 and coincidencias >= 3:
        return True
    return False


def obtener_foreign_key5(desVeritrade):
    ids_senasa = []  # Lista para acumular los IDs
    # Recorrer senasa
    for nomProdSenasa in dic_senasa.keys():
        # Obtener ID de senasa
        if detectar_producto5(desVeritrade, nomProdSenasa):
            id_senasa = dic_senasa.get(nomProdSenasa)  # Agregar ID a la lista
            ids_senasa.append(id_senasa)  # Agregar ID a la lista
    # Validar lista vacia
    if ids_senasa == []:
        ids_senasa = None
    return ids_senasa  # Retornar lista de IDs


obtener_foreign_key5_udf = udf(lambda x: obtener_foreign_key5(x), StringType())

# Recorrer veritrade - Depurar ids por etapas
# df_veritrade_spark_eta5 = df_veritrade_spark_eta5.withColumn(
#     "FK_SENASA_ID_ETA5",
#     F.when(
#         (col("FK_SENASA_ID").isNull()) | (col("FK_SENASA_ID") == ""),
#         obtener_foreign_key5_udf(col("DES_COM")),
#     ),
# )

# Recorrer veritrade
df_veritrade_spark_eta5 = df_veritrade_spark_eta5.withColumn(
    "FK_SENASA_ID",
    F.when(
        (col("FK_SENASA_ID").isNull()) | (col("FK_SENASA_ID") == ""),
        obtener_foreign_key5_udf(col("DES_COM")),
    ).otherwise(col("FK_SENASA_ID")),
)

# Recorrer veritrade - Depurar ids por etapas
df_veritrade_spark_eta5 = df_veritrade_spark_eta5.withColumn(
    "FK_SENASA_ETAPA",
    F.when(
        (col("FK_SENASA_ID").isNotNull() & (col("FK_SENASA_ID") != ""))
        & (col("FK_SENASA_ETAPA").isNull() | (col("FK_SENASA_ETAPA") == "")),
        "ETAPA 5",
    ).otherwise(col("FK_SENASA_ETAPA")),
)

---

## <span style="color: green">Etapa 5 -- Guardar Excel</span>

In [157]:
if guardar_excel == True:
    guardar_df_spark_excel(df_senasa_spark, "5 senasa.xlsx")
    guardar_df_spark_excel(df_veritrade_spark_eta5, "5 veritrade.xlsx")
    # detener_script()

---

# <span style="color: green">Etapa 6</span>

## <span style="color: green">Etapa 6 -- Estandarizacion de descripcion</span>

In [158]:
# Crear una columna vacía llamada "NUEVA_COLUMNA"
df_veritrade_spark_eta5 = df_veritrade_spark_eta5.withColumn("TIPO_PRODUCTO", lit(None))

In [159]:
# df_veritrade_spark_eta4.show(1)

In [160]:
def verificacion_alimento_en_des(texto):
    texto = str(texto)
    if (
        texto.startswith("ALIMENTO")
        or texto.startswith("ALMNTO.")
        or "ALIMENTO" in texto
    ):
        return True
    return False


def indicar_alimento(descripcion_comercial):
    if verificacion_alimento_en_des(descripcion_comercial):
        return "Alimento"


indicar_alimento_udf = udf(indicar_alimento, StringType())

df_veritrade_spark_eta5 = df_veritrade_spark_eta5.withColumn(
    "TIPO_PRODUCTO",
    F.when(
        (col("FK_SENASA_ID").isNull() | (col("FK_SENASA_ID") == "")) & (col("TIPO_PRODUCTO").isNull() | (col("TIPO_PRODUCTO") == "")),
        indicar_alimento_udf(col("DES_COM")),
    ).otherwise(col("TIPO_PRODUCTO")),
)

In [161]:
def verificacion_agricola_en_des(texto):
    texto = str(texto)
    if (
        texto.startswith("AGRICOLA")
        or texto.startswith("AGRÍCOLA")
        or "AGRICULTURA" in texto
    ):
        return True
    return False


def indicar_agricola(descripcion_comercial):
    if verificacion_alimento_en_des(descripcion_comercial):
        return "Agrícola"


indicar_agricola_udf = udf(indicar_agricola, StringType())

df_veritrade_spark_eta5 = df_veritrade_spark_eta5.withColumn(
    "TIPO_PRODUCTO",
    F.when(
        (col("FK_SENASA_ID").isNull() | (col("FK_SENASA_ID") == "")) & (col("TIPO_PRODUCTO").isNull() | (col("TIPO_PRODUCTO") == "")),
        indicar_agricola_udf(col("DES_COM")),
    ).otherwise(col("TIPO_PRODUCTO")),
)

In [162]:
def verificacion_otros_en_des(texto):
    texto = str(texto)
    if (
        "VITAMNAS EN CAJAS" in texto
        or "VITAMINAS VARIOS" in texto
        or "VITAMINAS EN CAJA" in texto
        or "VITAMINAS  EN CAJA" in texto
        or "VITAMINAS VARIADOS" in texto
        or "BLISTER DE VITAMINAS" in texto
        or "VITAMINAS NATURALES" in texto
        or "FRASCOS DE VITAMINA" in texto
    ):
        return True
    return False


def indicar_otros(descripcion_comercial):
    if verificacion_otros_en_des(descripcion_comercial):
        return "Otros"


indicar_otros_udf = udf(indicar_otros, StringType())

df_veritrade_spark_eta5 = df_veritrade_spark_eta5.withColumn(
    "TIPO_PRODUCTO",
    F.when(
        (col("FK_SENASA_ID").isNull() | (col("FK_SENASA_ID") == "")) & (col("TIPO_PRODUCTO").isNull() | (col("TIPO_PRODUCTO") == "")),
        indicar_otros_udf(col("DES_COM")),
    ).otherwise(col("TIPO_PRODUCTO")),
)

In [163]:
df_veritrade_spark_eta6 = df_veritrade_spark_eta5

In [164]:
# df_veritrade_spark_ite6.explain()

In [165]:
# num_particiones_veritrade = df_veritrade_spark_ite6.rdd.getNumPartitions()
# print(f"Número de particiones en df_veritrade_spark: {num_particiones_veritrade}")

---

## <span style="color: green">Etapa 6 -- Guardar Excel</span>

In [166]:
if guardar_excel == True or True:
    guardar_df_spark_excel(df_senasa_spark, "6 senasa.xlsx")
    guardar_df_spark_excel(df_veritrade_spark_eta6, "6 veritrade.xlsx")
    detener_script()

SystemExit: Detener script

---

# <span style="color: green">Etapa 7</span>

In [76]:
# df_veritrade_spark_ite4.write.csv("output.csv")

In [77]:
# df_veritrade_spark_ite4.explain()

In [78]:
# num_particiones_veritrade = df_veritrade_spark_ite4.rdd.getNumPartitions()
# print(f"Número de particiones en df_veritrade_spark: {num_particiones_veritrade}")

In [79]:
from pyspark.sql.functions import broadcast

df_veritrade_spark_eta7 = df_veritrade_spark_eta6.join(
    broadcast(df_senasa_spark),
    df_veritrade_spark_eta6.FK_SENASA_ID == df_senasa_spark.ID,
    "left",
)

In [80]:
# df_veritrade_spark_ite5.explain()

In [81]:
# num_particiones_veritrade = df_veritrade_spark_ite5.rdd.getNumPartitions()
# print(f"Número de particiones en df_veritrade_spark: {num_particiones_veritrade}")

df_senasa_spark.createOrReplaceTempView("senasa")
senasa = spark.sql("SELECT * FROM senasa")

df_veritrade_spark_ite4.createOrReplaceTempView("veritrade")
veritrade = spark.sql("SELECT * FROM veritrade")

query="""
SELECT ver.*,sen.DES_PROD,sen.TIPO_PRODUCTO,sen.DETALLE_VAR1 AS COMPOSICION,sen.DETALLE_VAR2 AS CLASIFICACION,sen.DETALLE_VAR3 AS FORMA
FROM veritrade ver
LEFT JOIN senasa sen
ON ver.FK_SENASA_ID=sen.ID
"""

df_veritrade_spark_ite5 = spark.sql(query)
df_veritrade_spark_ite5=df_veritrade_spark_ite5.coalesce(4)

---

## <span style="color: green">Etapa 7 -- Guardar Excel</span>

In [73]:
if guardar_excel == True:
    guardar_df_spark_excel(df_senasa_spark, "10 senasa.xlsx")
    guardar_df_spark_excel(df_veritrade_spark_eta7, "10 veritrade.xlsx")
    # detener_script()

---

In [45]:
import re
import ast

# df_veritrade=pd.read_excel("VERITRADE_pruebas_01102024.xlsx")
# df_senasa=pd.read_excel("Diccionario-Senasa_2024-09-28_CC.xlsx",sheet_name="2024-09-28")

# Crear un diccionario de tipos de presentación
presentaciones = {
    "AMPOLLA": ["AMPOLLA "],
    "BALDE": ["BALDE "],
    "BANDEJA": ["BANDEJA "],
    "BARRIL": ["BARRIL "],
    "BIDON": ["BIDON ", "BIDONES ", "BIDON(ES) "],
    "BIG BAG": ["BIG BAG "],
    "BLISTER": ["BLISTER "],
    "BLOQUE": ["BLOQUE ", "BLOQUES ", "BLOQUE(S) "],
    "BOLSA": ["BOLSA ", "BOLSAS "],
    "BOTELLA": [
        "BOTELLA ",
        "BOTELLAS ",
        "BOTELLA(S) ",
        "BOTTLE ",
        "BOTTLES ",
        "BOTTLE(S) ",
        " BOT ",
    ],
    "CAJA": ["CAJA ", "CAJAS ", "CAJA(S) "],
    "CILINDRO": ["CILINDRO ", "CILINDROS ", "CILINDRO(S) "],
    "COJIN": ["COJIN "],
    "COLLAR": ["COLLAR "],
    "CONTENEDOR": ["CONTENEDOR "],
    "DOY": ["DOY PACK "],
    "ENVASE": ["ENVASE ", "ENVASES ", "ENVASE(S) "],
    "ESTUCHE": ["ESTUCHE "],
    "FLEXITANK": ["FLEXITANK "],
    "FRASCO": ["FRASCO ", "FRASCOS ", "FRASCO(S) ", "FCO. "],
    "GALON": ["GALON ", "GALONES ", "GALON(ES) ", "GALONERA ", "GALONERA(S) "],
    "GARRAFA": ["GARRAFA "],
    "GRANEL": ["GRANEL "],
    "ISOTANQUE": ["INSOTANQUE ", "ISOTANQUES "],
    "JERINGA": ["JERINGA ", "JERINGAS ", "JERINGA(S) "],
    "LATA": ["LATA ", "LATAS "],
    "PIPETA": ["SPOT ON ", "RIVOLTA ", "PIPETA ", "PIPETAS ", "PIPETA(S) "],
    "POMO": ["POMO ", "POMOS ", "POMO(S) "],
    "POTE": ["POTE ", "POTES ", "POTE(S) "],
    "POUCH": ["POUCH "],
    "SACHET": ["SACHET "],
    "SACO": ["SACO ", "SACOS ", "SACO(S) "],
    "SOBRE": ["SOBRE ", "SOBRES ", "SOBRE(S) "],
    "TAMBOR": ["TAMBOR ", "TAMBORES ", "TAMBOR(ES) ", "DRUM ", "DRUMS ", "DRUM(S) "],
    "TANQUE": ["TANQUE "],
    "TARRO": ["TARRO "],
    "TUBO": ["TUBO ", "TUBOS ", "TUBO(S) "],
    "VIAL": ["VIAL "],
}

unidades_dict = {
    "KG": ["KG", "KGS", "KILOGRAMOS", "KILOS"],
    "GR": ["GR", "GRAMOS", "G", "GRMS", "GRS"],
    "LT": ["LT", "LTR", "LTS", "LTRS", "LTROS", "LITROS", "LITRO", "L"],
    "ML": ["ML"],
    #'UND':['UND'], #YA SABES QUE ES UNA UNIDAD PERO COMO SE SI ES TABLETA O COMPRIMIDO
    #'TAB':['TAB','TABS','TABLETA','TABLETAS','PASTILLAS','COMP']
}

sinonimos_a_clave = {}
for clave, sinonimos in unidades_dict.items():
    for sinonimo in sinonimos:
        sinonimos_a_clave[sinonimo] = clave

# pattern = r'(?<!\S)(\d+(?:[.,]\d+)?)\s*(' + '|'.join(re.escape(unit.strip()) for units in unidades_dict.values() for unit in units) + r')\b'
pattern = (
    r"(?<!\S)(\d+(?:[.,]\d+)?)\s*("
    + "|".join(re.escape(unit) for unit in sinonimos_a_clave.keys())
    + r")\b"
)
pattern_solo_valor = r"(?<!\S)(\d+(?:[.,]\d+)?)\s*\b"

senasa_data = df_senasa_spark.select("ID", "DETALLE_VAR4", "DETALLE_VAR3").collect()
senasa_dict = {row.ID: (row.DETALLE_VAR4, row.DETALLE_VAR3) for row in senasa_data}


def primeras_presentaciones_udf(id_senasa):
    if id_senasa is None or id_senasa not in senasa_dict:
        return

    presentacion, forma = senasa_dict[id_senasa]

    if isinstance(presentacion, str):
        try:
            my_pre_list = ast.literal_eval(presentacion)
        except (ValueError, SyntaxError):
            return

        if len(my_pre_list) == 1:
            presentacion = my_pre_list[0].upper()
            coincidencias = re.findall(pattern, presentacion)
            valor = re.findall(pattern_solo_valor, presentacion)

            if coincidencias:
                resultados = [
                    f"{cantidad.replace(',', '.')} {sinonimos_a_clave[unidad.strip()]}"
                    for cantidad, unidad in coincidencias
                ]
                for k in presentaciones.keys():
                    if k in my_pre_list[0]:
                        return f"{k} X {resultados[0]}"
                    if "Und" in my_pre_list[0] and (
                        forma in ["COMPRIMIDOS", "TABLETAS"]
                    ):
                        return f"{k} X {valor[0]} COMP"
            return
    return


# Registrar la UDF
primeras_presentaciones_udf = udf(primeras_presentaciones_udf, StringType())

# Aplicar la UDF al DataFrame de veritrade
df_veritrade_spark_ite7 = df_veritrade_spark_ite6.withColumn(
    "PRESENTACION", primeras_presentaciones_udf(col("FK_SENASA_ID"))
)

In [46]:
from pyspark.sql.types import ArrayType

unidades_dict2 = {
    "KG": ["KG", "KGS", "KILOGRAMOS", "KILOS"],
    "GR": ["GR", "GRAMOS", "G", "GRMS", "GRS"],
    "LT": ["LT", "LTR", "LTS", "LTRS", "LTROS", "LITROS", "LITRO", "L"],
    "ML": ["ML"],
    "UND": ["UND", "UNIDADES", "UNID."],
}

unidad_map = {unit: std for std, units in unidades_dict2.items() for unit in units}


def encontrar_unidad_medida(texto):
    texto = texto.upper()
    texto = texto.replace("X", " ")
    texto = texto.replace("/", " ")
    pattern = (
        r"(?<!\S)(\d+(?:[.,]\d+)?)\s*("
        + "|".join(
            re.escape(unit.strip())
            for units in unidades_dict2.values()
            for unit in units
        )
        + r")\b"
    )
    coincidencias = re.findall(pattern, texto)
    resultados = [
        f"{cantidad.replace(',', '.')} {unidad_map[unidad.strip()]}"
        for cantidad, unidad in coincidencias
        if unidad.strip() in unidad_map
    ]
    return resultados


encontrar_unidad_udf = udf(encontrar_unidad_medida, ArrayType(StringType()))

# Aplicar la UDF y crear una nueva columna
df_veritrade_spark_ite8 = df_veritrade_spark_ite7.withColumn(
    "UNIDAD", encontrar_unidad_udf(col("Descripcion Comercial"))
)

In [47]:
df_veritrade_spark_ite9 = df_veritrade_spark_ite8.withColumn(
    "PRESENTACION",
    F.when(
        col("PRESENTACION").isNull(), indicar_agricola_udf(col("DES_COM"))
    ).otherwise(col("PRESENTACION")),
)

presentaciones_vacias = df_veritrade[df_veritrade['PRESENTACION'] == '']['FK_SENASA_ID'].index.tolist()

sinonimos_a_clave_presentacion = {}
for clave, sinonimos in presentaciones.items():
    for sinonimo in sinonimos:
        sinonimos_a_clave_presentacion[sinonimo.strip()] = clave.strip()

sinonimos_a_clave_extra = {}
for clave, sinonimos in unidades_dict2.items():
    for sinonimo in sinonimos:
        sinonimos_a_clave_extra[sinonimo] = clave

pattern = r'(?<!\S)(\d+(?:[.,]\d+)?)\s*(' + '|'.join(re.escape(unit) for unit in sinonimos_a_clave_extra.keys()) + r')\b'
pattern_solo_valor = r'(?<!\S)(\d+(?:[.,]\d+)?)(?=\s*[a-zA-Z])'

def obtener_presentacion(cadena):
    cadena = cadena.strip()  # Eliminar espacios innecesarios
    return sinonimos_a_clave_presentacion.get(cadena, cadena)  # Devuelve la clave o la misma cadena si no se encuentra

def encontrar_presentaciones(texto, presentaciones):
    texto = texto.upper()
    encontrados = []

    for clave, sinonimos in presentaciones.items():
        for sinonimo in sinonimos:
            if sinonimo.strip() in texto:
                encontrados.append(clave)
                break

    return list(set(encontrados))


for idx in presentaciones_vacias:
    id_senasa = df_veritrade.loc[idx, "FK_SENASA_ID"]
    forma = df_senasa.loc[df_senasa["ID"] == id_senasa, "DETALLE_VAR4"].values

    valor_unidad_medida=[]

    if forma.size > 0:  # Asegúrate de que hay al menos un elemento
        forma_str = forma[0]  # Obtén el primer elemento
        if isinstance(forma_str, str):
            try:
                forma_list = ast.literal_eval(forma_str)
                my_set=set({})
                my_set_unidad=set({})
                for i in forma_list:
                    valor=i.split(" ")
                    if len(valor)>=3:
                        my_set.add(valor[0])
                        my_set_unidad.add(valor[-1])
                if len(my_set)==1:
                    my_pres=list(my_set)
                    my_pres=my_pres[0]  
                    forma_str=forma_str.upper()
                    coincidencias_valor_unidad_medida = re.findall(pattern, forma_str)
                    resultados = [f"{cantidad.replace(',', '.')} {sinonimos_a_clave_extra[unidad.strip()]}" for cantidad, unidad in coincidencias_valor_unidad_medida]    #TIPO LISTA
                    valor_unidad_medida_veritrade = df_veritrade.loc[idx, "UNIDAD"]
                    encontrado = False
                    if len(valor_unidad_medida_veritrade)==0:
                        pass
                    else:
                        #SOLO FUNCIONA PARA LOS CASOS QUE NO SEAN TAB NI COMP
                        for result in resultados:
                            for result2 in valor_unidad_medida_veritrade:
                                if result==result2:
                                    my_pres=obtener_presentacion(my_pres)
                                    cad_pres=my_pres+" X "+result2
                                    df_veritrade.loc[idx, "PRESENTACION"]=cad_pres
                                    encontrado=True
                                    break
                                else:
                                    pass
                            if encontrado:
                                break
                    
                    if (df_veritrade.loc[idx, "FORMA_FARMACEUTICA"]=="TABLETAS" or df_veritrade.loc[idx, "FORMA_FARMACEUTICA"]=="TABLETAS INTRAUTERINAS") and encontrado==False:
                        valor=re.findall(pattern_solo_valor, df_veritrade.loc[idx,"Descripcion Comercial"])
                        valor = [numero.replace(',', '.') for numero in valor]
                        for i in range(len(valor)):
                            valor[i]=valor[i]+" UND"
                        encontrado_tableta=False
                        for veri in valor:
                            for sena in resultados:
                                if veri==sena:
                                    my_pres=obtener_presentacion(my_pres)
                                    cad_pres=my_pres+" X "+sena
                                    cad_pres=cad_pres.replace("UND","TAB")
                                    df_veritrade.loc[idx, "PRESENTACION"]=cad_pres
                                    encontrado_tableta=True
                                    break
                            if encontrado_tableta:  # Si se encontró una coincidencia, salir del bucle externo
                                break
                        
                    if (df_veritrade.loc[idx, "FORMA_FARMACEUTICA"]=="COMPRIMIDOS" or df_veritrade.loc[idx, "FORMA_FARMACEUTICA"]=="COMPRIMIDOS MASTICABLES") and encontrado==False:
                        valor=re.findall(pattern_solo_valor, df_veritrade.loc[idx,"Descripcion Comercial"])
                        valor = [numero.replace(',', '.') for numero in valor]
                        for i in range(len(valor)):
                            valor[i]=valor[i]+" UND"
                        
                        encontrado_comprimido=False
                        for veri in valor:
                            for sena in resultados:
                                if veri==sena:
                                    my_pres=obtener_presentacion(my_pres)
                                    cad_pres=my_pres+" X "+sena
                                    cad_pres=cad_pres.replace("UND","COMP")
                                    df_veritrade.loc[idx, "PRESENTACION"]=cad_pres
                                    encontrado_comprimido=True
                                    #print(idx)
                                    break
                            
                            if encontrado_comprimido:  # Si se encontró una coincidencia, salir del bucle externo
                                break  
                else:
                    forma_str=forma_str.upper()
                    #print("tiene mas de una presentacion")
                    resultado_veritrade=encontrar_presentaciones(df_veritrade.loc[idx,"Descripcion Comercial"], presentaciones) #SI
                    resultado_senasa=encontrar_presentaciones(forma_str, presentaciones)                                        #SI
                    if len(resultado_veritrade)==0:
                        pass
                    else:
                        salir = False

                        for veri in resultado_veritrade:
                            if salir:  # Verifica si se debe salir
                                break
                            for sena in resultado_senasa:
                                if veri == sena:
                                    valor = re.findall(pattern_solo_valor, df_veritrade.loc[idx, "Descripcion Comercial"])
                                    valor = [numero.replace(',', '.') for numero in valor]
                                    for li in forma_list:
                                        for val in valor:
                                            if (li.startswith(sena)) and (val in li):
                                                partes = li.split(" ")
                                                parte_um = partes[-1].upper()

                                                if parte_um=="UND":
                                                    if (df_veritrade.loc[idx, "FORMA_FARMACEUTICA"]=="TABLETAS" or df_veritrade.loc[idx, "FORMA_FARMACEUTICA"]=="TABLETAS INTRAUTERINAS"):
                                                        cadena = f"{sena} X {val} TAB"
                                                        df_veritrade.loc[idx, "PRESENTACION"] = cadena
                                                        salir = True
                                                        break
                                                    elif (df_veritrade.loc[idx, "FORMA_FARMACEUTICA"]=="COMPRIMIDOS" or df_veritrade.loc[idx, "FORMA_FARMACEUTICA"]=="COMPRIMIDOS MASTICABLES"):
                                                        cadena = f"{sena} X {val} COMP"
                                                        df_veritrade.loc[idx, "PRESENTACION"] = cadena
                                                        salir = True
                                                        break
                                                else:
                                                    cadena = f"{sena} X {val} {parte_um}"
                                                    df_veritrade.loc[idx, "PRESENTACION"] = cadena
                                                    salir = True 
                                                    break
                                        if salir:  # Verifica si se debe salir después del bucle de valor
                                            break
                                if salir:  # Verifica nuevamente después del bucle de sena
                                    break

                                    #STARTS WITH Y CONTAINS

            except (ValueError, SyntaxError):
                print("Error al evaluar la cadena.")

In [None]:
from pyspark import StorageLevel

df_veritrade_spark_ite8.persist(StorageLevel.MEMORY_AND_DISK)

df = df_veritrade_spark_ite8.toPandas()

df.to_excel("output_geriax.xlsx", index=False)

In [None]:
df.head(1)

from pyspark import StorageLevel

df_veritrade_spark_ite6.persist(StorageLevel.MEMORY_AND_DISK)

df_veritrade_spark_ite6.write.csv("output.csv", mode='overwrite')

#PARA LOS QUE NO ENCUENTRA EN SENASA
indices_no_encontrado = df_veritrade[df_veritrade["FK_SENASA_ID"] == ""].index.tolist()

#SE AMARRA UN NO ENCONTRADO CON ALIMENTO
def verificar_empieza_alimento(texto):
    if texto.startswith("ALIMENTO"):
        return "Alimento"
    elif texto.startswith("ALMNTO."):
        return "Alimento"
    else:
        return ""
    
def verificar_contiene_alimento(texto):
    return "ALIMENTO" in texto

for idx in indices_no_encontrado:
    desc_comer_raw = df_veritrade.loc[idx, "Descripcion Comercial"].strip().upper()
    if verificar_empieza_alimento(desc_comer_raw):
        df_veritrade.loc[idx, "TIPO_PRODUCTO"] = "Alimento"
        continue

    if verificar_contiene_alimento(desc_comer_raw):
        df_veritrade.loc[idx, "TIPO_PRODUCTO"] = "Alimento"

#PARA LOS QUE NO ENCUENTRA EN SENASA
indices_no_encontrado = df_veritrade[df_veritrade["FK_SENASA_ID"] == ""].index.tolist()
    
def verificar_contiene_agricola(texto):
    if "AGRICOLA" in texto:
        return True
    elif "AGRÍCOLA" in texto:
        return True
    elif "AGRICULTURA" in texto:
        return True
    else:
        return False

for idx in indices_no_encontrado:
    desc_comer_raw = df_veritrade.loc[idx, "Descripcion Comercial"].strip().upper()  # Normalizar la descripción
    filtro_avanzado=df_veritrade.loc[idx, "TIPO_PRODUCTO"]
    if verificar_contiene_agricola(desc_comer_raw) and filtro_avanzado=="":
        df_veritrade.loc[idx, "TIPO_PRODUCTO"] = "Agrícola"

#df_veritrade.to_excel("VERITRADE_pruebas_01102024.xlsx")

df_veritrade_spark_ite4=df_veritrade_spark_ite3
df_veritrade_spark_ite4=df_veritrade_spark_ite4.coalesce(4)
df_veritrade_spark_ite4.cache()

In [50]:
# num_particiones_veritrade = df_veritrade_spark_ite3.rdd.getNumPartitions()
# print(f"Número de particiones en df_veritrade_spark: {num_particiones_veritrade}")

In [51]:
# df_veritrade_spark_ite3.explain()

In [52]:
# df_output = df_veritrade_spark_ite6.toPandas()

# df_output.to_excel("output_final.xlsx", index=False)

In [53]:
# df_veritrade_spark_ite6=df_veritrade_spark_ite6.repartition(100)

In [54]:
# df_veritrade_spark_ite6.write.csv("hdfs://localhost:9000/alexander/output.csv", header=True)

In [55]:
# print(df_veritrade_spark_ite6.count())
# print(df_veritrade_spark_ite6.printSchema())

In [56]:
# df_veritrade_spark_ite6.write.csv("output.csv", header=True, mode="overwrite")
# df_veritrade_spark_ite6.coalesce(1).write.csv("hdfs://localhost:9000/alexander/output.csv", header=True)

In [57]:
spark.stop()