In [1]:
import warnings


random_state = 42
base_model_id = "gpt2"
cache_dir = "model"
warnings.filterwarnings("ignore")

# Data Processing

In [2]:
from datasets import load_dataset


dataset = load_dataset("csv", data_files={"train": "train.csv", "test": "test.csv"})
dataset

DatasetDict({
    train: Dataset({
        features: ['ID', 'Material', 'Type', 'Shape', 'Coat/Functional Group', 'Synthesis_Method', 'Surface_Charge', 'Cell_Type', 'No_of_Cells (cells/well)', 'Human_Animal', 'Cell_Source', 'Cell_Tissue', 'Cell_Morphology', 'Cell_Age', 'Cell Line_Primary Cell', 'Time (hr)', 'Concentration (ug/ml)', 'Test', 'Test_Indicator', 'Size', 'Zeta', 'Target'],
        num_rows: 1775
    })
    test: Dataset({
        features: ['ID', 'Material', 'Type', 'Shape', 'Coat/Functional Group', 'Synthesis_Method', 'Surface_Charge', 'Cell_Type', 'No_of_Cells (cells/well)', 'Human_Animal', 'Cell_Source', 'Cell_Tissue', 'Cell_Morphology', 'Cell_Age', 'Cell Line_Primary Cell', 'Time (hr)', 'Concentration (ug/ml)', 'Test', 'Test_Indicator', 'Size', 'Zeta', 'Target'],
        num_rows: 762
    })
})

## Prompt Engineering

In [3]:
import random


description = """The material of the nanoparticle is {material}. The nanoparticle is {type}. The morphology of the nanoparticle is {shape}. The fabrication method is {syn_method}. {coat}. The cell type is {cell_type}. The number of cells (cells/well) is {n_cell}. The origin species of the cell is {species}. The source of the cell line is {cell_source}. The type of cell tissue is {tissue}. The morphology of the cell is {cell_shape}. The cell is in {age} stage. The cell is {line_primary}. The exposure time is {time} hours. The exposure concentration is {conc} ug/ml. The type of cytotoxicity test is {test}. The test mechanism is {indicator}. The size of the nanoparticle is {size} nm. The zeta potential indicating surface charge stability of the nanoparticle is {zeta} mV. The surface charge is {surface_charge}"""

mapping = {
    "Human_Animal": {"H": "human", "A": "animal"},
    "Cell Line_Primary Cell": {"P": "primary", "L": "cell line"},
    "Type": {"I": "inorganic", "O": "organic", "C": "carbon"},
}


def get_coating(coat):
    if isinstance(coat, str):
        return "The surface coating is " + coat
    return "The cell has no surface coating"


def get_description(row) -> str:
    return description.format(
        material=row["Material"],
        type=mapping["Type"][row["Type"]],
        shape=row["Shape"],
        syn_method=row["Synthesis_Method"],
        coat=get_coating(row["Coat/Functional Group"]),
        cell_type=row["Cell_Type"],
        n_cell=row["No_of_Cells (cells/well)"],
        species=mapping["Human_Animal"][row["Human_Animal"]],
        cell_source=row["Cell_Source"],
        tissue=row["Cell_Tissue"],
        cell_shape=row["Cell_Morphology"],
        age=row["Cell_Age"],
        line_primary=mapping["Cell Line_Primary Cell"][row["Cell Line_Primary Cell"]],
        time=row["Time (hr)"],
        conc=row["Concentration (ug/ml)"],
        test=row["Test"],
        indicator=row["Test_Indicator"],
        size=row["Size"],
        zeta=row["Zeta"],
        surface_charge=row["Surface_Charge"],
    )


reference = """Example {idx}:
- Description: {description}
- Answer: {target}"""


def get_reference(idx, row) -> str:
    return reference.format(
        idx=idx,
        description=get_description(row),
        target=row["Target"],
    )


prompt = """Predict the viability of a cell. {references}{description}. Viability (%):"""


def generate_prompt(batch: dict) -> dict:

    n = len(batch["ID"])
    examples = []
    for i in range(n):
        example = {k: batch[k][i] for k in batch.keys()}
        examples.append(example)

    example_idx = random.sample(range(n), 1)[0] if n != 1 else 0
    reference_idx_pool = list(set(range(n)) - set([example_idx]))

    if len(batch["ID"]) == 1:
        text = prompt.format(references="", description=get_description(examples[0]))
        return {
            "id": batch["ID"],
            "text": [text],
            "labels": batch["Target"],
        }

    elif len(batch["ID"]) == 2:
        reference_idx = random.sample(reference_idx_pool, 1)[0]
        text = prompt.format(
            references=get_reference("", examples[reference_idx]),
            description=get_description(examples[example_idx]),
        )
        return {
            "id": [examples[example_idx]["ID"]],
            "text": [text],
            "labels": [examples[example_idx]["Target"]],
        }

    k = n - 1
    example_idx = random.sample(range(n), 1)[0]
    reference_idxs = random.sample(reference_idx_pool, k)
    text = prompt.format(
        references="\n".join([get_reference(i+1, examples[idx]) for i, idx in enumerate(reference_idxs)]),
        description=get_description(examples[example_idx]),
    )
    return {
        "id": [examples[example_idx]["ID"]],
        "text": [text],
        "labels": [examples[example_idx]["Target"]],
    }


old_columns = dataset["train"].column_names
dataset["train"] = dataset["train"].map(
    generate_prompt,
    remove_columns=old_columns,
    batched=True,
    batch_size=1,
)
dataset["test"] = dataset["test"].map(
    generate_prompt,
    remove_columns=old_columns,
    batched=True,
    batch_size=1,
)

print("Train example " + "-" * 20)
print(f"{dataset['train'][0]['text']} {dataset['train'][0]['labels']}")
print("Test example " + "-" * 20)
print(f"{dataset['test'][0]['text']} {dataset['test'][0]['labels']}")

dataset

Train example --------------------
Predict the viability of a cell. The material of the nanoparticle is Pt. The nanoparticle is inorganic. The morphology of the nanoparticle is Sphere. The fabrication method is Chemical Reduction. The surface coating is PVP. The cell type is IMR90. The number of cells (cells/well) is 5000.0. The origin species of the cell is human. The source of the cell line is Human. The type of cell tissue is Lung. The morphology of the cell is Fibroblast. The cell is in Adult stage. The cell is cell line. The exposure time is 24 hours. The exposure concentration is 25.0 ug/ml. The type of cytotoxicity test is CellTiterGlo. The test mechanism is LuciferaseEnzyme. The size of the nanoparticle is 4.0 nm. The zeta potential indicating surface charge stability of the nanoparticle is -8.0 mV. The surface charge is Negative. Viability (%): 98.293
Test example --------------------
Predict the viability of a cell. The material of the nanoparticle is Ag. The nanoparticle is 

DatasetDict({
    train: Dataset({
        features: ['id', 'text', 'labels'],
        num_rows: 1775
    })
    test: Dataset({
        features: ['id', 'text', 'labels'],
        num_rows: 762
    })
})

## Tokenization

In [4]:
from transformers import AutoTokenizer


def tokenize(examples):
    return tokenizer(examples["text"], padding=True, return_tensors='pt')


tokenizer = AutoTokenizer.from_pretrained(base_model_id, cache_dir=cache_dir)
tokenizer.pad_token = tokenizer.eos_token
dataset = dataset.map(tokenize, batched=True)
dataset

DatasetDict({
    train: Dataset({
        features: ['id', 'text', 'labels', 'input_ids', 'attention_mask'],
        num_rows: 1775
    })
    test: Dataset({
        features: ['id', 'text', 'labels', 'input_ids', 'attention_mask'],
        num_rows: 762
    })
})

# Model Training

In [5]:
from transformers import AutoModelForSequenceClassification


id2label = {0: "HIGH"}
label2id = {"HIGH": 0}

model = AutoModelForSequenceClassification.from_pretrained(
    base_model_id,
    torch_dtype="auto",
    num_labels=1,
    id2label=id2label,
    label2id=label2id,
    cache_dir=cache_dir,
)
model.config.pad_token_id = model.config.eos_token_id

Some weights of GPT2ForSequenceClassification were not initialized from the model checkpoint at gpt2 and are newly initialized: ['score.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [6]:
from transformers import TrainingArguments, Trainer
from time import strftime


output_model_id = f"Cytotoxicity-Regression_GPT2_{strftime('%Y%m%d-%H%M%S')}"

training_args = TrainingArguments(
    output_dir=f"output/{output_model_id}",
    do_train=True,
    num_train_epochs=40,
    warmup_ratio=0.1,
    per_device_train_batch_size=72,
    learning_rate=1e-4,
    weight_decay=1,
    logging_strategy="epoch",
    save_strategy="epoch",
    seed=random_state,
    save_total_limit=5,
    bf16=True,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=dataset["train"],
)

trainer.train()

Step,Training Loss
25,6633.7087
50,5037.2422
75,3681.5662
100,3134.9106
125,2778.3531
150,2477.7175
175,2227.828
200,1996.4623
225,1798.6805
250,1622.3647


TrainOutput(global_step=1000, training_loss=1248.9363686523438, metrics={'train_runtime': 608.6539, 'train_samples_per_second': 116.651, 'train_steps_per_second': 1.643, 'total_flos': 7790349381120000.0, 'train_loss': 1248.9363686523438, 'epoch': 40.0})

# Inference

In [7]:
import pandas as pd
from time import time


start_time = time()
outputs = trainer.predict(dataset["test"])
end_time = time()
print(f"Inference time: {end_time - start_time:.4f} seconds")

ids = [row["id"] for row in dataset["test"].select_columns(["id"])]
preds = outputs.predictions.squeeze()
submission = pd.DataFrame({"ID": ids, "Target": preds})

submission.to_csv("../results/main/r_gpt2.csv", index=False)
submission

Inference time: 2.4049 seconds


Unnamed: 0,ID,Target
0,32,92.00
1,376,96.00
2,71,84.50
3,2232,92.00
4,2018,95.00
...,...,...
757,2356,93.00
758,64,94.00
759,649,95.50
760,1484,91.00
