In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report

from datasets import DatasetDict,Dataset
import pandas as pd

import torch

from sklearn.svm import SVC
from sklearn.feature_extraction.text import TfidfVectorizer
import pickle

import os
import matplotlib.pyplot as plt
import sklearn
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report

import datasets
from datasets import load_dataset, DatasetDict
from datasets import Dataset, concatenate_datasets

import torch
from torch.nn.functional import cross_entropy

import transformers
from transformers import AutoTokenizer, AutoModelForTokenClassification
from transformers import DataCollatorForTokenClassification
from transformers import Trainer, TrainingArguments


In [None]:
df = pd.read_csv("train.txt", sep = "\t")

In [None]:
df["A"] = df["A"].astype(str)
df.dropna(axis=0, inplace=True)
df = df[~df["A"].str.contains(r',|\.|\?', regex=True ,na=False)]

In [None]:
mappp ={"I":"I-CHEMICAL","B":"B-CHEMICAL","O":"O"}
df["O"] =  df["O"].map(mappp)

In [None]:
df.shape

In [None]:
df.head()

In [None]:
import pandas as pd
import random

def create_combined_dataset(df, min_rows=5, max_rows=10):
    
    new_data = []
    current_index = 0
    total_rows = len(df)

    while current_index < total_rows:

        num_rows_to_combine = random.randint(min_rows, max_rows)
        
        end_index = min(current_index + num_rows_to_combine, total_rows)
        slice_df = df.iloc[current_index:end_index]
        
        combined_text = " ".join(slice_df['A'].tolist())
        merged_labels = " ".join(slice_df['O'].tolist())
        
        new_data.append({'text': combined_text, 'labels': merged_labels})
        
        current_index = end_index

    return pd.DataFrame(new_data)

df_new = create_combined_dataset(df, min_rows=5, max_rows=10)
df_new["id"] = df_new.index

In [None]:
df_new.head()

In [None]:
device = 0 if torch.cuda.is_available() else -1

In [None]:
df_new.rename(columns={'text': 'material', 'labels': 'product'}, inplace=True)

In [None]:
df_new.head()

In [None]:
label2id= {
    "B-CHEMICAL": 0,
    "I-CHEMICAL": 1,
    "O": 2
  }
tag_values = list(label2id.keys())

id2label = {tag: idx for idx, tag in label2id.items()}

In [None]:
def create_tokens_and_tags(material, product):
    tokens_raw = [mat.strip(",") for mat in material.split(" ")]
    tokens = [tam.strip(",") for tam in product.split(" ")]
    tokens = [label2id[tam] for tam in tokens]

    return tokens_raw, tokens

formatted_data_chem = []
for i, row in df_new.iterrows():
     material = row['material']
     tokens = row['product']
     tokens_raw, tokens = create_tokens_and_tags(material, tokens)
     formatted_data_chem.append({
     "id": str(i),
     "tokens": tokens_raw, 
     "ner_tags": tokens  
    })

formatted_df_chem = pd.DataFrame(formatted_data_chem)

In [None]:
formatted_df_chem.shape

In [None]:
formatted_df_chem = formatted_df_chem[~formatted_df_chem['ner_tags'].apply(lambda x: all(val == 2 for val in x))]

In [None]:
formatted_df_chem = formatted_df_chem.sample(frac=0.8).reset_index(drop=True)

ds = Dataset.from_dict({'id': formatted_df_chem['id'].tolist(),
    'tokens': formatted_df_chem['tokens'].tolist(),
    'ner_tags': formatted_df_chem['ner_tags'].tolist()})

In [None]:
train_testvalid = ds.train_test_split(test_size=0.2)
test_valid = train_testvalid['test'].train_test_split(test_size=0.6)
ds = DatasetDict({
    'train': train_testvalid['train'],
    'test': test_valid['test'],
    'valid': test_valid['train']})

In [None]:
ds

In [None]:
tokenizer = AutoTokenizer.from_pretrained("microsoft/deberta-v3-base")

def tokenize_and_align_labels(samples):
    tokenized_inputs = tokenizer(samples["tokens"],
                                      truncation=True,
                                      is_split_into_words=True)

    labels = []

    for idx, label in enumerate(samples[f"ner_tags"]):
        word_ids = tokenized_inputs.word_ids(batch_index=idx)
        prev_word_idx = None
        label_ids = []
        for word_idx in word_ids: # set special tokens to -100
            if word_idx is None or word_idx == prev_word_idx:
                label_ids.append(-100)
            else:
                label_ids.append(label[word_idx])
            prev_word_idx = word_idx
        labels.append(label_ids)

    tokenized_inputs["labels"] = labels
    return tokenized_inputs

In [None]:
encoded_ds = ds.map(tokenize_and_align_labels,
                       batched=True,
                       remove_columns=
                        [
                            'ner_tags',
                            'tokens'
                        ]
                    )
data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer)

tag_values = list(label2id.keys())

In [None]:
model_name = "microsoft/deberta-v3-base"

model = AutoModelForTokenClassification.from_pretrained(model_name,
                                                             
    num_labels=len(tag_values),
    id2label=id2label,
    label2id=label2id)

In [None]:
tag_values = list(label2id.keys())

def compute_metrics(eval_preds):
    predictions, labels = eval_preds
    predictions = np.argmax(predictions, axis=2)

    true_predictions = [
        label_list[p] for prediction, label in zip(predictions, labels)
        for p, l in zip(prediction, label) if l != -100
    ]

    true_labels = [
        label_list[l] for prediction, label in zip(predictions, labels)
        for p, l in zip(prediction, label) if l != -100
    ]

    report = classification_report(true_labels, true_predictions, output_dict=True)

    return {
        "precision": report["macro avg"]["precision"],
        "recall": report["macro avg"]["recall"],
        "f1": report["macro avg"]["f1-score"]
    }


torch._dynamo.config.suppress_errors = True

In [None]:
from transformers import TrainingArguments, Trainer

config = {
    "MODEL_NAME": "debertabase_term_paper",
    "HUGGINGFACE_API_KEY": ""
    "REPORTS_TO": "tensorboard"
}

training_args = TrainingArguments(
output_dir=config["MODEL_NAME"],
num_train_epochs=4,
auto_find_batch_size=True,
gradient_accumulation_steps=4,
eval_accumulation_steps=2,
eval_strategy="epoch",
logging_strategy="steps",
logging_steps=100, 
warmup_steps=500,
logging_first_step=True, 
learning_rate=5e-4,
report_to=config["REPORTS_TO"],
weight_decay=0.001,
disable_tqdm=False,
fp16=True,
group_by_length=True,
push_to_hub=False,
hub_private_repo=True,
hub_token=config["HUGGINGFACE_API_KEY"],
save_strategy="no" 

)
trainer = Trainer(
    model=model,
    args=training_args,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
    train_dataset=encoded_ds["train"],
    eval_dataset=encoded_ds["valid"],
    data_collator=data_collator)

In [None]:
trainer.train()

In [None]:
from transformers import AutoTokenizer, AutoModelForTokenClassification, BertConfig, pipeline
from seqeval.metrics import classification_report, precision_score, recall_score, f1_score

tokenizer = AutoTokenizer.from_pretrained("muratti18462/debertabase_term_paper")

model = AutoModelForTokenClassification.from_pretrained("muratti18462/debertabase_term_paper")

ner_pipeline = pipeline("ner", model=model, tokenizer=tokenizer, aggregation_strategy="simple", device=device)

In [None]:
ds.set_format(type='pandas')
df = ds['test'][:]

In [None]:
label2id = model.config.label2id
id2label = model.config.id2label

def get_predicted_labels(tokens):
    text = " ".join(tokens)
    outputs = ner_pipeline(text)

    predicted_labels = ["O"] * len(tokens)
    
    for entity in outputs:
        word = entity['word']
        entity_label = entity['entity_group']
        start = entity['start']
        end = entity['end']
        
        entity_tokens = tokenizer.tokenize(word)
        for i, token in enumerate(tokens):
            if word.lower() in token.lower():
                predicted_labels[i] = entity_label
                break
    
    return predicted_labels

true_labels = []
predicted_labels = []

for _, row in df.iterrows():
    tokens = row["tokens"]
    gold_ids = row["ner_tags"]
    gold_labels = [id2label[id] for id in gold_ids]

    preds = get_predicted_labels(tokens)

    true_labels.append(gold_labels)
    predicted_labels.append(preds)



In [None]:
print("Classification Report:")
print(classification_report(true_labels, predicted_labels))

print("Precision:", precision_score(true_labels, predicted_labels))
print("Recall:", recall_score(true_labels, predicted_labels))
print("F1-score:", f1_score(true_labels, predicted_labels))

# SVM NER

In [None]:
ds = Dataset.from_dict({'id': formatted_df_chem['id'].tolist(),
    'tokens': formatted_df_chem['tokens'].tolist(),
    'ner_tags': formatted_df_chem['ner_tags'].tolist()})

In [None]:
train_testEval = ds.train_test_split(test_size=0.01)
ds = DatasetDict({
    "train" : train_testEval["train"],
    "test": train_testEval["test"]
})

In [None]:
sentences = formatted_df_chem["tokens"].to_list()
labels = formatted_df_chem["ner_tags"].to_list()

In [None]:
flat_tokens = [token for sentence in sentences for token in sentence]
flat_labels = [label for sentence_labels in labels for label in sentence_labels]

vectorizer = TfidfVectorizer(analyzer="word", lowercase=True)
X = vectorizer.fit_transform(flat_tokens)

# Train SVM
svm = SVC(kernel="linear", C=1.0)
svm.fit(X, flat_labels)

In [None]:
with open('my_svm_classifier.pkl', 'wb') as f:
    pickle.dump(svm, f) 

with open('my_svm_vectorizer.pkl', 'wb') as f:
    pickle.dump(vectorizer, f) 

In [None]:
svm = pd.read_pickle("my_svm_classifier.pkl")
vectorizer = pd.read_pickle("my_svm_vectorizer.pkl")

In [None]:
ds.set_format(type='pandas')

df_train = ds['train'][:]

In [None]:
ds.set_format(type='pandas')

df = ds['test'][:]

In [None]:
df["predicted_tags"] = None 

In [None]:
for i, row in df.iterrows():
    X_new = vectorizer.transform(row["tokens"])
    predictions = svm.predict(X_new)
    
    df.at[i, "predicted_tags"] = predictions.tolist()

In [None]:
true_labels = df["ner_tags"].to_list()
predicted_labels = df["predicted_tags"].to_list()

In [None]:
true_labels = [[id2label[int(tag)] for tag in seq] for seq in df["ner_tags"]]

predicted_labels = [[id2label[int(tag)] for tag in seq] for seq in df["predicted_tags"]]

In [None]:
from seqeval.metrics import classification_report, precision_score, recall_score, f1_score

In [None]:
print("Classification Report:")
print(classification_report(true_labels, predicted_labels))

print("Precision:", precision_score(true_labels, predicted_labels))
print("Recall:", recall_score(true_labels, predicted_labels))
print("F1-score:", f1_score(true_labels, predicted_labels))