# Preparation Phase

In [1]:
import torch
from transformers import PegasusForConditionalGeneration, PegasusTokenizer
from gensim.utils import simple_preprocess
from gensim.models.doc2vec import Doc2Vec, TaggedDocument
import xml.etree.ElementTree as ET
import os
from sklearn.feature_extraction.text import TfidfVectorizer

In [2]:
#device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device = torch.device('cpu')

def get_summarizer():
    model_name = 'google/pegasus-xsum'
    tokenizer = PegasusTokenizer.from_pretrained(model_name)
    summarizer_model = PegasusForConditionalGeneration.from_pretrained(model_name).to(device)
    return tokenizer, summarizer_model

# write function that calculates and returns tf-idf scores of all words in a text using libraries
def get_tfidf_scores(documents):
    tfidf_vectorizer = TfidfVectorizer()
    tfidf_vectorizer.fit(documents)
    return tfidf_vectorizer

# select the top n percent of words for each document with highest tf-idf scores
def get_top_n_words(document, tfidf_vectorizer, n):
    tfidf_scores = tfidf_vectorizer.transform([document])
    sorted_indices = tfidf_scores.argsort().flatten()[::-1]
    top_n_indices = sorted_indices[:int(len(sorted_indices) * n)]
    return [tfidf_vectorizer.get_feature_names()[i] for i in top_n_indices]

def read_xml_files():
    documents = []
    for filename in os.listdir('data'):
        if filename.endswith(".xml"):
            tree = ET.parse('data/' + filename)
            root = tree.getroot()
            for sec in root:
                if len(documents) > 10:
                    break
                text = sec.find(".//AbstractText")
                if text != None:
                    if text.text != None:
                        if len(text.text) > 1500:
                            documents.append(text.text)
            print("finished doc" + filename)
            break
        else:
            continue

    return documents

def read_corpus(documents, tokens_only=False):
    i = 0
    for doc in documents:
        tokens = simple_preprocess(doc)
        if tokens_only:
            yield tokens
        else:
            # For training data, add tags
            yield TaggedDocument(tokens, [i])
        i = i+1

def most_similar(text):
    processed_query = simple_preprocess(text)
    v1 = model.infer_vector(processed_query)
    return model.docvecs.most_similar([v1])

def summarize(text):
    batch = tokenizer([text], truncation=True, padding='longest', return_tensors="pt").to(device)
    translated = summarizer_model.generate(**batch)
    return tokenizer.batch_decode(translated, skip_special_tokens=True)[0]

def train_embedding(train_corpus, vecotor_size=124, window=20, min_count=2, epochs=200):
    model = Doc2Vec(vector_size=vecotor_size, window=window, min_count=min_count, epochs=epochs, workers=10)
    model.build_vocab(train_corpus)
    model.train(train_corpus, total_examples=model.corpus_count, epochs=model.epochs)
    return model

In [3]:
# create class Doc2VecModel that implements BaseEstimator

from sklearn.base import BaseEstimator

class Doc2VecModel(BaseEstimator):
    def __init__(self, vector_size=124, window=20, min_count=2, epochs=200, dm=0, hs=0, dbow_words=0, sample=1e-3):
        self.vector_size = vector_size
        self.window = window
        self.min_count = min_count
        self.epochs = epochs
        self.dm = dm
        self.hs = hs
        self.dbow_words = dbow_words
        self.sample = sample
        self.vector_size = vector_size
        self.window = window
        self.min_count = min_count
        self.epochs = epochs
        self.device = torch.device('cpu' if torch.cuda.is_available() else 'cpu')
        self.X_test = query_summarized
        self.X = train_corpus

    def fit(self, X, y=None):
        self.model = train_embedding(train_corpus, self.vector_size, self.window, self.min_count, self.epochs)
        return self

    def most_similar(self, text):
        processed_query = simple_preprocess(text)
        v1 = self.model.infer_vector(processed_query)
        return self.model.docvecs.most_similar([v1])

    def predict(self, X_test):
        if not X_test:
            X_test = self.X_test
        return self.most_similar(X_test)

In [4]:
tokenizer, summarizer_model = get_summarizer()

In [5]:
documents = read_xml_files()
train_corpus = list(read_corpus(documents))
test_corpus = list(read_corpus(documents, tokens_only=True))

finished docpubmed22n1109.xml


In [6]:
print(len(train_corpus))

11


In [9]:
model = train_embedding(train_corpus)

In [10]:
test_query_full = documents[0]

# print the summary
query_summarized = summarize(test_query_full)
print("query_full:\n", test_query_full)
print()
print("query_summarized:\n", query_summarized)



query_full:
 Medical education has increasingly shifted towards replacing large lectures with a combination of online and smaller in-person group sessions. This study compares the efficacy of a virtual Opioid Overdose Prevention and Response Training (OOPRT) for first-year medical students with an identical in-person training. During their first unit of medical school, students in the class of 2023 (cohort 1) received OOPRT in-person and students in the class of 2024 (cohort 2) received training via Zoom. Aside from the delivery format, trainings were identical. Both cohorts completed identical surveys at medical school entry and post-training to evaluate knowledge and experiences using the Opioid Overdose Knowledge Scale, Opioid Overdose Attitudes Scale, Medical Conditions Regard Scale, and Naloxone Related Risk Compensation Beliefs. Of 430 students, 84.2% (362: 124 in cohort 1; 238 in cohort 2) completed baseline and post-training surveys. Students reported significantly improved opi

In [19]:
print("Input text:", query_summarized)
print("Most similar text:\n", most_similar(query_summarized))

Input text: Virtual opioid overdose prevention training does not undermine the learning experience for first-year medical students.
Most similar text:
 [(0, 0.42832982540130615), (5, 0.24744363129138947), (2, 0.20902539789676666), (3, 0.1938617080450058), (10, 0.14480555057525635), (9, 0.010960517451167107), (4, -0.017481787130236626), (1, -0.033810585737228394), (7, -0.1667816936969757), (8, -0.234575554728508)]


  return model.docvecs.most_similar([v1])


# Use Case

In [None]:
query = "breast cancer treatment"

In [None]:
print("Input text:", query)
most_similar_result = most_similar(query)
print("Most similar text:\n", most_similar_result)
print("Summary of most similar text:\n", summarize(documents[most_similar_result[0][0]]))

# Word2Vec enrichment

In [None]:
from gensim.models import Word2Vec

In [None]:
# concatenate all documents to a single string
all_documents = ""
for doc in documents:
    all_documents += doc

# train word2vec model with all documents
word2vec_model = Word2Vec([all_documents.split()], window=5, min_count=1, workers=10, epochs=1000)

In [None]:
# tokenize the input sentence and return for each word the closest word from the word2vec model
def get_enriched_words(query):
    enriched_words = []
    for word in query.split():
        enriched_words.append(word2vec_model.wv.most_similar(word))
    return enriched_words

In [None]:
input_text = "breast cancer treatment"
print("Input text:", input_text)
enriched_words = get_enriched_words(input_text)
for i, word in enumerate(input_text.split()):
    print("Word:", word)
    print("Enriched words:")
    for enriched_word in enriched_words[i]:
        print(enriched_word)
    print("\n")

In [11]:
# hyperparameter tuning with random search for train_embedding function
from sklearn.model_selection import RandomizedSearchCV
import numpy as np

# define the parameter values that should be searched
# vector_size from 50 to 200 in steps of 10
# window from 5 to 30 in steps of 5
# min_count from 1 to 5 in steps of 1
# epochs from 100 to 1000 in steps of 100
param_dist = {"vector_size": list(range(44, 116, 12)),
              "window": list(range(5, 20, 5)),
              "min_count": list(range(1, 5, 1)),
              "epochs": list(range(100, 300, 100)),
              "dm": list(range(0, 3, 1)),
              "hs": list(range(0, 2, 1)),
              "dbow_words": list(range(0, 2, 1)),
              "sample": list(np.arange(0, 0.001, 0.0005))}

# create metric function
def custom_metric(y_true, ys_pred):
    # check if any tuple in ys_pred contains a 0 at index 0 and get the index of the tuple
    index = next((i for i, x in enumerate(ys_pred) if x[0] == 0), None)
    # if no tuple contains a 0 at index 0, return 0
    if index == None:
        return 0
    # else return the value at index 1 of the tuple at index
    else:
        return ys_pred[index][1]

# instantiate the random search with your own metric function
model = Doc2VecModel()

random_search = RandomizedSearchCV(estimator=model, param_distributions=param_dist, scoring=custom_metric, n_iter=10, cv=3, verbose=10, n_jobs=-1, random_state=42)

# fit the random search model
random_search.fit(train_corpus)

# view the best parameters from the random search
print("Best parameters from random search:")
print(random_search.best_params_)

Fitting 3 folds for each of 10 candidates, totalling 30 fits
Best parameters from random search:
{'window': 5, 'vector_size': 68, 'sample': 0.0, 'min_count': 1, 'hs': 0, 'epochs': 200, 'dm': 2, 'dbow_words': 1}
