In [1]:
# from google.colab import drive
# drive.mount('/content/drive')

In [2]:
# %cd '/content/drive/MyDrive/Colab Notebooks/DS310_NLP/visolex'

In [3]:
import logging
logging.basicConfig(level=logging.CRITICAL)

In [4]:
import json
import logging
import os
import numpy as np
import torch
import copy
from framework_components.student import Student
from utils import post_process, delete_special_tokens, get_tokenizer
from arguments import parse_arguments
from project_variables import DICT_PATH
from datasets.utils.logging import disable_progress_bar

  from .autonotebook import tqdm as notebook_tqdm


Better speed can be achieved with apex installed from https://www.github.com/nvidia/apex.
Better speed can be achieved with apex installed from https://www.github.com/nvidia/apex.
Better speed can be achieved with apex installed from https://www.github.com/nvidia/apex.


In [5]:
# Load the dictionary initially
with open(DICT_PATH, 'r', encoding='utf-8') as f:
    dictionary = json.load(f)

# Global variables to hold the loaded model and tokenizer
loaded_model = None
loaded_tokenizer = None

# Setup CUDA, GPU
device = "cuda" if torch.cuda.is_available() else "cpu"

In [6]:
# Assuming parse_arguments uses argparse or a similar library
import sys
from arguments import parse_arguments
import argparse


def parse_arguments():
    parser = argparse.ArgumentParser()
    # Define your arguments here as you currently do
    parser.add_argument("--datapath", type=str, default="data/test.json")
    parser.add_argument("--student_name", type=str, default="bert-base-uncased")
    parser.add_argument("--teacher_name", type=str, default="ran")
    parser.add_argument("--training_mode", type=str, default="default", help="Training mode identifier")
    parser.add_argument("--inference_model", type=str, default="student")
    parser.add_argument("--experiment_folder", type=str, default='./experiments', help="Folder to save experiment results")
    # logdir is likely set after parsing arguments, but we can include it for completeness if needed elsewhere
    # parser.add_argument("--logdir", type=str)
    parser.add_argument("--metric", type=str, default="f1_score")
    parser.add_argument("--num_iter", type=int, default=10)
    parser.add_argument("--num_rules", type=int, default=2)
    parser.add_argument("--num_epochs", type=int, default=10, help="Number of training epochs")
    parser.add_argument("--num_unsup_epochs", type=int, default=5, help="Number of unsupervised training epochs")
    parser.add_argument("--debug", type=bool, default=False)
    parser.add_argument("--remove_accents", type=bool, default=False, help="Whether to remove accents")
    parser.add_argument("--rm_accent_ratio", type=float, default=0.0, help="Ratio of text with removed accents")
    parser.add_argument("--append_n_mask", type=bool, default=True)
    parser.add_argument("--nsw_detect", type=bool, default=True)
    parser.add_argument("--soft_labels", type=bool, default=True)
    parser.add_argument("--loss_weights", type=bool, default=False)
    parser.add_argument("--convert_abstain_to_random", type=bool, default=False)
    parser.add_argument("--hard_student_rule", type=bool, default=True)
    parser.add_argument("--train_batch_size", type=int, default=16, help="Batch size for training")
    parser.add_argument("--eval_batch_size", type=int, default=128, help="Batch size for evaluation")
    parser.add_argument("--unsup_batch_size", type=int, default=128, help="Batch size for unsupervised data")
    parser.add_argument("--lower_case", type=bool, default=True)
    parser.add_argument("--learning_rate", type=float, default=0.001)
    parser.add_argument("--fine_tuning_strategy", type=str, default="flexible_lr", help="Fine-tuning strategy (e.g., full, lora, flexible_lr)") # Added flexible_lr as per your log
    parser.add_argument("--sample_size", type=int, default=8096)
    parser.add_argument("--topk", type=int, default=1)
    parser.add_argument("--seed", type=int, default=42, help="Random seed for reproductibility")
    parser.add_argument("--percent", type=float, default=0.0, help="0.0 means markable text, <> 0.0 means unmarkable text")
    # n_gpu and device are usually set after parsing arguments based on system configuration
    # parser.add_argument("--n_gpu", type=int)
    # parser.add_argument("--device", type=str)



    # ... add all your other arguments

    # This is the key change: parse_known_args()
    args, unknown = parser.parse_known_args()
    return args

args = parse_arguments()
args.n_gpu = torch.cuda.device_count()
args.device = device
args.lower_case = True
args.hard_student_rule = True
args.soft_labels = True
args.append_n_mask = True
args.nsw_detect = True
args.training_mode = 'weakly_supervised'
args.percent = 1.0 # 0.0 means markable text, <> 0.0 means unmarkable text

# Set up seed, logging, etc. here as needed
np.random.seed(args.seed)

args

Namespace(datapath='data/test.json', student_name='bert-base-uncased', teacher_name='ran', training_mode='weakly_supervised', inference_model='student', experiment_folder='./experiments', metric='f1_score', num_iter=10, num_rules=2, num_epochs=10, num_unsup_epochs=5, debug=False, remove_accents=False, rm_accent_ratio=0.0, append_n_mask=True, nsw_detect=True, soft_labels=True, loss_weights=False, convert_abstain_to_random=False, hard_student_rule=True, train_batch_size=16, eval_batch_size=128, unsup_batch_size=128, lower_case=True, learning_rate=0.001, fine_tuning_strategy='c:\\Users\\Ricardo\\AppData\\Roaming\\jupyter\\runtime\\kernel-v3d1a0b85a73e0b474a8c192238ce85231822c732a.json', sample_size=8096, topk=1, seed=42, percent=1.0, n_gpu=0, device='cpu')

In [7]:
def nsw_detection(source_tokens, is_nsw, tokenizer):
    source_tokens, keep_indices = delete_special_tokens(source_tokens)
    is_nsw = [is_nsw[i] for i in keep_indices]
    nsw_indices = [i for i, nsw in enumerate(is_nsw) if nsw == 1]
    nsw_tokens = [source_tokens[i] for i in nsw_indices]

    nsw_spans = []
    end_index = 0
    for i in range(len(source_tokens)):
        if source_tokens[i].startswith('▁'):
            end_index += 1
        current_text = tokenizer.convert_tokens_to_string([source_tokens[i]])
        full_text = tokenizer.convert_tokens_to_string(source_tokens[:(i+1)])
        if is_nsw[i] == 1:
            if current_text:
                nsw_spans.append({
                    'index': i,
                    'start_index': end_index,
                    'end_index': end_index + len(current_text),
                    'nsw': current_text
                })
        end_index = len(full_text) if current_text else len(full_text) + 1

    return nsw_spans

def lexnorm(output, tokenizer):
    # NSW Detection
    nsw_spans = nsw_detection(output['source_tokens'], output['is_nsw'], tokenizer)
    nsw_indices = [span['index'] for span in nsw_spans]

    # Lexical Normalization
    pred = output['pred']
    proba = output['proba']
    decoded_pred = tokenizer.convert_ids_to_tokens(pred)
    for i, nsw_idx in enumerate(nsw_indices):
        nsw_spans[i]['prediction'] = tokenizer.convert_tokens_to_string([decoded_pred[nsw_idx+1]])
        nsw_spans[i]['confidence_score'] = round(proba[nsw_idx+1], 4)

    pred_tokens, keep_indices = delete_special_tokens(decoded_pred)
    proba = [proba[i] for i in keep_indices]
    pred_str = tokenizer.convert_tokens_to_string(pred_tokens)
    pred_str = post_process(pred_str)
    return nsw_spans, pred_str

In [8]:
def load_model():
    global loaded_model, loaded_tokenizer, args
    model_name = 'visobert' # request.form['model']
    percent = args.percent # request.form['percent']  # Get the percent value from the request and cast to int

    args.student_name = model_name

    # Set the remove_accents argument if percent is not zero
    if percent != 0.0:
        args.remove_accents = True
        args.rm_accent_ratio = percent


    try:
        # Start Experiment: set the log directory based on args
        args.logdir = os.path.join(args.experiment_folder, args.student_name, args.training_mode + '_accent_{}'.format(str(args.rm_accent_ratio)))

        # Print the full log file path for debugging
        log_file_path = os.path.join(args.logdir, 'demo.log')
        print(f"Attempting to write log to: {log_file_path}")

        # Create the log directory if it doesn't exist
        if not os.path.exists(args.logdir):
            os.makedirs(args.logdir, exist_ok=True)

        # Load the tokenizer and student model
        loaded_tokenizer = get_tokenizer(model_name)

        loaded_model = Student(args=args, tokenizer=loaded_tokenizer)
        print("Initializing student model...")
        loaded_model.load("student_best")
        print("Student model loaded successfully.")

        # Model loaded successfully, send response to frontend
        response_message = f"Model {model_name} loaded successfully."


        return 'Success. Model loaded successfully'

    except Exception as e:
        return f"Error. Error loading model: {str(e)}"

In [9]:
load_model_status = load_model()
print(load_model_status)

Attempting to write log to: ./experiments\visobert\weakly_supervised_accent_1.0\demo.log


Some weights of XLMRobertaModel were not initialized from the model checkpoint at uitnlp/visobert and are newly initialized: ['pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Initializing student model...
Student model loaded successfully.
Success. Model loaded successfully


In [10]:
from IPython.display import display, HTML
import re


In [14]:
def normalize_text(input_text):
    """
    Normalizes text using the loaded model and tokenizer in a notebook environment.

    Args:
        input_text (str): The text to normalize.

    Returns:
        dict or None: A dictionary containing the normalization results
                      if the model is loaded, otherwise None.
    """
    global loaded_model, loaded_tokenizer

    if loaded_model is None or loaded_tokenizer is None:
        print('Error: Model not loaded. Please load the model first.')
        return None # Return None or raise an error

    try:
        # Perform text normalization using the loaded model and tokenizer
        output = loaded_model.inference(user_input=input_text)
        nsw_spans, pred_str = lexnorm(output, loaded_tokenizer)

        # Highlight the NSW tokens in pred_str
        highlighted_pred_str = input_text
        # for i, span in enumerate(nsw_spans):
        #     nsw_word = span['nsw']
        #     highlighted_pred_str = highlighted_pred_str.replace(nsw_word, f"<mark>{nsw_word}</mark>")
        if nsw_spans:
            nsw_words = [span['nsw'] for span in nsw_spans]
            for word in set(nsw_words):
                highlighted_pred_str = re.sub(rf'\b{re.escape(word)}\b', f'<mark>{word}</mark>', highlighted_pred_str)

        # Prepare the detection information
        detection_info = ""
        for i, span in enumerate(nsw_spans):
              detection_info += f"<tr><td>{span['nsw']}</td><td>{span['prediction']}</td><td>{span['confidence_score']}</td></tr>"

        # Return the highlighted normalized text and detection info as HTML
        return {
            'status': 'success',
            'normalized_text': pred_str,  # Raw normalized text
            'highlighted_text': highlighted_pred_str,  # Highlighted text
            'detection_info': detection_info  # Detection info details
        }

    except Exception as e:
        # Handle any errors during normalization
        print(f"Error during normalization: {str(e)}")
        return {'status': 'error', 'message': f"Error during normalization: {str(e)}"}


# Example of how to use it in the notebook:
# Assuming you have run the cell to load the model using load_model()

# Define the input text
input_text_example = "đẹp wa!"
# "T hk thik m ơi." "thoai" "sao hk có j  khác dị?" "đep wa!" "that ra toi cung ko biet kieu gi moi chinh xac nhất nữa" "sao lỗi j mà khó chệu dzô cùng"

input_text_example = input_text_example.lower()

# Call the modified function
normalization_results = normalize_text(input_text_example)

# Display the results
if normalization_results:
    print("Normalization Results:")
    if 'normalized_text' in normalization_results:
        print(f"Normalized Text: {normalization_results['normalized_text']}")
    else:
        print("No 'normalized_text' in results.")
    if 'highlighted_text' in normalization_results:
        # print(f"Highlighted Text: {normalization_results['highlighted_text']}") # This is HTML, you might need to display it differently
        print(f"Highlighted Text:")
        display(HTML(normalization_results['highlighted_text']))
    if 'detection_info' in normalization_results:
        print("Detection Info:")
        table_html = f"<table border='1'>{normalization_results['detection_info']}</table>"
        display(HTML(table_html))



Normalization Results:
Normalized Text: Đẹp quá!
Highlighted Text:


Detection Info:


0,1,2
wa,quá,0.9996
