# Import and Download Dataset

In [None]:
%pip install nltk 
%pip install transformers
%pip install datasets 
%pip install numpy 
%pip install ray 
%pip install nvsmi

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [None]:
%%capture

import re
import numpy as np
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk.tokenize import word_tokenize
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import LatentDirichletAllocation, NMF
from sklearn.model_selection import train_test_split
from tqdm.notebook import tqdm
from sklearn.metrics.pairwise import cosine_similarity
from nltk.tokenize import sent_tokenize
import torch
from sklearn.metrics import classification_report
from collections import Counter


from sklearn.metrics.pairwise import euclidean_distances
from transformers import DistilBertTokenizer, DistilBertForSequenceClassification
from transformers import pipeline
from datasets import load_dataset
from transformers import TrainingArguments
from torch.utils.data import Dataset
from transformers import Trainer

# Load 20 Newsgroups dataset
from sklearn.datasets import fetch_20newsgroups

dataset = fetch_20newsgroups(subset='all', remove=('headers', 'footers', 'quotes'))

In [None]:
import nltk
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')

True

In [None]:
import time
from ray.train.torch import TorchTrainer
from ray.train.torch import TorchConfig
from ray.air import ScalingConfig

# from ray.train.callbacks import ProgressBarCallback

In [None]:
available_gpus = [torch.cuda.device(i) for i in range(torch.cuda.device_count())]
available_gpus



[]

# Preprocess Dataset

In [None]:
# check this for correctness 
def preprocess(text):
    text = text.lower()
    tokens = word_tokenize(text)
    stopword_set = set(stopwords.words("english"))
    
#     tokens = [token for token in tokens if token not in stopword_set]
    # do we need this? seems like just simplifies our word 
    lemmatizer = WordNetLemmatizer()
    tokens = [lemmatizer.lemmatize(token) for token in tokens if token not in stopword_set]
    if len(tokens) <= 1:
        return
    return " ".join(tokens)


def keep_longest_n_documents(dataset, n=5000):
    data = dataset.data
    lengths = [len(text) for text in data]

    sorted_indices = sorted(range(len(lengths)), key=lambda i: lengths[i], reverse=True)
    longest_n_indices = sorted_indices[:n]

    longest_n_documents = [data[i] for i in longest_n_indices]
    longest_n_labels = [dataset.target[i] for i in longest_n_indices]

    return longest_n_documents, longest_n_labels

def filter_invalid_documents(documents, labels):
    filtered_documents = []
    filtered_labels = []

    for doc, label in zip(documents, labels):
        if len(doc.strip()) > 5 and preprocess(doc) is not None: # cheating way of doing this, but prevents any empty/invalid later on  
            filtered_documents.append(doc)
            filtered_labels.append(label)

    print(f'Filtered out {len(documents) - len(filtered_documents)} short strings.')

    return filtered_documents, filtered_labels



## All

In [None]:
documents = dataset.data
labels = dataset.target
filtered_documents, filtered_labels = filter_invalid_documents(documents, labels)

start_time = time.time()
X = [preprocess(text) for text in tqdm(filtered_documents, desc="Preprocessing texts", position=0)]
y = filtered_labels
print("Serial: ", time.time()-start_time)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

Filtered out 558 short strings.


Preprocessing texts:   0%|          | 0/18288 [00:00<?, ?it/s]

Serial:  31.999093055725098


In [None]:
import time
import multiprocessing
num_cores = multiprocessing.cpu_count()
print(f'Number of CPU cores: {num_cores}')

Number of CPU cores: 8


In [None]:
def parallel_preprocess(document_queue, result_queue):
    while True:
        document = document_queue.get() # pops 
        if document is None:
            break

        preprocessed_document = preprocess(document)
        result_queue.put(preprocessed_document)

documents = dataset.data
labels = dataset.target
filtered_documents, filtered_labels = filter_invalid_documents(documents, labels)

start_time = time.time()

document_queue = multiprocessing.Queue()
result_queue = multiprocessing.Queue()

num_cores = multiprocessing.cpu_count()
# num_cores = 2
workers = [multiprocessing.Process(target=parallel_preprocess, args=(document_queue, result_queue)) for _ in range(num_cores)]

for document in filtered_documents:
    document_queue.put(document)

for _ in range(num_cores):
    document_queue.put(None)

for worker in workers:
    worker.start() # every process executes parallel_preprocess, queue is shared 

X = []
for _ in tqdm(range(len(filtered_documents)), desc="Preprocessing texts", position=0):
    X.append(result_queue.get())
    
for worker in workers:
    worker.join()
    

print("Parallel: ", time.time() - start_time)

y = filtered_labels
X_train_parallel, X_test_parallel, y_train_parallel, y_test_parallel = train_test_split(X, y, test_size=0.2, random_state=42)


Filtered out 558 short strings.


Preprocessing texts:   0%|          | 0/18288 [00:00<?, ?it/s]

Parallel:  7.820757627487183


In [None]:
print(len(X_train))
print(len(X_train_parallel))

14630
14630


# Split into chunks

In [None]:
def split_into_chunks(text, chunk_size, overlap):
    chunks = []
    start = 0
    while start < len(text):
        end = start + chunk_size
        chunks.append(text[start:end])
        start = end - overlap
    return chunks


def get_important_chunks_lda_euclidean(text, n_chunks=5, chunk_size=256, overlap=128):
    text_chunks = split_into_chunks(text, chunk_size, overlap)

    tfidf_vectorizer = TfidfVectorizer()
    X_document = ""
    try:
        X_document = tfidf_vectorizer.fit_transform([text])
    except:
        print(text)
        return []
        

    text_chunks = split_into_chunks(text, chunk_size, overlap)
    X_chunks = tfidf_vectorizer.transform(text_chunks)
    
    # Filter out the chunks with no relevant tokens
    non_empty_indices = np.where(X_chunks.getnnz(1) > 0)[0]
    X_chunks_filtered = X_chunks[non_empty_indices]
    text_chunks_filtered = [text_chunks[i] for i in non_empty_indices]
    
    # Apply LDA to the filtered TF-IDF matrix
    lda = LatentDirichletAllocation(n_components=n_chunks, random_state=42)
    lda.fit(X_chunks_filtered)
    
    # Transform the whole document using the fitted LDA model
    doc_topics = lda.transform(X_document)
    
    # Calculate the topic probabilities for each chunk
    chunk_topics = lda.transform(X_chunks_filtered)
    
    # Calculate the Euclidean distances between the document's topics and each chunk's topics
    distances = euclidean_distances(doc_topics, chunk_topics)
    
    # Get the indices of the n_chunks most similar chunks (smallest distances)
    top_indices = np.argsort(distances, axis=1)[0, :n_chunks]
    
    # Extract and return the most similar chunks
    chunks = [text_chunks_filtered[index] for index in top_indices]
    return chunks



In [None]:
start_time = time.time()
train_chunks = [get_important_chunks_lda_euclidean(text) for text in tqdm(X_train, desc="Extracting important chunks (train set)")]

train_texts = [chunk for chunks in train_chunks if chunks is not None for chunk in chunks]
train_labels = np.hstack([np.repeat(label, len(chunks)) for label, chunks in zip(y_train, train_chunks)if chunks is not None])

assert len(train_texts) == len(train_labels)
print("Serial: ",time.time()-start_time)

Extracting important chunks (train set):   0%|          | 0/14630 [00:00<?, ?it/s]

-- -- -- -- --
p h e r c l e g n n g
-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --
-- -- -- -- --
[ ... ]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ^^^
-- -- -- -- --
-- -- -- -- --
Serial:  208.24080395698547


In [None]:
def parallel_chunks(document_queue, text_queue, label_queue):
    while True:
        document = document_queue.get() # pops 
        if document is None:
            break

        train_chunks_parallel = get_important_chunks_lda_euclidean(document)
        text_queue.put([chunk for chunks in train_chunks_parallel if chunks is not None for chunk in chunks])
        label_queue.put(np.hstack([np.repeat(label, len(chunks)) for label, chunks in zip(y_train, train_chunks_parallel)if chunks is not None]))
        
start_time = time.time()
document_queue = multiprocessing.Queue()
text_queue = multiprocessing.Queue()
label_queue = multiprocessing.Queue()
workers = [multiprocessing.Process(target=parallel_chunks, args=(document_queue, text_queue, label_queue)) for _ in range(num_cores)]

for documents in X_train:
    document_queue.put(document)

for _ in range(num_cores):
    document_queue.put(None)

for worker in workers:
    worker.start() # every process executes parallel_preprocess, queue is shared 

train_texts_parallel = []
train_labels_parallel = []
for _ in tqdm(range(len(X_train)), desc="Preprocessing texts", position=0): 
    train_texts_parallel.append(text_queue.get())
    train_labels_parallel.append(label_queue.get())
    
for worker in workers:
    worker.join()

assert len(train_texts_parallel) == len(train_labels_parallel)
    
print("Parallel: ", time.time() - start_time)
    

Preprocessing texts:   0%|          | 0/14630 [00:00<?, ?it/s]

Parallel:  84.09461188316345


# Preprocessing per document instead of in separate stages

In [None]:
# not sure if this is possible because we first process all the documents before making the train_test_split
# unless we do train_test_split first on the filtered documents and labels

def parallel_preprocess(document_queue, text_queue, label_queue):
    while True:
        document = document_queue.get() # pops 
        if document is None:
            break

        preprocessed_document = preprocess(document)
        train_chunks_parallel = get_important_chunks_lda_euclidean(document)
        text_queue.put([chunk for chunks in train_chunks_parallel if chunks is not None for chunk in chunks])
        label_queue.put(np.hstack([np.repeat(label, len(chunks)) for label, chunks in zip(y_train, train_chunks_parallel)if chunks is not None]))
        result_queue.put(preprocessed_document)

documents = dataset.data
labels = dataset.target
filtered_documents, filtered_labels = filter_invalid_documents(documents, labels)

start_time = time.time()

document_queue = multiprocessing.Queue()
text_queue = multiprocessing.Queue()
label_queue = multiprocessing.Queue()

num_cores = multiprocessing.cpu_count()
# num_cores = 2

workers = [multiprocessing.Process(target=parallel_chunks, args=(document_queue, text_queue, label_queue)) for _ in range(num_cores)]

for document in filtered_documents:
    document_queue.put(document)

for _ in range(num_cores):
    document_queue.put(None)

for worker in workers:
    worker.start() # every process executes parallel_preprocess, queue is shared 

X = []
for _ in tqdm(range(len(filtered_documents)), desc="Preprocessing texts", position=0):
    train_texts_parallel.append(text_queue.get())
    train_labels_parallel.append(label_queue.get())
    
for worker in workers:
    worker.join()
    

print("Parallel: ", time.time() - start_time)

y = filtered_labels
X_train_parallel, X_test_parallel, y_train_parallel, y_test_parallel = train_test_split(X, y, test_size=0.2, random_state=42)

train_texts_parallel = []
train_labels_parallel = []


# Train the Model

## Train with Ray

In [None]:
def train_func(config):

    class TextClassificationDataset(Dataset):
      def __init__(self, encodings, labels):
          self.encodings = encodings
          self.labels = labels
          
      def __len__(self):
          return len(self.labels)

      def __getitem__(self, idx):
          item = {key: val[idx].clone().detach() for key, val in self.encodings.items()}
          item["labels"] = torch.tensor(self.labels[idx], dtype=torch.long)
          return item

    tokenizer = DistilBertTokenizer.from_pretrained("distilbert-base-uncased")
    model = DistilBertForSequenceClassification.from_pretrained("distilbert-base-uncased", num_labels=20)

    train_encodings = tokenizer(train_texts, padding="max_length", truncation=True, max_length=256, return_tensors="pt")
    train_dataset = TextClassificationDataset(train_encodings, train_labels)

    training_args = TrainingArguments(
        output_dir="./results", 
        num_train_epochs=3,
        per_device_train_batch_size=16, 
        warmup_steps=500, 
        weight_decay=0.01,
    )

    trainer = Trainer(model=model, args=training_args, train_dataset=train_dataset, tokenizer=tokenizer)
    trainer.train()

    return trainer.state.best_metric, model.state_dict()


In [None]:

ray_trainer = TorchTrainer(train_func, scaling_config=ScalingConfig(use_gpu=True, num_workers=2))
ray_trainer.fit()
best_metric, model_state_dict = ray_trainer.fetch_result()
print(f"Best metric: {best_metric}")

trained_model = DistilBertForSequenceClassification.from_pretrained("distilbert-base-uncased", num_labels=20)
trained_model.load_state_dict(model_state_dict)

0,1
Current time:,2023-04-16 17:50:12
Running for:,00:12:36.01
Memory:,4.0/12.7 GiB

Trial name,status,loc
TorchTrainer_60727_00000,PENDING,


AttributeError: ignored

## Train in serial

In [None]:
class TextClassificationDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels
        
    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        item = {key: val[idx].clone().detach() for key, val in self.encodings.items()}
        item["labels"] = torch.tensor(self.labels[idx], dtype=torch.long)
        return item


In [None]:
# Train DistilBERT
tokenizer = DistilBertTokenizer.from_pretrained("distilbert-base-uncased")
model = DistilBertForSequenceClassification.from_pretrained("distilbert-base-uncased", num_labels=20)

def tokenize(text):
    return tokenizer(text, padding="max_length", truncation=True, max_length=256, return_tensors="pt")



Extracting important chunks (train set):   0%|          | 0/4000 [00:00<?, ?it/s]

In [None]:
train_encodings = tokenize(train_texts)
train_dataset = TextClassificationDataset(train_encodings, train_labels)


In [None]:
training_args = TrainingArguments(
    output_dir="./results", 
    num_train_epochs=3,
    per_device_train_batch_size=16, 
    warmup_steps=500, 
    weight_decay=0.01,
    # learning_rate=5e-5,
    # lr_scheduler_type="linear",
    # enable data parallelism 
    # model_parallel = True,
)
# print(training_args.n_gpu)
# print(training_args.)
trainer = Trainer(model=model, args=training_args, train_dataset=train_dataset, tokenizer=tokenizer)
trainer.train()



Step,Training Loss
500,2.0436
1000,1.1415
1500,0.8693
2000,0.6823
2500,0.614
3000,0.344
3500,0.3258


TrainOutput(global_step=3750, training_loss=0.822551416015625, metrics={'train_runtime': 1282.637, 'train_samples_per_second': 46.755, 'train_steps_per_second': 2.924, 'total_flos': 3973309989580800.0, 'train_loss': 0.822551416015625, 'epoch': 3.0})

In [None]:
from ray.air import ScalingConfig
from ray.train.torch import TorchTrainer, TorchConfig

trainer = TorchTrainer(
    train_func,
    torch_backend=TorchConfig(...),
    scaling_config=ScalingConfig(num_workers=2),
)

# Evaluate the Model

In [None]:

# Test and evaluate the classifier
text_chunks = [get_important_chunks_lda_euclidean(text) for text in tqdm(X_test, desc="Extracting important chunks (test set)")]

# 1. Tokenize the important chunks for each document in the test set
test_encodings = [tokenize(chunks) for chunks in text_chunks]

# 2. Make predictions for each chunk in each document using the trained model
predicted_labels_nested = []
model.eval()  # Set the model to evaluation mode
for encodings in tqdm(test_encodings, desc="Predicting"):
    with torch.no_grad():
        input_ids = encodings["input_ids"].to(training_args.device)
        attention_mask = encodings["attention_mask"].to(training_args.device)
        logits = model(input_ids, attention_mask=attention_mask).logits
        probabilities = torch.softmax(logits, dim=1).cpu().numpy()
        labels = np.argmax(probabilities, axis=1)
        predicted_labels_nested.append(labels)
        

Extracting important chunks (test set):   0%|          | 0/1000 [00:00<?, ?it/s]

Predicting:   0%|          | 0/1000 [00:00<?, ?it/s]

In [None]:

def majority_voting(labels):
    counter = Counter(labels)
    majority_label = counter.most_common(1)[0][0]
    return majority_label

document_labels = [majority_voting(chunk_labels) for chunk_labels in predicted_labels_nested]

# Compute the classification report
report = classification_report(y_test, document_labels)

print(report)

              precision    recall  f1-score   support

           0       0.74      0.64      0.69        50
           1       0.74      0.74      0.74        34
           2       0.79      0.79      0.79        43
           3       0.74      0.77      0.75        26
           4       0.82      0.82      0.82        28
           5       0.90      0.92      0.91        66
           6       0.90      0.87      0.88        30
           7       0.94      0.89      0.92        37
           8       0.94      1.00      0.97        31
           9       0.95      0.91      0.93        46
          10       0.94      0.96      0.95        49
          11       0.87      0.85      0.86        65
          12       0.77      0.96      0.85        24
          13       0.94      0.94      0.94        66
          14       0.98      0.89      0.93        64
          15       0.76      0.88      0.82        94
          16       0.81      0.90      0.85        60
          17       0.93    