In [None]:
# package for fine tune
import numpy as np
import pandas as pd
import transformers
from datasets import Dataset, load_metric, DatasetDict
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score,mean_squared_error,r2_score,accuracy_score,balanced_accuracy_score,roc_curve,auc,f1_score
from sklearn.metrics import precision_recall_fscore_support, log_loss
from sklearn.metrics import classification_report
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer
transformers.logging.set_verbosity_error()

In [None]:
# package for performance specifically
from sklearn.metrics import classification_report
import tensorflow as tf
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
from sklearn.datasets import make_classification
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import roc_curve
from sklearn.preprocessing import label_binarize
from tensorflow.python.ops.numpy_ops import np_config
plt.rcParams['figure.dpi'] = 200
sns.set(rc={'figure.figsize':(4,4)},style='ticks',font="Arial", font_scale=1)
from itertools import cycle

# 1 Build Model

In [None]:
# Prepare Datasets
dfFLIP = pd.read_csv("C:/Users/replace_with_your_data.csv",header=0)
df2017 = dfFLIP.loc[dfFLIP["FLIP_year"]==2017].reset_index(drop=True)
df2013 = dfFLIP.loc[dfFLIP["FLIP_year"]==2013].reset_index(drop=True) # more years

In [None]:
data2017 = df2017
data2017 = data2017.loc[data2017['Ingredients'].notna(),]
data2017 = data2017.loc[data2017['NOVA'].notna(),]
data2017 = data2017.loc[data2017['NOVA']!="not_avaliable",] 
data2017['NOVA'] = pd.to_numeric(data2017['NOVA'],errors='coerce')
display(data2017.shape)

df2017b = data2017[['Ingredients','NOVA']] # "ID"
df2017b.columns = ['text','label']
df2017b['label'] = df2017b['label'] - 1 # Make sure that labels start at 0
df2017b['label'] = df2017b['label'].astype(int) # And labels are integers
display(df2017b.head(5))
print(df2017b['label'].unique())
#train, test = train_test_split(df2017b, test_size=0.3, random_state=3456)
train, test = train_test_split(df2017b, test_size=0.2, random_state=3456)
train, valid = train_test_split(train, test_size=0.125, random_state=3456) # 0.125 x 0.8 = 0.1
print('train',train.shape), print(train.shape)
print('valid',valid.shape), print(valid.shape)
print('test',test.shape), print(test.shape)

display(train.shape, test.shape)

train_dataset = Dataset.from_dict(train)
test_dataset = Dataset.from_dict(test)
my_dataset_dict = DatasetDict({"train":train_dataset,"test":test_dataset})
my_dataset_dict

In [None]:
def tokenize_function(examples):
    return tokenizer(examples["text"], padding="max_length", truncation=True)
metric = load_metric("accuracy")
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    return metric.compute(predictions=predictions, references=labels)

In [None]:
def compute_metrics2(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='weighted')
    acc = accuracy_score(labels, preds)
    bacc = balanced_accuracy_score(labels, preds)
    
    return {
        'accuracy': acc,
        'balanced_accuracy_score': bacc,
        'f1': f1,
        'precision': precision,
        'recall': recall
    }

In [None]:
### Train the model
# Tokenize_final 
tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/multi-qa-MiniLM-L6-cos-v1")
tokenized_datasets1 = my_dataset_dict.map(tokenize_function, batched=True)
# create a smaller subset of the full dataset to fine-tune on to reduce the time it takes:
small_train_dataset1 = tokenized_datasets1["train"]#.shuffle(seed=1234).select(range(1000))
small_eval_dataset1 = tokenized_datasets1["test"]#.shuffle(seed=1234).select(range(1000))
model1 = AutoModelForSequenceClassification.from_pretrained('sentence-transformers/multi-qa-MiniLM-L6-cos-v1', num_labels=4)
# Training hyperparameters
training_args1 = TrainingArguments(disable_tqdm=False,output_dir="C:/Users/your_location", evaluation_strategy="epoch", num_train_epochs=10)
# Trainer
trainer1 = Trainer(
    model=model1, # model
    args=training_args1,
    train_dataset=small_train_dataset1, # dataset train
    eval_dataset=small_eval_dataset1, # dataset evalualtion
    compute_metrics=compute_metrics2)
trainer1.train()
# save and import the model
model1.save_pretrained("C:/Users/location_of_your_saved_model/")

In [None]:
# use the saved model  (everytime when re-open)
tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/multi-qa-MiniLM-L6-cos-v1")
tokenized_datasets1 = my_dataset_dict.map(tokenize_function, batched=True)
# create a smaller subset of the full dataset to fine-tune on to reduce the time it takes:
small_train_dataset1 = tokenized_datasets1["train"]#.shuffle(seed=1234).select(range(1000))
small_eval_dataset1 = tokenized_datasets1["test"]#.shuffle(seed=1234).select(range(1000))
model2 = AutoModelForSequenceClassification.from_pretrained('C:/Users/location_of_your_saved_model/', num_labels=4) ### change
# Training hyperparameters
training_args1 = TrainingArguments(disable_tqdm=False,output_dir="C:/Users/your_location", evaluation_strategy="epoch",num_train_epochs=10)
# Trainer
trainer1 = Trainer(
    model=model2, # model
    args=training_args1,
    train_dataset=small_train_dataset1, # dataset train
    eval_dataset=small_eval_dataset1, # dataset evalualtion
    compute_metrics=compute_metrics2)

In [None]:
### classification_report
y_pred = trainer1.predict(tokenized_datasets1["test"])
y_true = tokenized_datasets1["test"]['label']
predictions = tf.nn.softmax(y_pred.predictions) #, labels=clf.classes_
pred = np.argmax(predictions, 1)
cm=confusion_matrix(y_true, pred)
print(cm)

classification_report(y_true, pred)
print(classification_report(y_true, pred))

### normalized Confusion Matrix
c = disp.confusion_matrix
normed_c = (c.T / c.astype(np.float).sum(axis=1)).T
normed_c
normed_c=np.round(normed_c,2) # change digits to 2

title  = "Normalized Confusion Matrix \n "
disp2 = ConfusionMatrixDisplay(normed_c,
                              display_labels=['1','2','3','4'])

disp2.plot(cmap=plt.cm.Blues)
disp2.ax_.set_title(title, fontsize=13,fontweight="bold") # ,
disp2.ax_.set_xlabel('Predicted NOVA Group', fontsize=13)
disp2.ax_.set_ylabel('True NOVA Group', fontsize=13)

# 2 Predict new data

In [None]:
# Import new data
test_xFLIP2013 = df2013
test_xFLIP2013 = test_xFLIP2013.loc[:,["ID","Ingredients"]]
test_xFLIP2013 =test_xFLIP2013.dropna(axis=0, subset=["Ingredients"])

test = test_xFLIP2013
print(test.shape)
test = test[['Ingredients']]
test.columns=['text']

FLIP2013ft=test
test_df = Dataset.from_dict(FLIP2013ft).map(tokenize_function, batched=True)
y_pred = trainer1.predict(test_df)
predictions = tf.nn.softmax(y_pred.predictions) #, labels=clf.classes_
pred = np.argmax(predictions, 1)

# Save predicted data
test_xFLIP2013["Pred_finetune"]=pred+1
test_xFLIP2013.to_csv('C:/Users/your_location.csv',index=True)
print(test_xFLIP2013.Pred_finetune.value_counts())