# Phase 1: Classification (assign Utility, Application, or Entity Tag)

In [None]:
version = "v_imen" # All options: v_imen, v_team
system = "pos" # All options: jforum, cargotracker, petclinic, pos
model_type = "albert" # All options: ft_codebert, word2vec, albert, codebert, roberta, bert

## 1.1 Create Embeddings

In [None]:
from transformers import AutoTokenizer, AutoModel, AlbertTokenizer, AlbertModel, RobertaModel, RobertaTokenizer, BertTokenizer, BertModel
import torch
from sklearn.model_selection import train_test_split
import numpy as np
from utils import load_class_code_from_directory, load_data_from_csv, write_embeddings_to_csv, associate_classes_to_types, write_distance_to_csv
from embeddings import generate_embeddings_for_java_code, generate_word_embeddings_for_java_code
import nltk
from nltk.stem import WordNetLemmatizer
import gensim.downloader as api

In [None]:
# Check if CUDA (GPU) is available and if so, set the device to GPU
if torch.cuda.is_available():  
  dev = "cuda:0" 
else:  
  dev = "cpu"  

device = torch.device(dev)

In [None]:
# Select the model and tokenizer
if (model_type == "codebert"):
    tokenizer = AutoTokenizer.from_pretrained("microsoft/codebert-base",force_download=False)
    model = AutoModel.from_pretrained("microsoft/codebert-base",force_download=False)
elif (model_type == "ft_codebert"):
    tokenizer = AutoTokenizer.from_pretrained("./codebert_finetuned",force_download=False)
    model = AutoModel.from_pretrained("./codebert_finetuned",force_download=False)
elif (model_type == "bert"):
    tokenizer = BertTokenizer.from_pretrained("bert-base-uncased") 
    model = BertModel.from_pretrained("bert-base-uncased") 
elif (model_type == "roberta"):
    tokenizer = RobertaTokenizer.from_pretrained("roberta-base")
    model = RobertaModel.from_pretrained("roberta-base")
elif (model_type == "albert"): 
    # pip3 install sentencepiece
    tokenizer = AlbertTokenizer.from_pretrained("albert-base-v2")
    model = AlbertModel.from_pretrained("albert-base-v2")
elif model_type == "word2vec":
    # Download required NLTK datasets and initialize the lemmatizer
    nltk.download('wordnet')
    word_lemmatizer = WordNetLemmatizer()

    # Load Word2Vec model
    word2vec_model = api.load('word2vec-google-news-300')
else:
    raise NameError("model type not supported")

# Move the model to the GPU if available
model = model.to(device)

In [None]:
# Labels are 0: Application, 1: Utility, 2: Entity
class_labels = associate_classes_to_types(version, system)
print(class_labels)

In [None]:
# For each class in class_code, generate embeddings and add to class_embeddings dictionary
class_embeddings = {}
class_code = load_class_code_from_directory(system)
if model_type == "word2vec":
    class_embeddings = {class_name: generate_word_embeddings_for_java_code(code, model, word_lemmatizer) for class_name, code in class_code.items()}
else:
    class_embeddings = {class_name: generate_embeddings_for_java_code(code, model, tokenizer, device) for class_name, code in class_code.items()}

# Write embeddings to csv file
write_embeddings_to_csv(version, system, model_type, class_embeddings, class_labels)

## 1.2 Train classifiers

In [None]:
from sklearn.tree import DecisionTreeClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.naive_bayes import GaussianNB
from sklearn.metrics import confusion_matrix, accuracy_score, classification_report
from sklearn.ensemble import VotingClassifier, AdaBoostClassifier
from imblearn.over_sampling import SMOTE
from collections import Counter

In [None]:
# Load data
filename = f"./csv_files/{version}_{system}_{model_type}_embeddings.csv"
class_names, labels, embeddings = load_data_from_csv(filename)

# Train-test split
Xtrain, Xtest, names_train, names_test = train_test_split(embeddings, class_names, test_size=0.3, random_state=0)

# Get labels for the training and test sets
ytrain, ytest = [[labels[class_names.index(name)] for name in lst] for lst in [names_train, names_test]]

# Ensure at least one instance of each class in the training data
unique_classes = set(labels)
for cls in unique_classes:
    if cls not in ytrain:
        cls_index = labels.index(cls)
        Xtrain = np.vstack([Xtrain, [embeddings[cls_index]]])
        ytrain.append(cls)

# Calculate class frequencies and mean frequency
class_freq = Counter(ytrain)
mean_count = sum(class_freq.values()) // len(class_freq)

# Identify classes that need resampling (e.g., significantly fewer than mean_count)
threshold = 0.7  # 70% of the mean_count
classes_to_resample = {cls: int(mean_count) for cls, count in class_freq.items() if count < mean_count * threshold}

# Check if all classes_to_resample have enough samples
skip_resampling = False
for class_label in classes_to_resample.keys():
    if class_label in class_freq and class_freq[class_label] < 2:  # k_neighbors + 1
        skip_resampling = True
        print(f"Skipping resampling for class {class_label} due to insufficient samples.")
        break

# Apply SMOTE
if not skip_resampling and classes_to_resample:
    sm = SMOTE(sampling_strategy=classes_to_resample, k_neighbors=1, random_state=42)
    Xtrain, ytrain = sm.fit_resample(Xtrain, ytrain)

# Print information
print(f'Number of classes: {len(class_names)}\nResampled dataset shape: {Counter(ytrain)}')
print(f'len(Xtrain): {len(Xtrain)}\nlen(Xtest): {len(Xtest)}')
print(f'ytrain: {ytrain}\nytest: {ytest}')
print(f'ytrain count: {np.bincount(ytrain)}\nytest count: {np.bincount(ytest)}')


In [None]:
# # OPTIONAL: Please keep this code commented out unless you want to use it
# # This code ensures that each class is represented in the test set, proportional to its representation in the training set
# # Calculate class frequencies for training and test sets
# train_class_freq = Counter(ytrain)
# test_class_freq = Counter(ytest)

# # Calculate the ratio between the size of the training and test sets
# ratio = len(ytrain) / len(ytest)

# # Iterate over each class to make sure it has proportional representation in the test set
# for cls, train_count in train_class_freq.items():
#     expected_test_count = int(train_count / ratio)
#     actual_test_count = test_class_freq.get(cls, 0)

#     if actual_test_count < expected_test_count:
#         # Find instances in the training set to move to the test set
#         for _ in range(expected_test_count - actual_test_count):
#             cls_index = ytrain.index(cls)
#             Xtest = np.vstack([Xtest, [Xtrain[cls_index]]])
#             ytest.append(cls)
#             Xtrain = np.delete(Xtrain, cls_index, axis=0)
#             ytrain.pop(cls_index)

In [None]:
def generate_classification_report(y_true, y_pred):
    # Identify unique labels in both true labels and predictions
    unique_labels = np.unique(np.concatenate((y_true, y_pred)))

    # Map unique labels to their corresponding names
    label_names_map = {-1: "None", 0: "Application", 1: "Utility", 2: "Entity"}
    dynamic_label_names = [label_names_map[label] for label in unique_labels]

    # Generate and print the classification report
    print(classification_report(y_true, y_pred, target_names=dynamic_label_names))

## Decision Tree

In [None]:
decision_tree_classifier = DecisionTreeClassifier(max_depth=2).fit(Xtrain, ytrain)
decision_tree_predictions = decision_tree_classifier.predict(Xtest)
decision_tree_accuracy = accuracy_score(ytest, decision_tree_predictions)
decision_tree_confusion_matrix = confusion_matrix(ytest, decision_tree_predictions)
print(decision_tree_accuracy)
print(decision_tree_confusion_matrix)
generate_classification_report(ytest, decision_tree_predictions)

## SVM

In [None]:
svm_classifier = SVC(kernel='linear', C=2, probability=True).fit(Xtrain, ytrain)
svm_predictions = svm_classifier.predict(Xtest)
svm_accuracy = accuracy_score(ytest, svm_predictions)
svm_confusion_matrix = confusion_matrix(ytest, svm_predictions)
print(f"SVM Accuracy: {svm_accuracy}")
print(svm_confusion_matrix)
generate_classification_report(ytest, svm_predictions)

## KNN

In [None]:
knn_classifier = KNeighborsClassifier(n_neighbors=5).fit(Xtrain, ytrain)
knn_predictions = knn_classifier.predict(Xtest)
knn_accuracy = accuracy_score(ytest, knn_predictions)
knn_confusion_matrix = confusion_matrix(ytest, knn_predictions)
print(knn_accuracy)
print(knn_confusion_matrix)
generate_classification_report(ytest, knn_predictions)

## LogisticRegression

In [None]:
logistic_regression_classifier = LogisticRegression(random_state=0).fit(Xtrain, ytrain)
logistic_regression_predictions = logistic_regression_classifier.predict(Xtest)
logistic_regression_accuracy = accuracy_score(ytest, logistic_regression_predictions)
logistic_regression_confusion_matrix = confusion_matrix(ytest, logistic_regression_predictions)
print(logistic_regression_accuracy)
print(logistic_regression_confusion_matrix)
generate_classification_report(ytest, logistic_regression_predictions)

## Gaussian NB

In [None]:
naive_bayes_classifier = GaussianNB().fit(Xtrain, ytrain)
naive_bayes_predictions = naive_bayes_classifier.predict(Xtest)
naive_bayes_accuracy = accuracy_score(ytest, naive_bayes_predictions)
naive_bayes_confusion_matrix = confusion_matrix(ytest, naive_bayes_predictions)
print(naive_bayes_accuracy)
print(naive_bayes_confusion_matrix)
generate_classification_report(ytest, naive_bayes_predictions)

## Ensemble learning

In [None]:
# Combine individual classifiers into an ensemble
ensemble_clf = VotingClassifier(estimators=[
('svm', svm_classifier), ('knn', knn_classifier), ('dt', decision_tree_classifier), ('log_reg', logistic_regression_classifier), ('gnb', naive_bayes_classifier)],
voting='soft')

# Use SVM as the base estimator for AdaBoost
svm_base = SVC(kernel='linear', C=2, probability=True) # Can use any other classifier as the base estimator
ada_boost = AdaBoostClassifier(base_estimator=svm_base, n_estimators=50, algorithm='SAMME.R', random_state=1)

# Combine the ensemble classifier with AdaBoost
final_ensemble = VotingClassifier(estimators=[
    ('ensemble_clf', ensemble_clf), ('ada_boost', ada_boost)],
    voting='soft')

# Fit model to your data
final_ensemble.fit(Xtrain, ytrain)

# Evaluate model
ensemble_predictions = final_ensemble.predict(Xtest)
ensemble_accuracy = accuracy_score(ytest, ensemble_predictions)
print('Accuracy:', ensemble_accuracy)
ensemble_confusion_matrix = confusion_matrix(ytest, ensemble_predictions)
print(ensemble_confusion_matrix)
print(classification_report(ytest, ensemble_predictions))