<a href="https://colab.research.google.com/github/sallywang147/SCInvarinfer/blob/main/%5Bcloud%5D_T5_for_smart_contracts.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [32]:
!pip install sentencepiece
!pip install transformers
!pip install rich[jupyter]

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [33]:
from google.colab import auth
from google.auth import default
from numpy import random
import gspread
import gc
#autenticating to google
auth.authenticate_user()
creds, _ = default()
gc = gspread.authorize(creds)

In [34]:
import pandas as pd
#defining my worksheet
worksheet = gc.open('invariants3').sheet1
#get_all_values gives a list of rows
rows = worksheet.get_all_values()
#Convert to a DataFrame 
cols = ['Source', 'Target', 'Verify_Success']
df = pd.DataFrame(rows, columns=cols)

In [35]:
df

Unnamed: 0,Source,Target,Verify_Success
0,pragma solidity >=0.4.24 <0.6.0; contract C {\...,pragma solidity >=0.4.24 <0.6.0; contract C {\...,1
1,pragma solidity >=0.4.24 <0.6.0;\n\ncontract A...,pragma solidity >=0.4.24 <0.6.0;\n\ncontract A...,1
2,"pragma solidity >=0.4.24<0.6.0;\nimport ""./Lib...","pragma solidity >=0.4.24<0.6.0;\nimport ""./Lib...",0
3,pragma solidity >=0.4.24 <0.6.0;\n\ncontract A...,pragma solidity >=0.4.24 <0.6.0;\n\ncontract A...,1
4,pragma solidity >=0.4.24 <0.6.0;\n\ncontract L...,pragma solidity >=0.4.24 <0.6.0;\n\ncontract L...,1
...,...,...,...
65,pragma solidity >=0.4.24 <0.6.0;\n\n// This te...,pragma solidity >=0.4.24 <0.6.0;\n\n// This te...,
66,pragma solidity >=0.4.24 <0.6.0;\ncontract B {...,pragma solidity >=0.4.24 <0.6.0;\ncontract B {...,
67,pragma solidity >=0.4.24 <0.6.0;\n\ncontract E...,pragma solidity >=0.4.24 <0.6.0;\n\ncontract E...,
68,// SPDX-License-Identifier: MIT\npragma experi...,// SPDX-License-Identifier: MIT\npragma experi...,


In [36]:
# Importing libraries
import os
import numpy as np
import pandas as pd
import plotly.express as px
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, RandomSampler, SequentialSampler


# Importing the T5 modules from huggingface/transformers
from transformers import T5Tokenizer, T5ForConditionalGeneration

from rich.table import Column, Table
from rich import box
from rich.console import Console

# define a rich console logger
console=Console(record=True)

def display_df(df):
  """display dataframe in ASCII format"""

  console=Console()
  table = Table(Column("source_text", justify="center" ), Column("target_text", justify="center"), title="Sample Data",pad_edge=False, box=box.ASCII)

  for i, row in enumerate(df.values.tolist()):
    table.add_row(row[0], row[1])

  console.print(table)

def plot_loss(index_list, loss_list):
  results = {
      "epochs": index_list,
      "cross entropy loss": loss_list,
  }
  df = pd.DataFrame(results)
  fig = px.line(df, x ="epochs", y="cross entropy loss",  title="Evaluation")
  fig.show()

training_logger = Table(Column("Epoch", justify="center" ), 
                        Column("Steps", justify="center"),
                        Column("Cross Entropy Loss", justify="center"), 
                        title="Training Status",pad_edge=False, box=box.ASCII)


In [37]:
# Setting up the device for GPU usage
from torch import cuda
device = 'cuda' if cuda.is_available() else 'cpu'

In [38]:
class YourDataSetClass(Dataset):
  """
  Creating a custom dataset for reading the dataset and 
  loading it into the dataloader to pass it to the neural network for finetuning the model

  """

  def __init__(self, dataframe, tokenizer, source_len, target_len, source_text, target_text):
    self.tokenizer = tokenizer
    self.data = dataframe
    self.source_len = source_len
    self.summ_len = target_len
    self.target_text = self.data[target_text]
    self.source_text = self.data[source_text]

  def __len__(self):
    return len(self.target_text)

  def __getitem__(self, index):
    source_text = str(self.source_text[index])
    target_text = str(self.target_text[index])

    #cleaning data so as to ensure data is in string type
    source_text = ' '.join(source_text.split())
    target_text = ' '.join(target_text.split())
    
    source = self.tokenizer.batch_encode_plus([source_text], \
                                              max_length= self.source_len, \
                                              pad_to_max_length=True, \
                                              truncation=True, \
                                            #  padding='longest',\
                                              return_tensors='pt')
    target = self.tokenizer.batch_encode_plus([target_text], \
                                              max_length= self.summ_len, \
                                              pad_to_max_length=True, \
                                              truncation=True, \
                                           #   padding='longest', \
                                              return_tensors='pt')   
    source_ids = source['input_ids'].squeeze()
    source_mask = source['attention_mask'].squeeze()
    target_ids = target['input_ids'].squeeze()
    target_mask = target['attention_mask'].squeeze()

    return {
        'source_ids': source_ids.to(dtype=torch.long), 
        'source_mask': source_mask.to(dtype=torch.long), 
        'target_ids': target_ids.to(dtype=torch.long),
        'target_ids_y': target_ids.to(dtype=torch.long)
    }

In [39]:
def train(epoch, tokenizer, model, device, loader, optimizer):

  """
  Function to be called for training with the parameters passed from main function

  """

  model.train()
  loss_list = []
  for _, data in enumerate(loader, 0):
    y = data['target_ids'].to(device, dtype = torch.long)
    y_ids = y[:, :-1].contiguous()
    lm_labels = y[:, 1:].clone().detach()
    lm_labels[y[:, 1:] == tokenizer.pad_token_id] = -100
    ids = data['source_ids'].to(device, dtype = torch.long)
    mask = data['source_mask'].to(device, dtype = torch.long)
    outputs = model(input_ids = ids, attention_mask = mask, \
                    decoder_input_ids=y_ids, labels=lm_labels)  
    total_loss = float(outputs[0].item())
    loss_list.append(total_loss)
    if _%10==0:
      training_logger.add_row(str(epoch), str(_), str(total_loss))
      console.print(training_logger)

    optimizer.zero_grad()
    outputs[0].backward()
    optimizer.step()
    return loss_list



In [40]:
def validate(epoch, tokenizer, model, device, loader):

  """
  Function to evaluate model for predictions

  """
  model.eval()
  predictions = []
  actuals = []
  with torch.no_grad():
      for _, data in enumerate(loader, 0):
          y = data['target_ids'].to(device, dtype = torch.long)
          ids = data['source_ids'].to(device, dtype = torch.long)
          mask = data['source_mask'].to(device, dtype = torch.long)

          generated_ids = model.generate(
              input_ids = ids,
              attention_mask = mask, 
              max_length=150, 
              num_beams=2,
              repetition_penalty=2.5, 
              length_penalty=1.0, 
              early_stopping=True
              )
          print("generated token length: \n", len(generated_ids))
          preds = [tokenizer.decode(g, skip_special_tokens=True, \
                                    clean_up_tokenization_spaces=True) for g in generated_ids]
          target = [tokenizer.decode(t, skip_special_tokens=True, \
                                     clean_up_tokenization_spaces=True)for t in y]
          if _%10==0:
              console.print(f'Completed {_}')
          predictions.extend(preds)
          actuals.extend(target)
  print('predictions: \n', predictions)
  print('actuals: \n', actuals)
  return predictions, actuals

In [41]:
import gc
def T5Trainer(dataframe, source_text, target_text, model_params, output_dir="./output/"):
  
  """
  T5 trainer

  """

  # Set random seeds and deterministic pytorch for reproducibility
  torch.manual_seed(model_params["SEED"]) # pytorch random seed
  np.random.seed(model_params["SEED"]) # numpy random seed
  torch.backends.cudnn.deterministic = True

  # logging
  console.log(f"""[Model]: Loading {model_params["MODEL"]}...\n""")

  # tokenzier for encoding the text
  tokenizer = T5Tokenizer.from_pretrained(model_params["MODEL"])

  gc.collect()
  torch.cuda.empty_cache()
  os.environ['CUDA_VISIBLE_DEVICES']='0, 1, 2, 3'
  os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true'
  # Defining the model. We are using t5-base model and added a Language model layer on top for generation of Summary. 
  # Further this model is sent to device (GPU/TPU) for using the hardware.
  model = T5ForConditionalGeneration.from_pretrained(model_params["MODEL"])
  model = model.to(device)
  
  # logging
  console.log(f"[Data]: Reading data...\n")

  # Importing the raw dataset
  dataframe = dataframe[[source_text,target_text]]
  display_df(dataframe.head(2))

  
  # Creation of Dataset and Dataloader
  # Defining the train size. So 80% of the data will be used for training and the rest for validation. 
  train_size = 0.99
  train_dataset=dataframe.sample(frac=train_size,random_state = model_params["SEED"])
  val_dataset=dataframe.drop(train_dataset.index).reset_index(drop=True)
  train_dataset = train_dataset.reset_index(drop=True)

  console.print(f"FULL Dataset: {dataframe.shape}")
  console.print(f"TRAIN Dataset: {train_dataset.shape}")
  console.print(f"TEST Dataset: {val_dataset.shape}\n")


  # Creating the Training and Validation dataset for further creation of Dataloader
  training_set = YourDataSetClass(train_dataset, tokenizer, model_params["MAX_SOURCE_TEXT_LENGTH"], model_params["MAX_TARGET_TEXT_LENGTH"], source_text, target_text)
  val_set = YourDataSetClass(val_dataset, tokenizer, model_params["MAX_SOURCE_TEXT_LENGTH"], model_params["MAX_TARGET_TEXT_LENGTH"], source_text, target_text)


  # Defining the parameters for creation of dataloaders
  train_params = {
      'batch_size': model_params["TRAIN_BATCH_SIZE"],
      'shuffle': True,
      'num_workers': 0
      }


  val_params = {
      'batch_size': model_params["VALID_BATCH_SIZE"],
      'shuffle': False,
      'num_workers': 0
      }


  # Creation of Dataloaders for testing and validation. This will be used down for training and validation stage for the model.
  training_loader = DataLoader(training_set, **train_params)
  val_loader = DataLoader(val_set, **val_params)


  # Defining the optimizer that will be used to tune the weights of the network in the training session. 
  optimizer = torch.optim.Adam(params = model.parameters(), lr=model_params["LEARNING_RATE"])


  # Training loop
  console.log(f'[Initiating Fine Tuning]...\n')

  loss_result = []
  epoch_list = []
  for epoch in range(model_params["TRAIN_EPOCHS"]):
      loss = train(epoch, tokenizer, model, device, training_loader, optimizer)
      loss_result.extend(loss)
      epoch_list.append(epoch)

  plot_loss(epoch_list, loss_result)
  console.log(f"[Saving Model]...\n")
  #Saving the model after training
  path = os.path.join(output_dir, "model_files")
  model.save_pretrained(path)
  tokenizer.save_pretrained(path)


  # evaluating test dataset
  console.log(f"[Initiating Validation]...\n")
  for epoch in range(model_params["VAL_EPOCHS"]):
    predictions, actuals = validate(epoch, tokenizer, model, device, val_loader)
    final_df = pd.DataFrame({'Generated Text':predictions,'Actual Text':actuals})
    final_df.to_csv(os.path.join(output_dir,'predictions.csv'))
  
  console.save_text(os.path.join(output_dir,'logs.txt'))
  
  console.log(f"[Validation Completed.]\n")
  console.print(f"""[Model] Model saved @ {os.path.join(output_dir, "model_files")}\n""")
  console.print(f"""[Validation] Generation on Validation data saved @ {os.path.join(output_dir,'predictions.csv')}\n""")
  console.print(f"""[Logs] Logs saved @ {os.path.join(output_dir,'logs.txt')}\n""")

In [42]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [46]:
model_params={
    "MODEL":"t5-small",             # model_type: t5-large
    "TRAIN_BATCH_SIZE": 16,          # training batch size: 
    "VALID_BATCH_SIZE":16,          # validation batch size
    "TRAIN_EPOCHS":100,              # number of training epochs:20 seems optimal based on experiments
    "VAL_EPOCHS":1,                # number of validation epochs
    "LEARNING_RATE":1e-4,          # learning rate
    "MAX_SOURCE_TEXT_LENGTH":512,  # max length of source text
    "MAX_TARGET_TEXT_LENGTH":512,   # max length of target text
    "SEED": random.randint(1000)    # randomized seeds to shuffle test set

}

In [47]:
#To solve CUDA out of memory error
import gc
gc.collect()
torch.cuda.empty_cache()
os.environ['CUDA_VISIBLE_DEVICES']='0, 1, 2, 3'
os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true'

In [48]:
T5Trainer(dataframe=df, source_text="Source", target_text="Target", model_params=model_params, output_dir="outputs")

generated token length: 
 1


predictions: 
 ['pragma solidity >=0.4.24 0.6.0; contract Branch  function testIf(bool b) public returns (uint a)  if (b)  a = 1;  else  a = 2;  function testTernary(bool b) public returns (uint a)  a = b? 1 : 2;   function testTernary(']
actuals: 
 ['pragma solidity >=0.4.24 0.6.0; contract Branch  function testIf(bool b) public returns (uint a)  if (b)  a = 1;  else  a = 2;  assert (a == 1 || a == 2);  function testTernary(bool b) public returns (uint a)  a = b? 1 : 2; assert (a == 1 || a == 2);']


In [171]:
test_params = {
      'batch_size': 8,
      'shuffle': False,
      'num_workers': 0
      }

class TestDataSetClass(Dataset):
  """
  Creating a custom dataset for reading the dataset and 
  loading it into the dataloader to pass it to the neural network for finetuning the model

  """

  def __init__(self, dataframe, tokenizer, source_len, source_text):
    self.tokenizer = tokenizer
    self.data = dataframe
    self.source_len = source_len
    self.source_text = self.data[source_text]

  def __len__(self):
    return len(self.source_text)

  def __getitem__(self, index):
    source_text = str(self.source_text[index])

    #cleaning data so as to ensure data is in string type
    source_text = ' '.join(source_text.split())
    print('source_text: ', source_text)
    source = self.tokenizer.batch_encode_plus([source_text], \
                                               max_length= self.source_len, \
                                              pad_to_max_length=True, \
                                              truncation=True, \
                                           #  padding='longest', \
                                              return_tensors='pt')  
                                   

    source_ids = source['input_ids'].squeeze()
    print("len of source_ids in test_Dataset loader: \n", len(source_ids))
    input_id_chunks = source_ids.split(500)
    mask_chunks = source['attention_mask'][0].split(500)

    for i in range(len( input_id_chunks)):
        pad_len = 500 - input_id_chunks[i].shape[0]
        # check if tensor length satisfies required chunk size
        if pad_len > 0:
            # if padding length is more than 0, we must add padding

                padded_chunk = list(input_id_chunks[i])
                input_id_chunks[i] = torch.cat([
                input_id_chunks[i], torch.Tensor([0] * pad_len)], dim=-1)
                input_id_chunks[i][0] = torch.Tensor(tuple(padded_chunk))
                mask_chunk = list(mask_chunks[i])
                mask_chunk = torch.cat([
                mask_chunks[i], torch.Tensor([0] * pad_len)],dim=1)
                mask_chunks[i][0] = torch.Tensor(tuple(mask_chunks))
    input_ids = torch.stack(input_id_chunks)
    source_mask = torch.stack(mask_chunks)


    return {
        'source_ids': source_ids.to(dtype=torch.long), 
        'source_mask': source_mask.to(dtype=torch.long), 
    }

In [176]:
import gc
#let's get the trained model and never-seen test contracts 
def initialize():
  #gc = gspread.authorize(creds)
  trained_model = T5ForConditionalGeneration.from_pretrained('/content/outputs/model_files')
  tokenizer = T5Tokenizer.from_pretrained("/content/outputs/model_files")
 # test_contracts = gc.open('test_contracts').sheet1
 # rows = test_contracts.get_all_values()
  #print("test: \n",rows)
  #Convert to a DataFrame 
  col = ['Test']
  f = open('/content/ERC20.sol', "r")
  file = f.read()
  df = pd.DataFrame([file], columns=col)
  return df, trained_model, tokenizer

#test how the model performs on never-seen test contracts
def generate(df, model, tokenizer):
   model.eval()
   #take out: source_len=512, 
   #test_val = YourDataSetClass(df, tokenizer, model_params["MAX_SOURCE_TEXT_LENGTH"], model_params["MAX_TARGET_TEXT_LENGTH"], 'source_text', 'target_text')
   test_val = TestDataSetClass(df, tokenizer, source_len=2000, source_text="Test")
   test_loader = DataLoader(test_val, **test_params)
   predictions = []
  # reality = []
   with torch.no_grad():
      for _, data in enumerate(test_loader, 0):
                 #    y = data['target_ids']#.to(device, dtype = torch.long)
          ids = data['source_ids']#.to(device, dtype = torch.long)
          mask = data['source_mask']#.to(device, dtype = torch.long)
          print(len(ids))
          print(len(mask))
          generated_ids = model.generate(
              input_ids = ids,
              attention_mask = mask, 
              max_length=120, 
              num_beams=2,
              repetition_penalty=2.5, 
              length_penalty=1.0, 
              early_stopping=True
              )
          print("generated_ids: ", generated_ids)
          preds = [tokenizer.decode(g, skip_special_tokens=True, \
                                    clean_up_tokenization_spaces=True) for g in generated_ids]
        #  target = [tokenizer.decode(t, skip_special_tokens=True, \
        #                            clean_up_tokenization_spaces=True)for t in y]

          print("preds: ", preds)
       #   print("target: ", target)
          if _%10==0:
              console.print(f'Completed {_}')
          predictions.extend(preds)
   with open('./prediction_file', 'w') as writefile:
    for line in predictions:
        print(line)
        writefile.write(line)
         # print('predictions: \n', predictions)
  # return predictions

In [177]:
df, trained_model, tokenizer = initialize()
generate(df, trained_model, tokenizer)

source_text:  pragma solidity ^0.5.0; import "./IERC20.sol"; import "./SafeMath.sol"; import "./Libraries/VeriSolContracts.sol"; //change /** * A highly simplified Token to express basic specifications * * - totalSupply() equals the Sum({balanceOf(a) | a is an address }) * */ contract ERC20 is IERC20 { mapping (address => uint256) private _balances; uint256 private _totalSupply; /** * A dummy constructor */ constructor (uint256 totalSupply) public { require(msg.sender != address(0)); _totalSupply = totalSupply; _balances[msg.sender] = totalSupply; } /** * @dev See {IERC20-totalSupply}. */ function totalSupply() public view returns (uint256) { return _totalSupply; } /** * @dev See {IERC20-balanceOf}. */ function balanceOf(address account) public view returns (uint256) { return _balances[account]; } /** * @dev See {IERC20-transfer}. * * Requirements: * * - `recipient` cannot be the zero address. * - the caller must have a balance of at least `amount`. */ function transfer(address recipie


The `pad_to_max_length` argument is deprecated and will be removed in a future version, use `padding=True` or `padding='longest'` to pad to the longest sequence in the batch, or use `padding='max_length'` to pad to a max length. In this case, you can give a specific length with `max_length` (e.g. `max_length=45`) or leave max_length to None to pad to the maximal input size of the model (e.g. 512 for Bert).



RuntimeError: ignored

In [None]:
#for downloading purpose 
#!zip -r /content/model.zip /content/outputs/model_files