<a href="https://colab.research.google.com/github/krumeto/categorical_stuff/blob/main/custom_transformers.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd

In [None]:
class CollectionToStringTransformer:
  
  """A class to encode features with multiple string entries into one single string
  
  df_orig is the original dataframe
  index_col: string,  is the column to group the df by
  col_to_transform a string or a list, columns to combine into a string
  """
  
  def __init__(self, df_orig):
      self.df = df_orig.copy()
      
  def get_collection_per_index(self, index_col, col_to_transform):
    if isinstance(col_to_transform, str):
      mapping = self.df.groupby([index_col])[col_to_transform].apply(set).reset_index(name=f'{col_to_transform}_set')
      mapping[f'{col_to_transform}_set'] = mapping[f'{col_to_transform}_set'].apply(sorted)
    
    if isinstance(col_to_transform, list):
      mapping = self.df.groupby([index_col])[col_to_transform].agg(set)
      mapping.columns = [f'{col}_set' for col in mapping.columns]
      
      for col in mapping.columns:
        mapping[col] = mapping[col].apply(sorted)
        
      mapping = mapping.reset_index()
      
    return mapping
    
  def transform(self, index_col, col_to_transform, drop_set = True):
    
    if isinstance(col_to_transform, str):
      transformed = self.get_collection_per_index(index_col, col_to_transform)
      transformed[f"{col_to_transform}_string"] = transformed[f'{col_to_transform}_set'].apply(lambda x: ' '.join([str(i) for i in x]))
      
      if drop_set:
        transformed = transformed.drop([f'{col_to_transform}_set'], axis=1)
    
    
    if isinstance(col_to_transform, list):
      transformed = self.get_collection_per_index(index_col, col_to_transform)
      for col in col_to_transform:
        transformed[f"{col}_string"] = transformed[f'{col}_set'].apply(lambda x: ' '.join([str(i) for i in x]))
        
        if drop_set:
          transformed = transformed.drop([f'{col}_set'], axis=1)
        
    return transformed


In [None]:
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.utils.validation import check_is_fitted
from sklearn.decomposition import NMF
from sklearn.decomposition import TruncatedSVD
from sklearn.feature_extraction.text import HashingVectorizer, CountVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.pipeline import make_pipeline


class HashTfidfSvdTransformer(TransformerMixin, BaseEstimator):
  """A class to encode high cardinality categorical variables into a
  TfIdf Matrix.
  n_components: the number of max_features for the CountVectorizer
  hashing_kwargs: parameters to use for the CountVectorizer
  """
  
  def __init__(self, n_components, hashing_kwargs):
    self.n_components = n_components
    self.hashing_kwargs = hashing_kwargs
    
  
  def fit(self, df_orig, col_orig):
    self.df = df_orig.copy()
    
    self.hasher = CountVectorizer(
      max_features=self.n_components,
      **self.hashing_kwargs)
    
    self.vectorizer = make_pipeline(self.hasher, TfidfTransformer())
    self.vectorizer.fit(self.df[col_orig])

    return self
  
  def transform(self,X, col_to_encode):
    check_is_fitted(self, ['df', "n_components", 
                       #    'vectorizer', 'sparce_matrix',
                       #   'svd', 'regr'
                       ])
    self.dataset = X[col_to_encode]
    
    self.sparse_matrix = self.vectorizer.transform(self.dataset).toarray()
    print(self.sparse_matrix.shape)
    print(self.n_components)
  
#    self.svd = TruncatedSVD(n_components=self.n_components, random_state=42)
#    self.regr = self.svd.fit_transform(self.sparse_matrix)
    
    
    col_names = [f"{col_to_encode}_diag_component_{i}" for i in range(self.sparse_matrix.shape[1])]
    for i, name in enumerate(col_names):
      X[name] = self.sparse_matrix[:,i]
      
    return X

In [None]:
class SubstringOnehotEncoder(TransformerMixin, BaseEstimator):
  """A class to encode just a subset of top codes as one hots. The class is able to
  search for a match within a string.
  
  For example if a list_of_strings is ['Boston', 'Chicago'] both 'Boston Celtics' and
  'Chicago Bulls' are going to be encoded.
  
  """
  
  def __init__(self, drop_original = False, method='onehot', n_components = None):
    self.drop_original = drop_original
    self.method = method
    self.n_components = n_components
    
    if self.n_components is None and self.method == 'nmf':
      raise ValueError("Please define n_components for method nmf")
    
  def fit(self, df):
    self.df = df.copy()
    return self

  def transform(self, column , list_of_strings):
    allowed_methods = ['onehot', 'nmf']
    if self.method not in allowed_methods:
      raise ValueError(f"Please choose one of the following methods: {allowed_methods}")
    
    
    
    if self.method == 'onehot':
      check_is_fitted(self, ['df'])
      transformed = self.df
      
      for substring in list_of_strings:
        transformed[f"{column}_{substring}"] = transformed[column].str.contains(str(substring), case=False, na=0).astype('Int64')
    
    if self.method == 'nmf':
      print('NMF method does not retain state. Please beware in production.')
      check_is_fitted(self, ['df', 'n_components'])

      codes_df = pd.DataFrame()
      transformed = self.df
      
      for substring in list_of_strings:
        codes_df[f"{substring}"] = self.df[column].str.contains(str(substring), case=False, na=0).astype('Int64')
          
      print('Starting NMF')
      nmf = NMF(n_components=self.n_components)
      W = nmf.fit_transform(codes_df)
  
      col_names = [f"{column}_component_{i}" for i in range(self.n_components)]
      for i, name in enumerate(col_names):
        transformed[name] = W[:,i]


    
    if self.drop_original:
      transformed = transformed.drop(column, axis=1)
      
    return transformed
  
  

  