In [1]:
import argparse
from copy import deepcopy
import logging
import random
from collections import defaultdict
from os.path import join
import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score, mean_squared_error, r2_score, f1_score
from scipy.special import softmax
from sklearn.model_selection import train_test_split, KFold
from sklearn.linear_model import RidgeCV
import xgboost as xgb

from scipy.cluster.hierarchy import linkage, dendrogram, fcluster
from scipy.spatial.distance import squareform

from sklearn.preprocessing import StandardScaler
import joblib
import imodels
import inspect
import os.path
import sys
import psutil
import imodelsx.cache_save_utils
import time
import torch

### TODO: fill in import statements and correct path###

sys.path.append('..')

import idistill.model
import idistill.data
from idistill.whitebox_figs import FIGSRegressorCV

def distill_model(student, X_train_teacher, y_train_teacher, r, feature_names = None):
    """Distill the teacher model using the student model
    
        Paramaters: 
            student: student model
            X_train_teacher (n_train, n_concepts): teacher model's predicted concept outputs (logits, probabilties, etc) for training data
            y_train_teacher (n_train, n_outputs): teacher model's predicted task outputs (logits, probabilities, etc) for training data
            r: default dictionary to log experiment metrics
            feature_names: feature names of X_train_teacher
            
        Returns:
            r: default dictionary to log experiment metrics
            student: trained/distilled student model
    
    """
    
    fit_parameters = inspect.signature(student.fit).parameters.keys()
    if "feature_names" in fit_parameters and feature_names is not None:
        student.fit(X_train_teacher, y_train_teacher, feature_names=feature_names)
    else:
        student.fit(X_train_teacher, y_train_teacher)

    return r, student

def evaluate_student(student, X_train, X_test, y_train, y_test, metric, task, r):
    """Evaluate student performance on each split
    
        Paramaters: 
            student: student model
            X_train (n_train, n_concepts): teacher model's predicted concept outputs (logits, probabilties, etc) for training data
            X_test (n_test, n_concepts): teacher model's predicted concept outputs (logits, probabilties, etc) for test data
            y_train (n_train, 1): teacher model's predicted task outputs in evaluation form (i.e. if classification task, y_train_teacher must be class predictions, not class logits) OR true outputs for train data
            y_test (n_test, 1): teacher model's predicted task outputs in evaluation form OR true outputs for test data
            metric: metric to log 
            task: task to log (i.e. if evaluating distillation performance, task would be `distillation`)
            r: default dictionary to log experiment metrics
            
        Returns:
            r: default dictionary to log experiment metrics
            
    """
    
    metrics = {
            "accuracy": accuracy_score,
            "mse": mean_squared_error,
            "r2": r2_score,
            "f1": f1_score,
        
        }
    
    metric_fn = metrics[metric]
    
    for split_name, (X_, y_) in zip(
        ["train", "test"], [(X_train, y_train), (X_test, y_test)]
    ):
        y_pred_ = process_student_eval(student.predict(X_))
        r[f"student_{task}_{split_name}_{metric}"] = metric_fn(y_, y_pred_)

    return r

def evaluate_teacher(y_train_teacher, y_test_teacher, y_train, y_test, metric, task, r):
    """Evaluate teacher performance on each split
    
        Paramaters: 
            y_train_teacher (n_train, n_outputs): teacher model's predicted task outputs in evaluation form (i.e. if classification task, y_train_teacher must be class predictions, not class logits) for train data
            y_test_teacher (n_test, n_outputs): teacher model's predicted task outputs in evaluation form for test data
            y_train (n_train, 1): true outputs for train data
            y_test (n_test, 1): true outputs for test data
            metric: metric to log 
            task: type of distillation task (likely regression)
            r: default dictionary to log experiment metrics
            
        Returns:
            r: default dictionary to log experiment metrics
            
    """
    metrics = {
            "accuracy": accuracy_score,
            "mse": mean_squared_error,
            "r2": r2_score,
            "f1": f1_score,
        
        }
    
    metric_fn = metrics[metric]
    
    for split_name, (y_teacher_, y_) in zip(
        ["train", "test"], [(y_train_teacher, y_train), (y_test_teacher, y_test)]
    ):
        r[f"teacher_{task}_{split_name}_{metric}"] = metric_fn(y_teacher_, y_)
    
    return r

def evaluate_test_student(student, X_test, y_test, metric, task, r):
    """Evaluate student performance on each split"""
    
    metrics = {
            "accuracy": accuracy_score,
            "mse": mean_squared_error,
            "r2": r2_score,
            "f1": f1_score,
        
        }
    
    metric_fn = metrics[metric]
    
    y_pred_ = process_student_eval(student.predict(X_test))
    r[f"student_{task}_test_{metric}"] = metric_fn(y_test, y_pred_)

    return r

def evaluate_test_teacher(y_test_teacher, y_test, metric, task, r):
    metrics = {
            "accuracy": accuracy_score,
            "mse": mean_squared_error,
            "r2": r2_score,
            "f1": f1_score,
        
        }
    
    metric_fn = metrics[metric]
    
    r[f"teacher_{task}_test_{metric}"] = metric_fn(y_test_teacher, y_test)
    
    return r

def predict_teacher(teacher, X, gpu=0):
    """Make prediction from concepts to outputs with teacher model
    
        Paramaters: 
            teacher: teacher model
            X (n, n_concepts): concept data
            gpu: gpu cuda device if applicable
            
        Returns:
            y_pred: teacher model predictions for X 
            
    """
    ### TODO: handle teacher prediction outputs (X is intended to be concept design matrix)###

    return y_pred

def load_teacher_model(teacher_path, gpu=0):
    """Load in teacher model
    
        Paramaters: 
            teacher_path: path where teacher model is stored
            gpu: gpu cuda device if applicable
            
        Returns:
            model: teacher model
            
    """
    ### TODO: load in teacher model using teacher_path ###
    
    return model

def generate_tabular_distillation_data(teacher, train_path, test_path, gpu=0):
    """Generate tabular concept and output data using teacher model for distillation and evaluation
    
        Paramaters: 
            teacher: teacher model
            train_path: path where training data is stored
            test_path: path where test data is stored
            gpu: gpu cuda device if applicable
            
        Returns:
            X_train_teacher (n_train, n_concepts): predicted concepts by teacher model for training data
            X_test_teacher (n_test, n_concepts): predicted concepts by teacher model for test data
            X_train (n_train, n_concepts): true concept training data (likely 0, 1)
            X_test (n_test, n_concepts): true concept test data (likely 0, 1)
            y_train_teacher (n_train, n_outputs): teacher model's predicted task outputs for train data
            y_test_teacher (n_test, n_outputs): teacher model's predicted task outputs for test data
            y_train (n_train, 1): true outputs for train data
            y_test (n_test, 1): true outputs for test data
            
    """
    ### TODO: generate teacher train and test data using model, train_path, and test_path ###
    
    return X_train_teacher, X_test_teacher, X_train, X_test, y_train_teacher, y_test_teacher, y_train, y_test
    
def process_distillation_data(X_train_teacher, X_test_teacher, X_train, X_test, y_train_teacher, y_test_teacher):
    """Process teacher data for distillation (likely binarizing the data)
    
        Paramaters: 
            X_train_teacher (n_train, n_concepts): predicted concepts by teacher model for training data
            X_test_teacher (n_test, n_concepts): predicted concepts by teacher model for test data
            X_train (n_train, n_concepts): true concept training data (likely 0, 1)
            X_test (n_test, n_concepts): true concept test data (likely 0, 1)
            y_train_teacher (n_train, n_outputs): teacher model's predicted task outputs in evaluation form (i.e. if classification task, y_train_teacher must be class predictions, not class logits) for train data
            y_test_teacher (n_test, n_outputs): teacher model's predicted task outputs in evaluation form for test data
            
        Returns:
            X_train_teacher (n_train, n_concepts): processed predicted concepts by teacher model for distillation/student train data (likely 0, 1)
            X_test_teacher (n_test, n_concepts): processed predicted concepts by teacher model for student test data (likely 0, 1)
            y_train_teacher (n_train, n_outputs): teacher model's predicted task outputs for train data
            y_test_teacher (n_test, n_outputs): teacher model's predicted task outputs for test data
            
    """
    ### TODO: process (i.e. binarize) data for distillation ###
    
    return X_train_teacher, X_test_teacher, y_train_teacher, y_test_teacher

def process_student_eval(y_student):
    """Process student data outputs for evaluation (i.e. if we're using a regressor for distilling a classification model, need to argmax for evaluation)
    
        Paramaters: 
            y_student (n, n_outputs): student model predictions
            
        Returns:
            y_pred (n, ...): student model predictions in evaluation form (if regression, then perhaps y_pred = y_student)
            
    """
    ### TODO: handle student prediction outputs to match metrics ###

    return y_pred

def process_teacher_eval(y_teacher):
    """Process teacher data outputs for evaluation (i.e. if process teacher logits into classes)
    
        Paramaters: 
            y_student (n, n_outputs): teacher model predictions
            
        Returns:
            y_pred (n, ...): teacher model predictions in evaluation form (if regression, then perhaps y_pred = y_teacher)
            
    """
    ### TODO: process teacher model predictions for evaluations (sometimes we distill a teacher model using a regressor, but want to evaluate class prediction accuracy) ###
    
    return y_teacher_eval

def extract_interactions(student):

    interactions = []

    def traverse_tree(node, current_features, current_depth):

        if node.left is None and node.right is None:
            tree_interactions.append((current_features, np.var(np.abs(node.value))))
            return
        if node.left is not None:
            current_features_l = current_features.copy()
            current_features_l.append('c' + str(node.feature+1))
            traverse_tree(node.left, current_features_l.copy(), current_depth=current_depth+1)
        if node.right is not None:
            current_features_r = current_features.copy()
            current_features_r.append('!c' + str(node.feature+1))
            traverse_tree(node.right, current_features_r.copy(), current_depth=current_depth+1)
            
    try:
        trees = student.trees_
    except:
        trees = student.figs.trees_

    for tree in trees:
        tree_interactions = []
        traverse_tree(tree, [], current_depth=0)
        interactions.append(tree_interactions)
        
    return interactions

def split_list_by_sizes(list1, list2):
    result = []
    for row1, row2 in zip(list1, list2):
        sizes = [len(sublist) for sublist in row1]
        row_result = []
        start = 0
        for size in sizes:
            end = start + size
            row_result.append(list(row2[start:end]))
            start = end
        result.append(row_result)
    return result

def find_closest_keys_vectorized(dictionary, targets):
    keys = np.array(list(dictionary.keys()))
    targets = np.array(targets)
    diffs = np.abs(keys[:, None] - targets)
    closest_key_indices = np.argmin(diffs, axis=0)
    closest_keys = keys[closest_key_indices]

    return closest_keys

def extract_adaptive_intervention(student, X, interactions, number_of_top_paths=0):
    
    figs_dict = {}
    for i, tree in enumerate(interactions):
        tree_dict = {}
        for path, var in tree:
            tree_dict[var] = path
        figs_dict[i] = tree_dict

    test_pred_intervention = student.predict(X, by_tree = True)

    concepts_to_edit = [[] for _ in range(X.shape[0])]
    variances = np.var(np.abs(test_pred_intervention), axis = 1)

    concepts = np.array([find_closest_keys_vectorized(figs_dict[i], variances[:, i]) for i in range(variances.shape[1])])
    orderings_of_interventions = np.argsort(concepts.T, axis = 1)[:, ::-1]
    variances_of_orderings_of_interventions = np.sort(concepts.T, axis = 1)[:, ::-1]
    
    if number_of_top_paths == 0:
        r = range(orderings_of_interventions.shape[1])
    else:
        r = range(number_of_top_paths)

    for t in r:
        for i, l in enumerate(orderings_of_interventions[:, t]):
            new_list = []
            for c in figs_dict[l][variances_of_orderings_of_interventions[i, t]]:
                new_list.append(int(c[1:])-1 if c[0] != '!' else int(c[2:])-1)
            concepts_to_edit[i].append(new_list)
    return concepts_to_edit

  def noop(*args, **kwargs):  # type: ignore
2025-02-10 19:12:12.930469: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
  from urllib3.contrib.pyopenssl import orig_util_SSLContext as SSLContext
  torch.utils._pytree._register_pytree_node(


In [2]:
class ARGS:
    def __init__(self, a_dict):
        for k in a_dict.keys():
            exec(f'self.{k} = a_dict["{k}"]')
        
args = {}
#args['save_dir'] = join(path_to_repo, "results")  # The default value
args['teacher_path'] = '' 
args['train_path'] = ''
args['test_path'] = ''
args['task_type'] = "regression"
args['student_name'] = "FIGSRegressor"

args['max_rules'] = 250
args['max_trees'] = 40
args['max_depth'] = 4

args['metric'] = "accuracy" 
args['num_interactions_intervention'] = 0
args['gpu'] = 0

args = ARGS(args)

In [None]:
r = defaultdict(list)

In [None]:
teacher = load_teacher_model(args.teacher_path, args.gpu)
    
X_train_t, X_test_t, X_train, X_test, y_train_t, y_test_t, y_train, y_test = generate_tabular_distillation_data(teacher, args.train_path, args.test_path, args.gpu)

X_train_d, X_test_d, y_train_d, y_test_d = process_distillation_data(X_train_t, X_test_t, X_train, X_test, y_train_t, y_test_t)

y_train_t_eval = process_teacher_eval(y_train_t)
y_test_t_eval = process_teacher_eval(y_test_t)

figs_student = idistill.model.get_model(args.task_type, args.student_name, args)

r, figs_student = distill_model(figs_student, X_train_d, y_train_d, r)

r['max_trees'] = figs_student.max_trees
r['max_rules'] = figs_student.max_rules
r['max_depth'] = figs_student.max_depth
try:
    r['n_trees'] = len(figs_student.figs.trees_)
    r['n_rules'] = figs_student.figs.complexity_
except:
    r['n_trees'] = len(figs_student.trees_)
    r['n_rules'] = figs_student.complexity_

r = evaluate_student(figs_student, X_train_d, X_test_d, y_train_t_eval, y_test_t_eval, args.metric, "distillation", r)
r = evaluate_student(figs_student, X_train_d, X_test_d, y_train, y_test, args.metric, "prediction", r)

r = evaluate_teacher(y_train_t_eval, y_test_t_eval, y_train, y_test, args.metric, "prediction", r)

### adaptive FIGS concept editing ###

figs_interactions = extract_interactions(figs_student)

r['depth'] = max([max([len(i[0]) for i in t]) for t in figs_interactions])

train_q5 = np.quantile(X_train_t, 0.05, axis = 0)
train_q95 = np.quantile(X_train_t, 0.95, axis = 0)

X_test_d_a_edit = X_test_d.copy()
X_test_d_r_edit = X_test_d.copy()

X_test_t_a_edit = X_test_t.copy()
X_test_t_r_edit = X_test_t.copy()


cti_adap_test = extract_adaptive_intervention(figs_student, X_test_d, figs_interactions, args.num_interactions_intervention)

cti_rand_test = [np.random.choice(np.arange(X_test_d.shape[1]), X_test_d.shape[1], replace=False) for i in range(X_test_d.shape[0])]
cti_rand_test = split_list_by_sizes(cti_adap_test, cti_rand_test)


if 'linear' in args.teacher_path or 'Linear' in args.teacher_path:
    X_test_d_l_edit = X_test_d.copy()
    X_test_t_l_edit = X_test_t.copy()

    test_l_edit = np.einsum('nc, yc -> nyc', X_test_t.values, teacher.sec_model.linear.weight.cpu().detach().numpy())

    cti_l_test_arr = np.argsort(np.var(np.abs(test_l_edit), axis = 1), axis = 1)[:, ::-1]
    cti_l_test = [row for row in cti_l_test_arr]
    cti_l_test = split_list_by_sizes(cti_adap_test, cti_l_test)

if args.num_interactions_intervention == 0:
    num_iters = len(figs_student.trees_)
else:
    num_iters = args.num_interactions_intervention

for i in range(num_iters):
    for n in range(X_test_d.shape[0]):

        X_test_d_a_edit.iloc[n, cti_adap_test[n][i]] = X_test.iloc[n, cti_adap_test[n][i]]
        X_test_d_r_edit.iloc[n, cti_rand_test[n][i]] = X_test.iloc[n, cti_rand_test[n][i]]

        X_test_t_a_edit.iloc[n, cti_adap_test[n][i]] = train_q5[cti_adap_test[n][i]]*(X_test.iloc[n, cti_adap_test[n][i]] == 0) + train_q95[cti_adap_test[n][i]]*(X_test.iloc[n, cti_adap_test[n][i]])
        X_test_t_r_edit.iloc[n, cti_rand_test[n][i]] = train_q5[cti_rand_test[n][i]]*(X_test.iloc[n, cti_rand_test[n][i]] == 0) + train_q95[cti_rand_test[n][i]]*(X_test.iloc[n, cti_rand_test[n][i]])

        if 'linear' in args.teacher_path or 'Linear' in args.teacher_path:
            X_test_d_l_edit.iloc[n, cti_l_test[n][i]] = X_test.iloc[n, cti_l_test[n][i]]
            X_test_t_l_edit.iloc[n, cti_l_test[n][i]] = train_q5[cti_l_test[n][i]]*(X_test.iloc[n, cti_l_test[n][i]] == 0) + train_q95[cti_l_test[n][i]]*(X_test.iloc[n, cti_l_test[n][i]])

    y_test_t_eval_a_interv = process_teacher_eval(predict_teacher(teacher, X_test_t_a_edit, args.gpu))
    y_test_t_eval_r_interv = process_teacher_eval(predict_teacher(teacher, X_test_t_r_edit, args.gpu))

    r = evaluate_test_student(figs_student, X_test_d_a_edit, y_test_t_eval_a_interv, args.metric, f"distill_adap_interv_iter{i}", r)
    r = evaluate_test_student(figs_student, X_test_d_r_edit, y_test_t_eval_r_interv, args.metric, f"distill_rand_interv_iter{i}", r)

    r = evaluate_test_student(figs_student, X_test_d_a_edit, y_test, args.metric, f"pred_adap_interv_iter{i}", r)
    r = evaluate_test_student(figs_student, X_test_d_r_edit, y_test, args.metric, f"pred_rand_interv_iter{i}", r)

    r = evaluate_test_teacher(y_test_t_eval_a_interv, y_test, args.metric, f"pred_adap_interv_iter{i}", r)
    r = evaluate_test_teacher(y_test_t_eval_r_interv, y_test, args.metric, f"pred_rand_interv_iter{i}", r)

    if 'linear' in args.teacher_path or 'Linear' in args.teacher_path:
        y_test_t_eval_l_interv = process_teacher_eval(predict_teacher(teacher, X_test_t_l_edit, args.gpu))

        r = evaluate_test_student(figs_student, X_test_d_l_edit, y_test_t_eval_l_interv, args.metric, f"distill_lin_interv_iter{i}", r)
        r = evaluate_test_student(figs_student, X_test_d_l_edit, y_test, args.metric, f"pred_lin_interv_iter{i}", r)
        r = evaluate_test_teacher(y_test_t_eval_l_interv, y_test, args.metric, f"pred_lin_interv_iter{i}", r)