### Categorização dos produtos 3p em categorias 1p usando Regressão Logística

Treina-se o modelo com dados 1p usando a descrição como entrada e as categorias martins como saída (label). O modelo treinado é aplicado para, a partir da descrição do produto 3p, predizer a categoria 1p correspondente.

In [0]:
import pandas as pd
from unicodedata import normalize
from pyspark.sql import functions as F, Window, DataFrame
from pyspark.sql.types import *
from typing import List, Union
import nltk
nltk.download("popular")
import re
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

In [0]:
sdf_seller_product = (
    spark.read
    .option("mergeSchema", True)
    .option("encoding", "ISO-8859-1")
    .csv('/mnt/analyticsquadmkt/data-analytics-projects/3P/INPUT/cadastro_produtos_sellers.csv', sep=";", header=True)
)

sdf_feature_product_comp = (
  spark.read
  .option("mergeSchema", True)
  .parquet("/mnt/advisor-hml/data/01_raw/MIX_ATIVOS_MARTINS/MIX_ATIVOS_MARTINS.parquet")
)

#Produtos mais vendidos em termos de clientes distintos após remoção de similar (período de jan 2022 a jan 2023), com rótulos definidos manualmente
sdf_labels = (
    spark.read
    .option("mergeSchema", True)
    .option("encoding", "UTF-8")
    .csv(f'/mnt/advisor-hml/data/07_model_output/3p_categorization/3p_categorization_labels/labeled_data.csv', sep=";", header=True)
).select('SKU', 'Rank', 'MartinsCategory')

In [0]:
sdf_feature_product_selected = sdf_feature_product_comp.select(F.col('CODPRD').alias('SKU'), F.col('DESCMCMER').alias('ProductDescription'), F.col('DESCTGPRD').alias('ProductCategory'))

#Limpeza dos dados
sdf_product_filtered = sdf_feature_product_selected.filter(F.col("ProductDescription").isNotNull())
sdf_product_filtered = sdf_product_filtered.dropDuplicates(subset=['SKU'])
sdf_product_filtered = sdf_product_filtered.where((F.col('ProductCategory') != 'NÃO INFORMADO')
                                                  & (F.col('ProductCategory') != 'MARTINS DIRETO')
                                                  & (F.col('ProductCategory') != 'KITS SMART')
                                                  & (F.col('ProductCategory') != 'MATERIAL PROMOCIONAL GENERICO')
                                                  & (F.col('ProductCategory') != 'MATERIAL PROMOCIONAL ALIMENTOS/BEBIDAS')
                                                  & (F.col('ProductCategory') != 'MATERIAL PROMOCIONAL BAZAR/PAPELARIA/EQ')
                                                  & (F.col('ProductCategory') != 'MATERIAL PROMOCIONAL BELEZA')
                                                  & (F.col('ProductCategory') != 'MATERIAL PROMOCIONAL ELETRO')
                                                  & (F.col('ProductCategory') != 'MATERIAL PROMOCIONAL GENERICO')
                                                  & (F.col('ProductCategory') != 'MATERIAL PROMOCIONAL HIGIENE')
                                                  & (F.col('ProductCategory') != 'MATERIAL PROMOCIONAL INFORMATICA/TELECOM')
                                                  & (F.col('ProductCategory') != 'MATERIAL PROMOCIONAL MARTCON/AGROVET')
                                                 )

In [0]:
def remove_accents(sdf: DataFrame, columns: List[str]) -> DataFrame:
    '''Remove acentos das descrições de produtos'''
    
    func_accent = udf(lambda x: normalize('NFKD', x).encode('ASCII', 'ignore').decode('ASCII'), StringType())
    for columns in columns:
        sdf = sdf.withColumn(columns, func_accent(columns))
    
    return sdf  
   
sdf_seller_product = remove_accents(sdf_seller_product, ['DESPRD'])

#Seleciona as colunas de interesse
sdf_seller_product_selected = sdf_seller_product.select(F.col('CODMERSRR').alias('SKU'), F.col('DESPRD').alias('ProductDescription'), F.col('DESCTGPRD').alias('OriginalCategory'))

#Limpeza dos dados
sdf_seller_refined = sdf_seller_product_selected.filter(F.col("ProductDescription").isNotNull())
sdf_seller_refined= sdf_seller_refined.dropDuplicates(subset=['SKU'])

In [0]:
ps = nltk.PorterStemmer()

stopwords_pt = nltk.corpus.stopwords.words('portuguese')
stopwords_en = nltk.corpus.stopwords.words('english')
add_stopwords = ["leve","pague", "(cp)", "[()]", "pq", "desconto", "kg", "pg","lv", "mais","menos","por", "efacil","unidades","unidade","tamanho", "un",  "pequeno", "grande", "extra",  "gde","caixa", "refil", "embalagem", "pedacos", "premium", "new", "mini", "uso", "ate", "multi",  "ultra", "liquido" ,"original", "sabores", "cores", "min", "max", "claro","novo","dark", "brasil", "colorida", "colorido",]
colors =  ["vermelho","vermelha", "laranja", "amarelo","amarela", "verde", "azul", "anil", "indigo", "violeta", "prata",  "preto","branco","branca","preta","cinza","roxo","roxa","rosa","roso","bege","marrom", "red", "black", "blue", "yellow", "white", "green", "purple", "pink", "gold","dourado","dourada"]
stopwords = stopwords_pt + stopwords_en + add_stopwords + colors

df_seller_refined = sdf_seller_refined.toPandas()
df_product_filtered = sdf_product_filtered.toPandas()

def prepare_text(text):
    ''' Remoção de catacteres especiais, números e stopwords, tokenização e stemização'''
    
    text = re.sub(r"[,.;@#?!&$/]+\ *", " ", text) #substituir pontuação por espaço
    text = re.sub("\S*\d\S*", "", text).strip() # remover palavras que contêm números e espaços extras
    text = " ".join([word for word in text.split() if len(word) > 1]) # remover palavras com apenas 1 caractere
    tokens = re.split('\W+', text) # tokenizar
    text = [word for word in tokens if word not in stopwords] #remover stop words e cores
    text1 = []
    for word in text:
      if(word != " " and word != ""  ):
        text1.append(word)
    stemmed=[ps.stem(word) for word in text1] # reduzir palavras por stemização
    return stemmed
  
df_product_filtered['ProductDescriptionToken'] = df_product_filtered['ProductDescription'].apply(lambda x: prepare_text(x.lower()))
df_seller_refined['ProductDescriptionToken'] = df_seller_refined['ProductDescription'].apply(lambda x: prepare_text(x.lower()))

sdf_product_filtered_tk = spark.createDataFrame(df_product_filtered)
sdf_seller_refined_tk = spark.createDataFrame(df_seller_refined)

In [0]:
def vectorization (sdf_product_filtered_tk: DataFrame, sdf_seller_refined_tk: DataFrame, numFeatures: int, minDocFreq: int) -> DataFrame:
  '''Vetorização TFIDF  e criação da matrix esparsa '''

  hashtf = HashingTF(numFeatures=numFeatures, inputCol="ProductDescriptionToken", outputCol='tf')
  idf = IDF(inputCol='tf', outputCol="features", minDocFreq=minDocFreq) #minDocFreq: remove sparse terms
  label_stringIdx = StringIndexer(inputCol = "ProductCategory", outputCol = "label")
  pipeline = Pipeline(stages=[hashtf, idf, label_stringIdx])

  pipelineFit = pipeline.fit(sdf_product_filtered_tk)
  train_df = pipelineFit.transform(sdf_product_filtered_tk) #produtos 1p
  test_df = pipelineFit.transform(sdf_seller_refined_tk) #produtos 3p
  
  return train_df,test_df

train_df, test_df = vectorization (sdf_product_filtered_tk, sdf_seller_refined_tk, 2**16, 2)

In [0]:
lr = LogisticRegression(maxIter=50,regParam=0.01, elasticNetParam=0) #L2 regularization(elasticNetParam=0)
lrModel = lr.fit(train_df)
predictions = lrModel.transform(test_df)

In [0]:
@udf(returnType=DoubleType())
def extract_prob(vector):
    max_val = 0
    try:
        for prob in vector:
            if(prob > max_val):
                max_val = prob
            
        return float(max_val) 
    except ValueError:
        return None
 
sdf_predictions = predictions.withColumn("ProbabilityLR", extract_prob(F.col("probability")))

In [0]:
sdf_category_label = train_df.select(F.col('ProductCategory').alias('ProductCategoryLR'),'label').dropDuplicates() # df com os labels do modelo e as categorias correspondentes
sdf_predictions = sdf_predictions.select('SKU','ProductDescription','OriginalCategory','prediction','ProbabilityLR')
sdf_predictions_lr = sdf_predictions.join(sdf_category_label,on= sdf_predictions.prediction==sdf_category_label.label, how='left').drop('prediction','label')
sdf_predictions_lr.display()

SKU,ProductDescription,OriginalCategory,ProbabilityLR,ProductCategoryLR
alhoforte_171,Molho de Alho 145 ml,condimentos-e-temperos,0.1379734547427636,MOLHO PARA SALADA/MOLHO CREMOSO
alimentosphinus_2001004,"Doce de Cocada 600g Zero Adicao de Acucares, 58% frutas frescas - PHINUS ( Caixa Display com 24 uni",doces,0.1379238997308222,DOCE
alimentosphinus_2001007,"Doce de Pe de Moleque 600g Zero Adicao de Acucares, 55% Amendoim - PHINUS ( Caixa Display com 24 un",doces,0.1359704214425992,DOCE
alimentosphinus_2002039,Bombom 72% Cacau 270g Zero Adicao de Acucares - PHINUS ( Caixa Display com 18 unidades de 15g),doces,0.0501869547996992,DOCE
flamboyant_3823,Goiabada Tablete Flamboyant 24x400g,doces,0.5373077396003275,DOCE
flormel_7896653703169,"Doce de Leite Cremoso Flormel, Zero Acucar - Pote com 210g",doces,0.1598386773529365,DOCE
flormel_7896653703282,"Doce de Abacaxi com Coco Flormel, Zero acucar - Caixa com 24 Unidades de 20g cada",doces,0.1424712254785861,DOCE
gama_24088,Geleia Linea Uva 230g,geleias,0.1107626002260038,DOCE
gamaba_23886,Geleia Linea Frutas Vermelhas 230g,geleias,0.0994308390350609,DOCE
gamarj_23886,Geleia Linea Frutas Vermelhas 230g,geleias,0.0994308390350609,DOCE


In [0]:
#sdf_predictions_lr.where(F.col('ProbabilityLR')<0.17).orderBy('ProbabilityLR').display()

In [0]:
def create_column_reliable_prediction(sdf: DataFrame, threshold) -> DataFrame: 
    sdf = (
        sdf
        .withColumn(
            'ProductCategoryLR', 
            F.when(
                F.col('ProbabilityLR')>= threshold, F.col('ProductCategoryLR')
            )
            .otherwise('NAO ENCONTRADO LR')
        )
    )
    return sdf

sdf_predictions_lr = create_column_reliable_prediction(sdf_predictions_lr, 0.17)

In [0]:
sdf_predictions_rf.where(F.col('ProductCategoryLR')=='NAO ENCONTRADO LR').count()

In [0]:
#Junção do dataframe de previsões com os dados rotulados
sdf_model_evaluation_lr = sdf_labels.join(sdf_predictions_lr, ['SKU'], how='inner').orderBy(F.col('Rank')).select('SKU','ProductDescription', 'MartinsCategory', 'ProductCategoryLR', 'ProbabilityLR')
#sdf_model_evaluation_lr.display()

#Métricas de desempenho
y = sdf_model_evaluation_lr.select('MartinsCategory').toPandas()
ycalc = sdf_model_evaluation_lr.select('ProductCategoryLR').toPandas()

print('Accuracy:', accuracy_score(y, ycalc))
print('F1_score:', f1_score(y, ycalc, average='weighted'))
print('Precision:', precision_score(y, ycalc, average='weighted'))
print('Recall:', recall_score(y, ycalc, average='weighted'))

In [0]:
path = f'/mnt/advisor-hml/data/07_model_output/3p_categorization/LR'

sdf_predictions_lr\
    .coalesce(1) \
    .write \
    .mode('overwrite') \
    .format('parquet') \
    .option('header', 'true') \
    .save(path)