In [1]:
!python --version

Python 3.8.11


In [2]:
%%capture
!git clone https://github.com/shashank-srikant/ai4code-tutorial
!cd ai4code-tutorial
%pip install -r requirements.txt

In [6]:
import torch
import torch.nn as nn
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss
from typing import List, Optional, Tuple, Union
import pickle
import os

In [7]:
def get_vocab_tokens_to_use(tokenizer, filename="vocab_tokens_to_use.pkl"):
	def isascii(s, ignore=['*', '@', '[', '\\', ']', '/', '<', '=', '>', '^', '_', '`', '{', '}', '|','~']):
		"""Check if the characters in string s are in ASCII, U+0-U+7F."""
		try:
			s_enc = s.encode('ascii')
			return (not any(i in s for i in ignore))
		except:
			return False
	
	if os.path.exists(filename):
		with open(filename, 'rb') as f:
			(vocab_tokens_to_use, vocab_tokens_to_ignore, vocab_tokens_not_upper_case, vocab_tokens_upper_case) = pickle.load(f)
	else:
		all_tokens = tokenizer.get_vocab()
		cntr, alt_tok = 0, {}
		toks_to_use, toks_to_ignore = [], []
		toks_lower_case, toks_upper_case, toks_other_case = [], [], []
		
		for k, v in all_tokens.items():
			if isascii(k):
				alt_tok[k] = v
				cntr += 1
				toks_to_use.append(v)
				
				if k[0].isupper() and k[0] != 'Ġ':
					toks_upper_case.append(v)
				elif k[0].islower():
					toks_lower_case.append(v)
				else:
					toks_other_case.append(v)
			else:
				toks_to_ignore.append(v)

		vocab_tokens_to_ignore = sorted(toks_to_ignore)
		vocab_tokens_to_use = sorted(toks_to_use)
		vocab_tokens_not_upper_case = sorted(toks_lower_case + toks_other_case)
		vocab_tokens_upper_case = sorted(toks_upper_case)
		with open(filename, 'wb') as f:
			pickle.dump((vocab_tokens_to_use, vocab_tokens_to_ignore, vocab_tokens_not_upper_case, vocab_tokens_upper_case), f)
	
	return vocab_tokens_to_use, vocab_tokens_to_ignore, vocab_tokens_not_upper_case, vocab_tokens_upper_case

In [4]:

import transformers
from transformers.modeling_outputs import (
    BaseModelOutputWithPastAndCrossAttentions, 
    BaseModelOutputWithPoolingAndCrossAttentions, 
    SequenceClassifierOutput
)
from transformers.models.bert.modeling_bert import (
    BertEmbeddings,
    BertModel
)

class CustomBertEmbeddings(BertEmbeddings):
    def __init__(self, config, vocab_size, embed_dim):
        super().__init__(config)
        self.word_embeddings_v2 = torch.nn.Linear(vocab_size, embed_dim, bias=False)

    def update_weights(self):
        assert self.word_embeddings_v2.weight.data.shape == torch.t(self.word_embeddings.weight.data).shape
        self.word_embeddings_v2.weight.data = torch.t(self.word_embeddings.weight.data)
    
    def forward(
        self, 
        input_ids=None, 
        token_type_ids=None, 
        position_ids=None, 
        inputs_embeds=None, 
        past_key_values_length=0,
        one_hot=None
    ):
        if input_ids is not None:
            input_shape = input_ids.size()
        else:
            input_shape = inputs_embeds.size()[:-1]

        seq_length = input_shape[1]

        if position_ids is None:
            position_ids = self.position_ids[:, past_key_values_length : seq_length + past_key_values_length]

        # Setting the token_type_ids to the registered buffer in constructor where it is all zeros, which usually occurs
        # when its auto-generated, registered buffer helps users when tracing the model without passing token_type_ids, solves
        # issue #5664
        if token_type_ids is None:
            if hasattr(self, "token_type_ids"):
                buffered_token_type_ids = self.token_type_ids[:, :seq_length]
                buffered_token_type_ids_expanded = buffered_token_type_ids.expand(input_shape[0], seq_length)
                token_type_ids = buffered_token_type_ids_expanded
            else:
                token_type_ids = torch.zeros(input_shape, dtype=torch.long, device=self.position_ids.device)

        if inputs_embeds is None:
            if one_hot is not None:
                inputs_embeds = self.word_embeddings_v2(one_hot)
            else:
                inputs_embeds = self.word_embeddings(input_ids)
        token_type_embeddings = self.token_type_embeddings(token_type_ids)

        embeddings = inputs_embeds + token_type_embeddings
        if self.position_embedding_type == "absolute":
            position_embeddings = self.position_embeddings(position_ids)
            embeddings += position_embeddings
        embeddings = self.LayerNorm(embeddings)
        embeddings = self.dropout(embeddings)
        return embeddings

class CustomBertModel(BertModel):
    def __init__(self, config, vocab_sz, hidden_sz):
        super().__init__(config)
        self.embeddings_v2 = CustomBertEmbeddings(config, vocab_sz, hidden_sz)

    def update_weights(self):
        self.embeddings_v2.load_state_dict(self.embeddings.state_dict(), strict=False)

    def forward(
        self,
        input_ids=None,
        attention_mask=None,
        token_type_ids=None,
        position_ids=None,
        head_mask=None,
        inputs_embeds=None,
        encoder_hidden_states=None,
        encoder_attention_mask=None,
        past_key_values=None,
        use_cache=None,
        output_attentions=None,
        output_hidden_states=None,
        return_dict=None,
        one_hot=None
    ):
        r"""
        encoder_hidden_states  (:obj:`torch.FloatTensor` of shape :obj:`(batch_size, sequence_length, hidden_size)`, `optional`):
            Sequence of hidden-states at the output of the last layer of the encoder. Used in the cross-attention if
            the model is configured as a decoder.
        encoder_attention_mask (:obj:`torch.FloatTensor` of shape :obj:`(batch_size, sequence_length)`, `optional`):
            Mask to avoid performing attention on the padding token indices of the encoder input. This mask is used in
            the cross-attention if the model is configured as a decoder. Mask values selected in ``[0, 1]``:

            - 1 for tokens that are **not masked**,
            - 0 for tokens that are **masked**.
        past_key_values (:obj:`tuple(tuple(torch.FloatTensor))` of length :obj:`config.n_layers` with each tuple having 4 tensors of shape :obj:`(batch_size, num_heads, sequence_length - 1, embed_size_per_head)`):
            Contains precomputed key and value hidden states of the attention blocks. Can be used to speed up decoding.

            If :obj:`past_key_values` are used, the user can optionally input only the last :obj:`decoder_input_ids`
            (those that don't have their past key value states given to this model) of shape :obj:`(batch_size, 1)`
            instead of all :obj:`decoder_input_ids` of shape :obj:`(batch_size, sequence_length)`.
        use_cache (:obj:`bool`, `optional`):
            If set to :obj:`True`, :obj:`past_key_values` key value states are returned and can be used to speed up
            decoding (see :obj:`past_key_values`).
        """
        output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
        output_hidden_states = (
            output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
        )
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict

        if self.config.is_decoder:
            use_cache = use_cache if use_cache is not None else self.config.use_cache
        else:
            use_cache = False

        if input_ids is not None and inputs_embeds is not None:
            raise ValueError("You cannot specify both input_ids and inputs_embeds at the same time")
        elif input_ids is not None:
            input_shape = input_ids.size()
        elif inputs_embeds is not None:
            input_shape = inputs_embeds.size()[:-1]
        else:
            raise ValueError("You have to specify either input_ids or inputs_embeds")

        batch_size, seq_length = input_shape
        device = input_ids.device if input_ids is not None else inputs_embeds.device

        # past_key_values_length
        past_key_values_length = past_key_values[0][0].shape[2] if past_key_values is not None else 0

        if attention_mask is None:
            attention_mask = torch.ones(((batch_size, seq_length + past_key_values_length)), device=device)

        if token_type_ids is None:
            if hasattr(self.embeddings, "token_type_ids"):
                buffered_token_type_ids = self.embeddings_v2.token_type_ids[:, :seq_length]
                buffered_token_type_ids_expanded = buffered_token_type_ids.expand(batch_size, seq_length)
                token_type_ids = buffered_token_type_ids_expanded
            else:
                token_type_ids = torch.zeros(input_shape, dtype=torch.long, device=device)

        # We can provide a self-attention mask of dimensions [batch_size, from_seq_length, to_seq_length]
        # ourselves in which case we just need to make it broadcastable to all heads.
        extended_attention_mask: torch.Tensor = self.get_extended_attention_mask(attention_mask, input_shape, device)

        # If a 2D or 3D attention mask is provided for the cross-attention
        # we need to make broadcastable to [batch_size, num_heads, seq_length, seq_length]
        if self.config.is_decoder and encoder_hidden_states is not None:
            encoder_batch_size, encoder_sequence_length, _ = encoder_hidden_states.size()
            encoder_hidden_shape = (encoder_batch_size, encoder_sequence_length)
            if encoder_attention_mask is None:
                encoder_attention_mask = torch.ones(encoder_hidden_shape, device=device)
            encoder_extended_attention_mask = self.invert_attention_mask(encoder_attention_mask)
        else:
            encoder_extended_attention_mask = None

        # Prepare head mask if needed
        # 1.0 in head_mask indicate we keep the head
        # attention_probs has shape bsz x n_heads x N x N
        # input head_mask has shape [num_heads] or [num_hidden_layers x num_heads]
        # and head_mask is converted to shape [num_hidden_layers x batch x num_heads x seq_length x seq_length]
        head_mask = self.get_head_mask(head_mask, self.config.num_hidden_layers)

        embedding_output = self.embeddings_v2(
            input_ids=input_ids,
            position_ids=position_ids,
            token_type_ids=token_type_ids,
            inputs_embeds=inputs_embeds,
            past_key_values_length=past_key_values_length,
            one_hot=one_hot
        )
        encoder_outputs = self.encoder(
            embedding_output,
            attention_mask=extended_attention_mask,
            head_mask=head_mask,
            encoder_hidden_states=encoder_hidden_states,
            encoder_attention_mask=encoder_extended_attention_mask,
            past_key_values=past_key_values,
            use_cache=use_cache,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )
        sequence_output = encoder_outputs[0]
        pooled_output = self.pooler(sequence_output) if self.pooler is not None else None

        if not return_dict:
            return (sequence_output, pooled_output) + encoder_outputs[1:]

        return BaseModelOutputWithPoolingAndCrossAttentions(
            last_hidden_state=sequence_output,
            pooler_output=pooled_output,
            past_key_values=encoder_outputs.past_key_values,
            hidden_states=encoder_outputs.hidden_states,
            attentions=encoder_outputs.attentions,
            cross_attentions=encoder_outputs.cross_attentions,
        )

class CustomBertForSequenceClassification(transformers.BertForSequenceClassification):
    def __init__(self, config, vocab_sz, hidden_sz):
        super().__init__(config)
        self.bert_v2 = CustomBertModel(config, vocab_sz, hidden_sz)
    
    def update_weights(self):
        self.bert_v2.load_state_dict(self.bert.state_dict(), strict=False)

    def forward(
        self,
        input_ids=None,
        attention_mask=None,
        token_type_ids=None,
        position_ids=None,
        head_mask=None,
        inputs_embeds=None,
        labels=None,
        output_attentions=None,
        output_hidden_states=None,
        return_dict=None,
        one_hot=None
    ):
        r"""
        labels (:obj:`torch.LongTensor` of shape :obj:`(batch_size,)`, `optional`):
            Labels for computing the sequence classification/regression loss. Indices should be in :obj:`[0, ...,
            config.num_labels - 1]`. If :obj:`config.num_labels == 1` a regression loss is computed (Mean-Square loss),
            If :obj:`config.num_labels > 1` a classification loss is computed (Cross-Entropy).
        """
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict

        outputs = self.bert_v2(
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
            one_hot=one_hot
        )

        pooled_output = outputs[1]

        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)

        loss = None
        if labels is not None:
            if self.config.problem_type is None:
                if self.num_labels == 1:
                    self.config.problem_type = "regression"
                elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int):
                    self.config.problem_type = "single_label_classification"
                else:
                    self.config.problem_type = "multi_label_classification"

            if self.config.problem_type == "regression":
                loss_fct = MSELoss()
                if self.num_labels == 1:
                    loss = loss_fct(logits.squeeze(), labels.squeeze())
                else:
                    loss = loss_fct(logits, labels)
            elif self.config.problem_type == "single_label_classification":
                loss_fct = CrossEntropyLoss()
                loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
            elif self.config.problem_type == "multi_label_classification":
                loss_fct = BCEWithLogitsLoss()
                loss = loss_fct(logits, labels)
        if not return_dict:
            output = (logits,) + outputs[2:]
            return ((loss,) + output) if loss is not None else output

        return SequenceClassifierOutput(
            loss=loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )

In [None]:
def get_most_sensitive_sites(model, code, target, inp_oh, number_of_sites, get_toks_per_word, tokenizer_for_debugging_purposes):
        # Use word importance function I(x_i) from https://arxiv.org/pdf/2109.00544.pdf
        grads_and_embeddings = model.get_grad_v2(code, target, None, inp_oh)
        g = grads_and_embeddings['gradient'].detach().data
        abs_g = torch.abs(g)
        sum_g = torch.sum(abs_g, dim=1)
        sorted_sum_g = torch.sort(sum_g, descending=True).indices.numpy().tolist()

        tok_idxss = grads_and_embeddings['ids']
        word_idxs = get_toks_per_word(tok_idxss)
        words_to_replace = []
        for k in word_idxs:
                if len(k) == 1:
                        words_to_replace.append(k[0])

        num_replace, idxs_to_replace = 0, []
        for s in sorted_sum_g:
                if (s in words_to_replace) and (s != (len(sorted_sum_g) - 1)):
                        num_replace += 1
                        idxs_to_replace.append(s)

                if num_replace == number_of_sites:
                        break

        return idxs_to_replace


def get_code_preds(sent, model, tokenizer, target_output=None, attack_loss_fn=None, input_oh=None):
    encoded_input = tokenizer(sent, return_tensors='pt')
    output = model(**encoded_input, one_hot=input_oh)
    if target_output is not None:
        loss = attack_loss_fn(output.logits, torch.tensor(target_output).unsqueeze(0)).detach().cpu().numpy().tolist()
    else:
        loss = None
    return output.logits.detach().cpu().squeeze(), encoded_input['input_ids'], encoded_input, loss


def per_code_synthesis_senti(code_to_transform, 
                                tokenizer, 
                                config,
                                model_wrapper_senti, 
                                custom_bert, 
                                desired_target_after_attack,
                                args, 
                                convert_to_onehot,
                                get_toks_per_word,
                                attack_iters,
                                learning_rate,
                                vocab_tokens_to_use_bert, 
                                number_to_dump,
                                paths,
                                model1=None, model2=None):

    # code_to_transform = "I am very sad today."
    
    result_dump = []
    result_dump.insert(0, ['pgd_iters: '+str(attack_iters), 
                    'pgd_lr: '+str(learning_rate), 
                    'desired_target: '+str(desired_target_after_attack), 
                    'number of multinomial samples: '+str(args.pgd_multinomial_samples),
                    'number sites attacked: '+str(args.pgd_number_sites_to_attack),
                    'custom input file: '+str(args.pgd_load_csv_pth),
                    'code idx '+str(args.pgd_load_csv_idx),
                    '', '', '', ''])

    result_dump.insert(1, ['Orig code', 
                    'Orig prediction',
                    'Orig prediction loss', 
                    'Generated sent', 
                    'Generated prediction',
                    'Generated prediction loss', 
                    'Best loss iters',
                    'Number of sites selected for perturbation',
                    'Number of sites finally pertrubed',
                    'Selected sites for perturbation',
                    'Processing time',
                    'CSV Idx'])
    
    df = pd.DataFrame(result_dump)
    identifier_dir = str(number_to_dump)+"_"+\
                str(attack_iters)+"_"+\
                str(learning_rate)+"_"+\
                str(desired_target_after_attack)+"_"+\
                str(args.pgd_multinomial_samples)+"_"+\
                str(args.pgd_number_sites_to_attack)+"_"+\
                str(args.pgd_min_sites_to_attack)+"_"+\
                str(args.pgd_max_sites_to_attack)+"_"+\
                str(args.pgd_load_csv_pth.split("/")[-1].split(".")[0])
    
    if args.pgd_number_sites_to_attack == -1:
        number_sites_to_attack = np.random.randint(args.pgd_min_sites_to_attack, args.pgd_max_sites_to_attack)
    else:
        number_sites_to_attack = args.pgd_number_sites_to_attack

    (prediction_orig, tok_idxs, encoded_idxs, loss_orig) = get_code_preds(code_to_transform, model_wrapper_senti.model, tokenizer, desired_target_after_attack, model_wrapper_senti.attack_loss_fn, None)
    # print("Original code: {}; Predicted activation: {}\n^^^^^\n".format(code_to_transform, prediction))

    input_onehot = convert_to_onehot(tok_idxs, vocab_size=len(tokenizer), device=args.device)
    input_onehot_orig = input_onehot.detach().clone()

    loss_iters, generated_tokenizer_idxs = [], None
    input_onehot_best = None
    loss_best = 100 #loss_prediction
    pred_best = 0
    
    ## Test whether onehot preds work as expected
    input_onehot.grad = None
    input_onehot.requires_grad = True
    input_onehot.retain_grad()
    (prediction_oh, _, _, _) = get_code_preds(code_to_transform, model_wrapper_senti.model, tokenizer, None, None, input_onehot)
    assert torch.equal(prediction_oh, prediction_orig)

    tok_to_attack = get_most_sensitive_sites(model_wrapper_senti, code_to_transform, desired_target_after_attack, input_onehot, number_sites_to_attack, get_toks_per_word, tokenizer)
    input_onehot.requires_grad = False			

    input_onehot_softmax = input_onehot.data.clone()
    # for attack_cnt in tqdm(range(pgd_attack_iters)):
    for attack_cnt in range(attack_iters):
        loss_best_sampled = 100 #loss_prediction
        pred_best_sampled = 0
        best_input_onehot_sampled, best_nabla_sampled = None, None

        if attack_cnt % 5 == 0:
                flg = True
        else:
                flg = False

        for _ in range(args.pgd_multinomial_samples):
            input_onehot_softmax_ = input_onehot_softmax.data.numpy()[0,:]
            sampled_oh_ = []
            for tok_idx in range(input_onehot_softmax_.shape[0]):
                if tok_idx in tok_to_attack:
                    sampled_oh_.append(np.random.multinomial(1, input_onehot_softmax_[tok_idx]))
                else:
                    sampled_oh_.append(input_onehot.data[:, tok_idx, :].squeeze(0).numpy())
            sampled_oh = np.stack(sampled_oh_)
            input_onehot_softmax_sampled = torch.tensor(sampled_oh, requires_grad=True, dtype=torch.double, device=args.device)
            grads_and_embeddings = model_wrapper_senti.get_grad_v2(code_to_transform, desired_target_after_attack, None, input_onehot_softmax_sampled, False)
            if (grads_and_embeddings['loss'] < loss_best_sampled): # or\
                # (desired_target < 0 and grads_and_embeddings['loss'] > loss_best_sampled):
                loss_best_sampled = grads_and_embeddings['loss']
                pred_best_sampled = grads_and_embeddings['prediction']
                #if args.verbose:
                #	print("Loss: {}; Pred: {}".format(loss_best_sampled, grads_and_embeddings['prediction']))
                best_input_onehot_sampled = input_onehot_softmax_sampled.detach().clone()
                best_nabla_sampled = grads_and_embeddings['gradient'].detach()
                
        loss_iters.append(loss_best_sampled)

        if (loss_best_sampled < loss_best): # or (desired_target < 0 and loss_best_sampled > loss_best):
            loss_best = loss_best_sampled
            pred_best = pred_best_sampled
            input_onehot_best = best_input_onehot_sampled.data
            if args.verbose:
                print("Loss: {}; Pred: {}; iter: {}".format(loss_best, pred_best, attack_cnt))

        input_onehot[:, tok_to_attack, :] = input_onehot[:, tok_to_attack, :] - torch.mul(best_nabla_sampled, learning_rate)[tok_to_attack, :]
        
        input_onehot_softmax = torch.nn.Softmax(dim=2)(input_onehot.data)

    if args.pgd_token_selection_strategy == 'argmax':
            generated_tokenizer_idxs = input_onehot_best.argmax(1).squeeze().detach().cpu().numpy().tolist()

    generated_string = tokenizer.decode(generated_tokenizer_idxs, skip_special_tokens=True)
    (generated_prediction, _, _, generated_prediction_loss) = get_code_preds(generated_string, model_wrapper_senti.model, tokenizer, desired_target_after_attack, model_wrapper_senti.attack_loss_fn, None)

    if args.verbose:
            print("Best loss: {} :: prediction: {}".format(loss_best, pred_best))
            print("generated_tokenizer_idxs: {}".format(generated_tokenizer_idxs))
            print('Generated string:\n{}^^\n'.format(generated_string))
            print('Original string:\n{}^^\n'.format(code_to_transform))
    

In [None]:
class HuggingFaceModelWrapper(ModelWrapper):
    """Loads a HuggingFace ``transformers`` model and tokenizer."""

    def __init__(self, 
                    model_encoder, 
                    model_fc, 
                    tokenizer, 
                    attack_loss_fn, 
                    debug_me=False, 
                    max_token_length=10, 
                    all_embeddings_file_name="all_embeddings.pkl",
                    layer_to_use=1, 
                    aggregation_to_use="last-tok",
                    vocab_toks_to_use=[]):
        assert isinstance(
            tokenizer,
            (transformers.PreTrainedTokenizer, transformers.PreTrainedTokenizerFast),
        ), f"`tokenizer` must of type `transformers.PreTrainedTokenizer` or `transformers.PreTrainedTokenizerFast`, but got type {type(tokenizer)}."

        self.model = model_encoder
        self.model_fc = model_fc
        self.tokenizer = tokenizer
        self._max_length = max_token_length
        self.attack_loss_fn = attack_loss_fn
        self.list_emb = list() # Stores a 1D tensor of all embeddings of tokens in the vocab
        self.list_toks = list() # Stores the tokens corresponding to the indicies in list_emb
        self.debug_me = debug_me
        self.all_embeddings_file_name = all_embeddings_file_name
        self.layer_to_use = layer_to_use
        self.aggregation_to_use = aggregation_to_use
        self.model_device = next(self.model.parameters()).device
        # self.get_all_embeddings(vocab_toks_to_use)


    def get_grad_v2(self, input_code, desired_label_or_target_after_attack, current_embedding=None, input_onehot=None, print_debug=False):
        """Get gradient of loss with respect to input tokens.

        Args:
            input_dict (dict): contains keys 'input_ids' and 'attention_mask' needed by the model
        Returns:
            Dict of ids, tokens, and gradient as numpy array.
        """
        self.model.eval()
        #self.model_fc.eval()
        if input_onehot is not None:
            input_onehot.grad = None
            input_onehot.requires_grad = True
            input_onehot.retain_grad()

        embedding_layer = self.model.get_input_embeddings()
        original_state = embedding_layer.weight.requires_grad
        embedding_layer.weight.requires_grad = True

        emb_grads = []
        if current_embedding is not None:
            # current_embedding.requires_grad = True
            current_embedding.retain_grad()

        def output_hook(module, input, output):
            if current_embedding is not None:
                if not self.debug_me:
                    output.data.copy_(current_embedding)
                else:
                    output.data = torch.zeros(current_embedding.shape, device=current_embedding.device)
            
            return output

        def grad_hook(module, grad_in, grad_out):
            emb_grads.append(grad_out[0])

        emb_bck_hook = embedding_layer.register_full_backward_hook(grad_hook)
        emb_fwd_hook_handle = embedding_layer.register_forward_hook(output_hook)

        self.model.zero_grad()

        input_dict = self.tokenizer(input_code, padding=False, return_tensors='pt', add_special_tokens=True)
        # print(input_dict.input_ids)

        prediction = self.model(input_dict.input_ids.to(self.model_device), output_hidden_states=True, return_dict=True, one_hot=input_onehot).logits.squeeze()
        
        loss = self.attack_loss_fn(prediction.unsqueeze(0), torch.tensor(desired_label_or_target_after_attack).unsqueeze(0))
        
        if print_debug:
            print("Prediction: {}; Loss :: {}".format(prediction, loss.squeeze().data.numpy().tolist()))
        # print("Loss shape :: ", loss.shape)
        loss.backward()

        # grad w.r.t to word embeddings
        # grad = emb_grads.squeeze() #.cpu().numpy()
        grad = input_onehot.grad.squeeze()
        
        embeddings = embedding_layer(input_dict['input_ids'])        
        embedding_layer.weight.requires_grad = original_state
        
        emb_fwd_hook_handle.remove()
        emb_bck_hook.remove()
        
        output = {"ids": input_dict, "gradient": grad, "embedding": embeddings, "loss":loss.detach().cpu().numpy().tolist(), "prediction": prediction.detach().cpu().numpy().tolist()}
        
        if self.debug_me:
            print(output['gradient'].shape)
            print(output['embedding'].shape)
        
        return output

In [None]:
from transformers import BertTokenizer, BertModel, BertForSequenceClassification
from transformers import AutoTokenizer, AutoModelForSequenceClassification, AutoConfig
tokenizer_bert = AutoTokenizer.from_pretrained("mrm8488/codebert-base-finetuned-detect-insecure-code")
config_bert = AutoConfig.from_pretrained("mrm8488/codebert-base-finetuned-detect-insecure-code")

model1_bert = AutoModelForSequenceClassification.from_pretrained("mrm8488/codebert-base-finetuned-detect-insecure-code")
model2_bert = BertForSequenceClassification.from_pretrained("mrm8488/codebert-base-finetuned-detect-insecure-code")
custom_bert = CustomBertForSequenceClassification(config_bert, config_bert.vocab_size, config_bert.hidden_size)
custom_bert.load_state_dict(model2_bert.state_dict(), strict=False)
custom_bert.update_weights()
custom_bert.bert_v2.update_weights()
custom_bert.bert_v2.embeddings_v2.update_weights()
custom_bert.eval()

vocab_tokens_to_use_bert, vocab_tokens_to_ignore_bert, vocab_tokens_not_upper_case_bert, vocab_tokens_upper_case_bert = get_vocab_tokens_to_use(tokenizer_bert)
attack_loss_fn = nn.CrossEntropyLoss()
model_wrapper_senti = HuggingFaceModelWrapper(model_encoder=custom_bert, 
                                        model_fc=None, 
                                        tokenizer=tokenizer_bert, 
                                        attack_loss_fn=attack_loss_fn,
                                        max_token_length=tokenizer_bert.model_max_length,
                                        debug_me=False,
                                        all_embeddings_file_name=None,
                                        layer_to_use=None,
                                        aggregation_to_use=None,
                                        vocab_toks_to_use=vocab_tokens_to_use_bert)

if args.pgd_data_root is not None:
    paths.DATAROOT = args.pgd_data_root

pgd_attack_iters_senti = args.pgd_iter # 5
learning_rate_pgd_senti  = args.pgd_lr # 1.5
desired_target_after_attack_senti = int(args.pgd_desired_target) # 100.0
# desired_target_after_attack_senti = torch.tensor(desired_target_senti, requires_grad=True).to(args.device)

In [None]:
for cnt, x in enumerate(range(len(stimset))):
    print("******* code idx:: {}".format(cnt+1))
    current_stimid = stimset.index[x]
    code_to_transform = stimset.loc[current_stimid]['code']
    per_code_synthesis_senti(code_to_transform, 
                                    tokenizer_bert, 
                                    config_bert, 
                                    model_wrapper_senti, 
                                    custom_bert, 
                                    desired_target_after_attack_senti, 
                                    args, 
                                    convert_to_onehot, 
                                    get_toks_per_word,
                                    pgd_attack_iters_senti,
                                    learning_rate_pgd_senti,
                                    vocab_tokens_to_use_bert,
                                    number_to_dump,
                                    paths, 
                                    model1_bert, model2_bert)