In [1]:
#Imports
import tensorflow as tf
import torch
from transformers import TFAutoModelForSequenceClassification, BertTokenizer
import pandas as pd 
import keras
from keras.optimizers import Adam
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
import nltk
import string
nltk.download('stopwords')
from nltk.corpus import stopwords #Word Stop
from nltk.tokenize import word_tokenize #Tokenization & Word Stop
stop_words = set(stopwords.words('english'))
punctuation = set(string.punctuation + '``'+ '`'+ ''+ ',' + '/')
import joblib
import numpy as np
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score

2024-07-11 18:54:32.546580: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:479] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-07-11 18:54:32.568986: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:10575] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-07-11 18:54:32.569029: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1442] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-07-11 18:54:32.583555: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
  _torch_pytree._register_pytree_node(
[nltk_data] D

In [None]:
#LDA Pipelines
class LDATopicModelPipeline:
    def __init__(self, lda_model_path, vectorizer_path):
        self.lda_model_path = lda_model_path
        self.vectorizer_path = vectorizer_path
        self.lda = None
        self.vectorizer = None
        
    def load_model(self):
        self.lda = joblib.load(self.lda_model_path)
        self.vectorizer = joblib.load(self.vectorizer_path)
        
    def topic_distributions(self, new_documents):
        if self.lda is None or self.vectorizer is None:
            self.load_model()
            
        if isinstance(new_documents["Combined_Text"], list):
            # If new_documents is a list of strings
            texts = new_documents
        elif isinstance(new_documents["Combined_Text"], pd.Series):
            # If new_documents is a Pandas Series (assuming it's a single column from a DataFrame)
            texts = new_documents["Combined_Text"].tolist()
            
        else:
            raise TypeError("Input data should be a list, Pandas Series, or DataFrame of strings.")
            
        x = self.vectorizer.transform(texts)
        topic_distributions = self.lda.transform(x)
        return topic_distributions
    
    def get_vectorizer(self):
        return self.vectorizer
    
    def append_topics(self, data, topic_distributions):
        topics = []
        for topic_dist in topic_distributions:
            dominant_topic = topic_dist.argmax()
            topics.append(dominant_topic)
            
        data['Topic'] = topics
        return data
            
#gives necessary files to process information
#use through this syntax: topic_distributions = topic_model.topic_distributions(new_documents)

In [None]:
#Training BERT model for text classification
class BertTextClassification:
    def __init__(self):
                self.documents = None
        self.vectorizer = None
        self.bayes_trained = None
        
        
    def load_model(self, topic_num):
        #Use Trained Text Classifier Based on Topic Number
         for x in [topic_num]:
            self.bayes_trained = joblib.load(f'/home/rpierson/githubPierson/TrainingBERTClassifiers/topic_pred_{x}.pkl')
            self.vectorizer = joblib.load(f'/home/rpierson/githubPierson/TrainingBERTClassifiers/vec_{x}.pkl')
    def priority(self, documents):
        self.num_topics = documents['Topic'].drop_duplicates().values
        documents["Predicted_Priority"] = "" 
        self.documents = documents
        
        for topic_num in self.num_topics:
            self.load_model(topic_num)
            for i in self.documents.index: 
                if self.documents.loc[i, 'Topic'] == topic_num:
                    text = self.documents.loc[i, "Combined_Text"]
                    vector = self.vectorizer.transform([text]).toarray().reshape(1, -1)
                    prediction = self.bayes_trained.predict(vector)
                    self.documents.at[i, "Predicted_Priority"] = prediction[0]
        return self.documents
        

In [None]:
#Accuracy Assessment
class AccuracyAssessment:
    def __init__(self, priority_levels):
        self.actual_priority = None
        self.predicted_priority = None
        self.true_pos = 0
        self.false_pos = 0
        self.false_neg = 0
        self.num_priority_levels = priority_levels
        self.confusion_matrix = np.zeros((self.num_priority_levels, self.num_priority_levels), dtype=int)
    
    def update_vals(self, actual, predicted):
        self.actual_priority = actual.astype(int)
        self.predicted_priority = predicted.astype(int)
        
        # Update confusion matrix
        for i in range(len(self.actual_priority)):
            true_idx = self.actual_priority[i] - 1
            pred_idx = self.predicted_priority[i] - 1
            self.confusion_matrix[true_idx, pred_idx] += 1
    
    def calc_metrics(self, class_index):
        self.true_pos = self.confusion_matrix[class_index, class_index]
        self.false_pos = np.sum(self.confusion_matrix[:, class_index]) - self.true_pos
        self.false_neg = np.sum(self.confusion_matrix[class_index, :]) - self.true_pos
    
    def precision(self, class_index):
        self.calc_metrics(class_index)
        
        if self.true_pos + self.false_pos == 0:
            return 0
        precision = self.true_pos / (self.true_pos + self.false_pos)
        
        return precision
    
    def recall(self, class_index):
        self.calc_metrics(class_index)
        
        if self.true_pos + self.false_neg == 0:
            return 0
        
        recall = self.true_pos / (self.true_pos + self.false_neg)
        
        return recall
    
    def fmeasure(self, class_index):
        precision = self.precision(class_index)
        recall = self.recall(class_index)
        
        if precision + recall == 0:
            return 0
        
        fmeasure = (2 * precision * recall) / (precision + recall)
        
        return fmeasure
    
    def accuracyOverall(self):
        accuratePriority = 0
        for i in range(self.num_priority_levels):
            self.calc_metrics(i)
            accuratePriority += self.true_pos
        accuratePriorities = accuratePriority / len(self.actual_priority)
        return accuratePriorities
    
    def microAnalysis(self):
        precisions = []
        recalls = []
        f_measures = []
        for i in range(self.num_priority_levels):
            precision_score = self.precision(i)
            recall_score = self.recall(i)
            f_measure_score = self.fmeasure(i)
            
            precisions.append(precision_score)
            recalls.append(recall_score)
            f_measures.append(f_measure_score)
        
        micro_precision = sum(precisions) / sum(precisions + recalls)
        micro_recall = sum(precisions) / sum(precisions + f_measures)
        micro_fmeasure = (2 * micro_precision * micro_recall) / (micro_precision + micro_recall)
        print(f"Micro-Analysis for Priority Levels: Precision = {micro_precision:.4f}, Recall={micro_recall:.4f}, F-measure={micro_fmeasure:.4f}")
   
    def macroAnalysis(self):
        precisions = []
        recalls = []
        f_measures = []
        for i in range(self.num_priority_levels):
            precision_score = self.precision(i)
            recall_score = self.recall(i)
            f_measure_score = self.fmeasure(i)
            
            precisions.append(precision_score)
            recalls.append(recall_score)
            f_measures.append(f_measure_score)

        macro_precision = sum(precisions) / len(precisions)
        macro_recall = sum(recalls) / len(recalls)
        macro_fmeasure = sum(f_measures) / len(f_measures)
        print(f"Macro-Analysis for Priority Levels: Precision = {macro_precision:.4f}, Recall={macro_recall:.4f}, F-measure={macro_fmeasure:.4f}")
    def printAssessment(self):
        for i in range(self.num_priority_levels):
            precision_score = self.precision(i)
            recall_score = self.recall(i)
            f_measure_score = self.fmeasure(i)
    
            print(f"Priority P{i+1}: Precision={precision_score:.4f}, Recall={recall_score:.4f}, F-measure={f_measure_score:.4f}")

In [None]:
#Classes together in pipeline:
def priority_pipeline_without_assessment(data):
    #PreProcessing
    #preprocess = PreprocessingPineline(stop_words, punctuation)
    #data = preprocess.data_to_tokens(data)
    #Topic Modeling (Insert Trained Model Here, Save as a .pth)
    lda_model_path = '/home/rpierson/PiersonREU/extracted/lda.pkl'
    vectorizer_path = '/home/rpierson/PiersonREU/extracted/vec.pkl'
    topic_model = LDATopicModelPipeline(lda_model_path, vectorizer_path)
    topic_model.load_model()
    df = pd.DataFrame(columns = ['Combined_Text', 'Topic'])
    df['Combined_Text'] = data
    df['Combined_Text'] = df['Combined_Text'].fillna(' ')
    topic_distributions = topic_model.topic_distributions(df)
    data = topic_model.append_topics(df, topic_distributions)
    print(df)
    vectorizer = topic_model.get_vectorizer()
    
    #Text Classification Per Topic (Insert Trained Model Here, Save as a .pth)
    nb = TextClassificationNaiveBayes()
    data = nb.priority(df)
    return data