from tensorflow.python.keras.utils import Sequence from tensorflow import keras import json import numpy as np import nn_input_files as nn_input from numpy import linalg as LA import sys import math # References: #https://www.tensorflow.org/api_docs/python/tf/keras/utils/Sequence #https://stanford.edu/~shervine/blog/keras-how-to-generate-data-on-the-fly class DeepTrace_DataGenerator(Sequence): # posObs[id] = [BUG, FILE], all possible positive cases # negObs[epoch] = negObs_list[id] = [BUG, FILE], sampled negative cases per epoch # split_indices: a subset of possible indices of positive cases def __init__(self, sample_ids, observation_key_triplets, bug_embedded_dir, code_vectorized_dir, jsonConfigs, shuffle=True): self.shuffle = shuffle self.observation_key_triplets = observation_key_triplets self.sample_ids = sample_ids self.cases = len(self.sample_ids) self.batch_size = 1 self.code_vectorized_dir = code_vectorized_dir self.bug_embedded_dir = bug_embedded_dir with open(jsonConfigs, 'r') as f: body = json.loads(f.read()) # self.code_time_steps = body["code_time_steps"] # self.bug_time_steps = body["bug_time_steps"] self.bug_embedding = body["bug_embedding"] self.code_dim1_max_word_features = body["code_dim1_max_word_features"] self.code_dim2_max_word_features = body["code_dim2_max_word_features"] self.code_max_word_features_classes = body["code_max_word_features_class"] self.code_max_word_features_package = body["code_max_word_features_package"] self.code_max_word_features_extends = body["code_max_word_features_extends"] self.code_max_word_features_implements = body["code_max_word_features_implements"] self.on_epoch_end() def on_epoch_end(self): self.gen_indices = np.arange(self.cases) np.random.shuffle(self.gen_indices) # Number of Batches in the Epoch # (Sequence) / (Batch Size) def __len__(self): return self.cases # A batch at position index def __getitem__(self, batch_index): # Select Data for the Batch start = batch_index * self.batch_size end = (batch_index + 1) * self.batch_size batch_indices = self.gen_indices[start:end] batch_ids = [self.sample_ids[i] for i in batch_indices] batch_keys = [self.observation_key_triplets[id] for id in batch_ids] # Load for batch_obs # there should be exactly one observation keys = batch_keys[0] method_id = keys[0] bug_id = keys[1] r = keys[2] method_category_mat, method_token_mat = self.load_method(method_id, self.code_dim1_max_word_features, self.code_vectorized_dir) class_mat = self.load_class(method_id, self.code_max_word_features_classes, self.code_vectorized_dir) package_mat = self.load_package(method_id, self.code_max_word_features_package, self.code_vectorized_dir) extends_mat = self.load_extends(method_id, self.code_max_word_features_extends, self.code_vectorized_dir) implements_mat = self.load_implements(method_id, self.code_max_word_features_implements, self.code_vectorized_dir) bug_mat = self.load_bug(bug_id, self.bug_embedded_dir) label = np.array([r]) #print("iwona test *****************") #print("bug_mat", bug_mat.shape) #print("method_category_mat", method_category_mat.shape) #print("method_token_mat",method_token_mat.shape) return [bug_mat, method_category_mat, method_token_mat], label.reshape(1, *label.shape) #return method_category_mat, label.reshape(1, *label.shape) # Dimensionality of output; batch_size X seq_length X feature_dimensionality def load_bug(self, bug_id, bug_dir): bug_embedded = np.load(bug_dir + bug_id) dim = bug_embedded.shape if dim[0] > 100: bug_embedded = bug_embedded[0:500,:] bug_norm = LA.norm(bug_embedded, axis=1) bug_normalized = bug_embedded / bug_norm[:, None] return bug_normalized.reshape(1, *bug_normalized.shape) def load_method(self, method_id, word_features_categories, code_dir): code_matrix = np.load(code_dir+method_id) code_categories = keras.utils.to_categorical(code_matrix[:, 0], num_classes=word_features_categories) code_tokens = code_matrix[:, 1] return code_categories.reshape(1, *code_categories.shape), code_tokens.reshape(1, *code_tokens.shape) def load_class(self, method_id, word_features, code_dir): class_vectorized = np.load(code_dir + "context_metadata/class/" + method_id) class_one_hot = keras.utils.to_categorical(class_vectorized, word_features) return class_one_hot.reshape(1, *class_one_hot.shape) def load_package(self, method_id, word_features, code_dir): package_vectorized = np.load(code_dir + "context_metadata/package/" + method_id) package_one_hot = keras.utils.to_categorical(package_vectorized, word_features) return package_one_hot.reshape(1, *package_one_hot.shape) def load_extends(self, method_id, word_features, code_dir): extends_vectorized = np.load(code_dir + "context_metadata/extends/" + method_id) extends_one_hot = keras.utils.to_categorical(extends_vectorized, word_features) return extends_one_hot.reshape(1, *extends_one_hot.shape) def load_implements(self, method_id, word_features, code_dir): implements_vectorized = np.load(code_dir + "context_metadata/implements/" + method_id) implements_one_hot = keras.utils.to_categorical(implements_vectorized, word_features) return implements_one_hot.reshape(1, *implements_one_hot.shape) def load_observations_key_triplets(jsonfilePos, jsonfileNeg): key_triplets = [] countPos = 0 countNeg = 0 with open(jsonfilePos, 'r') as pf: for pline in pf: body = json.loads(pline) key_triplets += [[body["method"], body["bug"], 1]] countPos += 1 with open(jsonfileNeg, 'r') as nf: for nline in nf: body = json.loads(nline) key_triplets += [[body["method"], body["bug"], 0]] countNeg += 1 return countPos, countNeg, key_triplets if __name__ == "__main__": ## TEST DATA GENERATOR ## ASSUMES THAT DIRECTORY AND MAPPING FILES HAVE BEEN CREATED ## hyperparameters.json ## file_bug_positive.json ## file_bug_negative.json ## directory & subdirectories: code, code/02_vectorized/ with subdir context_metadata / class, package, extendeds, implements ## directory & subdirectories: bug jsonConfig = "hyperparams.json" with open(jsonConfig, 'r') as f: config = json.loads(f.read()) # Load Observations jsonfile_pos = config["map_method_bug_pos_path"] jsonfile_neg = config["map_method_bug_neg_path"] posObsN, negObsN, observations_key_triplets = load_observations_key_triplets(jsonfile_pos, jsonfile_neg) # K Fold Testing np.random.seed(12) sample_pos_indices = np.arange(posObsN) np.random.shuffle(sample_pos_indices) sample_neg_indices = np.arange(negObsN) + posObsN np.random.shuffle(sample_neg_indices) K = config["K"] group_size = int(np.floor(posObsN+negObsN) / K) # Iterate K times # Split to Validation and Training k = 0 pos_validation_indices = sample_pos_indices[k*group_size:(k+1)*group_size] neg_validation_indices = sample_neg_indices[k:k+group_size] pos_train_indices = [x for x in sample_pos_indices if x not in pos_validation_indices] neg_train_indices = [x for x in sample_neg_indices if x not in neg_validation_indices] #TEST ALL pos_train_indices = sample_pos_indices neg_train_indices = sample_neg_indices # TEST ONE of EACH #pos_train_indices = np.array([sample_neg_indices[0]]) #neg_train_indices = np.array([sample_neg_indices[0]]) train_indices = np.concatenate((pos_train_indices, neg_train_indices)) np.random.shuffle(train_indices) validation_indices = pos_validation_indices + neg_validation_indices np.random.shuffle(validation_indices) # Init the Generator embedded_bug_dir = config["bugs_embedded_dir"] vectorized_code_dir = config["code_vectorized_dir"] gen = DeepTrace_DataGenerator(train_indices, observations_key_triplets, embedded_bug_dir, vectorized_code_dir, jsonConfig, shuffle=True) count = 0 for batch in gen: count += 1 print("Batch ***************************************************************** ", count) print("batch len", len(batch)) for input_i in batch[0]: print("input: ", input_i.shape) bug_batch_steps_embedding = batch[0][0] batchx, stepsx, embeddingx = bug_batch_steps_embedding.shape issue = np.sum(np.isinf(bug_batch_steps_embedding)) if issue: resp = input("Issue Found in:") else: print("Bug has no +/- inf") for s in range(stepsx): embedding_norm = LA.norm(bug_batch_steps_embedding[0][s][:]) #print(embedding_norm.shape, embedding_norm) if not np.isclose(embedding_norm, 1, atol=0.000001): issue = input("Norm not close to 1.") print("output:", batch[1]) #print("observation count:", len(train_indices))