In [1]:
import sys
!{sys.executable} -m pip install spacy

Collecting spacy
  Downloading spacy-3.4.3-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (6.6 MB)
     |████████████████████████████████| 6.6 MB 4.0 MB/s            
[?25hCollecting preshed<3.1.0,>=3.0.2
  Downloading preshed-3.0.8-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (130 kB)
     |████████████████████████████████| 130 kB 70.9 MB/s            
[?25hCollecting langcodes<4.0.0,>=3.2.0
  Downloading langcodes-3.3.0-py3-none-any.whl (181 kB)
     |████████████████████████████████| 181 kB 61.5 MB/s            
[?25hCollecting pathy>=0.3.5
  Downloading pathy-0.10.0-py3-none-any.whl (48 kB)
     |████████████████████████████████| 48 kB 7.7 MB/s             
Collecting spacy-loggers<2.0.0,>=1.0.0
  Downloading spacy_loggers-1.0.3-py3-none-any.whl (9.3 kB)
Collecting thinc<8.2.0,>=8.1.0
  Downloading thinc-8.1.5-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (819 kB)
     |████████████████████████████████| 819 

In [2]:
!{sys.executable} -m spacy download en_core_web_sm

2022-12-02 14:46:02.009860: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/local/lib:/usr/local/nvidia/lib:/usr/local/nvidia/lib64:/usr/local/cuda/lib64:/usr/local/cuda/extras/CUPTI/lib64
2022-12-02 14:46:02.009921: W tensorflow/stream_executor/cuda/cuda_driver.cc:269] failed call to cuInit: UNKNOWN ERROR (303)
2022-12-02 14:46:02.009957: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (samycoulombe-ray-head-48n7c): /proc/driver/nvidia/version does not exist
Collecting en-core-web-sm==3.4.1
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.4.1/en_core_web_sm-3.4.1-py3-none-any.whl (12.8 MB)
     |████████████████████████████████| 12.8 MB 3.7 MB/s            
Installing collected packages: en-core-web-sm
Successful

In [3]:
# standard library dependencies
import functools
from datetime import datetime
from collections import namedtuple
from typing import Iterable, List, Any, Mapping, Union, Tuple, Callable

# external dependencies
import dill
import numpy as np
import pandas as pd
import shopify_merlin.trino as trino

from tqdm import tqdm
from sklearn.preprocessing import LabelEncoder
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer

import spacy

@functools.lru_cache
def load_raw_dataset( trino_query: str = None,
                      columns_to_keep: List[str] = None,
                      columns_to_drop: List[str] = None,
                      deduplicate_using: Iterable[str] = None,
                      replace_missing_values_with: Any = " ",
                      concatenate_options_columns: bool = True,
                      add_topmost_product_class: bool = True) -> pd.DataFrame:
  if trino_query is None:
    trino_query = """SELECT preds.confidence, taxonomy.category_string, sample_dataset.*
      FROM hive.product_classification.predictions AS preds
      JOIN hive.insights.google_product_taxonomy AS taxonomy
        ON preds.predicted_category_id = taxonomy.category_id
      JOIN hive.scratch.fair_price_pred_US as sample_dataset
        ON preds.product_id = sample_dataset.product_id
      ORDER BY sample_dataset.product_id
      """
  raw_dataset: pd.DataFrame = trino.trino_query(trino_query)
  if columns_to_keep is not None:
    raw_dataset = raw_dataset[columns_to_keep]
  if concatenate_options_columns:
    raw_dataset["options"] = raw_dataset[["option1", "option2", "option3"]].fillna(
      replace_missing_values_with
    ).astype(
      str
    ).agg(
      " ".join,
      axis = 1
    )
  if add_topmost_product_class:
    raw_dataset["root_product_category"] = [ first for first, *rest in
                                            raw_dataset["category_string"].str.split(" > ") ]
  if columns_to_drop is None:
    columns_to_drop = [
      "compare_at_price",
      "inventory_management",
      "option1",
      "option2",
      "option3",
      "body_html",
      "product_type",
      "custom_product_type",
      "handle",
      "shop_id",
    ]
  if columns_to_drop is not None:
    raw_dataset = raw_dataset.drop(
      columns_to_drop,
      axis="columns"
    )
  if deduplicate_using is None:
    deduplicate_using = ["product_id", "product_variant_id"]
  raw_dataset = raw_dataset.drop_duplicates(
    subset = deduplicate_using,
    keep = "first"
  ).reset_index(drop = True)
  raw_dataset.fillna(
    replace_missing_values_with,
    inplace = True
  )
  return raw_dataset.sort_values("product_id")


def preprocess_text_data(data: Union[Mapping[str, Any], pd.DataFrame],
                         text_features: List[str] = None,
                         nlp_model = None,
                         stopwords: Iterable[str] = None,
                         preprocessing_steps_to_skip: List[str] = None,
                         verbose: bool = True) -> Mapping[str,List[str]]:
  if text_features is None:
    text_features = [ 'category_string', 'product_title', 'variant_title', 'tag_string', 'options' ]
  assert all([text_column in data for text_column in text_features])
  text_features = sorted(text_features)
  if nlp_model is None:
    nlp_model = spacy.load("en_core_web_sm")
  if stopwords is None:
    stopwords = nlp_model.Defaults.stop_words
  if preprocessing_steps_to_skip is None:
    preprocessing_steps_to_skip = [
      "tagger",
      "parser",
      "ner",
      "entity_linker",
      "textcat",
      "textcat_multilabel ",
      "trainable_lemmatizer",
      "morphologizer",
      "attribute_ruler senter ",
      "sentencizer ",
      "tok2vec",
      "transformer",
    ]

  processed_text_data = dict()
  for text_feature_to_process in text_features:
    if verbose:
      iterator = tqdm(
          nlp_model.pipe(data[text_feature_to_process], disable=preprocessing_steps_to_skip),
          total = len(data),
          leave = False,
          desc = f"processing {text_feature_to_process}... "
      )
    else:
      iterator = nlp_model.pipe(data[text_feature_to_process], disable=preprocessing_steps_to_skip)
    processed_column_data = []
    for doc in iterator:
      processed_column_data.append([  token.lemma_ for token in doc
                                      if (not token.is_punct) and (token.lemma_ not in stopwords) and (not token.is_space)])
    processed_text_data[text_feature_to_process] = processed_column_data
    print(f"processed {text_feature_to_process}.")

  return processed_text_data

def vectorize_text_data(processed_text_column_data: Mapping[str,List[List[str]]],
                        vectorizer_class: Callable = CountVectorizer,
                        verbose: bool = True,
                        **kwargs) -> Tuple[Mapping[str,Union[CountVectorizer, TfidfVectorizer]], Mapping[str,np.ndarray]]:
  text_column_vectorizers = dict()
  vectorized_text_column_arrays = dict()
  if verbose:
    iterator = tqdm(processed_text_column_data.items())
  else:
    iterator = processed_text_column_data.items()
  for text_column_name, text_column_token_lists in iterator:
    vectorizer = vectorizer_class(
      lowercase = False,
      preprocessor = None,
      tokenizer = lambda list_of_tokens: [ token.lower() for token in list_of_tokens ],
      **kwargs
    )
    vectorized_document_tokens: np.ndarray =  vectorizer.fit_transform(text_column_token_lists)
    text_column_vectorizers[text_column_name] = vectorizer
    vectorized_text_column_arrays[text_column_name] = vectorized_document_tokens
  return text_column_vectorizers, vectorized_text_column_arrays

def categorize_data(data: Union[Mapping[str, Any],pd.DataFrame],
                    encoder: Callable = LabelEncoder,
                    categorical_features: List[str] = None) -> Tuple[Mapping[str,Callable],Mapping[str,np.ndarray]]:
  if categorical_features is None:
    categorical_features = ['fulfillment_service', 'vendor']
  assert all([categorical_column in data for categorical_column in categorical_features])
  categorical_column_encoders = dict()
  categorical_column_labels = dict()
  for categorical_column_name in tqdm(categorical_features):
    e = encoder()
    labels = e.fit_transform(data[categorical_column_name])
    categorical_column_encoders[categorical_column_name] = e
    categorical_column_labels[categorical_column_name] = labels
  return categorical_column_encoders, categorical_column_labels

ProductCategoryAssets = namedtuple(
  "ProductCategoryAssets",
  "processed_dataset text_preprocessor text_vectorizers categorical_encoders"
)

def feature_engineering_pipeline(
  # load_data
  trino_query: str = None,
  columns_to_keep: List[str] = None,
  columns_to_drop: List[str] = None,
  deduplicate_using: Iterable[str] = None,
  replace_missing_values_with: Any = " ",
  concatenate_options_columns: bool = True,
  # preprocess text data
  text_features: List[str] = None,
  nlp_model = None,
  stopwords: Iterable[str] = None,
  preprocessing_steps_to_skip: List[str] = None,
  verbose: bool = True,
  # vectorize text data
  vectorizer_class: Callable = CountVectorizer,
  vectorizer_kwargs: Mapping = None,
  # encode categorical data
  categorical_features: List[str] = None,
  categorical_encoder: Callable = LabelEncoder,
  save: bool = True) -> Mapping[str, ProductCategoryAssets]:

  raw_dataset: pd.DataFrame = load_raw_dataset(
    trino_query = trino_query,
    columns_to_keep = columns_to_keep,
    columns_to_drop = columns_to_drop,
    deduplicate_using = deduplicate_using,
    replace_missing_values_with = replace_missing_values_with,
    concatenate_options_columns = concatenate_options_columns,
  )

  preprocessed_categorical_data: Tuple[Mapping[str,Callable],Mapping[str,np.ndarray]] = categorize_data(
    raw_dataset,
    categorical_features = categorical_features,
    encoder = categorical_encoder,
  )
  categorical_column_encoders, categorical_column_labels = preprocessed_categorical_data
  categorical_column_labels = pd.DataFrame(categorical_column_labels)

  preprocessed_text_data: Mapping[str,List[str]] = preprocess_text_data(
    raw_dataset,
    text_features = text_features,
    nlp_model = nlp_model,
    stopwords = stopwords,
    preprocessing_steps_to_skip = preprocessing_steps_to_skip,
    verbose = verbose,
  )
  preprocessed_text_data = pd.DataFrame(preprocessed_text_data)

  if vectorizer_kwargs is None:
    vectorizer_kwargs = {"min_df": 0.01}

  all_product_category_assets: Mapping[str, ProductCategoryAssets] = dict()
  if verbose:
    iterator = tqdm(raw_dataset.groupby("root_product_category"))
  else:
    iterator = raw_dataset.groupby("root_product_category")
  for root_product_category, subdf in iterator:
    row_indices = subdf.index
    preprocessed_text_data_in_root_product_category: pd.DataFrame = preprocessed_text_data.loc[row_indices]
    text_vectorization_output: Tuple[Mapping[str,Union[CountVectorizer, TfidfVectorizer]], Mapping[str,np.ndarray]] = vectorize_text_data(
      preprocessed_text_data_in_root_product_category,
      vectorizer_class = vectorizer_class,
      **vectorizer_kwargs
    )
    text_column_vectorizers, vectorized_text_column_arrays = text_vectorization_output
    vectorized_text_column_df = pd.DataFrame(
      np.hstack([ vectorized_text_column_arrays[feature_column].toarray()
                  for feature_column in sorted(vectorized_text_column_arrays.keys()) ]),
      columns = [ f"{feature_column}[{token}]"
                  for feature_column in sorted(vectorized_text_column_arrays.keys())
                  for token in text_column_vectorizers[feature_column].get_feature_names() ]
    )
    processed_subdf = subdf[['confidence', 'price', 'product_id', 'product_variant_id']].copy(deep = True)
    processed_subdf = processed_subdf.join(categorical_column_labels.loc[row_indices]).reset_index(drop = True)
    processed_subdf = pd.concat([processed_subdf, vectorized_text_column_df], axis = 1)
    all_product_category_assets[root_product_category] = ProductCategoryAssets(
      processed_dataset = processed_subdf.copy(deep = True),
      text_preprocessor = preprocess_text_data,
      text_vectorizers = text_column_vectorizers,
      categorical_encoders = categorical_column_encoders
    )
  if save:
    with open(f"./app/{datetime.now().strftime('%Y%m%d%H%M%S')}_all_product_category_assets.pkl", "wb") as handle:
      dill.dump(
          all_product_category_assets,
          handle,
          protocol = dill.HIGHEST_PROTOCOL
      )
    with open(f"./app/{datetime.now().strftime('%Y%m%d%H%M%S')}_all_product_category_assets_backup.pkl", "wb") as handle:
      dill.dump(
          all_product_category_assets,
          handle,
          protocol = dill.HIGHEST_PROTOCOL
      )

  return all_product_category_assets

2022-12-02 14:46:09.863095: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/local/lib:/usr/local/nvidia/lib:/usr/local/nvidia/lib64:/usr/local/cuda/lib64:/usr/local/cuda/extras/CUPTI/lib64
2022-12-02 14:46:09.863144: W tensorflow/stream_executor/cuda/cuda_driver.cc:269] failed call to cuInit: UNKNOWN ERROR (303)
2022-12-02 14:46:09.863179: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (samycoulombe-ray-head-48n7c): /proc/driver/nvidia/version does not exist


In [4]:
%%time
feature_engineering_pipeline()

100%|██████████| 2/2 [00:01<00:00,  1.75it/s]
                                                                                         

processed category_string.


                                                                                 

processed options.


                                                                                       

processed product_title.


                                                                                     

processed tag_string.


                                                                                       

processed variant_title.


  0%|          | 0/20 [00:00<?, ?it/s]
5it [00:00, 109.97it/s]
  5%|▌         | 1/20 [00:01<00:20,  1.08s/it]
0it [00:00, ?it/s][A
1it [00:01,  1.49s/it][A
2it [00:02,  1.45s/it][A
3it [00:05,  1.77s/it][A
4it [00:10,  3.05s/it][A
5it [00:11,  2.28s/it][A
 10%|█         | 2/20 [00:17<02:57,  9.87s/it]
0it [00:00, ?it/s][A
2it [00:00, 19.73it/s][A
5it [00:00, 15.34it/s][A
 15%|█▌        | 3/20 [00:17<01:34,  5.56s/it]
5it [00:00, 177.78it/s]

0it [00:00, ?it/s][A
5it [00:00, 24.59it/s][A
 25%|██▌       | 5/20 [00:17<00:37,  2.50s/it]
5it [00:00, 366.19it/s]

0it [00:00, ?it/s][A
3it [00:00, 18.49it/s][A
5it [00:00, 13.60it/s][A
 35%|███▌      | 7/20 [00:18<00:19,  1.49s/it]
0it [00:00, ?it/s][A
5it [00:00, 17.94it/s][A
 40%|████      | 8/20 [00:18<00:14,  1.22s/it]
0it [00:00, ?it/s][A
5it [00:00, 15.21it/s][A
 45%|████▌     | 9/20 [00:19<00:11,  1.01s/it]
0it [00:00, ?it/s][A
2it [00:00, 17.94it/s][A
5it [00:00, 12.50it/s][A
 50%|█████     | 10/20 [00:19<00:08,  1.

CPU times: user 23min 5s, sys: 26.7 s, total: 23min 31s
Wall time: 37min 35s


{'animals & pet supplies': ProductCategoryAssets(processed_dataset=     confidence   price     product_id  product_variant_id  \
 0         0.730   59.50      117409002           267951692   
 1         0.822   19.95      118091272      31781102157922   
 2         0.251   87.77      118093254          1112368412   
 3         0.513    6.95      147750950           337260866   
 4         0.562   17.00      154756385           354686825   
 ..          ...     ...            ...                 ...   
 853       0.818   56.95  5249845330069      34485627420821   
 854       0.520   13.49  5863164444744      36487256473672   
 855       0.522   19.00  6072519491772      37503292833980   
 856       0.557  440.00  6283110416565      38326354673845   
 857       0.859   14.99  6537072869500      39257151209596   
 
      fulfillment_service  vendor  category_string[>]  \
 0                    173   47766                   1   
 1                    173   41255                   0   
 2   