<a href="https://colab.research.google.com/github/rossl18/rossl18.github.io/blob/main/SyllabusClassifier.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Import necessary libraries
%pip install better_profanity
%pip install pdfminer
%pip install nltk
import re
import nltk
import os
import numpy as np
import pandas as pd
from scipy.sparse import vstack
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, accuracy_score, confusion_matrix
from better_profanity import profanity
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.naive_bayes import MultinomialNB
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer
from sklearn.feature_extraction.text import TfidfVectorizer
from pdfminer.high_level import extract_text

# Download necessary NLTK data
nltk.download('punkt')
nltk.download('stopwords')


def preprocess_text(text):
    """
    Preprocesses the input text by lowercasing, removing special characters and numbers,
    tokenizing, removing stop words, and applying stemming.

    Parameters:
    - text (str): The raw text to preprocess.

    Returns:
    - preprocessed_text (str): The preprocessed text.
    """
    # Convert text to lowercase
    text = text.lower()

    # Remove special characters and numbers
    text = re.sub(r'[^a-z\s]', '', text)

    # Tokenize the text
    tokens = word_tokenize(text)

    # Remove stop words
    stop_words = set(stopwords.words('english'))
    tokens = [word for word in tokens if word not in stop_words]

    # Apply stemming
    stemmer = PorterStemmer()
    tokens = [stemmer.stem(word) for word in tokens]

    # Join tokens back into a string
    preprocessed_text = ' '.join(tokens)

    return preprocessed_text


def process_pdf(pdf_path):
    """
    Extracts text from a PDF file and preprocesses it.

    Parameters:
    - pdf_path (str): The file path to the PDF document.

    Returns:
    - preprocessed_text (str): The preprocessed text extracted from the PDF.
    """
    raw_text = extract_text(pdf_path)
    if not raw_text:
        print(f"No text extracted from {pdf_path}.")
        return ""
    preprocessed_text = preprocess_text(raw_text)
    return preprocessed_text


def extract_tfidf_features(preprocessed_texts):
    """
    Transforms a list of preprocessed texts into TF-IDF features.

    Parameters:
    - preprocessed_texts (list of str): A list of preprocessed text documents.

    Returns:
    - X (scipy.sparse.csr_matrix): The TF-IDF feature matrix.
    - vectorizer (TfidfVectorizer): The fitted TF-IDF vectorizer.
    """
    vectorizer = TfidfVectorizer()
    X = vectorizer.fit_transform(preprocessed_texts)
    return X, vectorizer


def process_folder(folder_path):
    """
    Processes all PDF files in a folder, extracts and preprocesses text,
    and extracts TF-IDF features.

    Parameters:
    - folder_path (str): The path to the folder containing PDF files.

    Returns:
    - X (scipy.sparse.csr_matrix): The TF-IDF feature matrix for all documents.
    - vectorizer (TfidfVectorizer): The fitted TF-IDF vectorizer.
    - filenames (list of str): List of filenames corresponding to each document in X.
    """
    preprocessed_texts = []
    filenames = []

    if not os.path.isdir(folder_path):
        print(f"Folder not found: {folder_path}")
        return None, None, []

    for filename in sorted(os.listdir(folder_path)):
        if filename.lower().endswith('.pdf'):
            pdf_path = os.path.join(folder_path, filename)
            preprocessed_text = process_pdf(pdf_path)
            if preprocessed_text:
                preprocessed_texts.append(preprocessed_text)
                filenames.append(filename)
            else:
                print(f"Skipping {filename} due to no extracted text.")

    if not preprocessed_texts:
        print("No texts were processed from PDFs.")
        return None, None, filenames

    X, vectorizer = extract_tfidf_features(preprocessed_texts)
    return X, vectorizer, filenames


def process_csv_files(csv_file_class0, csv_file_class1, text_column='text'):
    """
    Reads and preprocesses texts from two CSV files for class 0 and class 1.

    Parameters:
    - csv_file_class0 (str): Path to the CSV file containing texts for class 0.
    - csv_file_class1 (str): Path to the CSV file containing texts for class 1.
    - text_column (str): Name of the column in the CSV files containing the texts.

    Returns:
    - preprocessed_texts (list of str): Preprocessed texts from both CSV files.
    - labels (list of int): Corresponding labels (0 or 1) for each text.
    """
    preprocessed_texts = []
    labels = []

    # Process class 0 CSV
    df_class0 = pd.read_csv(csv_file_class0)
    if text_column not in df_class0.columns:
        print(f"Column '{text_column}' not found in {csv_file_class0}. Available columns: {df_class0.columns.tolist()}")
        return preprocessed_texts, labels
    for text in df_class0[text_column].astype(str).tolist():
        preprocessed_texts.append(preprocess_text(text))
        labels.append(0)

    # Process class 1 CSV
    df_class1 = pd.read_csv(csv_file_class1)
    if text_column not in df_class1.columns:
        print(f"Column '{text_column}' not found in {csv_file_class1}. Available columns: {df_class1.columns.tolist()}")
        return preprocessed_texts, labels
    for text in df_class1[text_column].astype(str).tolist():
        preprocessed_texts.append(preprocess_text(text))
        labels.append(1)

    return preprocessed_texts, labels


def test_models_and_decision(folder_path, labels, add_text=None, csv_texts=None, csv_labels=None, x_percent=1000):
    """
    Processes PDFs and CSV data, trains and evaluates models, and applies decision rules.

    Parameters:
    - folder_path (str): Path to the folder containing PDF files.
    - labels (list): Labels corresponding to the PDFs and CSV texts.
    - add_text (list of str, optional): Additional texts to include.
    - csv_texts (list of str, optional): Preprocessed texts from CSV files.
    - csv_labels (list of int, optional): Labels corresponding to the CSV texts.
    - x_percent (float): Percentage threshold for length comparison.

    Returns:
    - decisions (list of int): List of binary decisions (1 or 0) for each input.
    """
    # Initialize profanity filter
    profanity.load_censor_words()

    # Process PDFs
    X_pdf, vectorizer, filenames = process_folder(folder_path)
    preprocessed_texts_pdf = []
    if X_pdf is not None:
        for filename in filenames:
            pdf_path = os.path.join(folder_path, filename)
            text = process_pdf(pdf_path)
            preprocessed_texts_pdf.append(text)

    # Process additional texts
    preprocessed_texts_add = []
    if add_text:
        preprocessed_texts_add = [preprocess_text(text) for text in add_text]
        if vectorizer:
            X_add_text = vectorizer.transform(preprocessed_texts_add)
        else:
            X_add_text = None
    else:
        X_add_text = None

    # Process CSV texts
    preprocessed_csv_texts = csv_texts if csv_texts else []
    preprocessed_csv_labels = csv_labels if csv_labels else []
    if preprocessed_csv_texts and vectorizer:
        X_csv_texts = vectorizer.transform(preprocessed_csv_texts)
    else:
        X_csv_texts = None

    # Combine all texts and labels
    all_texts = preprocessed_texts_pdf + preprocessed_texts_add + preprocessed_csv_texts
    all_labels = labels  # labels already include csv_labels

    # Print class distribution
    unique, counts = np.unique(all_labels, return_counts=True)
    class_distribution = dict(zip(unique, counts))
    print(f"\nClass Distribution: {class_distribution}")

    # Print total number of texts and labels
    print(f"Total number of texts: {len(all_texts)}")
    print(f"Total number of labels: {len(all_labels)}")

    # Validate labels and texts count
    if len(all_labels) != len(all_texts):
        print(f"Mismatch between number of labels ({len(all_labels)}) and number of texts ({len(all_texts)}).")
        return []

    # Combine feature matrices
    feature_matrices = []
    if X_pdf is not None:
        feature_matrices.append(X_pdf)
    if X_add_text is not None:
        feature_matrices.append(X_add_text)
    if X_csv_texts is not None:
        feature_matrices.append(X_csv_texts)

    if not feature_matrices:
        print("No data to process.")
        return []

    X = vstack(feature_matrices)
    y = np.array(all_labels)

    # Calculate average text length
    lengths = [len(text.split()) for text in all_texts]
    avg_length = np.mean(lengths)
    print(f"Average text length: {avg_length:.2f} words")

    # Split data
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )

    # Initialize models
    models = {
        'Logistic Regression': LogisticRegression(max_iter=1000, class_weight='balanced'),
        'Support Vector Machine': SVC(probability=True, class_weight='balanced'),
        'Naïve Bayes': MultinomialNB()
    }

    performance_data = []

    # Train and evaluate models
    for model_name, model in models.items():
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)
        report = classification_report(y_test, y_pred, output_dict=True, zero_division=0)
        accuracy = accuracy_score(y_test, y_pred)

        # Compute confusion matrix
        cm = confusion_matrix(y_test, y_pred)

        model_performance = {
            'Model': model_name,
            'Accuracy': accuracy,
            'Macro Precision': report['macro avg']['precision'],
            'Macro Recall': report['macro avg']['recall'],
            'Macro F1-score': report['macro avg']['f1-score'],
            'Weighted Precision': report['weighted avg']['precision'],
            'Weighted Recall': report['weighted avg']['recall'],
            'Weighted F1-score': report['weighted avg']['f1-score'],
            'True Negatives': cm[0][0],
            'False Positives': cm[0][1],
            'False Negatives': cm[1][0],
            'True Positives': cm[1][1]
        }

        performance_data.append(model_performance)

    # Display performance metrics
    performance_df = pd.DataFrame(performance_data)
    cols = [
        'Model', 'Accuracy', 'Macro Precision', 'Macro Recall', 'Macro F1-score',
        'Weighted Precision', 'Weighted Recall', 'Weighted F1-score',
        'True Negatives', 'False Positives', 'False Negatives', 'True Positives'
    ]
    performance_df = performance_df[cols]
    print("\nPerformance Metrics:")
    print(performance_df)

    # Get model predictions on all data
    model_predictions = []
    for model_name, model in models.items():
        preds = model.predict(X)
        model_predictions.append(preds)
    model_predictions = np.array(model_predictions).T  # Shape: (num_inputs, num_models)

    # **Temporary Debugging: Print raw predictions**
    print("\nSample Model Predictions:")
    sample_size = min(10, len(all_texts))  # Print first 10 for brevity
    for idx in range(sample_size):
        preds = model_predictions[idx]
        print(f"Text ID {idx+1}: Predictions={preds}, Label={all_labels[idx]}")
    print("...")  # Indicate more data exists

    # Apply decision rules
    decisions = []
    for idx, text in enumerate(all_texts):
        if profanity.contains_profanity(text):
            final_decision = 0  # Auto reject
        else:
            preds = model_predictions[idx]
            num_ones = np.sum(preds == 1)
            if num_ones >= 2:
                length = lengths[idx]
                if abs(length - avg_length) <= (x_percent / 100) * avg_length:
                    final_decision = 1
                else:
                    final_decision = 0
            else:
                final_decision = 0
        decisions.append(final_decision)

    return decisions


def main():
    """
    Main function to process the 'syllabi' folder and test machine learning models with additional CSV data.
    """
    # Set the folder path
    folder_path = 'C:/Users/rossl/syllabi'  # Replace with your actual folder path

    # Obtain the list of PDF filenames
    try:
        filenames = sorted([f for f in os.listdir(folder_path) if f.lower().endswith('.pdf')])
    except FileNotFoundError:
        print(f"Folder not found: {folder_path}")
        return
    except Exception as e:
        print(f"Error accessing folder {folder_path}: {e}")
        return

    # Provide labels corresponding to each PDF file
    labels_pdf = [0, 0, 0, 0, 0, 1, 1, 1, 1]  # Adjust according to your PDFs

    # Check if the number of labels matches the number of files
    if len(labels_pdf) != len(filenames):
        print("The number of labels does not match the number of PDF files.")
        print(f"Number of labels: {len(labels_pdf)}")
        print(f"Number of PDF files: {len(filenames)}")
        return

    # Paths to the CSV files
    csv_file_class0 = 'C:/Users/rossl/nonsyllabi.csv'  # Replace with your actual path
    csv_file_class1 = 'C:/Users/rossl/syllabi.csv'     # Replace with your actual path

    # Ensure the CSV files exist
    if not os.path.exists(csv_file_class0):
        print(f"CSV file for class 0 does not exist: {csv_file_class0}")
        return
    if not os.path.exists(csv_file_class1):
        print(f"CSV file for class 1 does not exist: {csv_file_class1}")
        return

    # Process CSV files
    csv_texts, csv_labels = process_csv_files(
        csv_file_class0=csv_file_class0,
        csv_file_class1=csv_file_class1,
        text_column='Syllabus'  # Replace with your actual column name if different
    )

    # Print number of CSV texts and labels
    print(f"Number of CSV texts: {len(csv_texts)}")
    print(f"Number of CSV labels: {len(csv_labels)}")

    # Combine labels for PDFs and CSVs
    labels = labels_pdf.copy()
    labels.extend(csv_labels)

    # Print total number of texts and labels
    total_texts = len(filenames) + len(csv_texts)
    total_labels = len(labels)
    print(f"Total number of texts: {total_texts}")
    print(f"Total number of labels: {total_labels}")

    # Call the test_models_and_decision function
    decisions = test_models_and_decision(
        folder_path=folder_path,
        labels=labels,
        add_text=None,        # Replace with your additional texts if any
        csv_texts=csv_texts,
        csv_labels=csv_labels,
        x_percent=50
    )

    # Print the decisions
    pdf_inputs = [os.path.join(folder_path, filename) for filename in filenames]
    all_inputs = pdf_inputs.copy()
    all_inputs.extend(csv_texts)  # Assuming add_text is None

    print("\nDecisions:")
    for input_item, decision in zip(all_inputs, decisions):
        if isinstance(input_item, str) and input_item.lower().endswith('.pdf'):
            display_text = os.path.basename(input_item)
        else:
            display_text = input_item[:100]  # Display first 100 characters
        print(f"Input: {display_text}...")
        print(f"Decision: {decision}")
        print('---')


if __name__ == "__main__":
    main()