Skip to content

Commit

Permalink
new data coming in was being scaled in the wrong way
Browse files Browse the repository at this point in the history
  • Loading branch information
Lorenzo Zanisi committed Jul 12, 2022
1 parent 5fa1dbd commit b12c3fe
Show file tree
Hide file tree
Showing 7 changed files with 312 additions and 129 deletions.
283 changes: 225 additions & 58 deletions src/notebooks/pipeline_notebooks/outputs.ipynb

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/notebooks/pipeline_notebooks/plot_CL_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def losses_and_forgetting():

lambdas = [1] # [0.4,0.6,0.8,1]
replays = [1] #[0.4,0.6,0.8,1]
acquis = ['None'] #['random','individual']
acquis = ['individual'] #['random','individual']
names = [f'bootstrapped_CL_shrink_perturb_lam_{lam}_{acq}_replaysize_{mem}_useAL_{useAL}' for acq in acquis for lam in lambdas for mem in replays]
cmap_1 = plt.get_cmap('Greens')

Expand Down
49 changes: 27 additions & 22 deletions src/pipeline/CLPipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pickle as pkl
from multiprocessing import Pool
#import comet_ml import Experiment
import matplotlib.pylab as plt
from sklearn.preprocessing import StandardScaler

import pipeline.pipeline_tools as pt
Expand All @@ -27,13 +28,13 @@


def downsample(cfg,data,mem_replay):
train_sample = data[0].sample( int(len(data[0])*mem_replay))
train_sample_2 = data[0].sample( int(len(data[0])*mem_replay))
train_classifier = data[1].sample(int(len(data[1])*mem_replay))
valid_dataset = data[2].sample(int(len(data[2])*mem_replay))
valid_classifier = data[3].sample(int(len(data[3])*mem_replay))
holdout_set = data[4].sample(int(len(data[4])*mem_replay))
holdout_classifier = data[5].sample(int(len(data[5])*mem_replay))
return train_sample, train_classifier, valid_dataset, valid_classifier, holdout_set, holdout_classifier
return train_sample_2, train_classifier, valid_dataset, valid_classifier, holdout_set, holdout_classifier

def apply_shrink_perturb(models,lambda_task):
for flux in models.keys():
Expand Down Expand Up @@ -94,7 +95,7 @@ def CLPipeline(arg):
if CL_mode!= 'shrink_perturb':
cfg['hyperparams']['lambda'] = 1
if j==0:
train_sample, train_classifier, valid_dataset, valid_classifier, unlabelled_pool, holdout_set, holdout_classifier, saved_tests, scaler = pt.get_data(cfg,j=j,apply_scaler=True) # --- saved_tests is never scaled
train_sample_2, train_classifier, valid_dataset, valid_classifier, unlabelled_pool, holdout_set, holdout_classifier, saved_tests, scaler = pt.get_data(cfg,j=j,apply_scaler=True) # --- saved_tests is never scaled
saved_test_data.update(saved_tests)

for FLUX in FLUXES:
Expand All @@ -113,15 +114,15 @@ def CLPipeline(arg):

# --- downsample data from previous tasks, otherwise too much imbalance.
# --- scaler is same as before for the following lines (to unscale), then gets updated with new data
data = [train_sample, train_classifier, valid_dataset, valid_classifier, holdout_set, holdout_classifier]
train_sample, train_classifier, valid_dataset, valid_classifier, holdout_set, holdout_classifier = downsample(cfg, data, mem_replay)
data = [train_sample_2, train_classifier, valid_dataset, valid_classifier, holdout_set, holdout_classifier]
train_sample_2, train_classifier, valid_dataset, valid_classifier, holdout_set, holdout_classifier = downsample(cfg, data, mem_replay)
# --- unscale so they can be added to new (unscaled) data
train_sample, train_classifier, valid_dataset, valid_classifier, unlabelled_pool, holdout_set, holdout_classifier,scaler = pt.scale_data(train_sample, train_classifier, valid_dataset, valid_classifier, unlabelled_pool, holdout_set, holdout_classifier,unscale=True,scaler=scaler)
train_sample_2, train_classifier, valid_dataset, valid_classifier, unlabelled_pool, holdout_set, holdout_classifier,scaler = pt.scale_data(train_sample_2, train_classifier, valid_dataset, valid_classifier, unlabelled_pool, holdout_set, holdout_classifier,unscale=True,scaler=scaler)

train_sample_new, train_classifier_new, valid_dataset_new, valid_classifier_new, unlabelled_pool_new, holdout_set_new, holdout_classifier_new, saved_tests,scaler = pt.get_data(cfg, apply_scaler=False,j=j) # --- new scaler declared here
train_sample_2_new, train_classifier_new, valid_dataset_new, valid_classifier_new, unlabelled_pool_new, holdout_set_new, holdout_classifier_new, saved_tests,scaler = pt.get_data(cfg, apply_scaler=False,j=j) # --- new scaler declared here
saved_test_data.update(saved_tests)

train_sample.add(train_sample_new) #
train_sample_2.add(train_sample_2_new) #
train_classifier.add(train_classifier_new)
valid_dataset.add(valid_dataset_new) # --- in a real world situation where data is streaming this is unavailable. Need to update dynamically withi the AL pipeline.
valid_classifier.add(valid_classifier_new)
Expand All @@ -130,15 +131,15 @@ def CLPipeline(arg):
# unlabelled_pool = unlabelled_pool_new # --- not added, only data from the new task arrives

# --- rescale by keeping memory of previous tasks - train sample is available, data is scaled accordingly
train_sample, train_classifier, valid_dataset, valid_classifier, unlabelled_pool, holdout_set, holdout_classifier, scaler = pt.scale_data(train_sample, train_classifier, valid_dataset, valid_classifier, unlabelled_pool, holdout_set, holdout_classifier)
train_sample_2, train_classifier, valid_dataset, valid_classifier, unlabelled_pool, holdout_set, holdout_classifier, scaler = pt.scale_data(train_sample_2, train_classifier, valid_dataset, valid_classifier, unlabelled_pool, holdout_set, holdout_classifier)


# --- train models for jth task
for FLUX in FLUXES:
models[FLUX]['Regressor'].scaler = scaler
models[FLUX]['Regressor'], losses = md.train_model(
models[FLUX]['Regressor'],
train_sample,
train_sample_2,
valid_dataset,
epochs=cfg["train_epochs"],
patience=cfg["train_patience"],
Expand Down Expand Up @@ -218,28 +219,31 @@ def CL_and_AL_Pipeline(arg):
if CL_mode!= 'shrink_perturb':
cfg['hyperparams']['lambda'] = 1
if j==0:
train_sample, train_classifier, valid_dataset, valid_classifier, unlabelled_pool, holdout_set, holdout_classifier, saved_tests, scaler = pt.get_data(cfg,j=0)
train_sample_2, train_classifier, valid_dataset, valid_classifier, unlabelled_pool, holdout_set, holdout_classifier, saved_tests, scaler = pt.get_data(cfg,j=0)
saved_test_data.update(saved_tests)
first_iter = True # -- ok, retain
else:
# --- read data and scale with scaler of previous task
train_sample_new, train_classifier_new, valid_dataset_new, valid_classifier_new, unlabelled_pool_new, holdout_set_new, holdout_classifier_new, saved_tests,scaler = pt.get_data(cfg, scaler=scaler,j=j) # --- scaler gets updated during pipeline and passed here
train_sample_2_plot = copy.deepcopy(train_sample_2)
train_sample_2_plot.scale(scaler, unscale=True)

#train_sample = copy.deepcopy(train_sample_2)
_,_, valid_dataset_new, valid_classifier_new, unlabelled_pool_new, holdout_set_new, holdout_classifier_new, saved_tests,_ = pt.get_data(cfg, scaler=scaler,j=j) # --- scaler gets updated during pipeline and passed here
saved_test_data.update(saved_tests)

# --- downsample data from previous tasks, otherwise too much imbalance.
# data = [train_sample, train_classifier, valid_dataset, valid_classifier, holdout_set, holdout_classifier]
# train_sample, train_classifier, valid_dataset, valid_classifier, holdout_set, holdout_classifier = downsample(cfg, data, mem_replay)
# data = [train_sample_2, train_classifier, valid_dataset, valid_classifier, holdout_set, holdout_classifier]
# train_sample_2, train_classifier, valid_dataset, valid_classifier, holdout_set, holdout_classifier = downsample(cfg, data, mem_replay)

# train_sample.add(train_sample_new) # ToDo====>>>> if train sample is available, make sure data is scaled accordingly!
# train_sample_2.add(train_sample_2_new) # ToDo====>>>> if train sample is available, make sure data is scaled accordingly!
# train_classifier.add(train_classifier_new)
valid_dataset.add(valid_dataset_new) # --- in a real world situation where data is streaming this is unavailable. Need to update dynamically withi the AL pipeline.
valid_classifier.add(valid_classifier_new)
holdout_set.add(holdout_set_new)
holdout_classifier.add(holdout_classifier_new)
unlabelled_pool = copy.deepcopy(unlabelled_pool_new) # --- not added, only data from the new task arrives

for flux in models.keys():
models[flux]['Regressor'].scaler = scaler
#for flux in models.keys():
# models[flux]['Regressor'].scaler = scaler

first_iter = False

Expand All @@ -251,14 +255,15 @@ def CL_and_AL_Pipeline(arg):

inp = [seed,cfg,save_outputs_path,save_plots_path,
{'scaler':scaler,
'train':{'train_class':train_classifier,'train_regr':train_sample},
'train':{'train_class':train_classifier,'train_regr':train_sample_2},
'val':{'val_class': valid_classifier, 'val_regr':valid_dataset},
'test':{'test_class': holdout_classifier, 'test_regr': holdout_set},
'unlabelled':unlabelled_pool}, models, first_iter]
'unlabelled':unlabelled_pool}, models, first_iter, j]

inp = {'run_mode':'CL','cfg':inp}

train_sample_2, train_classifier, valid_dataset, valid_classifier, holdout_set, holdout_classifier, output_dict, scaler, models = ALpipeline(inp) # --- this is really ugly as it is a recursvie thing and so should really be in a class itself, but not time to polish

train_sample, train_classifier, valid_dataset, valid_classifier, holdout_set, holdout_classifier, output_dict, scaler, models = ALpipeline(inp) # --- this is really ugly as it is a recursvie thing and so should really be in a class itself, but not time to polish
outputs.update({f'task{j}':output_dict})
this_forgetting = get_forget_dict(saved_test_data,models,j,scaler,cfg)
forgetting.update(this_forgetting)
Expand Down
2 changes: 2 additions & 0 deletions src/pipeline/Models.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ def unscale(self, y):
# get the index of the scaler that corresponds to the target
scaler_features = self.scaler.feature_names_in_
scaler_index = np.where(scaler_features == self.flux)[0][0]
print("SCALER INDEX", scaler_index)

return y * self.scaler.scale_[scaler_index] + self.scaler.mean_[scaler_index]

Expand Down Expand Up @@ -427,6 +428,7 @@ def predict(self, dataloader, unscale=False, mean=None):
x = x.to(self.device)
z = z.to(self.device)
z_hat = self.forward(x.float())
z_hat[z_hat<0] = 0 # --- clip negative values
pred.append(z_hat.squeeze().detach().cpu().numpy())
loss = self.loss_function(z.unsqueeze(-1).float(), z_hat, unscale=unscale)
if unscale:
Expand Down
8 changes: 4 additions & 4 deletions src/pipeline/pipeline_config_CL_shrinkperturb.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
CL_method: shrink_perturb
common:
MC_dropout_runs: 50
Nbootstraps: 10
acquisition: random
Nbootstraps: 1
acquisition: individual
flux:
- efiitg_gb
hyperparams:
Expand All @@ -26,8 +26,8 @@ common:
sample_size_debug: 1
train_epochs: 50
train_patience: 10
lambda_task: 0.2
mem_replay: 0.4
lambda_task: 1
mem_replay: 1
task1:
data:
test: /home/ir-zani1/rds/rds-ukaea-ap001/ir-zani1/qualikiz/UKAEAGroupProject/data/carbonwall/Lmode/test_data_clipped.pkl
Expand Down
29 changes: 15 additions & 14 deletions src/pipeline/pipeline_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
"post_test_loss": [], # regressor performance after retraining
"post_test_loss_unscaled": [], # regressor performance after retraining, unscaled
"post_test_loss_unscaled_norm": [], # regressor performance after retraining, unscaled, normalised

"mean_scaler": [],
"scale_scaler": [],
"n_train_points": [],

"mse_before": [],
Expand Down Expand Up @@ -153,24 +154,24 @@ def prepare_data(
else:
return train_dataset, valid_dataset, test_dataset, None, train_dataset_regressor

def scale_data(train_sample, train_classifier, valid_dataset, valid_classifier, unlabelled_pool, holdout_set, holdout_classifier, scaler=None, unscale=False):
def scale_data(train_regr, train_class, valid_dataset, valid_classifier, unlabelled_pool, holdout_set, holdout_classifier, scaler=None, unscale=False):
if scaler is None: # --- ensures that future tasks can be scaled according to scaler of previous tasks
scaler = StandardScaler()
scaler.fit(train_sample.data.drop(["stable_label","index"], axis=1))
train_sample.scale(scaler,unscale=unscale)
train_classifier.scale(scaler,unscale=unscale)
scaler.fit(train_regr.data.drop(["stable_label","index"], axis=1))
train_regr.scale(scaler,unscale=unscale)
train_class.scale(scaler,unscale=unscale)
# -- scale eval now so don't scale its derivative dsets later
valid_dataset.scale(scaler,unscale=unscale)
valid_classifier.scale(scaler,unscale=unscale)
holdout_set.scale(scaler,unscale=unscale)
holdout_classifier.scale(scaler,unscale=unscale)
unlabelled_pool.scale(scaler,unscale=unscale)
return train_sample, train_classifier, valid_dataset, valid_classifier, unlabelled_pool, holdout_set, holdout_classifier, scaler
return train_regr, train_class, valid_dataset, valid_classifier, unlabelled_pool, holdout_set, holdout_classifier, scaler

def get_data(cfg,scaler=None,apply_scaler=True,j=None):
PATHS = cfg["data"]
FLUX = cfg["flux"]
train_classifier, eval_dataset, test_dataset, scaler, train_regressor = prepare_data(
train_class, eval_dataset, test_dataset, _, train_regressor = prepare_data(
PATHS["train"],
PATHS["validation"],
PATHS["test"],
Expand All @@ -182,9 +183,9 @@ def get_data(cfg,scaler=None,apply_scaler=True,j=None):
print('size of train, test, val', len(train_regressor),len(test_dataset), len(eval_dataset))
# --- train sets
if len(train_regressor)>cfg['hyperparams']['train_size']:
train_sample = train_regressor.sample(cfg['hyperparams']['train_size'])
train_regr = train_regressor.sample(cfg['hyperparams']['train_size'])
else:
train_sample = copy.deepcopy(train_regressor)
train_regr = copy.deepcopy(train_regressor)

# --- holdout sets are from the test set
if len(test_dataset)>cfg['hyperparams']['test_size']:
Expand All @@ -206,8 +207,8 @@ def get_data(cfg,scaler=None,apply_scaler=True,j=None):
eval_regressor.data = eval_regressor.data.drop(eval_regressor.data[eval_regressor.data["stable_label"] == 0].index)
valid_dataset = eval_regressor.sample(cfg['hyperparams']['valid_size']) # --- valid for regressor
valid_classifier = eval_dataset.sample(cfg['hyperparams']['valid_size']) # --- valid for classifier, must come from original eval
if len(train_classifier)>cfg['hyperparams']['train_size']:
train_classifier = train_classifier.sample(cfg['hyperparams']['train_size'])
if len(train_class)>cfg['hyperparams']['train_size']:
train_class = train_class.sample(cfg['hyperparams']['train_size'])
else:
pass

Expand All @@ -217,8 +218,8 @@ def get_data(cfg,scaler=None,apply_scaler=True,j=None):
unlabelled_pool = eval_dataset

if apply_scaler:
train_sample, train_classifier, valid_dataset, valid_classifier, unlabelled_pool, holdout_set, holdout_classifier, scaler = scale_data(train_sample, train_classifier, valid_dataset, valid_classifier, unlabelled_pool, holdout_set, holdout_classifier, scaler=scaler)
return train_sample, train_classifier, valid_dataset, valid_classifier, unlabelled_pool, holdout_set, holdout_classifier, saved_tests, scaler
train_regr, train_class, valid_dataset, valid_classifier, unlabelled_pool, holdout_set, holdout_classifier, scaler = scale_data(train_regr, train_class, valid_dataset, valid_classifier, unlabelled_pool, holdout_set, holdout_classifier, scaler=scaler)
return train_regr, train_class, valid_dataset, valid_classifier, unlabelled_pool, holdout_set, holdout_classifier, saved_tests, scaler

# classifier tools
def select_unstable_data(
Expand Down Expand Up @@ -321,7 +322,7 @@ def retrain_classifier(
logging.log(15, f"Training on {data_size} points")

train = copy.deepcopy(training_dataset)
train.add(misclassified_dataset)
#train.add(misclassified_dataset)

# create data loaders
train_loader = pandas_to_numpy_data(train, batch_size=batch_size, shuffle=True)
Expand Down
Loading

0 comments on commit b12c3fe

Please sign in to comment.