In [None]:
!pip install spacy==2.3.5
!pip install scispacy==0.3.0

In [None]:
import spacy
import scispacy

from pprint import pprint
from collections import OrderedDict
import pandas as pd

from spacy import displacy
from scispacy.umls_linking import UmlsEntityLinker

from sklearn.base import BaseEstimator
from typing import Dict, Tuple
import numpy as np

from sklearn.ensemble import GradientBoostingClassifier

# Standard Imports
import seaborn as sns
import matplotlib.pyplot as plt
import pickle

# Transformers
from sklearn.preprocessing import LabelEncoder, OneHotEncoder, StandardScaler, MinMaxScaler

# Modeling Evaluation
from sklearn.model_selection import train_test_split, cross_val_score, KFold, GridSearchCV
from sklearn.metrics import accuracy_score, precision_score, recall_score,f1_score, confusion_matrix, classification_report
from IPython.display import display, Markdown


# should be 2.3.5 and >=0.3.0
spacy.__version__, scispacy.__version__

In [None]:
from google.colab import drive 
drive.mount('/content/gdrive')

Read in structured data with relevant features

In [None]:
# split labeled data into training, testing sets
train_df = pd.read_csv("gdrive/MyDrive/6.871/train_features.csv")
dev_df = pd.read_csv("gdrive/MyDrive/6.871/dev_features.csv")
test_df = pd.read_csv("gdrive/MyDrive/6.871/test_features.csv")


print(train_df)
print(dev_df)
print(test_df)

In [None]:
# fill Na with 0 (bag of words)
train_df = train_df.fillna(0)
dev_df = dev_df.fillna(0)
test_df = test_df.fillna(0)

train_X = train_df.drop(["Unnamed: 0", 'pair_id', 'type_data'], axis=1)
train_y = train_df["contradiction?"]

dev_X = dev_df.drop(["Unnamed: 0", 'pair_id', 'type_data'], axis=1)
dev_y = dev_df["contradiction?"]

test_X = test_df.drop(["Unnamed: 0",'pair_id', 'type_data'], axis=1)
test_y = test_df["contradiction?"]

Establish rules to explore as relevant features

In [None]:
# RULES:
# check_umls: if they do not share a concept, return 1 (no contradiction)
# neg_check_umls: if number of neg tokens is equal, return 0 (no contradiction); otherwise, return 1 (contradiction)
# check_med7: if not talking about same DRUG -> return 0 (no contradiction); if same DRUG but different other info -> return 1 (contradiction)
# dep_sim: if number < 0.5 (contradiction)

rules = {"check_umls": [
                ("=", 1, 0.0)
              ], 
        "neg_check_umls": [
                ("=", 1, 1.0)
              ],
         "check_med7": [
                ("=", 1, 1.0)
              ],
         "dep_sim": [
                ("<", 0.5, 1.0)
              ]
        }

Hybrid rule-based learning with auto learned rules

In [None]:
class RuleAugmentedEstimator(BaseEstimator):
  """
  Augments sklearn estimators with deterministic rule-based logic.
  """

  def __init__(self, base_model: BaseEstimator, rules: Dict, **base_params):
      """
      Initializes the rule-augmented estimator by supplying underlying sklearn estimator
      and hard-coded rules.

      Args:
        base_model: underlying sklearn estimator.
          Must implement fit and predict method.
        rules: hard coded rules in format of dictionary,
          with keys being the pandas dataframe column name, 
          and values being a tuple in the following form: 
          (comparison operator, value, return value)

          Acceptable comparison operators are: 
          "=", "<", ">", "<=", ">="

          Example:
                
                {"House Type": [
                    ("=", "Penthouse", 1.0),
                    ("=", "Shack", 0.0)
                  ],
                  "House Price": [
                      ("<", 1000.0, 0.0),
                      (">=", 500000.0, 1.0)
                ]}
        **base_params: Optional keyword arguments which will be passed on
            to the base_model.

      """
      self.rules = rules
      self.base_model = base_model
      self.base_model.set_params(**base_params)

  def __repr__(self):
      return "Rule Augmented Estimator:\n\n\t Base Model: {}\n\t Rules: {}".format(self.base_model, self.rules)

  def __str__(self):
      return self.__str__

  def _get_base_model_data(self, X: pd.DataFrame, y: pd.Series) -> Tuple[pd.DataFrame, pd.Series]:
      """
      Filters the training data for data points not affected by the rules.
      """
      train_x = X

      for category, rules in self.rules.items():

          if category not in train_x.columns.values: continue

          for rule in rules:

              if rule[0] == "=":
                  train_x = train_x.loc[train_x[category] != rule[1]]

              elif rule[0] == "<":
                  train_x = train_x.loc[train_x[category] >= rule[1]]

              elif rule[0] == ">":
                  train_x = train_x.loc[train_x[category] <= rule[1]]

              elif rule[0] == "<=":
                  train_x = train_x.loc[train_x[category] > rule[1]]

              elif rule[0] == ">=":
                  train_x = train_x.loc[train_x[category] < rule[1]]

              else:
                  print("Invalid rule detected: {}".format(rule))
              
      indices = train_x.index.values
      train_y = y.iloc[indices]
      
      train_x = train_x.reset_index(drop=True)
      train_y = train_y.reset_index(drop=True)
      
      return train_x, train_y   

  def fit(self, X: pd.DataFrame, y: pd.Series, **kwargs):
      """Fits the estimator to the data.
      
      Fits the estimator to the data, only training the underlying estimator
      on data which isn't affected by the hard-coded rules.
      
      Args:
          X: The training feature data.
          y: The training label data.
          **kwargs: Optional keyword arguments passed to the underlying
          estimator's fit function.
              
      """
      train_x, train_y = self._get_base_model_data(X, y)
      self.base_model.fit(train_x, train_y, **kwargs)

  def predict(self, X: pd.DataFrame) -> np.array:
      """Gets predictions for the provided feature data.
      
      The predicitons are evaluated using the provided rules wherever possible
      otherwise the underlying estimator is used.
      
      Args:
          X: The feature data to evaluate predictions for.
      
      Returns:
          np.array: Evaluated predictions.
      """
      
      p_X = X.copy()
      p_X['prediction'] = np.nan

      for category, rules in self.rules.items():

          if category not in p_X.columns.values: continue

          for rule in rules:

              if rule[0] == "=":
                  p_X.loc[p_X[category] == rule[1], 'prediction'] = rule[2]

              elif rule[0] == "<":
                  p_X.loc[p_X[category] < rule[1], 'prediction'] = rule[2]

              elif rule[0] == ">":
                  p_X.loc[p_X[category] > rule[1], 'prediction'] = rule[2]

              elif rule[0] == "<=":
                  p_X.loc[p_X[category] <= rule[1], 'prediction'] = rule[2]

              elif rule[0] == ">=":
                  p_X.loc[p_X[category] >= rule[1], 'prediction'] = rule[2]

              else:
                  print("Invalid rule detected: {}".format(rule))

      if len(p_X.loc[p_X['prediction'].isna()].index != 0):

          base_X = p_X.loc[p_X['prediction'].isna()].copy()
          base_X.drop('prediction', axis=1, inplace=True)
          p_X.loc[p_X['prediction'].isna(), 'prediction'] = self.base_model.predict(base_X)

      return p_X['prediction'].values
    
  def get_params(self, deep: bool = True) -> Dict:
      """Return the model's and base model's parameters.
      Args:
          deep: Whether to recursively return the base model's parameters.
      Returns
          Dict: The model's parameters.
      """
      
      params = {'base_model': self.base_model,
                'outcome_range': self.outcome_range,
                'rules': self.rules
                }

      params.update(self.base_model.get_params(deep=deep))
      return params
    
  def set_params(self, **params):
      """Sets parameters for the model and base model.
      Args:
          **params: Optional keyword arguments.
      """
                
      parameters = params
      param_keys = parameters.keys()
      
      if 'base_model' in param_keys:
          value = parameters.pop('base_model')
          self.base_model = value

In [None]:
# extract relevant features for rules
auto_train_X = train_X[['check_umls', 'neg_check_umls', "check_med7","dep_sim"]].copy()
auto_test_X = test_X[['check_umls', 'neg_check_umls', "check_med7","dep_sim"]].copy()
print(auto_train_X)
print(auto_test_X)


In [None]:
# fit the hybrid model with relevant features
gbc = GradientBoostingClassifier(n_estimators=50, verbose=1)
hybrid_model = RuleAugmentedEstimator(gbc, rules)
hybrid_model.fit(auto_train_X, train_y)
predictions = hybrid_model.predict(auto_test_X)

In [None]:
def evaluation(y, y_hat, title = 'Confusion Matrix'):
    cm = confusion_matrix(y, y_hat)
    precision = precision_score(y, y_hat)
    recall = recall_score(y, y_hat)
    accuracy = accuracy_score(y,y_hat)
    f1 = f1_score(y,y_hat)
    print('Recall: ', recall)
    print('Accuracy: ', accuracy)
    print('Precision: ', precision)
    print('F1: ', f1)
    sns.heatmap(cm,  cmap= 'PuBu', annot=True, fmt='g', annot_kws=    {'size':20})
    plt.xlabel('predicted', fontsize=18)
    plt.ylabel('actual', fontsize=18)
    plt.title(title, fontsize=18)
    
    plt.show();

In [None]:
evaluation(y_true, predictions)

Rule based model only

In [None]:
class RuleBasedEstimator(BaseEstimator):
  """
  Uses deterministic rule-based logic.
  """

  def __init__(self, base_model: BaseEstimator, rules: Dict, **base_params):
      """
      Initializes the rule-based estimator by supplying hard-coded rules.

      Args:
        base_model: underlying sklearn estimator.
          Must implement fit and predict method.
        rules: hard coded rules in format of dictionary,
          with keys being the pandas dataframe column name, 
          and values being a tuple in the following form: 
          (comparison operator, value, return value)

          Acceptable comparison operators are: 
          "=", "<", ">", "<=", ">="

          Example:
                
                {"House Type": [
                    ("=", "Penthouse", 1.0),
                    ("=", "Shack", 0.0)
                  ],
                  "House Price": [
                      ("<", 1000.0, 0.0),
                      (">=", 500000.0, 1.0)
                ]}
        **base_params: Optional keyword arguments which will be passed on
            to the base_model.

      """
      self.rules = rules
      self.base_model = base_model
      self.base_model.set_params(**base_params)

  def __repr__(self):
      return "Rule Augmented Estimator:\n\n\t Base Model: {}\n\t Rules: {}".format(self.base_model, self.rules)

  def __str__(self):
      return self.__str__
  

  def fit(self, X: pd.DataFrame, y: pd.Series, **kwargs):
      """Fits the estimator to the data.
      
      Fits the estimator to the data, only training the underlying estimator
      on data which isn't affected by the hard-coded rules.
      
      Args:
          X: The training feature data.
          y: The training label data.
          **kwargs: Optional keyword arguments passed to the underlying
          estimator's fit function.
              
      """
      self.base_model.fit(X, y, **kwargs)

  def predict(self, X: pd.DataFrame) -> np.array:
      """Gets predictions for the provided feature data.
      
      The predicitons are evaluated using the provided rules wherever possible
      otherwise the underlying estimator is used.
      
      Args:
          X: The feature data to evaluate predictions for.
      
      Returns:
          np.array: Evaluated predictions.
      """
      
      p_X = X.copy()
      p_X['prediction'] = np.nan

      for category, rules in self.rules.items():

          if category not in p_X.columns.values: continue

          for rule in rules:

              if rule[0] == "=":
                  p_X.loc[p_X[category] == rule[1], 'prediction'] = rule[2]

              elif rule[0] == "<":
                  p_X.loc[p_X[category] < rule[1], 'prediction'] = rule[2]

              elif rule[0] == ">":
                  p_X.loc[p_X[category] > rule[1], 'prediction'] = rule[2]

              elif rule[0] == "<=":
                  p_X.loc[p_X[category] <= rule[1], 'prediction'] = rule[2]

              elif rule[0] == ">=":
                  p_X.loc[p_X[category] >= rule[1], 'prediction'] = rule[2]

              else:
                  print("Invalid rule detected: {}".format(rule))

      # check if any predictions missing (relegate to base_model)
      if len(p_X.loc[p_X['prediction'].isna()].index != 0):

          base_X = p_X.loc[p_X['prediction'].isna()].copy()
          base_X.drop('prediction', axis=1, inplace=True)
          p_X.loc[p_X['prediction'].isna(), 'prediction'] = self.base_model.predict(base_X)

      return p_X['prediction'].values
    

  def get_params(self, deep: bool = True) -> Dict:
      """Return the model's and base model's parameters.
      Args:
          deep: Whether to recursively return the base model's parameters.
      Returns
          Dict: The model's parameters.
      """
      
      params = {'base_model': self.base_model,
                'outcome_range': self.outcome_range,
                'rules': self.rules
                }

      params.update(self.base_model.get_params(deep=deep))
      return params
  

  def set_params(self, **params):
      """Sets parameters for the model and base model.
      Args:
          **params: Optional keyword arguments.
      """
                
      parameters = params
      param_keys = parameters.keys()
      
      if 'base_model' in param_keys:
          value = parameters.pop('base_model')
          self.base_model = value
          
      if 'rules' in param_keys:
          value = parameters.pop('rules')
          self.rules = value
      
      self.base_model.set_params(**parameters)

In [None]:
# fit the rule based model with relevant features
gbc = GradientBoostingClassifier(n_estimators=50, verbose=1)
rule_model = RuleBasedEstimator(gbc, rules)
rule_model.fit(auto_train_X, train_y)
predictions = rule_model.predict(auto_test_X)

In [None]:
evaluation(y_true, predictions)

Decision tree algorithm (gradient boosting classifier): auto learned rules only

In [None]:
gbc = GradientBoostingClassifier(n_estimators=50, verbose=1)
gbc.fit(auto_train_X, train_y)
predictions = gbc.predict(auto_test_X)

In [None]:
evaluation(y_true, predictions)