<a href="https://colab.research.google.com/github/ranton256/classifying_concord/blob/main/supervised_ML_identify_author.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Supervised Machine Learning - Identify Author


In [1]:
!pip install -q spacy

In [None]:
# you will need to download the spacy model for english.
!python -m spacy download en_core_web_sm

In [None]:
!which python

In [None]:
# %pip install -r requirements.txt

In [None]:
import spacy

## The Dataset
Our two datasets are constructed from two related works of 19th century American transcendentalism. These are both public domain.

1. [Essays by Ralph Waldo Emerson by Ralph Waldo Emerson](https://www.google.com/url?q=https%3A%2F%2Fwww.gutenberg.org%2Febooks%2F16643)
2. [Walden, and On The Duty Of Civil Disobedience by Henry David Thoreau](https://www.google.com/url?q=https%3A%2F%2Fwww.gutenberg.org%2Febooks%2F205)

These two authors had different writing styles but shared more than their philosophical interests—they were neighbors in Concord, Massachusetts.

These two works are also similar in length when formatted as plain text.

We will use spaCy to segment each work into sections of roughly 3 to 5 sentences each, then build a datafrom of the text including a label of 'emerson' or 'thoreau', then shuffle and split that into train and test sets for training some machine learning models to classify them by predicting which author they are from and compare the results.

We will also preprocess text to remove stopwords,and perform lemmatization.

In [None]:
emerson_txt_url = "https://www.gutenberg.org/ebooks/16643.txt.utf-8"
thoreau_txt_url = "https://www.gutenberg.org/ebooks/205.txt.utf-8"


In [None]:
import requests
from pathlib import Path

In [None]:
def download_file(url):
  local_filename = Path(url.split('/')[-1])
  result = requests.get(url)
  result.raise_for_status()
  with open(local_filename, "wb") as f:
      f.write(result.content)
  return local_filename

In [None]:
emerson_file = download_file(emerson_txt_url)
thoreau_file = download_file(thoreau_txt_url)

In [None]:
!head -n 50 {emerson_file}

In [None]:
!head -n 50 {thoreau_file}

In [None]:
# Let's strip the frontmatter lines off the start of each file.
# remove each line preceding one that contains "START OF THE PROJECT GUTENBERG EBOOK "
!grep -n "START OF THE PROJECT GUTENBERG EBOOK" {emerson_file}
!grep -n "START OF THE PROJECT GUTENBERG EBOOK" {thoreau_file}

In [None]:
def trim_frontmatter(filename):
  with open(filename) as f:
    lines = f.readlines()

  n_trim_lines = 0
  for i, line in enumerate(lines):
    if "START OF THE PROJECT GUTENBERG EBOOK" in line:
      n_trim_lines = i + 1
      break

  trimmed_lines = lines[n_trim_lines:]
  trimmed_content = '\n'.join(trimmed_lines)
  new_filename = f"trimmed_{filename}"
  with open(new_filename, "w") as f:
    f.write(trimmed_content)
  return new_filename

In [None]:
trimmed_emerson_file = trim_frontmatter(emerson_file)
trimmed_thoreau_file = trim_frontmatter(thoreau_file)

In [None]:
!head {trimmed_emerson_file}
!head {trimmed_thoreau_file}

In [None]:
from collections import deque
from random import randint
import itertools


In [None]:
nlp = spacy.load("en_core_web_sm")
# if you have an error here, make sure you ran the "!python -m spacy download en_core_web_sm" command in the earlier cell.

In [None]:
def segment_doc(filename):
  with open(filename) as f:
    text = f.read()
  doc = nlp(text)
  assert doc.has_annotation("SENT_START")

  sent_dq = deque()
  #it = doc.sents.__iter__()
  n = randint(3, 5)

  for sent in doc.sents:
    sent_dq.append(sent)
    if len(sent_dq) > n:
      sent_dq.popleft()
      snippet = " ".join(sent.text for sent in sent_dq)
      yield snippet
      n = randint(3, 5)
      sent_dq.clear()

In [None]:
import pandas as pd

In [None]:
def dataframe_from_file(file_path):
  segments = segment_doc(file_path)

  df = pd.DataFrame(segments, columns=["text"])
  return df


In [None]:
emerson_df = dataframe_from_file(trimmed_emerson_file)
emerson_df.to_csv("emerson.csv")
emerson_df.info()

In [None]:
emerson_df.head()

In [None]:

thoreau_df = dataframe_from_file(trimmed_thoreau_file)
thoreau_df.to_csv("thoreau.csv")
thoreau_df.info()

In [None]:
thoreau_df.head()

In [None]:
# combine and shuffle the datasets, using a consistent random seed.
from sklearn.utils import shuffle

d1 = emerson_df.copy()
d1["label"] = "emerson"

d2 = thoreau_df.copy()
d2["label"] = "thoreau"

combined_df = pd.concat([d1, d2])
combined_df = shuffle(combined_df, random_state=7919)
combined_df.to_csv("combined.csv")
combined_df.info()
combined_df.head()

## Now we have our dataset in combined.csv


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
# you can start here if csv files were already created.
from sklearn.feature_extraction.text import TfidfVectorizer

In [None]:
sns.countplot(x=combined_df["label"], palette="rocket")
plt.show()


In [None]:
%pip install -q wordcloud

In [None]:
from spacy.lang.en import STOP_WORDS
my_stopwords = STOP_WORDS

', '.join(my_stopwords)

In [None]:
# Show wordcloud from each dataset.
from wordcloud import WordCloud


def plot_word_cloud(text_sections, title):
  cloud = WordCloud(background_color='black', stopwords=my_stopwords).generate(str(text_sections))
  fig = plt.figure(figsize=(12,8), facecolor='white')
  plt.imshow(cloud, interpolation="bilinear")
  plt.axis('off')
  plt.title(title, fontsize=48)
  plt.tight_layout(pad=0)
  plt.show()




In [None]:
plot_word_cloud(emerson_df["text"], "Emerson")

In [None]:
plot_word_cloud(thoreau_df["text"], "Thoreau")

In [None]:
# Preprocess text to remove stopwords, and perform lemmatization.

final_text = []
for index,entry in enumerate(combined_df['text']):
  doc = nlp(entry.lower())
  Final_words = []
  for word in doc:
    if not word.is_stop and not word.is_punct:
      Final_words.append(word.lemma_)
  final_text.append(' '.join(Final_words))



In [None]:
combined_df['final_text'] = final_text
combined_df.head()

In [None]:
vectorizer = TfidfVectorizer()
X = vectorizer.fit_transform(combined_df["final_text"])
y = combined_df["label"]


In [None]:
# split our data into train and test sets.
from sklearn.model_selection import train_test_split

x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=4909)
print(f"x_train: {x_train.shape}")
print(f"y_train: {y_train.shape}")
print(f"x_test: {x_test.shape}")
print(f"y_test: {y_test.shape}")

In [None]:
from sklearn.linear_model import LogisticRegression


In [None]:
# -2 for n_jobs is all but one CPU available.
lr_model = LogisticRegression(solver='saga', random_state=8102, n_jobs=-2)

lr_model.fit(x_train, y_train)

In [None]:
y_pred = lr_model.predict(x_test)

In [None]:
from sklearn.metrics import f1_score

In [None]:
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from IPython.display import Markdown, display

def show_metrics(y_test, y_pred, model_name):
  display(Markdown(f"# {model_name}"))

  print(classification_report(y_test,y_pred))
  print("Test accuracy:", accuracy_score(y_test,y_pred))
  cm = confusion_matrix(y_test, y_pred)

  labels = ["emerson", "thoreau"]
  sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=labels, yticklabels=labels)
  plt.title('Confusion Matrix')
  plt.ylabel('Actual')
  plt.xlabel('Predicted')
  plt.show()

In [None]:
show_metrics(y_test, y_pred, "Logistic Regression")

In [None]:
# Let's compare that to random forests.


from sklearn.ensemble import RandomForestClassifier

rf = RandomForestClassifier()
rf.fit(x_train,y_train)


In [None]:
y_pred_rf = rf.predict(x_test)

In [None]:
show_metrics(y_test, y_pred_rf, "Random Forest")

In [None]:
from sklearn import svm

In [None]:
# create the SVM classifier
clf = svm.SVC(kernel='rbf')

clf.fit(x_train,y_train)
clf

In [None]:
y_pred_svm = clf.predict(x_test)

In [None]:
show_metrics(y_test, y_pred_svm, "SVM")

In [None]:
%pip install -q transformers

In [None]:
%pip install -q torch

In [None]:
import torch

In [None]:
from transformers import AutoTokenizer, AutoModel

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)


In [None]:
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased", return_token_type_ids = False, padding="max_length", truncation=True)

model = AutoModel.from_pretrained("distilbert-base-uncased").to(device)


In [None]:
x_train_s, x_test_s, y_train_s, y_test_s = train_test_split(combined_df["text"], combined_df["label"], test_size=0.2, random_state=4909)
print(f"x_train_s: {x_train_s.shape}")
print(f"y_train_s: {y_train_s.shape}")
print(f"x_test_s: {x_test_s.shape}")
print(f"y_test_s: {y_test_s.shape}")


In [None]:
x_train_tok = tokenizer(x_train_s.tolist(), padding=True, truncation=True, return_tensors="pt")
y_train_tok = y_train_s.tolist()

x_test_tok = tokenizer(x_test_s.tolist(), padding=True, truncation=True, return_tensors="pt")
y_test_tok = y_test_s.tolist()


In [None]:
x_train_tok[0:2]

In [None]:

print(x_train_tok.keys())


#move onto device (GPU)
x_train_tok = {k:torch.tensor(v).to(device) for k,v in x_train_tok.items()}
x_test_tok = {k:torch.tensor(v).to(device) for k,v in x_test_tok.items()}

In [None]:
with torch.no_grad():
  hidden_train = model(**x_train_tok)
  hidden_test = model(**x_test_tok)

# Get the [CLS] hidden states
cls_train = hidden_train.last_hidden_state[:,0,:]
cls_test = hidden_test.last_hidden_state[:,0,:]

In [None]:
x_train_db = cls_train.to("cpu")
# y_train_tok

x_test_db = cls_test.to("cpu")
# y_test_tok


In [None]:
lr_model2 = LogisticRegression(C=1, solver='saga', random_state=8102, n_jobs=-2, max_iter=10_000)

lr_model2.fit(x_train_db,y_train_tok)

# This does not converge, with the settings used for TF-DF
# ConvergenceWarning: The max_iter was reached which means the coef_ did not converge
# So we adjusting max_iter and experimented with C (regulation strength).

y_pred = lr_model2.predict(x_test_db)


In [None]:
show_metrics(y_test_tok, y_pred, "Logistic Regression on DistilBERT hidden states")

In [None]:
rf = RandomForestClassifier()
rf.fit(x_train_db,y_train_tok)

rf.score(x_test_db,y_test_tok)

y_pred_rf = rf.predict(x_test_db)

In [None]:
show_metrics(y_test_tok, y_pred_rf, "Random Forest on DistilBERT hidden states")

In [None]:
from sklearn import svm

In [None]:
# create the SVM classifier
clf = svm.SVC(kernel='rbf')

clf.fit(x_train_db,y_train_tok)

y_pred_svm = clf.predict(x_test_db)

In [None]:
show_metrics(y_test_tok, y_pred_svm, "SVM on DistilBERT hidden states")

In [None]:
from transformers import DistilBertForSequenceClassification

# Define the model with random weights, suitable for binary classification (2 classes)
model = DistilBertForSequenceClassification.from_pretrained(
    'distilbert-base-uncased', num_labels=2
)
# we already have the appropriate tokenizer from before.



In [None]:
# create our optimizer
from torch.optim import AdamW

optimizer = AdamW(model.parameters(), lr=5e-5)

In [None]:
!pip install -q datasets

In [None]:
from sklearn import preprocessing

# Create a copy of our dataframe
trans_df = combined_df.copy()

# drop the preprocessed text column which we aren't using.
trans_df.drop("final_text", axis=1, inplace=True)

# transform our labels into numeric values.
le = preprocessing.LabelEncoder()
my_labels = trans_df["label"].tolist()
le.fit(my_labels)

my_cat_labels = le.classes_
trans_df["label"] = le.transform(trans_df["label"])

print(f"{my_cat_labels=}")

trans_df.info()
trans_df.describe()
trans_df.head()


In [None]:
from datasets import Dataset
from transformers import AutoTokenizer

# for simplicity, we are just splitting the dataset again.
train_df, test_df = train_test_split(trans_df, test_size=0.2, random_state=4909)



train_dataset = Dataset.from_pandas(train_df)
test_dataset = Dataset.from_pandas(test_df)

def tokenize_data(examples):
    return tokenizer(examples["text"], truncation=True)

tokenized_train = train_dataset.map(tokenize_data, batched=True)
tokenized_test = test_dataset.map(tokenize_data, batched=True)

In [None]:
from transformers import Trainer, TrainingArguments, DataCollatorWithPadding

data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

training_args = TrainingArguments(
    output_dir="./results",
    learning_rate=2e-4,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=5,
    weight_decay=0.01,
    # this parameter was apparently removed recently.
    # evaluation_strategy="epoch",
    logging_strategy="epoch"
)

# Define Trainer object for training the model
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_train,
    eval_dataset=tokenized_test,
    tokenizer=tokenizer,
    data_collator=data_collator,
)

trainer.train()

In [None]:
# Save the model.
trainer.save_model('model')

In [None]:
# This is how you can load the model.

# from transformers import AutoModelForSequenceClassification
# model = AutoModelForSequenceClassification.from_pretrained("./model")

In [None]:
def predictor(text):
  #inputs = tokenizer(text, return_tensors="pt")
  inputs = tokenizer(text, padding=True, truncation=True, return_tensors="pt")
  inputs = {k:torch.tensor(v).to(device) for k,v in inputs.items()}

  with torch.no_grad():
      logits = model(**inputs).logits
  predictions = torch.argmax(logits, dim=-1)
  return predictions


In [None]:
x_test_trans = test_dataset["text"]
y_test_trans = test_dataset["label"]

# sanity test a few inference inputs.
for txt, lbl in zip(x_test_trans[:5], y_test_trans[:5]):
  pred = predictor( txt)
  print(f"{my_cat_labels[lbl]}: pred={my_cat_labels[pred]}, {txt=}")

In [None]:
y_pred_trans = [predictor(txt) for txt in x_test_trans]

for txt, lbl, pred in zip(x_test_trans[:5], y_test_trans[:5], y_pred_trans[:5]):
  print(f"{my_cat_labels[lbl]}: pred={my_cat_labels[pred]}, {txt=}")


In [None]:
y_pred_trans = [torch.tensor(v).cpu() for v in y_pred_trans]

In [None]:
y_test_trans = [torch.tensor(v).cpu() for v in y_test_trans]

In [None]:
show_metrics(y_pred_trans, y_test_trans, "Fine-tuned DistilBERT")

## Let's check out the test samples that are misclassified.

In [None]:
def scalar_from_tensor(t):
  if t.dim() == 0:
    return t.item()
  elif t.dim() == 1:
    return t[0].item()
  else:
    raise ValueError(f"Unexpected tensor dimension: {t.dim()}")


In [None]:

y_test_trans = [scalar_from_tensor(t) for t in y_test_trans]
y_pred_trans = [scalar_from_tensor(t) for t in y_pred_trans]

print(f"y_test_trans: {y_test_trans[:5]}")
print(f"y_pred_trans: {y_pred_trans[:5]}")


In [None]:
print("my_cat_labels")

rows = []
for i, (txt, lbl, pred) in enumerate(zip(x_test_trans, y_test_trans, y_pred_trans)):
  if lbl != pred:
    print(f"{lbl=},{pred=}")
    row =(my_cat_labels[lbl], my_cat_labels[pred], txt)
    print(f"{row=}")
    rows.append(row)

n_miss = len(rows)
print(f"Count of misclassified = {n_miss}")
misclassified_df = pd.DataFrame(rows, columns=["actual", "predicted", "text"])
misclassified_df.head(n_miss)


## Postscript - Few short learning with a modern LLM

Another technique that is used recently is to use a few-shot learning to inject examples as context into an existing LLM model.


In [None]:
# choose some examples of each author from our training set.
import re

training_df = combined_df.copy()

few_shot_num_examples = 10

thoreau_samples = training_df[ training_df["label"] == "thoreau"].sample(few_shot_num_examples, random_state=7809)
emerson_samples = training_df[ training_df["label"] == "emerson"].sample(few_shot_num_examples, random_state=997)

# remove the training samples.
training_df.drop(thoreau_samples.index, axis=0, inplace=True)
training_df.drop(emerson_samples.index, axis=0, inplace=True)

# print(f"Thoreau sample: {thoreau_samples}")
# print(f"Emerson sample: {emerson_samples}")

emerson_samples_array = emerson_samples['text'].to_list()
thoreau_samples_array = thoreau_samples['text'].to_list()

emerson_samples_array = [s.strip() for s in emerson_samples_array]
thoreau_samples_array = [s.strip() for s in thoreau_samples_array]

# replace multiple newlines with a single newline
emerson_samples_array = [re.sub(r'\n+', '\n', sample) for sample in emerson_samples_array]
thoreau_samples_array = [re.sub(r'\n+', '\n', sample) for sample in thoreau_samples_array]

print("======Emerson samples:======\n")
for idx, sample in enumerate(emerson_samples_array):
    print(f"{idx+1}: {sample}\n")

print("======Thoreau samples:======\n")
for idx, sample in enumerate(thoreau_samples_array):
    print(f"{idx+1}: {sample}\n")


In [None]:
# We also need our prompt template
prompt_template = """
You are an expert literary analyst. Your task is to classify whether a given text was written by {author1} or {author2}.

Here are some example texts from each author:

{author1} examples:
{author1_examples}

{author2} examples:
{author2_examples}

Based on these examples and your knowledge of their writing styles, analyze the following text and determine whether it was written by {author1} or {author2}:

Text to classify:
{text_to_classify}

Provide your classification as either {author1} or {author2}.
"""


In [None]:
def generate_prompt(text, author1, author2):
    prompt_text = prompt_template.format(author1=author1, author2=author2, author1_examples=emerson_samples_array, author2_examples=thoreau_samples_array, text_to_classify=text)
    return prompt_text


In [None]:
from transformers import AutoModelForSequenceClassification
from transformers import AutoTokenizer
import torch

# Now that we have our samples, we can inject them into an LLM.
# we will use Hugging Face's transformers library to inject the samples into the model.


# this model is small but accepts longer input sequences than BERT/DistilBERT.
#model_name = "microsoft/Phi-3-mini-4k-instruct"
# model_name = "roberta-large"
model_name = "answerdotai/ModernBERT-base"

dev_name = "cuda" if torch.cuda.is_available() else "cpu"
device = torch.device(dev_name)
model = AutoModelForSequenceClassification.from_pretrained(
    model_name,
    num_labels=2,
    id2label={0: "emerson", 1: "thoreau"},
    label2id={"emerson": 0, "thoreau": 1}
    ).to(device)

# get the tokenizer for the model.
tokenizer = AutoTokenizer.from_pretrained(model_name)



In [None]:
# Use the model to predict the samples.
import torch

def classify_text(text, author1, author2):
    prompt_text = generate_prompt(text, author1, author2)
    inputs = tokenizer(prompt_text, return_tensors="pt")
    inputs = {k:torch.tensor(v).to(device) for k,v in inputs.items()}
    with torch.no_grad():
        outputs = model(**inputs)
    logits = outputs.logits
    predicted_class_id = logits.argmax().item()

    output_logits = outputs.logits
    predicted_class_id = torch.argmax(output_logits, dim=1).item()
    predicted_class_label = model.config.id2label[predicted_class_id]
    predicted_probability = torch.softmax(output_logits, dim=1).max().item()

    return predicted_class_label, predicted_probability

In [None]:
# let's try a few examples.

samples = training_df.sample(3, random_state=4909)

for cnt, (idx, sample) in enumerate(samples.iterrows()):
    print(f"Example {cnt+1}:")
    print(f"Text: {sample['text']}")
    predicted_label, predicted_probability = classify_text(sample['text'], "Emerson", "Thoreau")
    print(f"Prediction: {predicted_label} ({predicted_probability:.2f})")
    print(f"Actual: {sample['label']}")
    print("-" * 80)
    print("\n")

In [None]:
%pip install -q tqdm
from tqdm import tqdm


In [None]:
#A run over the whole dataset

# since we haven't trained the model, we don't really need to split the dataset.

MAX_SAMPLES = 100  # 100 samples is enough to get a good idea of the performance.

texts = training_df["text"].tolist()
y_actual_phi = training_df["label"].tolist()


# use tqdm to show progress
y_pred_phi = [classify_text(text, "Emerson", "Thoreau")[0] for text in tqdm(texts[:MAX_SAMPLES])]

y_actual_subset = y_actual_phi[:MAX_SAMPLES]
y_pred_subset = y_pred_phi


In [None]:
show_metrics(y_actual_subset, y_pred_subset, "Few shot learning")

In [None]:
# okay with langchain then

%pip install -q langchain

In [None]:
%pip install -q langchain_community

In [None]:
%pip install -q langchain_huggingface

In [None]:
from langchain.prompts import PromptTemplate

In [None]:
# This is to help debugging.
import os
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'


In [None]:
lc_prompt_template = PromptTemplate(
    input_variables=["author1", "author2", "author1_examples", "author2_examples", "text_to_classify"],
    template=prompt_template
)

In [None]:
from langchain_huggingface import HuggingFacePipeline


# llm = HuggingFacePipeline.from_model_id(
#     model_id="microsoft/Phi-3-mini-4k-instruct",
#     task="text-generation",
#     pipeline_kwargs={"max_new_tokens": 10},
# )


from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline

#model_id="microsoft/Phi-3-mini-4k-instruct"
#tokenizer = AutoTokenizer.from_pretrained(model_id)
#model = AutoModelForCausalLM.from_pretrained(model_id)
pipe = pipeline(
    "text-generation", model=model, tokenizer=tokenizer, max_new_tokens=10, device=device
)
llm = HuggingFacePipeline(pipeline=pipe)



In [None]:
for cnt, (idx, sample) in enumerate(samples.iterrows()):
    print(f"Example {cnt+1}:")
    #print(f"Text: {sample['text']}")


    #print(f"Text: {sample['text']}")
    print(f"Actual: {sample['label']}")

    prompt = lc_prompt_template.format(
          author1="emerson",
          author2="thoreau",
          author1_examples="\n".join(emerson_samples_array), # Join examples into a single string
          author2_examples="\n".join(thoreau_samples_array),
          text_to_classify=sample['text']
      )

    predicted_label = llm.invoke(prompt)
    print(f"Prediction: {predicted_label}")

    print("-" * 80)
    print("\n")

