<a href="https://colab.research.google.com/github/konductor000/Style-transfer-BERT/blob/main/style_transfer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install -q transformers
!pip install datasets
!pip install evaluate
!wget -q https://github.com/shentianxiao/language-style-transfer/raw/master/data/yelp/sentiment.train.0 -O train_negative
!wget -q https://github.com/shentianxiao/language-style-transfer/raw/master/data/yelp/sentiment.train.1 -O train_positive
!wget -q https://github.com/shentianxiao/language-style-transfer/raw/master/data/yelp/sentiment.dev.0 -O dev_negative
!wget -q https://github.com/shentianxiao/language-style-transfer/raw/master/data/yelp/sentiment.dev.1 -O dev_positive

In [2]:
!head -n 5 ./dev_positive
!echo
!head -n 5 ./dev_negative

staff behind the deli counter were super nice and efficient !
love this place !
the staff are always very nice and helpful .
the new yorker was amazing .
very ny style italian deli .

ok never going back to this place again .
easter day nothing open , heard about this place figured it would ok .
the host that walked us to the table and left without a word .
it just gets worse .
the food tasted awful .


In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
dir = 'drive/MyDrive/saves/style_transfer/'

In [5]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
device = 'cuda' if torch.cuda.is_available() else 'cpu'

if device == 'cpu':
    print("Fine-tuning BERT without an accelerator is not party-approved.")

In [6]:
from transformers import LineByLineTextDataset, DataCollatorForLanguageModeling
from transformers import Trainer, TrainingArguments

def train_model(model, tokenizer, file_path, output_dir, epochs=1):
    print("Preparing the training data...")
    dataset = LineByLineTextDataset(
        file_path=file_path, tokenizer=tokenizer, block_size=128)

    print("Dataset ready!")

    trainer = Trainer(
        model=model, train_dataset=dataset, 
        data_collator=DataCollatorForLanguageModeling(
            tokenizer=tokenizer, mlm=True, mlm_probability=0.15),
        args=TrainingArguments(
            output_dir=output_dir, overwrite_output_dir=True,
            num_train_epochs=epochs, per_device_train_batch_size=32,
            save_steps=10_000, save_total_limit=2),
    )

    trainer.train()

In [None]:
from transformers import BertTokenizer, BertForMaskedLM

tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

use_funetuned = True

if use_funetuned:
    bert_mlm_positive = BertForMaskedLM.from_pretrained(dir + 'positive')
    bert_mlm_negative = BertForMaskedLM.from_pretrained(dir + 'negative')
else:
    bert_mlm_positive = BertForMaskedLM.from_pretrained('bert-base-uncased', return_dict=True).to(device).train(True)
    dataset_path_pos = "./train_positive"
    output_dir_pos = "./bert_mlm_positive"

    bert_mlm_negative = BertForMaskedLM.from_pretrained('bert-base-uncased', return_dict=True).to(device).train(True)
    dataset_path_neg = "./train_negative"
    output_dir_neg = "./bert_mlm_negative"

    train_model(bert_mlm_positive, tokenizer, dataset_path_pos, output_dir_pos, epochs=4)
    train_model(bert_mlm_negative, tokenizer, dataset_path_neg, output_dir_neg, epochs=5)

bert_mlm_positive.to(device)
bert_mlm_negative.to(device)

In [8]:
if not use_funetuned:
    bert_mlm_positive.save_pretrained(dir + 'positive')
    bert_mlm_negative.save_pretrained(dir + 'negative')

In [9]:
id_token = {value: key for key, value in tokenizer.vocab.items()}

mask = tokenizer.mask_token
tok_mask = 103
mask_id = 6
sequence = 'The sound in the cinema was good !'

sequence = sequence.split()
sequence[mask_id] = mask
sequence = ' '.join(sequence)
print(sequence)
batch = tokenizer([sequence], padding=True, truncation=True, return_tensors='pt')
batch = {key: value.to(device) for key, value in batch.items()}
mask_id = batch['input_ids'][0].tolist().index(tok_mask)

logits = bert_mlm_positive(**batch).logits.cpu().data.numpy()[0, mask_id]
logits = bert_mlm_negative(**batch).logits.cpu().data.numpy()[0, mask_id]

top_k = np.argsort(-logits)[:5]
for i in top_k:
    print(id_token[i])

The sound in the cinema was [MASK] !
horrible
terrible
awful
loud
ridiculous


In [10]:
def get_replacements(sentence: str, num_tokens, k_best, epsilon=1e-3):
    """
    - split the sentence into tokens using the INGSOC-approved BERT tokenizer
    - find :num_tokens: tokens with the highest ratio (see above)
    - replace them with :k_best: words according to bert_mlm_positive
    :return: a list of all possible strings (up to k_best * num_tokens)
    """
    
    id_token = {value: key for key, value in tokenizer.vocab.items()}

    out_list = []
    sent_split = sentence.split()
    all_scores = []
    mask = tokenizer.mask_token
    tok_mask = 103

    for i in range(len(sent_split)):
        mask_id = i

        sequence = sent_split.copy()
        sequence[mask_id] = mask
        sequence = ' '.join(sequence)

        batch = tokenizer([sequence], padding=True, truncation=True, return_tensors='pt')
        batch = {key: value.to(device) for key, value in batch.items()}
        mask_id = batch['input_ids'][0].tolist().index(tok_mask)

        logits_pos = bert_mlm_positive(**batch).logits.cpu().data.numpy()[0, mask_id]
        logits_neg = bert_mlm_negative(**batch).logits.cpu().data.numpy()[0, mask_id]

        if sent_split[i] not in tokenizer.vocab:
            continue
        token = tokenizer.vocab[sent_split[i]]

        score = (logits_pos[token] + epsilon) / (logits_neg[token] + epsilon)

        all_scores.append(score)

    idx_list = np.argsort(all_scores)[:num_tokens]

    for i in idx_list:
        mask_id = i

        sequence = sent_split.copy()
        sequence[mask_id] = mask
        sequence = ' '.join(sequence)

        batch = tokenizer([sequence], padding=True, truncation=True, return_tensors='pt')
        batch = {key: value.to(device) for key, value in batch.items()}
        mask_id = batch['input_ids'][0].tolist().index(tok_mask)

        logits_pos = bert_mlm_positive(**batch).logits.cpu().data.numpy()[0, mask_id]

        top_k = np.argsort(-logits_pos)[:k_best]
        
        for token_id in top_k:
            out_seq = sent_split.copy()
            out_seq[i] = id_token[token_id]
            out_list.append(" ".join(out_seq))


    return out_list

In [11]:
dev_data = list(open('./dev_negative'))

In [12]:
dev_data[500:505]

['wrong !\n',
 "i 'm sure she wo n't issue a refund because - surprise !\n",
 "nothing special and the drinks are n't cheap .\n",
 'great wings and decent drinks but the wait staff is horrible !\n',
 'the tables are all saved by people crowded around the two tvs .\n']

In [13]:
get_replacements("great wings and decent drinks but the wait staff is horrible !",
                 num_tokens=2, k_best=2)
# >>> ["great wings and decent drinks but the wait staff is great !", "great wings and decent drinks but the wait staff is awesome !"])

['great wings and decent drinks but the wait staff is awesome !',
 'great wings and decent drinks but the wait staff is great !',
 'great wings and decent drinks and the wait staff is horrible !',
 'great wings and decent drinks , the wait staff is horrible !']

In [14]:
raw_dataset = []

for sentence in list(open('./dev_positive')):
    tokenized = tokenizer(sentence, padding=True, truncation=True)
    raw_dataset.append({**tokenized, 'labels': 1})

for sentence in list(open('./dev_negative')):
    tokenized = tokenizer(sentence, padding=True, truncation=True)
    raw_dataset.append({**tokenized, 'labels': 0})

In [15]:
train_dataset = np.array(raw_dataset)

In [16]:
from sklearn.model_selection import train_test_split

train_dataset, eval_dataset = train_test_split(train_dataset, test_size=0.15)

In [17]:
import evaluate

accuracy = evaluate.load("accuracy")

Downloading builder script:   0%|          | 0.00/4.20k [00:00<?, ?B/s]

In [18]:
import numpy as np


def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    predictions = np.argmax(predictions, axis=1)
    return accuracy.compute(predictions=predictions, references=labels)

In [19]:
id2label = {0: "NEGATIVE", 1: "POSITIVE"}
label2id = {"NEGATIVE": 0, "POSITIVE": 1}

In [20]:
from transformers import AutoModelForSequenceClassification, TrainingArguments, Trainer
from transformers import AutoTokenizer

model = AutoModelForSequenceClassification.from_pretrained(
    "distilbert-base-uncased", num_labels=2, id2label=id2label, label2id=label2id
)

tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")

Downloading (…)lve/main/config.json:   0%|          | 0.00/483 [00:00<?, ?B/s]

Downloading (…)"pytorch_model.bin";:   0%|          | 0.00/268M [00:00<?, ?B/s]

Some weights of the model checkpoint at distilbert-base-uncased were not used when initializing DistilBertForSequenceClassification: ['vocab_layer_norm.bias', 'vocab_projector.bias', 'vocab_transform.bias', 'vocab_projector.weight', 'vocab_layer_norm.weight', 'vocab_transform.weight']
- This IS expected if you are initializing DistilBertForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing DistilBertForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of DistilBertForSequenceClassification were not initialized from the model checkpoint at distilbert-base-uncased and are newly initialized: ['classifier.bias', 'pre_classifier.bias', 'classifier.w

Downloading (…)okenizer_config.json:   0%|          | 0.00/28.0 [00:00<?, ?B/s]

Downloading (…)solve/main/vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

Downloading (…)/main/tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

In [21]:
from transformers import DataCollatorWithPadding

data_collator = DataCollatorWithPadding(tokenizer=tokenizer, return_tensors="pt")

In [22]:
use_finetuned = True

training_args = TrainingArguments(
    output_dir="my_awesome_model",
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=10,
    weight_decay=0.01,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,

)

if not use_finetuned:
    trainer.train()
else:
    model = AutoModelForSequenceClassification.from_pretrained(
        dir + 'classicator', num_labels=2, id2label=id2label, label2id=label2id
    ) 

loading configuration file drive/MyDrive/saves/style_transfer/classicator/config.json
Model config DistilBertConfig {
  "_name_or_path": "drive/MyDrive/saves/style_transfer/classicator",
  "activation": "gelu",
  "architectures": [
    "DistilBertForSequenceClassification"
  ],
  "attention_dropout": 0.1,
  "dim": 768,
  "dropout": 0.1,
  "hidden_dim": 3072,
  "id2label": {
    "0": "NEGATIVE",
    "1": "POSITIVE"
  },
  "initializer_range": 0.02,
  "label2id": {
    "NEGATIVE": 0,
    "POSITIVE": 1
  },
  "max_position_embeddings": 512,
  "model_type": "distilbert",
  "n_heads": 12,
  "n_layers": 6,
  "pad_token_id": 0,
  "problem_type": "single_label_classification",
  "qa_dropout": 0.1,
  "seq_classif_dropout": 0.2,
  "sinusoidal_pos_embds": false,
  "tie_weights_": true,
  "torch_dtype": "float32",
  "transformers_version": "4.26.0",
  "vocab_size": 30522
}

loading weights file drive/MyDrive/saves/style_transfer/classicator/pytorch_model.bin
All model checkpoint weights were used 

In [23]:
if not use_finetuned:
    model.save_pretrained(dir + 'classicator')

In [24]:
from transformers import pipeline

sentences = ['Very bad product!', 'Pretty good servings, and prices are high']

classifier = pipeline("sentiment-analysis", model=model, tokenizer=tokenizer)
classifier(sentences)

Disabling tokenizer parallelism, we're using DataLoader multithreading already


[{'label': 'NEGATIVE', 'score': 0.9993836879730225},
 {'label': 'POSITIVE', 'score': 0.9998654127120972}]

In [25]:
def take_res(result):
    output = []
    for i in range(len(result)):
        if result[i]['label'] == 'POSITIVE':
            output.append(result[i]['score'])
        else:
            output.append(1 - result[i]['score'])

    return output


def change_coment(text, num_tokens, k_best, num_iters = 5):
    
    beam_size = num_tokens * k_best

    beam_search = []
    beam_results = []

    beam_search += get_replacements(text, num_tokens, k_best)
    beam_results += take_res(classifier(beam_search))

    for i in range(num_iters):
        prob_text = []
        prob_score = []

        for j in range(beam_size):
            prob_text += get_replacements(beam_search[-j-1], num_tokens, k_best)
        
        prob_score += take_res(classifier(prob_text))
        new_branch = sorted([[prob_score[i], prob_text[i]] for i in range(len(prob_text))], reverse=True)

        for j in range(beam_size):
            beam_search.append(new_branch[j][1])
            beam_results.append(new_branch[j][0])

    beam = sorted(list(set([(beam_results[i], beam_search[i]) for i in range(len(beam_results))])) \
                  + [(take_res(classifier(text))[0], text)], reverse=True)

    return [pair[1] for pair in beam[:min(beam_size, len(beam))]]

In [31]:
num_tokens = 2
k_best = 2

texts = [
    "This place is weird and I do not like it at all !",
    "The prices are high and the staff were terrible .",
    "Never buy this product this is really bullshit !",
    "Very delicious coffee and pancakes !",
]

for text in texts:
    text = text.lower()

    beam_search = change_coment(text, num_tokens, k_best)

    print(beam_search)

['this place is great and i do really like it at all !', 'this place is great and i do really like it after all !', 'this place is great and i do not like it after all !', 'this place is great and i do not like it at all !']
['their prices are great and the staff is great !', 'the prices are great and the staff is great !', 'their prices are great and the staff is great .', 'the prices are great and the staff is great .']
['i like this and she is really great !', 'i like this product he is so great !', 'i like this and it is really great !', 'i like this and this is really great !']
['very delicious coffee and pancakes !', 'very delicious coffee and pancakes !', 'very delicious coffee and food !', 'very delicious coffee and pancakes .']


In [None]:
"""
This place is weird and I do not like it at all!       --> This place is great and i do really like it after all!
The prices are high and the staff were terrible.       --> Their prices are great and the staff is great!
Never buy this product this is really bullshit!        --> I like this and it is really great!
Very delicious coffee and pancakes! (already positive) --> Very delicious coffee and pancakes! (Didn't change at all)
"""