### Installing Dependencies

In [1]:
%pip install -r requirements.txt

Note: you may need to restart the kernel to use updated packages.


You should consider upgrading via the 'c:\Users\renato.valentim\AppData\Local\Programs\Python\Python39\python.exe -m pip install --upgrade pip' command.


### Imports and Paths

In [2]:
from utils import *

In [3]:
CATALOG_PATH = f"{MAIN_PATH}/base_catalogo_mobly_enriquecida_12032025.csv"

### Loading Query Data

In [4]:
df_catalog = (
  spark.read
  .option("header", True)
  .option("inferSchema", True)
  .option("multiline", True)
  .option("quote", '"')
  .option("escape", "\\")
  .option("sep", ";")
  .option("ignoreLeadingWhiteSpace", True)
  .option("ignoreTrailingWhiteSpace", True)
  .csv(CATALOG_PATH)

  # Transformações importantes das columns
  .withColumnRenamed('Categoria', 'categoria')
  .withColumnRenamed('Marca', 'marca')
  .withColumnRenamed('Cor', 'cor')
  .withColumnRenamed('Preço', 'preco_medio')
  .withColumn("preco_medio", f.regexp_replace(f.col("preco_medio"), "R\\$ ", ""))
  .withColumn("preco_medio", f.regexp_replace(f.col("preco_medio"), "\\.", ""))
  .withColumn("preco_medio", f.regexp_replace(f.col("preco_medio"), ",", "."))
  .withColumn('preco_medio', f.col('preco_medio').cast('double'))
  .withColumnRenamed('Nome', 'desc_product')
  .withColumnRenamed('SKU', 'sku')
  .withColumn('sku', f.col('sku').cast(StringType()))
  .withColumnRenamed('ATRIBUTO 3', 'atributo_1')
  .withColumnRenamed('ATRIBUTO 4', 'atributo_2')
  .withColumnRenamed('ATRIBUTO 5', 'atributo_3')
  .withColumnRenamed('ATRIBUTO 6', 'atributo_4')
  .withColumnRenamed('ATRIBUTO 7', 'atributo_5')
  .withColumnRenamed('ATRIBUTO 8', 'atributo_6')
)

In [5]:
df = remove_accents_df(
    df=df_catalog, 
    columns=[
        "desc_product", "categoria", "marca", "cor", 
        "atributo_1", "atributo_2", "atributo_3", 
        "atributo_4", "atributo_5", "atributo_6"]
    )
df = convert_string_columns_to_upper(df)

In [6]:
columns = ["categoria", "marca", "cor"]

# Iterar por todas as colunas no DataFrame
for column in columns:
        # Aplicar a função para cada column
        df = clean_text_column(df, column)

### Performing Elastic Search

In [7]:
df_rows = df.collect()

In [8]:
NUM_RESULTS_PER_QUERY = 10
final_results = perform_elastic_search_attributes(df_rows, num_results_per_query=NUM_RESULTS_PER_QUERY)

Searching: 100%|██████████| 142/142 [00:08<00:00, 17.13it/s]


### Saving Results in .CSV

In [9]:
timestamp = datetime.now().strftime("%Y%m%d_%H%M")
output_path = f"outputs/attributes_{NUM_RESULTS_PER_QUERY}_most_similar_descriptions_{timestamp}_.csv"

In [10]:
column_names = [
    'sku', 'catalog_desc', 'id_product', 'similar_desc', 'elastic_score', 'normalized_score', 'preco_medio', 
    'categoria', 'marca', 'cor', 'atributo_1', 'atributo_2', 'atributo_3', 'atributo_4', 'atributo_5', 'atributo_6'
]

save_csv(final_results, output_path, column_names)

### Loading Final Results

In [45]:
df_result = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .option("multiline", True)
    .option("quote", '"')
    .option("escape", "\\")
    .option("sep", ",")
    .option("ignoreLeadingWhiteSpace", True)
    .option("ignoreTrailingWhiteSpace", True)
    .csv(output_path)
)

In [None]:
df_delivery = (
    df_result.alias('df_result').join(df_catalog.alias('df_catalog'), on="sku", how="inner")
    .select(
        f.col("df_result.sku").alias("catalog_sku"), "df_result.id_product", 
        "df_result.catalog_desc", "df_result.similar_desc",
        "df_result.similar_desc", "df_result.elastic_score",
        f.col("df_catalog.preco_medio").alias("catalog_preco_medio"), "df_result.preco_medio", 
        f.col("df_catalog.categoria").alias("catalog_categoria"), "df_result.categoria",
        f.col("df_catalog.marca").alias("catalog_marca"), "df_result.marca",
        f.col("df_catalog.cor").alias("catalog_cor"), "df_result.cor",
        f.col("df_catalog.atributo_1").alias("catalog_atributo_1"), "df_result.atributo_1",
        f.col("df_catalog.atributo_2").alias("catalog_atributo_2"), "df_result.atributo_2",
        f.col("df_catalog.atributo_3").alias("catalog_atributo_3"), "df_result.atributo_3",
        f.col("df_catalog.atributo_4").alias("catalog_atributo_4"), "df_result.atributo_4",
        f.col("df_catalog.atributo_5").alias("catalog_atributo_5"), "df_result.atributo_5",
        f.col("df_catalog.atributo_6").alias("catalog_atributo_6"), "df_result.atributo_6"
    )
    .dropDuplicates(["catalog_sku", "id_product"])
)

df_delivery.show()

+-------------+----------+--------------------+--------------------+-------------------+-----------+-----------------+-------------+--------------+---------------+-------------+-------------+------------------+----------+------------------+-------------+------------------+-------------+--------------------+-------------+------------------+-------------+------------------+----------+
|  catalog_sku|id_product|        catalog_desc|        similar_desc|catalog_preco_medio|preco_medio|catalog_categoria|    categoria| catalog_marca|          marca|  catalog_cor|          cor|catalog_atributo_1|atributo_1|catalog_atributo_2|   atributo_2|catalog_atributo_3|   atributo_3|  catalog_atributo_4|   atributo_4|catalog_atributo_5|   atributo_5|catalog_atributo_6|atributo_6|
+-------------+----------+--------------------+--------------------+-------------------+-----------+-----------------+-------------+--------------+---------------+-------------+-------------+------------------+----------+-------

In [47]:
print('Número de resultados retornado:', f"{df_delivery.count():,}")

Número de resultados retornado: 1,333


In [None]:

timestamp = datetime.now().strftime("%Y%m%d_%H%M")
refined_output_path = f"{ELASTIC_PATH}/outputs/refined_outputs/{timestamp}_elastic_search_attributes.csv"

In [49]:
# Converter para Pandas
df_pd = df_delivery.toPandas()

# Salvar como CSV
df_pd.to_csv(refined_output_path, index=False)
