# VertexAI Sentiment Analysis Notebook

Date of creation: Jan 23

In this notebook you will find a sentiment analysis pipeline created in VertexAI.

The pipeline:
- Starts from a csv file which has been stored in gcs 
- Creates a VertexAI dataset from the data 
- Creates a sentiment analysis model classifying Negative Neutral and Positive sentiments 
- Fetches model evaluation metrics and confusion matrix
- Saves metrics, confusion matrix and gcs file to BigQuery

Documentation links:
1. Creating service account key, this is used to create a json file which was saved to the VM 
    - https://cloud.google.com/iam/docs/creating-managing-service-account-keys#iam-service-account-keys-create-console
    
2. Overview of modelling in vertex AI using the api with coded examples with short video intros. Here is where you can find almost all the functions in this notebook under the section describing sentiment analysis for text data.
    - https://cloud.google.com/vertex-ai/docs/training-overview

## Imports, Variables & Functions

In [None]:
### List of imports

import pandas as pd
import matplotlib.pyplot as plt
import math
from itertools import islice
import google
from google.cloud import bigquery
from google.cloud import aiplatform
from google.cloud import storage
import os
import time
import calendar
import json
from datetime import date

In [None]:
### List of variables required for functions

#Credentials for google api, points to json key which was manually loaded to instance
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'ab-mlai-team-dev-5bb89ad0ff4c.json'

#Variables for dataset creation
gcp_project = 'ab-mlai-team-dev'
location = 'europe-west4'
gcs_source = 'gs://mlai-nlp/cleaned_data/cleaned_600_elements.csv'
bq_dataset = 'Corona_NLP'

#Creating unique filename
current_GMT = time.gmtime()
ts = calendar.timegm(current_GMT)
filename = gcs_source.split('/')[-1].rstrip('.csv') + str(ts)
todays_date = date.today().strftime("%d-%m-%Y")

#Import schema
import_schema_uri = aiplatform.schema.dataset.ioformat.text.sentiment

#Variables for training pipeline creation
dataset_id = 'projects/526415775648/locations/europe-west4/datasets/4244609663447859200'
model_display_name = dataset_id.split('/')[-1]
sentiment_max = 2

#Variables for model evaluation
model_id = '2508971185375543296'
model_parent = f'projects/{gcp_project}/locations/{location}/models/{model_id}'

#Variables for batch predictions
inference_dataset_source = 'gs://mlai-nlp/cleaned_data/inference_dataset.csv' 
inference_bucket = 'nlp-batch-prediction-test'
source_bucket = 'mlai-nlp'

#Notebook controls
create_dataset =  False
create_model = False
load_gcs_source_to_bq = False
load_evals_to_bq = False
load_confusion_matrix_to_bq = False
preform_batch_prediction = False
load_batch_prediction_to_bq = True

In [None]:
todays_date

## Functions

In [None]:
### Functions

def create_and_import_text_dataset_from_bigquery(
    display_name: str,
    project: str,
    location: str,
    gcs_source: str
):

    aiplatform.init(project=project, location=location)
    
    #change dataset type here if not using tabular data:
    dataset = aiplatform.TextDataset.create(
        display_name=display_name,
        project = gcp_project,
        location = location,
        gcs_source=gcs_source,
        import_schema_uri=import_schema_uri
    )

    dataset.wait()

    print(f'\tDataset: "{dataset.display_name}"')
    print(f'\tname: "{dataset.resource_name}"')
    
    return dataset.resource_name

def create_training_pipeline_text_sentiment_analysis_sample(
    project: str,
    location: str,
    display_name: str,
    dataset_id: str,
    model_display_name: str,
    sentiment_max: int = 2,
):
    aiplatform.init(project=project, location=location)

    job = aiplatform.AutoMLTextTrainingJob(
        display_name=display_name,
        prediction_type="sentiment",
        sentiment_max=sentiment_max,
    )

    text_dataset = aiplatform.TextDataset(dataset_id)

    model = job.run(
        dataset=text_dataset,
        model_display_name=model_display_name,
        training_filter_split="labels.aiplatform.googleapis.com/ml_use=training",
        validation_filter_split="labels.aiplatform.googleapis.com/ml_use=validation",
        test_filter_split="labels.aiplatform.googleapis.com/ml_use=test",
    )

    model.wait()

    print(model.display_name)
    print(model.resource_name)
    print(model.uri)
    return model

def get_model_evaluation_text_sentiment_analysis_sample(
    project: str,
    model_id: str,
    evaluation_id: str,
    location: str,
    api_endpoint: str = f"{location}-aiplatform.googleapis.com",
):
    """
    To obtain evaluation_id run the following commands where LOCATION
    is the region where the model is stored, PROJECT is the project ID,
    and MODEL_ID is the ID of your model.

    model_client = aiplatform.gapic.ModelServiceClient(
        client_options={
            'api_endpoint':'LOCATION-aiplatform.googleapis.com'
            }
        )
    evaluations = model_client.list_model_evaluations(parent='projects/PROJECT/locations/LOCATION/models/MODEL_ID')
    print("evaluations:", evaluations)
    """
    # The AI Platform services require regional API endpoints.
    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    # This client only needs to be created once, and can be reused for multiple requests.
    client = aiplatform.gapic.ModelServiceClient(client_options=client_options)
    name = client.model_evaluation_path(
        project=project, location=location, model=model_id, evaluation=evaluation_id
    )
    response = client.get_model_evaluation(name=name)
    print("response:", response)

def get_model_evaluation_slice_sample(
    project: str,
    model_id: str,
    evaluation_id: str,
    slice_id: str,
    location: str,
    api_endpoint: str = f"{location}-aiplatform.googleapis.com",
):
    """
    To obtain evaluation_id run the following commands where LOCATION
    is the region where the model is stored, PROJECT is the project ID,
    and MODEL_ID is the ID of your model.

    model_client = aiplatform.gapic.ModelServiceClient(
        client_options={
            'api_endpoint':'LOCATION-aiplatform.googleapis.com'
            }
        )
    evaluations = model_client.list_model_evaluations(parent='projects/PROJECT/locations/LOCATION/models/MODEL_ID')
    print("evaluations:", evaluations)
    """
    # The AI Platform services require regional API endpoints.
    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    # This client only needs to be created once, and can be reused for multiple requests.
    client = aiplatform.gapic.ModelServiceClient(client_options=client_options)
    name = client.model_evaluation_slice_path(
        project=project,
        location=location,
        model=model_id,
        evaluation=evaluation_id,
        slice=slice_id,
    )
    response = client.get_model_evaluation_slice(name=name)
    print("response:", response)
    
def convert_cm_to_percentage(cm):
    #Converts confusion matrix into label accurracy percentages
    confusion_percentage_accurracies = []
    for i in cm:
        for j in i:
            confusion_percentage_accurracies.append(j/int(sum(i)))


    cm_size = int(math.sqrt(len(confusion_percentage_accurracies)))
    length_to_split = [cm_size] * cm_size
    iter_item = iter(confusion_percentage_accurracies)
    confusion_percentage_array = [list(islice(iter_item, elem))
            for elem in length_to_split]
    
    return confusion_percentage_array
    
def load_dataframe_to_bigquery(df, table_id):
    
    client = bigquery.Client()
    #here we're saving a table for each model which we can union together later in sql if necessary
    job_config = bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE") 

    job = client.load_table_from_dataframe(df, table_id, job_config=job_config)  # Make an API request.
    job.result()  # Wait for the job to complete.

    table = client.get_table(table_id)  # Make an API request.
    print(f'Loaded {table.num_rows} rows and {len(table.schema)} columns to {table_id}')
    
    
def create_batch_prediction_job_sample(
    project: str,
    location: str,
    model_resource_name: str,
    job_display_name: str,
    gcs_source: str,
    gcs_destination: str,
    sync: bool = True,
):
    aiplatform.init(project=project, location=location)

    my_model = aiplatform.Model(model_resource_name)

    batch_prediction_job = my_model.batch_predict(
        job_display_name=job_display_name,
        gcs_source=gcs_source,
        gcs_destination_prefix=gcs_destination,
        sync=sync,
    )

    batch_prediction_job.wait()

    print(batch_prediction_job.display_name)
    print(batch_prediction_job.resource_name)
    print(batch_prediction_job.state)
    return batch_prediction_job

def write_string_to_gcs_txt(string ,file_name, bucket_name):
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(file_name)
    blob.upload_from_string(string)
    
def upload_blob(source_file_name, destination_blob_name, bucket_name):
  """Uploads a file to the bucket."""
  storage_client = storage.Client()
  bucket = storage_client.get_bucket(bucket_name)
  blob = bucket.blob(destination_blob_name)

  blob.upload_from_filename(source_file_name)

  print('File {} uploaded to {}.'.format(
      source_file_name,
      destination_blob_name))
    
def list_blobs(bucket_name , prefix = None):
    """Lists all the blobs in the bucket."""
    # bucket_name = "your-bucket-name"

    storage_client = storage.Client()

    # Note: Client.list_blobs requires at least package version 1.17.0.
    blobs = storage_client.list_blobs(bucket_name, prefix = prefix)

    # Note: The call returns a response only when the iterator is consumed.
    for blob in blobs:
        print(blob.name)

## Create Vertex AI TextDataset

In [None]:
#Creates a TextDataset in VertexAI from a predefined csv file stored in gcs 
if create_dataset == True:
    create_and_import_text_dataset_from_bigquery(filename,gcp_project,location,gcs_source)

## Train Vertex AI Sentiment Analysis Model

In [None]:
#Starts an AutoMLTextTrainingJob in VertexAI from a predefined TextDataset
if create_model == True:
    model = create_training_pipeline_text_sentiment_analysis_sample(
        project= gcp_project,
        display_name = 'dataset'+ model_display_name,
        dataset_id = dataset_id,
        location= location,
        model_display_name = model_display_name,
        sentiment_max = 2
    )

## Model Evaluation

In [None]:
#Fetches model evaluations from a trained AutoMLTextModel given a model_parent
model_client = aiplatform.gapic.ModelServiceClient(client_options={'api_endpoint': f'{location}-aiplatform.googleapis.com'})
list_eval = model_client.list_model_evaluations(parent=model_parent)

for evaluation in list_eval:
    eval_name = evaluation.name

overall_eval = model_client.get_model_evaluation(name=eval_name)

In [None]:
#Remove hash to show model evaluations json output
#overall_eval

In [None]:
#Reads metrics from model evaluations json output 
f1Score = overall_eval.metrics['f1Score']
linearKappa = overall_eval.metrics['linearKappa']
meanSquaredError = overall_eval.metrics['meanSquaredError']
meanAbsoluteError = overall_eval.metrics['meanAbsoluteError']
precision = overall_eval.metrics['precision']
quadraticKappa = overall_eval.metrics['quadraticKappa']
recall = overall_eval.metrics['recall']

In [None]:
#Investigating the confusion matrix output

#Here we can see the proto.marshal....MapComposite objects are iterable
for i in overall_eval.metrics['confusionMatrix']:
    print(overall_eval.metrics['confusionMatrix'][i])

In [None]:
confusion_percentage_array = convert_cm_to_percentage(overall_eval.metrics['confusionMatrix']['rows'])

In [None]:
#Creates DataFrame of the confusion matrix
confusion_matrix_df = pd.DataFrame(overall_eval.metrics['confusionMatrix']['rows'], columns = ['Negative_pred', 'Neutral_pred', 'Positive_pred'])

In [None]:
#Creates DataFrame of the confusion matrix
confusion_matrix_percentages_df = pd.DataFrame(confusion_percentage_array,columns = ['Negative_pred', 'Neutral_pred', 'Positive_pred'])

In [None]:
confusion_matrix_percentages_df.insert(loc=0, column='model_id', value=model_id)
confusion_matrix_percentages_df.insert(loc=1, column='Sentiment_actuals', value=['Negative', 'Neutral', 'Positive'])

confusion_matrix_df.insert(loc=0, column='model_id', value=model_id)
confusion_matrix_df.insert(loc=1, column='Sentiment_actuals', value=['Negative', 'Neutral', 'Positive'])

In [None]:
#Example confusion matrix output
confusion_matrix_percentages_df

In [None]:
#Example confusion matrix output
confusion_matrix_df

In [None]:
#Investigating gsc_source csv
csv_data_df = pd.read_csv(gcs_source, header=None)
csv_data_df.columns = ['Test_Train_Validation','Text','Sentiment','max_sentiment']

In [None]:
#Calculates sentiment_representation from csv
sentiment_representation = list(csv_data_df.groupby(['Sentiment']).size())
negative_representation = sentiment_representation[0]
neutral_representation = sentiment_representation[1]
positive_representation = sentiment_representation[2]

In [None]:
#Calculates test/train split from csv
no_test_items = list(csv_data_df.groupby(['Test_Train_Validation']).size())[0]
no_train_items = list(csv_data_df.groupby(['Test_Train_Validation']).size())[1]
no_val_items = list(csv_data_df.groupby(['Test_Train_Validation']).size())[2]

In [None]:
#Creates DataFrame from evaluation metrics
evaluation_metrics = [[model_id, no_test_items, no_train_items, no_val_items, f1Score, linearKappa, meanSquaredError, meanAbsoluteError, precision, quadraticKappa, recall]]
evaluation_metric_cols = ['model_id','no_test_items', 'no_train_items', 'no_val_items', 'f1Score', 'linearKappa', 'meanSquaredError', 'meanAbsoluteError', 'precision', 'quadraticKappa', 'recall']
evaluation_metrics_df = pd.DataFrame(evaluation_metrics, columns = evaluation_metric_cols)
evaluation_metrics_df

In [None]:
#Loads DataFrames to BQ
if load_evals_to_bq == True:
    load_dataframe_to_bigquery(evaluation_metrics_df, table_id=f'{gcp_project}.{bq_dataset}.model_evaluation{model_id}')

In [None]:
#Note, csv data is saved as this variable, see documentation https://cloud.google.com/vertex-ai/docs/text-data/sentiment-analysis/prepare-data for data prep input
csv_data_df.head(5)

In [None]:
#Loads DataFrames to BQ
if load_gcs_source_to_bq == True:
    load_dataframe_to_bigquery(csv_data_df, table_id=f'{gcp_project}.{bq_dataset}.model_input_csv{model_id}')

In [None]:
#Loads DataFrames to BQ
if load_confusion_matrix_to_bq == True:
    load_dataframe_to_bigquery(confusion_matrix_df, table_id=f'{gcp_project}.{bq_dataset}.confusion_matrix{model_id}')

In [None]:
#Loads DataFrames to BQ
if load_confusion_matrix_to_bq == True:
    load_dataframe_to_bigquery(confusion_matrix_percentages_df, table_id=f'{gcp_project}.{bq_dataset}.confusion_matrix_percentage{model_id}')

In [None]:
%%capture
#additional code bits not yet used
'''
Corona_NLP_test = pd.read_csv('gs://mlai-nlp/Corona_NLP_test.csv', encoding='iso-8859-1')
Corona_NLP_train = pd.read_csv('gs://mlai-nlp/Corona_NLP_train.csv', encoding='iso-8859-1')

#Calculates number of test items
no_test_items = int(confusion_matrix_df.values.sum())
print('no_test_items:', no_test_items)

get_model_evaluation_text_sentiment_analysis_sample(
    project = gcp_project,
    model_id = model_id,
    evaluation_id = '4292050566529417216',
    location = location,
    api_endpoint = f"{location}-aiplatform.googleapis.com",
)


get_model_evaluation_slice_sample(
    project = gcp_project,
    model_id = model_id,
    evaluation_id = '4292050566529417216',
    location = location,
    api_endpoint = f"{location}-aiplatform.googleapis.com",
    slice_id = '',
)

'''


## Batch Prediction


# Steps

* Create txt files for prediction, saved to EUW4 bucket location
* Create input JSONL file containing txt file locations
* Perform batch prediction
* Read JSONL results from GCS


In [None]:
#Creates inference Dataframe from csv
inference_dataset = pd.read_csv(inference_dataset_source, header=None)

In [None]:
# Writes elelments of inference_dataset to bucket as individual TXT files. JSON list then appends the individaul filenames to a list to later create a JSON input file
create_inference_txt = True

json_list = []

if create_inference_txt == True:
    for i in range(len(inference_dataset)):
        write_string_to_gcs_txt(inference_dataset.iloc[i][0], f'inference-files/{todays_date}/{i}.txt',inference_bucket)
        json_list.append({'content': f'gs://nlp-batch-prediction-test/inference-files/{todays_date}/{i}.txt', 'mimeType': 'text/plain'})

In [None]:
#Creates a JSON input file
json_input_filename = 'output.jsonl'

with open(json_input_filename, 'w') as outfile:
    for entry in json_list: 
        json.dump(entry, outfile)
        outfile.write('\n')

In [None]:
#Uploads a JSON input file to GCS
upload_blob(json_input_filename, f'cleaned_data/{json_input_filename}', source_bucket)

In [None]:
batch_input = f'gs://mlai-nlp/cleaned_data/{json_input_filename}'

# Perform batch prediction
if preform_batch_prediction == True:
    create_batch_prediction_job_sample(
        project = gcp_project,
        location = location,
        model_resource_name = model_parent,
        job_display_name = 'test_predict',
        gcs_source = batch_input,
        gcs_destination = 'gs://mlai-nlp/cleaned_data/batch_prediction/results',
        sync = False)

In [None]:
#Searches bucket of prediction results, appends blobs to blob_list
blobs = storage.Client().list_blobs(source_bucket, prefix = 'cleaned_data/batch_prediction/results/')

blob_list = []
for i in blobs:
    blob_list.append(i.name)

In [None]:
#Finds the latest batch_prediction
blob_list[-1]

In [None]:
#Finds the latest batch_prediction
storage_client = storage.Client()
bucket = storage_client.bucket(source_bucket)
blob = bucket.blob(blob_list[-1])

In [None]:
#Reads the batch_prediction
with blob.open("r") as file:
    batch_prediction_output = file.read()
    print(batch_prediction_output)

In [None]:
#Iterates through batch prediction to append the text index and predictions 
batch_prediction_output_list = batch_prediction_output.split('\n')
txt_index = []
prediction = []
for i in range(len(batch_prediction_output_list)-1):
    txt_index.append(batch_prediction_output_list[i].split('.txt')[0][-1])
    prediction.append(batch_prediction_output_list[i].split(":")[-1][0])
    

In [None]:
#Creates batch prediction dataframe
batch_predict_df = pd.DataFrame({'txt_index': txt_index , 'prediction' : prediction})

In [None]:
#Merges batch prediction dataframe to original inference dataset based off the indexing. This creates a final batch prediciton dataframe consisting of the original text and the predictionss
batch_predict_df = batch_predict_df.sort_values('txt_index')
batch_predict_df = batch_predict_df.reset_index()
inference_dataset = inference_dataset.rename(columns={0: "Text"})['Text']
batch_predict_final_df = pd.concat([inference_dataset,batch_predict_df['prediction']], axis=1)

In [None]:
batch_predict_final_df = pd.concat([inference_dataset,batch_predict_df['prediction']], axis=1)

In [None]:
batch_predict_final_df

In [None]:
#Loads DataFrames to BQ
if load_batch_prediction_to_bq == True:
    load_dataframe_to_bigquery(batch_predict_final_df, table_id=f'{gcp_project}.{bq_dataset}.batch_prediction')