In [1]:
import polars as pl
import re
import numpy as np
import json
import pickle
import gc
import re

from collections import Counter
from typing import List, Dict, Callable
from pathlib import Path

from sklearn.feature_extraction.text import TfidfVectorizer

import scipy.sparse as sp
from tqdm.notebook import tqdm

In [2]:
MAIN_DIR = Path('../data/avito-orig-data/')

FILE_PATHES = [
    MAIN_DIR / Path('train_part_0001.snappy.parquet'),
    MAIN_DIR / Path('train_part_0002.snappy.parquet'),
    MAIN_DIR / Path('train_part_0003.snappy.parquet'),
    MAIN_DIR / Path('train_part_0004.snappy.parquet'),
    MAIN_DIR / Path('test_part_0001.snappy.parquet'),
    MAIN_DIR / Path('test_part_0002.snappy.parquet'),
]

BnC_DIR = Path('../data/brands-and-colors/')

In [3]:
RENAME_MAPPING = {
    'base_item_id': 'variantid_1', 
    'cand_item_id': 'variantid_2',
    'base_title': 'name_1',
    'cand_title': 'name_2',
    'base_description': 'description_1',
    'cand_description': 'description_2',
    'base_category_name': 'category_level_1_1',
    'cand_category_name': 'category_level_1_2',
    'base_subcategory_name': 'category_level_2_1',
    'cand_subcategory_name': 'category_level_2_2',
    'base_param1': 'category_level_3_1',
    'cand_param1': 'category_level_3_2',
    'base_param2': 'category_level_4_1',
    'cand_param2': 'category_level_4_2',
    'base_json_params': 'characteristic_attributes_mapping_1',
    'cand_json_params': 'characteristic_attributes_mapping_2',
    'base_count_images': 'n_images_1',
    'cand_count_images': 'n_images_2',
    'base_price': 'price_1',
    'cand_price': 'price_2'
}

IDS = ['variantid_1', 'variantid_2']
FOR_SPLIT = ['group_id', 'action_date']
IMAGE_PATHS = ['base_title_image', 'cand_title_image']
BINARY_FEATURES = ['is_same_location', 'is_same_region']
TARGET = 'is_double'

In [4]:
full_df = pl.DataFrame()

for file in tqdm(FILE_PATHES):
    chunk = pl.read_parquet(file)
    if 'is_double' not in chunk.columns:
        chunk = chunk.with_columns(
            is_double=pl.lit(-1),
            action_date=pl.lit(-1),
            group_id=pl.lit(-1)
        )
        chunk = chunk.select(full_df.columns)
    full_df = pl.concat([full_df, chunk], how='diagonal_relaxed')
    print(f'{full_df.shape=}')

del chunk 
gc.collect()
full_df = full_df.rename(mapping=RENAME_MAPPING)

  0%|          | 0/6 [00:00<?, ?it/s]

full_df.shape=(500000, 28)
full_df.shape=(1000000, 28)
full_df.shape=(1500000, 28)
full_df.shape=(1879555, 28)
full_df.shape=(2129555, 28)
full_df.shape=(2379555, 28)


In [5]:
with open(BnC_DIR / Path('brands.pkl'), 'rb') as f: # потом с ллм отфильтровать бренды
    brands = pickle.load(f)

with open(BnC_DIR / Path('colors.pkl'), 'rb') as f:
    colors = pickle.load(f)

In [None]:
def reduce_memory_usage_pl(df: pl.DataFrame) -> pl.DataFrame:
    print(f"before {round(df.estimated_size('mb'), 2)=} mb")
    numeric_int_types = [pl.Int8, pl.Int16, pl.Int32, pl.Int64]
    numeric_float_types = [pl.Float32, pl.Float64]    
    for col in df.columns:
        col_type = df[col].dtype
        if col_type in numeric_int_types + numeric_float_types:
            c_min = df[col].min()
            c_max = df[col].max()
            if col_type in numeric_int_types:
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df = df.with_columns(df[col].cast(pl.Int8))
                    gc.collect()
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df = df.with_columns(df[col].cast(pl.Int16))
                    gc.collect()
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df = df.with_columns(df[col].cast(pl.Int32))
                    gc.collect()
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    df = df.with_columns(df[col].cast(pl.Int64))
                    gc.collect()
            elif col_type in numeric_float_types:
                if c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df = df.with_columns(df[col].cast(pl.Float32))
                    gc.collect()
                else:
                    pass
        elif col_type == pl.Utf8:
            df = df.with_columns(df[col].cast(pl.Categorical))
            gc.collect()
        else:
            pass
    print(f"after {round(df.estimated_size('mb'), 2)=} mb")
    return df

In [7]:
full_df = reduce_memory_usage_pl(full_df)

before round(df.estimated_size('mb'), 2)=8758.52 mb
after round(df.estimated_size('mb'), 2)=4651.31 mb


In [8]:
data_1 = full_df.select(
    [col for col in full_df.columns if col.endswith('_1')]
).rename({col: col[:-2] for col in full_df.columns if col.endswith('_1')})

data_2 = full_df.select(
    [col for col in full_df.columns if col.endswith('_2')]
).rename({col: col[:-2] for col in full_df.columns if col.endswith('_2')})

data = pl.concat([data_1, data_2], how='vertical').unique()

del data_1, data_2
gc.collect()

print(f'{data.shape=}')



data.shape=(3263574, 10)


In [9]:
def remove_html_tags_and_emoji(text: str) -> str:
    if text is None:
        return None
    clean = re.compile('<.*?>')
    text = re.sub(clean, '', text)
    text = text.replace('\n', ' ')
    emoji_pattern = re.compile("["
                               u"\U0001F600-\U0001F64F"
                               u"\U0001F300-\U0001F5FF"
                               u"\U0001F680-\U0001F6FF"
                               u"\U0001F1E0-\U0001F1FF"
                               "]+", flags=re.UNICODE)
    return emoji_pattern.sub(r'', text)

In [10]:
def filter_en_text(text: str) -> str:
    pattern = r"\b[a-zA-Z]+\b"
    res = ' '.join(re.findall(pattern, text.lower()))
    return res

In [11]:
def extract_mixed_words(s: str) -> str:
    pattern = r'\b(?=[A-Za-zА-Яа-яЁё]*\d)(?=\d*[A-Za-zА-Яа-яЁё])[A-Za-zА-Яа-яЁё\d]+\b'
    matches = re.findall(pattern, s)
    return ' '.join(matches)

In [12]:
def normalize(text: str) -> str:
    if text is None:
        return None
    text = text.lower()
    chars = []
    for char in text:
        if char.isalnum():
            chars.append(char)
        else:
            chars.append(' ')
    tokens = ''.join(chars).split() 
    return '_'.join(tokens)

In [None]:
# переписать на kenlm
def fix_key_layout(text: str, thold: float = 0.6) -> str:
    en_ru_mapping = {
        'e': 'е',
        'y': 'у',
        'o': 'о',
        'p': 'р',
        'a': 'а',
        'k': 'к',
        'x': 'х',
        'c': 'с',
        'E': 'Е',
        'T': 'Т',
        'O': 'О',
        'P': 'Р',
        'A': 'А',
        'H': 'Н',
        'K': 'К',
        'X': 'Х',
        'C': 'С',
        'B': 'В',
        'M': 'М',
    }

    def is_english_letter(char: str) -> bool:
        return 'a' <= char <= 'z' or 'A' <= char <= 'Z'

    def should_apply_mapping(word: str, thold: float = 0.6) -> float:
        if not word:
            return False
        total_letters = sum(c.isalpha() for c in word)
        if total_letters == 0:
            return False
        english_letters = sum(is_english_letter(c) for c in word)
        return (english_letters / total_letters) < thold

    words = text.split()
    fixed_words = []
    for word in words:
        if should_apply_mapping(word):
            fixed_word = ''.join(en_ru_mapping.get(char, char) for char in word)
        else:
            fixed_word = word
        fixed_words.append(fixed_word)

    res = ' '.join(fixed_words)
    res = res.replace('нa', 'на').replace(' c ', ' с ').replace(
        ' cо ', ' со ').replace(' сo ', ' со ').replace(
        ' co ', ' со ').replace('вo', 'во').replace(
        ' кo ', ' ко ').replace(' o ', ' о ').replace(
        'oб', 'об').replace('oт', 'от').replace(
        'зa', 'за').replace('пo', 'по').replace(
        'дo', 'до').replace(' y ', ' у ').replace(
        'cм', 'см').replace('гp', 'гр').replace(
        'пpo', 'про').replace('дa', 'да').replace(
        'нe', 'не').replace('тo', 'то').replace(
        'жe', 'же').replace('pyб', 'руб').replace(
        'eд', 'ед').replace('oна', 'она').replace(
        'онa', 'она').replace('oнa', 'она').replace(
        'oн', 'он').replace('eго', 'его').replace(
        'егo', 'его').replace('eгo', 'его').replace(
        'ниx', 'них').replace('иx', 'их').replace(
        'вcе', 'все').replace('всe', 'все').replace('вce', 'все')
    
    return res

In [14]:
def normalize_names(df: pl.DataFrame) -> pl.DataFrame:
    df = df.with_columns(
        pl.col('name').map_elements(remove_html_tags_and_emoji, return_dtype=pl.Utf8).alias('name')
    )
    df = df.with_columns(
        pl.col('name').map_elements(fix_key_layout, return_dtype=pl.Utf8).alias('name')
    )
    df = df.with_columns(
        pl.col('name').map_elements(normalize, return_dtype=pl.Utf8).alias('name_norm'),
        pl.col('name').str.strip_chars().str.to_lowercase().alias('name_tokens'),
    )
    return df.with_columns(
        pl.col('name_tokens').map_elements(lambda x: ' '.join(x.split()), return_dtype=pl.Utf8).alias('name'),
    )

def normalize_en_names(df: pl.DataFrame) -> pl.DataFrame:
    df = df.with_columns(
        pl.col('name').map_elements(filter_en_text, return_dtype=pl.Utf8).alias('name_en')
    )
    df = df.with_columns(
        pl.col('name_en').map_elements(remove_html_tags_and_emoji, return_dtype=pl.Utf8).alias('name_en')
    )
    df = df.with_columns(
        pl.col('name_en').map_elements(normalize, return_dtype=pl.Utf8).alias('name_en_norm'),
        pl.col('name_en').str.strip_chars().str.to_lowercase().alias('name_en_tokens'),
    )
    return df.with_columns(
        pl.col('name_en_tokens').map_elements(lambda x: ' '.join(x.split()), return_dtype=pl.Utf8).alias('name_en'),
    )

def normalize_mixed_names(df: pl.DataFrame) -> pl.DataFrame:
    df = df.with_columns(
        pl.col('name').map_elements(extract_mixed_words, return_dtype=pl.Utf8).alias('name_mix')
    )
    df = df.with_columns(
        pl.col('name_mix').map_elements(normalize, return_dtype=pl.Utf8).alias('name_mix_norm'),
        pl.col('name_mix').str.strip_chars().str.to_lowercase().alias('name_mix_tokens'),
    )
    return df.with_columns(
        pl.col('name_mix_tokens').map_elements(lambda x: ' '.join(x.split()), return_dtype=pl.Utf8).alias('name_mix'),
    )

In [15]:
def normalize_desc(df: pl.DataFrame) -> pl.DataFrame:
    df = df.with_columns(
        pl.col('description').map_elements(remove_html_tags_and_emoji, return_dtype=pl.Utf8).alias('description')
    )
    df = df.with_columns(
        pl.col('description').map_elements(fix_key_layout, return_dtype=pl.Utf8).alias('description')
    )
    df = df.with_columns(
        pl.col('description').map_elements(normalize, return_dtype=pl.Utf8).alias('description_norm'),
        pl.col('description').str.strip_chars().str.to_lowercase().alias('description_tokens'),
    )
    return df.with_columns(
        pl.col('description_tokens').map_elements(lambda x: ' '.join(x.split()), return_dtype=pl.Utf8).alias('description'),
    )

def normalize_en_desc(df: pl.DataFrame) -> pl.DataFrame:
    df = df.with_columns(
        pl.col('description').map_elements(filter_en_text, return_dtype=pl.Utf8).alias('description_en')
    )
    df = df.with_columns(
        pl.col('description_en').map_elements(remove_html_tags_and_emoji, return_dtype=pl.Utf8).alias('description_en')
    )
    df = df.with_columns(
        pl.col('description_en').map_elements(normalize, return_dtype=pl.Utf8).alias('description_en_norm'),
        pl.col('description_en').str.strip_chars().str.to_lowercase().alias('description_en_tokens'),
    )
    return df.with_columns(
        pl.col('description_en_tokens').map_elements(lambda x: ' '.join(x.split()), return_dtype=pl.Utf8).alias('description_en'),
    )

def normalize_mixed_desc(df: pl.DataFrame) -> pl.DataFrame:
    df = df.with_columns(
        pl.col('description').map_elements(extract_mixed_words, return_dtype=pl.Utf8).alias('description_mix')
    )
    df = df.with_columns(
        pl.col('description_mix').map_elements(normalize, return_dtype=pl.Utf8).alias('description_mix_norm'),
        pl.col('description_mix').str.strip_chars().str.to_lowercase().alias('description_mix_tokens'),
    )
    return df.with_columns(
        pl.col('description_mix_tokens').map_elements(lambda x: ' '.join(x.split()), return_dtype=pl.Utf8).alias('description_mix'),
    )

In [16]:
def normalize_characteristic_attributes(df: pl.DataFrame) -> pl.DataFrame:
    def flatten_json(d: dict, parent_key: str = ''):
        items = {}
        for k, v in d.items():
            if isinstance(v, list):
                for i in v:
                    if isinstance(i, dict):
                        items.update(flatten_json(i))
                    else:
                        items[k] = ','.join(str(x) for x in v)
            elif isinstance(v, dict):
                items.update(flatten_json(v))
            else:
                items[k] = str(v)
        return items

    return df.with_columns(
        pl.col('characteristic_attributes_mapping')
        .map_elements(
            lambda x: json.dumps(flatten_json(json.loads(x))),
            return_dtype=pl.String
        )
        .alias('characteristic_attributes_mapping')
    )

In [17]:
def get_kv_from_attrs(df: pl.DataFrame) -> pl.DataFrame:
    def get_list_of_values(values: list) -> list:
        values_to_return = []
        for value in values:
            if isinstance(value, list):
                if len(value) > 0 and isinstance(value[0], dict):
                    dict_values = [str(i) if i is not None else 'none' for i in value[0].values()]
                    values_to_return.extend(dict_values)
                else:
                    values_to_return.extend([str(i) for i in value])
            else:
                values_to_return.append(str(value))
        return values_to_return
    
    def get_list_of_keys(json_dict: dict) -> list:
        values_to_return = []
        for key, value in json_dict.items():
            values_to_return.append(str(key))
            if isinstance(value, list) and len(value) > 0:
                if isinstance(value[0], dict):
                    dict_keys = [str(i) for i in value[0].keys()]
                    values_to_return.extend(dict_keys)
        return values_to_return

    return df.with_columns(
        pl.col('characteristic_attributes_mapping')
            .map_elements(
                lambda x: get_list_of_keys(json.loads(x)),
                return_dtype=pl.List(pl.Utf8)
        ).alias('attr_keys'),
        pl.col('characteristic_attributes_mapping')
            .map_elements(
                lambda x: get_list_of_values(list(json.loads(x).values())),
                return_dtype=pl.List(pl.Utf8)
        ).alias('attr_vals')
    )

In [18]:
def get_lengths(df: pl.DataFrame) -> pl.DataFrame: 
    return df.with_columns(
        (pl.col('name_tokens').str.count_matches(' ') + 1).alias('name_tokens_len'),
        (pl.col('description_tokens').str.count_matches(' ') + 1).alias('description_tokens_len'),
        (pl.col('name_en_tokens').str.count_matches(' ') + 1).alias('name_en_tokens_len'),
        (pl.col('description_en_tokens').str.count_matches(' ') + 1).alias('description_en_tokens_len'),
        (pl.col('name_mix_tokens').str.count_matches(' ') + 1).alias('name_mix_tokens_len'),
        (pl.col('description_mix_tokens').str.count_matches(' ') + 1).alias('description_mix_tokens_len'),
        pl.col('attr_keys').list.len().alias('attr_keys_len'),
        pl.col('attr_vals').list.len().alias('attr_vals_len'),
    )

In [19]:
def get_digits_elements(df: pl.DataFrame) -> pl.DataFrame:
    def extract_tokens_with_digits(text: str) -> str:
        if not isinstance(text, str):
            return ''
        return ' '.join([token for token in text.split() if len(re.findall(r'\d', token)) > 2])
    
    return df.with_columns([
        pl.col('name_tokens').map_elements(extract_tokens_with_digits, return_dtype=pl.Utf8).alias('name_tokens_w_digits'),
        pl.col('description_tokens').map_elements(extract_tokens_with_digits, return_dtype=pl.Utf8).alias('description_tokens_w_digits'),
    ])

In [20]:
def extract_brands(df: pl.DataFrame) -> pl.DataFrame:
    global brands
    brands_set = set(brands)

    def find_brands(s: str) -> list[str]:
        if s is None:
            return []
        words = s.split()
        return list(set([word for word in words if word in brands_set]))

    df = df.with_columns(
        pl.col('name').map_elements(find_brands, return_dtype=pl.List(pl.Utf8)).alias('brands_name')
    )
    
    return df.with_columns(
        pl.col('description').map_elements(find_brands, return_dtype=pl.List(pl.Utf8)).alias('brands_desc')
    )

In [21]:
def extract_colors(df: pl.DataFrame) -> pl.DataFrame:
    global colors
    colors_set = set(colors)

    def find_colors(s: str) -> list[str]:
        if s is None:
            return []
        words = s.split()
        return list(set([word for word in words if word in colors_set]))

    df = df.with_columns(
        pl.col('name').map_elements(find_colors, return_dtype=pl.List(pl.Utf8)).alias('colors_name')
    )

    return df.with_columns(
        pl.col('description').map_elements(find_colors, return_dtype=pl.List(pl.Utf8)).alias('colors_desc')
    )

In [22]:
def find_units(s: str) -> list[str]:
    if s is None:
        return []
        
    units = [
        'мм', 'м', 'см', 'дм', 'км', 'нм', 'мкм', 'дюйм', 'сантиметр', 'миллиметр', 'километр', 'нанометр', 'km',
        'мм²', 'см²', 'дм²', 'м²', 'км²',
        'м³', 'см³', 'мм³',
        'г', 'кг', 'мг', 'т', 'kg', 'g', 'грамм', 'килограмм', 'миллиграмм', 'mg',
        'л', 'мл', 'куб\.см', 'куб\.м', 'литр', 'миллилитр', 'ml',
        'гб', 'гбит', 'мб', 'мбит', 'кб', 'кбит', 'тб', 'тбит', 'байт', 'бит', 'гигабайт', 'гигабит', 'мегабайт', 'gb', 'kb', 'mb',
        'час', 'ч', 'мин', 'сек', 'с', 'минут', 'секунд', 'min', 'sec', 'h',
        'в', 'квт', 'мвт', 'вт', 'ва', 'ква', 'мва', 'а', 'ма', 'ка', 'мка', 'вольт', 'ампер', 'ампер-час', 'ач', 'мач', 'ватт', 'mah', 'w',
        'ом', 'ohm', 'Ω', 'mΩ',
        'ф', 'мкф', 'нф', 'пф', 
        'гц', 'кгц', 'мгц', 'ггц', 'герц', 'килогерц', 'мегагерц', 'гигагерц', 'hz', 'khz', 'mhz', 'ghz',
        'дб', 'децибел', 'db',
        'бар', 'паскаль', 'па', 'гпа', 'кпа', 'атм',
        'градус', '°', 'рад', 'радиан',
        '°c', '°f', 'град', 'цельсий', 'фаренгейт',
        'процент', '%',
        'шт', 'ед', 'штук',
        'дж', 'кдж', 'ккал',
        'моль',
        'н', 'кн',
        'лс', 'об/мин', 'км/ч', 'м/с', 'лошадиных сил', 'm/h', 'km/h', 'kmh', 'км/с', 'мм/с', 'миль/ч', 
        'г/см3', 'кг/м3',
        'люмен', 'дптр',
        'пиксель', 'пикс', 'px', 'dpi', 'ppi', 'кадр/с', 'fps',
        'flops', 'gflops', 'tflops', 'mips', 'ipc',
        '$', '€', '£', '¥', 'руб.', 'р.', 'rub', 'usd', 'eur',
        'ядро', 'ядер', 'поток', 'потоков', 'thread', 'core', 'операций/с', 'op/s',
        'дюймов', 'inch',
        'MP', 'Мп', 'mpx', 'мегапиксель', 'мегапикселей',
    ]
    
    units_sorted = sorted(units, key=len, reverse=True)
    safe_units = [re.escape(u) for u in units_sorted]
    units_pattern = '|'.join(safe_units)
    
    num_pattern = r'\d+(?:[\.,]\d+)?'
    pattern_string = rf'({num_pattern})(\s)?({units_pattern})\b'
    
    regex = re.compile(pattern_string, re.IGNORECASE)
    matches = regex.findall(s)

    unit_res = [f'{x[0]} {x[2]}' for x in matches]

    size_pattern = r'(\d+)\s*[xх*]\s*(\d+)(?:\s*[xх*]\s*(\d+))?'
    matches = re.findall(size_pattern, s)
    dim_res = []
    for match in matches:
        if match[2]: # 3d
            dim_res.append(f'{match[0]}x{match[1]}x{match[2]}') 
        else: # 2d
            dim_res.append(f'{match[0]}x{match[1]}')
    
    return list(set(unit_res + dim_res))

def extract_units(df: pl.DataFrame) -> pl.DataFrame:
    df = df.with_columns(
        pl.col('name').map_elements(find_units, return_dtype=pl.List(pl.Utf8)).alias('units_name')
    )

    return df.with_columns(
        pl.col('description').map_elements(find_units, return_dtype=pl.List(pl.Utf8)).alias('units_desc')
    )

In [23]:
def fit_tfidf_vectorizer(data: pl.DataFrame, columns: List[str]) -> Dict[str, TfidfVectorizer]:
    tfidf_vectorizers = {}
    
    for col in columns:
        try:
            combined_texts = data.get_column(col).cast(pl.Utf8).to_list()
        except: 
            combined_texts = data.get_column(col).explode().to_list()
 
        combined_texts = [i for i in combined_texts if i is not None] # костыль
        
        vectorizer = TfidfVectorizer()
        vectorizer.fit(combined_texts)
        tfidf_vectorizers[col] = vectorizer

        with open(f'{col}_tfidf_vectorizer.pkl', 'wb') as f:
            pickle.dump(vectorizer, f)
            
    return tfidf_vectorizers

In [24]:
def tfidf_emb_gen(
    data: pl.DataFrame, 
    tfidf_vectorizers: Dict[str, TfidfVectorizer], 
    columns: List[str], 
    batch_size: int = 5000
) -> pl.DataFrame:
    
    for col in columns:
        tfidf_col_sparse = []
        total_rows = len(data)

        for start in tqdm(
            range(0, total_rows, batch_size), 
            desc=f'   Processing {col}', 
            bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt}',
            colour='yellow'
        ):
            end = min(start + batch_size, total_rows)
            
            batch_texts = (data.slice(start, end - start)
                .get_column(col)
                .cast(pl.Utf8)
                .to_list()
            )
            tfidf_batch_sparse = tfidf_vectorizers[col].transform(batch_texts)
            tfidf_col_sparse.append(tfidf_batch_sparse)
        
        tfidf_col_sparse = sp.vstack(tfidf_col_sparse)
        sparse_rows = [row for row in tfidf_col_sparse]
        
        data = data.with_columns(pl.Series(f'{col}_tfidf', sparse_rows))
    
    return data

In [25]:
def fit_tfidf_step(data: pl.DataFrame) -> pl.DataFrame:
    global tfidf_vectorizers, tfidf_columns
    tfidf_vectorizers = fit_tfidf_vectorizer(data, tfidf_columns)
    return data

def apply_tfidf_step(data: pl.DataFrame) -> pl.DataFrame:
    global tfidf_vectorizers, tfidf_columns
    return tfidf_emb_gen(data, tfidf_vectorizers, tfidf_columns)

In [26]:
def find_anti_words(row):
    words1 = set(re.findall(r'([a-z]+)', row['name_1'].lower()))
    words2 = set(re.findall(r'([a-z]+)', row['name_2'].lower()))
    xor_words = words1.symmetric_difference(words2)
    return list(xor_words)

In [None]:
def calculate_idf_by_category_level(data, level):
    kv_idf_by_models = {}
    category_col = f'category_level_{level}'
    unique_categories = data.select(category_col).unique().to_series().to_list()

    for group_cat in unique_categories:
        sub_group_df = data.filter(pl.col(category_col) == group_cat)
        series_charactes = sub_group_df.select('characteristic_attributes_mapping').to_series()
        
        keys_idf = Counter()
        values_idf = {}
        
        for row in series_charactes:
            if row is None:
                continue
            row = json.loads(row)
            for k, v in row.items():
                keys_idf[k] += 1
                if k not in values_idf:
                    values_idf[k] = Counter()
                values_idf[k][v] += 1
                        
        kv_idf_by_models[group_cat] = (keys_idf, values_idf, len(series_charactes))

    pop_characts_tf_idf = {}

    for group_cat in unique_categories:
        characters = kv_idf_by_models[group_cat][0]
        items_in_group = kv_idf_by_models[group_cat][2]        
        characters_list = []
        
        for k in characters:
            counts_k = kv_idf_by_models[group_cat][0][k]
            key_tf = counts_k / items_in_group
            counts_v = len(kv_idf_by_models[group_cat][1][k].keys())
            if counts_v == 0:
                print(f'{k=} in {group_cat=} has 0 unique values')
                continue
            value_idf = np.log(counts_k / counts_v)
            tf_idf = key_tf * value_idf
            characters_list.append((tf_idf, k))
            
        pop_characts_tf_idf[group_cat] = sorted(characters_list, reverse=True)

    with open(f'../data/preprocessed/pop_characts_tf_idf_level_{level}.pkl', 'wb') as file:
        pickle.dump(pop_characts_tf_idf, file)

    return pop_characts_tf_idf

In [28]:
# data = data.sample(n=10000, shuffle=True)

In [29]:
processing_pipeline = [
    normalize_names,
    normalize_en_names,
    normalize_mixed_names,
    normalize_desc,
    normalize_en_desc,
    normalize_mixed_desc,
    get_kv_from_attrs,
    normalize_characteristic_attributes,
    get_digits_elements,
    get_lengths,
    extract_units,
    extract_brands,
    extract_colors,
    # fit_tfidf_step,
    # apply_tfidf_step
]

def preprocessing(data: pl.DataFrame, pipeline: List[Callable]) -> pl.DataFrame:
    with tqdm(pipeline, desc='Data Preprocessing') as pbar:
        for func in pbar:
            pbar.set_postfix({'current_operation': func.__name__})
            data = func(data)
    return data

tfidf_vectorizers = None    
tfidf_columns = ['name']

In [30]:
preprocessed = preprocessing(data, processing_pipeline)

Data Preprocessing:   0%|          | 0/13 [00:00<?, ?it/s]

In [None]:
preprocessed.drop(
    [c for c in preprocessed.columns if c.endswith('_tfidf')]
).write_parquet('../data/preprocessed/all_products_preprocessed.parquet')

# for col in [c for c in preprocessed.columns if c.endswith('_tfidf')]:
#     matrices = preprocessed[col].to_list()
#     sp.save_npz(f'../data/preprocessed/{col}_matrices.npz', sp.vstack(matrices))

# df = pl.read_parquet('../data/preprocessed/all_products_preprocessed.parquet')
# name_tfidf_matrix = sp.load_npz('../data/preprocessed/name_tfidf_matrices.npz')
# name_tfidf_rows = [name_tfidf_matrix.getrow(i) for i in range(name_tfidf_matrix.shape[0])]
# df = df.with_columns(pl.Series('name_tfidf', name_tfidf_rows))

In [None]:
full_df.filter(pl.col('is_double') == -1).select(IDS + BINARY_FEATURES).write_parquet('../data/preprocessed/test_pairs.parquet')
full_df.filter(pl.col('is_double') != -1).select(IDS + BINARY_FEATURES + FOR_SPLIT + [TARGET]).write_parquet('../data/preprocessed/train_pairs.parquet')

In [33]:
filtered = full_df.filter(pl.col('is_double') == 0)
result = (
    filtered
    .with_columns(
        pl.struct(['name_1', 'name_2']).map_elements(find_anti_words, return_dtype=pl.List(pl.Utf8)).alias('xor_words')
    )
)

In [34]:
anti_words = Counter()

for words in result['xor_words']:
    anti_words.update(words)

filtered_anti_words = Counter({word: count for word, count in anti_words.items() if len(word) > 2})

In [None]:
with open('../data/preprocessed/anti_words.pkl', 'wb') as file:
    pickle.dump(anti_words, file)

with open('../data/preprocessed/filtered_anti_words.pkl', 'wb') as file:
    pickle.dump(filtered_anti_words, file)

In [36]:
filtered_anti_words.most_common(10)

[('pro', 39884),
 ('apa', 38487),
 ('pac', 33894),
 ('nike', 22080),
 ('max', 19261),
 ('acc', 18308),
 ('adidas', 17999),
 ('samsung', 17703),
 ('black', 13183),
 ('iphone', 13125)]

In [37]:
pop_characts_tf_idf_c1 = calculate_idf_by_category_level(preprocessed, 1)
pop_characts_tf_idf_c2 = calculate_idf_by_category_level(preprocessed, 2)
pop_characts_tf_idf_c3 = calculate_idf_by_category_level(preprocessed, 3)
pop_characts_tf_idf_c4 = calculate_idf_by_category_level(preprocessed, 4)

  sub_group_df = data.filter(pl.col(category_col) == group_cat)
