In [1]:
!pip install transformers --upgrade
!pip install torch --upgrade
!pip install ipywidgets --upgrade
!pip install torchserve --upgrade

# !pip list

Collecting transformers
  Using cached transformers-4.20.1-py3-none-any.whl (4.4 MB)
Collecting huggingface-hub<1.0,>=0.1.0
  Using cached huggingface_hub-0.8.1-py3-none-any.whl (101 kB)
Collecting tokenizers!=0.11.3,<0.13,>=0.11.1
  Using cached tokenizers-0.12.1-cp37-cp37m-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (6.6 MB)
Collecting filelock
  Using cached filelock-3.7.1-py3-none-any.whl (10 kB)
Collecting regex!=2019.12.17
  Using cached regex-2022.6.2-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (749 kB)
Installing collected packages: tokenizers, regex, filelock, huggingface-hub, transformers
Successfully installed filelock-3.7.1 huggingface-hub-0.8.1 regex-2022.6.2 tokenizers-0.12.1 transformers-4.20.1
Collecting torch
  Downloading torch-1.12.0-cp37-cp37m-manylinux1_x86_64.whl (776.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m776.3/776.3 MB[0m [31m759.9 kB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Installing collected packages: torch


In [2]:
import pandas as pd
import numpy as np
import os
import logging
import argparse

import google.auth
from google.cloud import storage, bigquery
from google.cloud import exceptions as GCPExceptions

from transformers import (RobertaConfig, RobertaTokenizer,
                          RobertaForSequenceClassification,
                          TrainingArguments, Trainer)
#import torch
from torch.utils.data import Dataset

os.environ['CUDA_LAUNCH_BLOCKING'] = '1'

logging.basicConfig(
    format='%(asctime)s %(levelname)-4s [%(filename)s:%(lineno)d] - %(message)s',
    datefmt='%Y-%m-%dT%H:%M:%S%z',
    level=logging.INFO
)


#--------------------------------------------------------------------------------------

class trainerConfig:
    def __init__(self):
        self.tierLevel: int = 1
        self.vocabularySize: int = 50265
        self.hiddenLayers: int = -1
        self.batchSize: int = 16
        
        self.robertaModel = None
        self.modelName = None
        
    
    def setUpFromArgs(self):
        parserNamespace = setUpParser()
        self.vocabularySize = parserNamespace.vocabulary_size
        self.hiddenLayers = parserNamespace.hidden_layers
        # self.tierLevel = parserNamespace.tier_level
        self.batchSize = parserNamespace.batch_size
        

    def loadModel(self, genGonf):
        modelBucket = genGonf.stClient.get_bucket(genGonf.modelBucketName)
        
        if self.hiddenLayers > -1:
            self.modelName = 'Model_'+str(self.hiddenLayers)+'_Layers_'+str(self.tierLevel)+'_Tier_'+str(self.vocabularySize)
        else:
            self.modelName = 'Model_roberta-base_'+str(self.tierLevel)+'_Tier_'+str(self.vocabularySize)
        
        for file in modelBucket.list_blobs(prefix=str(self.modelName)):
            if not os.path.exists(os.path.join(genGonf.tempDownloadFolder,str(self.modelName))):
                os.makedirs(os.path.join(genGonf.tempDownloadFolder,str(self.modelName)))
            file.download_to_filename(os.path.join(genGonf.tempDownloadFolder,file.name))

        self.robertaModel = RobertaForSequenceClassification.from_pretrained(os.path.join(genGonf.tempDownloadFolder,str(self.modelName)))
        
        
class generalConfig:
    def __init__(self):
        self.logLevel = logging.INFO
        self.vocabularySize: int = 50265
        self.projectID = 'mwrite-a835'
        self.reRun = 0
        
        self.predResultsBucketName: str = 'mpr-research-prediction-results'
        
        self.tokenizerBucketName: str = 'mpr-research-tokenizers'
        self.modelBucketName: str = 'mpr-research-models'
        
        self.dataTableID: str = 'mwrite-a835.mpr_research_uploaded_dataset.course-upload-data'
        self.timestampTableID: str = 'mwrite-a835.mpr_research_uploaded_dataset.course-upload-timestamp'
        self.predictTableID: str = 'mwrite-a835.mpr_research_predicted_dataset.predicted-data'
        
        self.tempDownloadFolder: str = './tmp'
        if not os.path.exists(self.tempDownloadFolder):
            os.makedirs(self.tempDownloadFolder)

        self.labelTierDict = {1:['Verification/Summary', 'Praise', 'Problem/Solution'],
                              2:['WritingOrFormatting Issues', 'Missing Content', 'Incorrect Content']}
        self.binaryDict = {0:'No', 1:'Yes'}

        self.coreColumns = ['AuthorID','ReviewerID','Criterion','Course','Comment']
        self.tierColumsDict = {1:['CommentCode'], 2:['WritingFormatting', 'MissingContent', 'Incorrect']}

        self.stClient = storage.Client(project=self.projectID)
        self.bqClient = bigquery.Client(project=self.projectID)
        
        self.robertaTokenizer = None
 
    
    def setUpFromArgs(self):
        parserNamespace = setUpParser()
        self.logLevel = parserNamespace.logging_level
        self.vocabularySize = parserNamespace.vocabulary_size
        self.projectID = parserNamespace.projectid
        self.reRun = parserNamespace.rerun
        
        self.predResultsBucketName = parserNamespace.prediction_bucket
        
        self.tokenizerBucketName = parserNamespace.tokenizer_bucket
        self.modelBucketName = parserNamespace.model_bucket
        
        self.dataTableID = parserNamespace.data_table
        self.timestampTableID = parserNamespace.timestamp_table
        self.predictTableID = parserNamespace.predict_table
        
            
    def loadTokenizer(self):
        tokenizerBucket = self.stClient.get_bucket(self.tokenizerBucketName)

        for file in tokenizerBucket.list_blobs(prefix=str(self.vocabularySize)):
            if not os.path.exists(os.path.join(self.tempDownloadFolder,str(self.vocabularySize))):
                os.makedirs(os.path.join(self.tempDownloadFolder,str(self.vocabularySize)))
            if not os.path.exists(os.path.join(self.tempDownloadFolder,file.name)):
                file.download_to_filename(os.path.join(self.tempDownloadFolder,file.name))

        self.robertaTokenizer = RobertaTokenizer.from_pretrained(os.path.join(self.tempDownloadFolder,str(self.vocabularySize)), do_lower_case=True)
            
    
class peerDataset(Dataset):
    def __init__(self, df, baseConf):
        self.config = baseConf
        self.comments = df['Comment'].values 

    def __len__(self):
        return len(self.comments)

    def __getitem__(self, idx):
        self.tokenizedData = sentenceTokenizer(self.comments[idx], self.config .robertaTokenizer)
        return self.tokenizedData
    
    
#--------------------------------------------------------------------------------------


def setUpParser():
    parser = argparse.ArgumentParser(description='Take in command line arguments.')
    parser.add_argument(
        '--projectid',
        help='Enter Project ID. Defaults to "mwrite-a835".',
        default='mwrite-a835',
        type=str)
    parser.add_argument(
        '--rerun',
        help='Set to 1 if to run on all courses, regardless of prediction status, else set to 0. Defaults to 0.',
        default=0,
        type=int)
    parser.add_argument(
        '--vocabulary-size',
        help='Vocabulary size for Roberta Tokenizer and Models. Defaults to 50265.',
        choices=[50265,30522],
        default=50265,
        type=int)
    parser.add_argument(
        '--hidden-layers',
        help='Number of hidden layers to use if building a model from Roverta Config is used. Set to -1 to use roberta-base instead. Defaults to -1.',
        default=-1,
        type=int)
    parser.add_argument(
        '--batch-size',
        help='Batch size for data fed to the model. Defaults to 16. Warning, higher batch sizes need higher memory requirements, make sure your job is configured appropiately.',
        default=16,
        type=int)
    parser.add_argument(
        '--tokenizer-bucket',
        help='GCP bucket where tokenizer files are stored. Defaults to "mpr-research-tokenizers".',
        default='mpr-research-tokenizers',
        type=str)
    parser.add_argument(
        '--model-bucket',
        help='GCP bucket where model files are uploaded after training. Defaults to "mpr-research-data".',
        default='mpr-research-models',
        type=str)
    parser.add_argument(
        '--data-predict-bucket',
        help='GCP bucket where course data are saved as TSV files. Defaults to "mpr-research-data-uploads".',
        default='mpr-research-data-uploads',
        type=str)
    parser.add_argument(
        '--prediction-bucket',
        help='GCP bucket where course data predictions are saved as CSV files. Defaults to "mpr-research-prediction-results".',
        default='mpr-research-prediction-results',
        type=str)
    parser.add_argument(
        '--data-table',
        help='BigQuery table where course data is stored. Defaults to "mwrite-a835.mpr_research_uploaded_dataset.course-upload-data".',
        default='mwrite-a835.mpr_research_uploaded_dataset.course-upload-data',
        type=str)
    parser.add_argument(
        '--timestamp-table',
        help='BigQuery table where course upload timestamp data is stored. Defaults to "mwrite-a835.mpr_research_uploaded_dataset.course-upload-timestamp".',
        default='mwrite-a835.mpr_research_uploaded_dataset.course-upload-timestamp',
        type=str)
    parser.add_argument(
        '--predict-table',
        help='BigQuery table where predicted data is uploaded. Defaults to "mwrite-a835.mpr_research_predicted_dataset.predicted-data".',
        default='mwrite-a835.mpr_research_predicted_dataset.predicted-data',
        type=str)
    parser.add_argument(
        '--logging-level',
        help='Set default Logging Level. Defaults to INFO.',
        choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'FATAL'],
        default='INFO',
        type=str)
    
    return parser.parse_args()


def sentenceTokenizer(sentence, tokenizer):
    tokenizedSentence = tokenizer(sentence, add_special_tokens = True, 
                                  max_length= 512, truncation='longest_first', padding='max_length', 
                                  return_attention_mask=True, return_tensors='pt')
    tokenizedSentence = {key:tokenizedSentence[key][0] for key in tokenizedSentence}
    return tokenizedSentence


def trainerBuilder(genConf, config):
    trainingArgs = TrainingArguments(
                            output_dir=os.path.join(genConf.tempDownloadFolder,'trainerTier'+str(config.tierLevel)+'Logs'),
                            per_device_eval_batch_size=config.batchSize,
                            )
    trainer = Trainer(
                    model=config.robertaModel,
                    args=trainingArgs,
                    )
    
    return trainer


def makePredictions(row, baseConfig, trainerDict):
    
    dataRetrievalQuery = f"SELECT * FROM `{baseConfig.dataTableID}` WHERE CourseID = {row['CourseID']}"
    dataDF = baseConfig.bqClient.query(dataRetrievalQuery).result().to_dataframe()

    dfTier1Slice = dataDF[baseConfig.coreColumns].dropna()
    dataLoaderTier1 = peerDataset(dfTier1Slice, baseConfig)
    predsDictTier1 = trainerDict[1].predict(dataLoaderTier1)
    predListTier1 = np.argmax(predsDictTier1.predictions, axis=-1)
    dfTier1Slice['Tier 1 Predictions'] = [baseConfig.labelTierDict[1][predValue] for predValue in predListTier1]

    dfTier2Slice = dfTier1Slice[dfTier1Slice['Tier 1 Predictions'] == 'Problem/Solution'].copy()
    dataLoaderTier2 = peerDataset(dfTier2Slice, baseConfig)
    predsDictTier2 = trainerDict[2].predict(dataLoaderTier2)
    predListTier2 = np.where(predsDictTier2.predictions > 0, 1, 0)
    for index, col in enumerate(baseConfig.labelTierDict[2]):
        dfTier2Slice[col] = [baseConfig.binaryDict[predValue] for predValue in predListTier2[:,index]]
    finalDF = dfTier1Slice.join(dfTier2Slice[baseConfig.labelTierDict[2]])
    finalDF.columns = finalDF.columns.str.replace(' ', '_')
    
    return finalDF

def runOnBQTable(baseConfig, trainerDict, forcePredict=True):
    
    timestampQuery = f"SELECT * FROM `{baseConfig.timestampTableID}`"
    timestampDF = baseConfig.bqClient.query(timestampQuery).result().to_dataframe()
    
    for _, row in timestampDF.iterrows():
        if not row['isPredicted'] or forcePredict:
            logging.info(f"Processing data for {row['CourseID']} - {row['Course']}.")
            finalDF = makePredictions(row, baseConfig, trainerDict)
            uploadSuccess = uploadPredictionToGCPTable(row, finalDF, baseConfig)
        else:
            logging.info(f"{row['CourseID']} - {row['Course']} already predicted on.")
    

def uploadPredictionToGCPTable(row, finalDF, baseConfig):

    try:
        logging.info(f"Deleting past {row['Course']} course data and updating.")
        deleteQuery = f"DELETE FROM `{baseConfig.predictTableID}` WHERE Course = '{row['Course']}'"
        deleteJob = baseConfig.bqClient.query(deleteQuery)

        logging.info(f"Saving {row['Course']} to GCP: {baseConfig.predictTableID}")
        uploadJobConfig = bigquery.LoadJobConfig()
        uploadJob = baseConfig.bqClient.load_table_from_dataframe(
                                        finalDF, 
                                        baseConfig.predictTableID, 
                                        job_config=uploadJobConfig
                                       )
        predictionCompleteQuery = f"UPDATE `{baseConfig.timestampTableID}` SET isPredicted = true WHERE CourseID = {row['CourseID']}"
        timestampUpdateJob = baseConfig.bqClient.query(predictionCompleteQuery)
        return True

    except GCPExceptions.NotFound as e:
        logging.error(f'Error Message: {e}')
        logging.error(
            f"Failed to upload Course Data for {row['Course']} to GCP table.")
        return False
    

# For debugging only
def wipeAllBQData(basicConfig):
    basicConfig.bqClient.query(f'DELETE FROM `{basicConfig.targetTableID}` WHERE true')
    basicConfig.bqClient.query(f'DELETE FROM `{basicConfig.timestampTableID}` WHERE true')
    logging.info('Wiped data from all tables.')

# For debugging only
def wipeAllPredictData(basicConfig):
    basicConfig.bqClient.query(f'DELETE FROM `{basicConfig.predictTableID}` WHERE true')
    logging.info('Wiped data from prediction table.')


In [3]:
basicConfig = generalConfig()
#basicConfig.setUpFromArgs()
basicConfig.loadTokenizer()

configByTierDict = {1:trainerConfig(), 2:trainerConfig()}
trainerByTierDict = {1:None, 2:None}

for tierLevel in configByTierDict:
    #configByTierDict[tierLevel].setUpFromArgs()
    configByTierDict[tierLevel].tierLevel = tierLevel
    configByTierDict[tierLevel].loadModel(basicConfig)

    trainerByTierDict[tierLevel] = trainerBuilder(basicConfig, configByTierDict[tierLevel])

runOnBQTable(basicConfig, trainerByTierDict, False)

loading configuration file ./tmp/Model_roberta-base_2_Tier_50265/config.json
Model config RobertaConfig {
  "_name_or_path": "roberta-base",
  "architectures": [
    "RobertaForSequenceClassification"
  ],
  "attention_probs_dropout_prob": 0.1,
  "bos_token_id": 0,
  "classifier_dropout": null,
  "eos_token_id": 2,
  "hidden_act": "gelu",
  "hidden_dropout_prob": 0.1,
  "hidden_size": 768,
  "id2label": {
    "0": "LABEL_0",
    "1": "LABEL_1",
    "2": "LABEL_2"
  },
  "initializer_range": 0.02,
  "intermediate_size": 3072,
  "label2id": {
    "LABEL_0": 0,
    "LABEL_1": 1,
    "LABEL_2": 2
  },
  "layer_norm_eps": 1e-05,
  "max_position_embeddings": 514,
  "model_type": "roberta",
  "num_attention_heads": 12,
  "num_hidden_layers": 12,
  "pad_token_id": 1,
  "position_embedding_type": "absolute",
  "problem_type": "multi_label_classification",
  "torch_dtype": "float32",
  "transformers_version": "4.20.1",
  "type_vocab_size": 1,
  "use_cache": true,
  "vocab_size": 50265
}

loading

In [4]:
from ts.torch_handler.base_handler import BaseHandler
import torch

class TransformersClassifierHandler(BaseHandler):
    """
    The handler takes an input string and returns the classification text 
    based on the serialized transformers checkpoint.
    """
    def __init__(self):
        super(TransformersClassifierHandler, self).__init__()
        
        self.basicConfig = generalConfig()
        #basicConfig.setUpFromArgs()
        self.basicConfig.loadTokenizer()
        self.configByTierDict = {1:trainerConfig(), 2:trainerConfig()}
        for tierLevel in self.configByTierDict:
            #configByTierDict[tierLevel].setUpFromArgs()
            self.configByTierDict[tierLevel].tierLevel = tierLevel
            self.configByTierDict[tierLevel].loadModel(self.basicConfig)

        self.initialized = False

    def initialize(self, ctx):
        """ Loads the model.pt file and initialized the model object.
        Instantiates Tokenizer for preprocessor to use
        Loads labels to name mapping file for post-processing inference response
        """
        self.manifest = ctx.manifest
        properties = ctx.system_properties
        self.device = torch.device("cuda:" + str(properties.get("gpu_id")) if torch.cuda.is_available() else "cpu")
        self.initialized = True

    def preprocess(self, data):
        """ Preprocessing input request by tokenizing
            Extend with your own preprocessing steps as needed
        """
        # text = data[0].get("data")
        # if text is None:
        #     text = data[0].get("body")
        # sentences = text.decode('utf-8')
        # logger.info("Received text: '%s'", sentences)
        
        sentences = data

        # Tokenize the texts
        tokenizer_args = ((sentences,))
        tokenizedSentences = self.basicConfig.robertaTokenizer(*tokenizer_args, add_special_tokens = True, 
                                  max_length= 512, truncation='longest_first', padding='max_length', 
                                  return_attention_mask=True, return_tensors='pt')
        return tokenizedSentences
    
    def inference(self, inputs):
        """ Predict the class of a text using a trained transformer model.
        """
        predictionsDict = {}
        with torch.no_grad():
            logits = self.configByTierDict[1].robertaModel(**inputs).logits
        predicted_class_id = logits.argmax().item()
        predictionsDict['Tier 1 Predictions'] = self.basicConfig.labelTierDict[1][predicted_class_id]

        if predictionsDict['Tier 1 Predictions'] == 'Problem/Solution':
            with torch.no_grad():
                logits = self.configByTierDict[2].robertaModel(**inputs).logits
            predicted_class_id = np.where(logits > 0, 1, 0)[0]
            for index, col in enumerate(self.basicConfig.labelTierDict[2]):
                predictionsDict[col] = self.basicConfig.binaryDict[predicted_class_id[index]] 
        else:
            for index, col in enumerate(self.basicConfig.labelTierDict[2]):
                predictionsDict[col] = None

        return [predictionsDict]

    def postprocess(self, inference_output):
        return inference_output
    
    
# baseHandler = TransformersClassifierHandler()
# tokenizedData = baseHandler.preprocess('This is a test sentence and I think I love it.')
# baseHandler.inference(tokenizedData)