# Product matching 

In [1]:
import os
import pandas as pd
from datetime import datetime
from pathlib import Path

from modules.normalize_text import normalize_text
from modules.csv_reader import CSVFileReader
from modules.sku_matcher import get_confidence

In [2]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.types import StringType, FloatType
import pyspark.sql.functions as F

spark = SparkSession.builder \
    .appName("TestSpark") \
    .getOrCreate()

23/09/14 08:12:49 WARN Utils: Your hostname, MacBook-Pro-de-Joel.local resolves to a loopback address: 127.0.0.1; using 10.0.0.7 instead (on interface en1)
23/09/14 08:12:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/14 08:12:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
%load_ext autoreload
%autoreload 2

In [4]:
# Define the input and output directories
MKPT_PRICE_PATH        = './data/mktp_prices/'
WEBSCRAPING_INPUT_PATH = './data/scrapped/'
MATCHING_OUPUT_PATH    = './data/matched_parquets/'

## Load the input files
### ABI Marketplace SKU-Price catalog

In [5]:
mktp_prices_path = Path(f'{MKPT_PRICE_PATH}/PreciosMarketplaceHistorico.xlsb')

df_mkpt_prices = pd.read_excel(
    mktp_prices_path, 
    sheet_name = 'PRECIOS', engine = 'pyxlsb', skiprows = 3)

print(df_mkpt_prices.columns)

# Sanity check
sku_colname   = 'SKU NUEVO'
sku_desc_name = 'DESCRIPCIÓN'
price_colname = f'PTR_W{15}'
brand_colname = 'Marca'

# Format the SKU material to string
df_mkpt_prices[sku_colname] = df_mkpt_prices[sku_colname].astype(str)
# Verify that there are no duplicated SKUs
assert not df_mkpt_prices[sku_colname].duplicated().any()

# Keep only useful columns and rename them
df_mkpt_prices = df_mkpt_prices[[sku_colname, sku_desc_name, 
                                 price_colname, brand_colname]]
# Include the `brand` info into the `sku_name`
df_mkpt_prices['mkp_sku_id'] = df_mkpt_prices[sku_colname]\
                                .str.cat(df_mkpt_prices[brand_colname], sep=" ")
df_mkpt_prices = df_mkpt_prices[[sku_colname, sku_desc_name, price_colname]]
df_mkpt_prices.columns = ['mkp_sku_id', 'mkp_sku_name', 'mkp_price']

# CONVERT TO SPARK
df_mkpt_prices = spark.createDataFrame(df_mkpt_prices)

# Round price values
df_mkpt_prices = df_mkpt_prices.withColumn('mkp_price',
                        F.round(df_mkpt_prices['mkp_price'], 2))

@F.udf(StringType())
def normalize_text_udf(text):
    # Replace 'normalize_text'
    return normalize_text(text, encode='ascii')
# normalize_text_spark_udf = udf(normalize_text_udf, StringType())

# Cleaning phase
# Lowercase string fields
df_mkpt_prices = df_mkpt_prices\
                    .withColumn('mkp_sku_name_clean', 
                                normalize_text_udf(df_mkpt_prices['mkp_sku_name']))

df_mkpt_prices.show()

Index(['Categoría', 'SKU NUEVO', 'DESCRIPCIÓN', 'EAN/UPC', 'IMPERDIBLES',
       'IVA', 'IEPS', 'Marca', 'DESCRIPCION_2', 'CUPO', 'PZA', 'PTR',
       'PTR_UNITARIO', 'PTR_W1 ', 'PTR_W2', 'PTR_W3 ', 'PTR_W4', 'PTR_W5',
       'PTR_W6', 'PTR_W7', 'PTR_W8', 'PTR_W9', 'PTR_W10', 'PTR_W11', 'PTR_W12',
       'PTR_W13', 'PTR_W14', 'PTR_W15'],
      dtype='object')


  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:
[Stage 0:>                                                          (0 + 1) / 1]

+----------+--------------------+---------+--------------------+
|mkp_sku_id|        mkp_sku_name|mkp_price|  mkp_sku_name_clean|
+----------+--------------------+---------+--------------------+
|   3000200|NESTLE PUREZA VIT...|    65.84|npvnpvnpvnpv4lt 4pz|
|   3006942|SANTA MARIA 355ML...|     76.0|santamariasantama...|
|   3000085|NESTLE PUREZA VIT...|     82.0|npvnpvnpvnpv1lt ...|
|   3001443|SANTA MARIA 1L -1...|    100.0|santamariasantama...|
|   3000080|SANTA MARIA 1.5L ...|    100.0|santamariasantama...|
|   3000083|NESTLE PUREZA VIT...|    100.0|npvnpvnpvnpv1.5l...|
|   3004895|SANTA MARIA GOURM...|     94.0|santamariasantama...|
|   3002270|STA. MARIA 4L - 6...|    119.0|santamariasantama...|
|   3006945|SANTA MARIA 500ML...|    117.0|santamariasantama...|
|   3006953|NESTLE PUERZA VIT...|    123.0|nestle puerza vit...|
|   3000119|NESTLE PUREZA VIT...|   225.01|npvnpvnpvnpvmine...|
|   3009129|SAN PELLEGRINO 25...|   414.31|san pellegrino 20...|
|   3009148|PERRIER VIDRI

                                                                                

### Load the web scraping files

In [6]:
# Path to the web scraping directory
scraped_path = Path(WEBSCRAPING_INPUT_PATH)
# Date to match
date_sf = '02-05-23'

# TODO: Create a function to list all available dates
# select the max date and check whether that date exist in the sku_matched table
# iterate until evaluate all available dates 

# TODO: No está claro el formato del archivo que va a enviar
# el equipo de Elkin y de Michael

In [7]:
# Date to match (corresponds to a single directory per date)
scraping_date = datetime.strptime(date_sf, "%d-%m-%y").strftime("%d%m%Y")
date_path = scraped_path /scraping_date 

# List of .csv files inside the date directory
scraping_files = [ 
        date_path / f for f in
            (filter(lambda f: f.endswith('.csv'), 
                os.listdir(date_path)))
            ]

scraping_files

[PosixPath('data/scrapped/02052023/02-05-23-BodegaAurrera_MX.csv'),
 PosixPath('data/scrapped/02052023/02-05-23-Soriana_MX.csv'),
 PosixPath('data/scrapped/02052023/02-05-23-Walmart_MX.csv'),
 PosixPath('data/scrapped/02052023/02-05-23-Scorpion_MX.csv'),
 PosixPath('data/scrapped/02052023/02-05-23-CavaDelDuero_MX.csv'),
 PosixPath('data/scrapped/02052023/02-05-23-laCastellana_MX.csv'),
 PosixPath('data/scrapped/02052023/02-05-23-Surtitienda_MX.csv'),
 PosixPath('data/scrapped/02052023/02-05-23-MayoreoTotal_MX.csv'),
 PosixPath('data/scrapped/02052023/02-05-23-BodegaAlianza_MX.csv'),
 PosixPath('data/scrapped/02052023/02-05-23-Autoservicio-Laplaya_MX.csv'),
 PosixPath('data/scrapped/02052023/02-05-23-Alcca_MX.csv'),
 PosixPath('data/scrapped/02052023/02-05-23-LaEuropea_MX.csv'),
 PosixPath('data/scrapped/02052023/02-05-23-IbarraMayoreo_MX.csv'),
 PosixPath('data/scrapped/02052023/02-05-23-Frubana_MX.csv'),
 PosixPath('data/scrapped/02052023/02-05-23-Corpovino_MX.csv'),
 PosixPath('data/

In [8]:
# Use the `CSVFileReader` to load and clean the csv files
df_scrap = pd.concat([CSVFileReader(f).validate_file() for f in scraping_files])\
                .reset_index(drop = True)
# Rename the categories assuming the following patter
# `sku_name`, `competitor_name`, `url`, `price`
df_scrap.columns = ['competitor_sku_name', 'competitor_name', 
                    'competitor_url', 'competitor_price']

# Drop duplicates
df_scrap = df_scrap.drop_duplicates(subset = 
                ['competitor_sku_name', 'competitor_name'])
                
# Convert competitors_column to categorical
df_scrap['competitor_name'] = df_scrap['competitor_name'].astype('category')

# Reset index
# Add an index for future analaysis
df_scrap = df_scrap.reset_index(names = 'scrap_id')

# CONVER TO SPARK
df_scrap = spark.createDataFrame(df_scrap)

df_scrap.groupby('competitor_name').count().show()

  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:
[Stage 1:>                                                          (0 + 8) / 8]

+--------------------+-----+
|     competitor_name|count|
+--------------------+-----+
|             Walmart| 1773|
|             Soriana|  266|
|      Bodega Aurrera| 2097|
|             Scorpio| 4916|
|       La Castellana| 1028|
|      Cava del Duero|  900|
|       Mayoreo Total| 6239|
|         Surtitienda| 2684|
|             Alianza|  608|
|               Alcca|  596|
|Autoservicio la P...|  821|
|      Ibarra Mayoreo| 2285|
|          La Europea| 1928|
|          Abarrotero|  279|
|           Consuvino| 1369|
|             Frubana|  110|
|           Corpovino|  179|
+--------------------+-----+



                                                                                

### Perform the text-cleaning phase of web scraping data

In [9]:
# Cleaning phase
df_scrap = df_scrap.withColumn('comp_sku_name_clean', 
            normalize_text_udf(df_scrap['competitor_sku_name']))

# df_scrap.orderBy(F.rand()).limit(10).show()
df_scrap.show()

+--------+--------------------+---------------+--------------------+----------------+--------------------+
|scrap_id| competitor_sku_name|competitor_name|      competitor_url|competitor_price| comp_sku_name_clean|
+--------+--------------------+---------------+--------------------+----------------+--------------------+
|       0|Leche evaporada A...| Bodega Aurrera|https://despensa....|            21.5|leche evaporada a...|
|       1|Leche en polvo Ne...| Bodega Aurrera|https://despensa....|            15.5|leche polvo nestl...|
|       2|Leche en polvo Al...| Bodega Aurrera|https://despensa....|            78.0|leche polvo alpur...|
|       3|Producto lacteo N...| Bodega Aurrera|https://despensa....|            16.0|lch numalac polvo...|
|       4|Producto lacteo N...| Bodega Aurrera|https://despensa....|            60.5|lch nestle nutri ...|
|       5|Leche en polvo Al...| Bodega Aurrera|https://despensa....|            81.5|leche polvo alpur...|
|       6|Leche evaporada A...| Bodeg

## SKU matching phase

### Number of evaluations to perform
Calculate the number of evaluations to perform:

$$N_{evals} = m * n$$
where $m$= *number of Marketplace skus* and $n$ = *number of scraped products*

In [10]:
# Total de evaluaciones
_n_rows_mkp = df_mkpt_prices.count()
_n_rows_wsp = df_scrap.count()
_n_evals =_n_rows_mkp * _n_rows_wsp
print(f'[{_n_rows_mkp:,} mkp skus] * [{_n_rows_wsp:,} web scraping skus] ')
print(f'= {_n_evals:,} evaluations')

                                                                                

[247 mkp skus] * [28,078 web scraping skus] 
= 6,935,266 evaluations


                                                                                

Define a function to perform the matching phase and keep the best match between a given pair of competitor and marketplace skus.

In [11]:
# Define the UDF funton of 
@F.udf(FloatType())
def get_confidence_udf(mktp_sku_clean_name, comp_sku_clean_name):
    # Replace 'get_confidence'
    conf = get_confidence(mktp_sku_clean_name, comp_sku_clean_name)
    return conf

### Perform the matching phase

In [12]:
# Perform a cross join to obtain all pair combinations of mkp and comp products
# Broadcast the smallest table to enhance performance, Keep only relevant columns
# to avoid memory overload
cj_df = F.broadcast(df_mkpt_prices.select(['mkp_sku_id', 'mkp_sku_name_clean']))\
        .crossJoin(df_scrap.select(["scrap_id", "comp_sku_name_clean"]))

# Perform the pair-wise evaluation using the 'get_confidence'
cj_df = cj_df.withColumn("confidence", 
                 get_confidence_udf(F.col('mkp_sku_name_clean'), 
                                    F.col('comp_sku_name_clean')))

# Now get the best mktp match for each competitor sku
w = Window.partitionBy('scrap_id')
match_df = cj_df\
        .withColumn('best_conf', F.max('confidence').over(w))\
        .where(F.col('confidence') == F.col('best_conf'))\
        .drop('best_conf')\
        .dropDuplicates(['scrap_id']) # Keep only one occurrence 

DataFrame[mkp_sku_id: string, mkp_sku_name_clean: string, scrap_id: bigint, comp_sku_name_clean: string, confidence: float]

### Format the output and save to `.parquet`

1. Keep only relevant matches
2. Include `scraping_date`
3. Keep only relevant columns
4. Compute `price_diff` and `price_index`

In [18]:
col_to_keep = ['scraping_date', 'mkp_sku_id', 'scrap_id', 'mkp_sku_name', 'competitor_name', 
               'competitor_sku_name', 'confidence', 'mkp_price', 'competitor_price', 
               'price_diff', 'price_index', 'mkp_sku_name_clean', 'comp_sku_name_clean', 
               'competitor_url']

# Add the date column
formatted_date = pd.to_datetime(scraping_date, format = '%d%m%Y').date()

# Confidence interval
conf_thr = 0.35

match_df = match_df\
                .select(['scrap_id', 'mkp_sku_id', 'confidence'])\
                .join(df_scrap, ['scrap_id'], "left")\
                .join(df_mkpt_prices, ['mkp_sku_id'], "left")\
                .filter(F.col('confidence') >= conf_thr)\
                .withColumn('scraping_date', F.lit(formatted_date))\
                .withColumn('price_diff',  F.round(F.col('mkp_price') - F.col('competitor_price'), 2))\
                .withColumn('price_index', F.round(F.col('price_diff') / F.col('mkp_price'), 2))\
                .select(col_to_keep)\
                .fillna('', subset = ['competitor_url'])\
                .orderBy(F.col('mkp_sku_id'), F.col('confidence').desc())

# Save to cache
match_df.cache()
output_path = f'{MATCHING_OUPUT_PATH}/{scraping_date}_prod_matched_TEST.parquet' 
match_df.write.parquet(output_path)
match_df.count()
        

23/09/14 10:21:35 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

1878

In [16]:
match_df.show()

+-------------+----------+--------+--------------------+---------------+--------------------+----------+---------+----------------+----------+-----------+--------------------+--------------------+--------------------+
|scraping_date|mkp_sku_id|scrap_id|        mkp_sku_name|competitor_name| competitor_sku_name|confidence|mkp_price|competitor_price|price_diff|price_index|  mkp_sku_name_clean| comp_sku_name_clean|      competitor_url|
+-------------+----------+--------+--------------------+---------------+--------------------+----------+---------+----------------+----------+-----------+--------------------+--------------------+--------------------+
|   2023-05-02|   3000083|   24675|NESTLE PUREZA VIT...| Ibarra Mayoreo|Agua Natural Nest...|       1.0|    100.0|           120.7|     -20.7|      -0.21|npvnpvnpvnpv1.5l...|npvnpvnpvnpv1.5l...|https://ibarramay...|
|   2023-05-02|   3000083|    8703|NESTLE PUREZA VIT...|        Scorpio|Agua Natural Nest...|       1.0|    100.0|           123

In [17]:
match_df.count()

                                                                                

1878