In [None]:
import os
import re
import pickle
from pathlib import Path
import torch
import numpy as np
import random
torch.__version__

In [None]:
!git clone https://github.com/fdalvi/NeuroX

In [None]:
import sys
package_paths = ['NeuroX', 'src']
for pth in package_paths:
    sys.path.append(pth)
    
from converter import ConvertSample
from extractors import GetEmbeddings
from Experiment import Experiment

In [None]:
seed = 300 # change seed here
model_type = 'good' # change model type here
os.environ['PYTHONHASHSEED']=str(seed)
os.environ["CUBLAS_WORKSPACE_CONFIG"]=":4096:8"
os.environ["CUDA_LAUNCH_BLOCKING"]="1" 

In [None]:
def set_seed(seed):
    
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True
    torch.use_deterministic_algorithms(True)
    
    random.seed(seed)
    torch.manual_seed(seed)

    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)

    os.environ["PYTHONHASHSEED"] = str(seed)
     
set_seed(seed)

In [None]:
torch.set_num_threads(1)

In [None]:
data_paths = [f'{str(Path(os.getcwd()))}/data/per_sent/', \
              f'{str(Path(os.getcwd()))}/data/per_sent/', \
              f'{str(Path(os.getcwd()))}/data/per_token/'] # change data paths here

In [None]:
i = 0
for data_path in data_paths:
    
    if i == 0:
        probing_type = 'cls' # embeddings by CLS token
    elif i == 1:
        probing_type = 'avg' # average sentence embeddings
    elif i == 2:
        probing_type = 'token' # per-token
        
    path_work = f'{model_type}_{probing_type}{seed}/' 
    if not os.path.isdir(path_work):
        os.mkdir(path_work)
    
    ordered_neurons = {}
    ordered_neurons_c = {}
    threshold = {}
    threshold_c = {}
    weights = {}
    weights_c = {}
    scores_layers = {}
    top_n = {}
    top_n_per_class = {}
    top_n_c = {}
    top_n_per_class_c = {}
    bottom_n = {}
    scores = {}
    scores_control = {}
    scores_keep_top = {}
    scores_keep_top_c = {}
    scores_keep_thres = {}
    scores_keep_thres_c = {}
    predicted = {}
    scores_keep_bot = {}

    for dirname, _, filenames in os.walk(data_path):
        for filename in filenames:
            file = os.path.join(dirname, filename)
            splitter = ConvertSample(path=file,  path_work=path_work, train_size=20, test_size=20)
            # get train and test
            path_trdata, path_trlabel,path_ctrdata, path_ctrlabel, path_tedata, path_telabel = splitter.writer()
            # get embeddings
            data = GetEmbeddings(path_trdata=path_trdata, path_tedata=path_tedata, path_work=path_work, probing_type=probing_type)
            data.jsons(model=f'models/{model_type}_1kk/') # model here
            cat = Experiment(path_trdata, path_trlabel, path_tedata, path_telabel, path_work, probing_type=probing_type)    
            cat_name = re.search(r'[a-zA-Z]+_[a-zA-Z]+(?=.txt)', path_trdata)[0]
            d_name = cat.dataset
            probe, scores_tr, scores_te, predictions = cat.run_classification(return_predictions=True)
            scores[cat_name] = [scores_tr, scores_te]
            predicted[cat_name] = predictions
            weights1, weights2 = cat.return_weights(probe)
            weights[cat_name] = [weights1, weights2]
            ordering, cutoffs = cat.nranking(probe) # ranking
            ordered_neurons[cat_name] = [ordering, cutoffs]
            top_n[cat_name] = cat.top_n(probe)[0]
            top_n_per_class[cat_name] = cat.top_n(probe)[1]
            scores_tr, scores_te = cat.keep_only(neurons=top_n[cat_name], goal='top') 
            scores_keep_top[cat_name] = [scores_tr, scores_te] 
            bottom_n[cat_name] = cat.top_n(probe, percentage=0.8)[0]
            scores_tr, scores_te = cat.keep_bottom(neurons=bottom_n[cat_name])
            scores_keep_bot[cat_name] = [scores_tr, scores_te]   
            scores_layers[cat_name] = cat.train_layers()
            threshold[cat_name] = cat.threshold_n(probe)[0] 
            scores_tr, scores_te = cat.keep_only(threshold[cat_name], goal='threshold') 
            scores_keep_thres[cat_name] = [scores_tr, scores_te] 
            # save results
            with open(f'{path_work}/scores_{d_name}.pkl', 'wb') as f:
                pickle.dump(scores, f)
            with open(f'{path_work}/predicted_{d_name}.pkl', 'wb') as f:
                pickle.dump(predicted, f)
            with open(f'{path_work}/label2dx_{d_name}.pkl', 'wb') as f:
                pickle.dump(cat.label2idx, f)
            with open(f'{path_work}/neurons_{d_name}.pkl', 'wb') as f:
                pickle.dump(ordered_neurons, f)
            with open(f'{path_work}/weights_{d_name}.pkl', 'wb') as f:
                pickle.dump(weights, f)
            with open(f'{path_work}/scores_keep_top_{d_name}.pkl', 'wb') as f:
                pickle.dump(scores_keep_top, f)
            with open(f'{path_work}/scores_keep_thres_{d_name}.pkl', 'wb') as f:
                pickle.dump(scores_keep_thres, f)
            with open(f'{path_work}/scores_keep_bot_{d_name}.pkl', 'wb') as f:
                pickle.dump(scores_keep_bot, f)
            with open(f'{path_work}/top_n_{d_name}.pkl', 'wb') as f:
                pickle.dump(top_n, f)
            with open(f'{path_work}/top_n_per_class_{d_name}.pkl', 'wb') as f:
                pickle.dump(top_n_per_class, f)
            with open(f'{path_work}/bottom_n_{d_name}.pkl', 'wb') as f:
                pickle.dump(bottom_n, f)
            with open(f'{path_work}/threshold_{d_name}.pkl', 'wb') as f:
                pickle.dump(threshold, f)
            with open(f'{path_work}/scores_layers_{d_name}.pkl', 'wb') as f:
                pickle.dump(scores_layers, f)
                
            # control task here
            cat = Experiment(path_ctrdata, path_ctrlabel, path_tedata, path_telabel, path_work, probing_type=probing_type)       
            cat_name = re.search(r'[a-zA-Z]+_[a-zA-Z]+(?=.txt)', path_trdata)[0]
            probe, scores_tr, scores_te = cat.run_classification()
            scores_control[cat_name] = [scores_tr, scores_te]
            weights1_c, weights2_c = cat.return_weights(probe)
            weights_c[cat_name] = [weights1_c, weights2_c]
            ordering_c, cutoffs_c = cat.nranking(probe)
            ordered_neurons_c[cat_name] = [ordering_c, cutoffs_c]
            top_n_c[cat_name] = cat.top_n(probe)[0] 
            top_n_per_class_c[cat_name] = cat.top_n(probe)[1]
            scores_tr, scores_te = cat.keep_only(neurons=top_n_c[cat_name], goal='top')
            scores_keep_top_c[cat_name] = [scores_tr, scores_te] 
            threshold_c[cat_name] = cat.threshold_n(probe)[0] 
            scores_tr, scores_te = cat.keep_only(threshold_c[cat_name], goal='threshold') 
            scores_keep_thres_c[cat_name] = [scores_tr, scores_te] 
            # save results
            with open(f'{path_work}/top_n_per_class_c_{d_name}.pkl', 'wb') as f:
                pickle.dump(top_n_per_class_c, f)
            with open(f'{path_work}/top_n_c_{d_name}.pkl', 'wb') as f:
                pickle.dump(top_n_c, f)
            with open(f'{path_work}/label2dx_c_{d_name}.pkl', 'wb') as f:
                pickle.dump(cat.label2idx, f)
            with open(f'{path_work}/weights_c_{d_name}.pkl', 'wb') as f:
                pickle.dump(weights_c, f)
            with open(f'{path_work}/scores_c_{d_name}.pkl', 'wb') as f:
                pickle.dump(scores_control, f)
            with open(f'{path_work}/scores_keep_top_c_{d_name}.pkl', 'wb') as f:
                pickle.dump(scores_keep_top_c, f)
            with open(f'{path_work}/scores_keep_thres_c_{d_name}.pkl', 'wb') as f:
                pickle.dump(scores_keep_thres_c, f)
            with open(f'{path_work}/neurons_c_{d_name}.pkl', 'wb') as f:
                pickle.dump(ordered_neurons_c, f)
    i += 1