<a href="https://colab.research.google.com/github/kedar5/Amazon-Customer-Reviews-Clustering/blob/main/Sentiment_analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>



#**Sentiment Analysis**

**Author: Kedar Deshpande**


###Sentiment Analysis for Marketing Analytics

Sentiment Analysis for marketing analytics helps inspect the given text and identifies the prevailing 
emotional opinion within the text, by analyzing online comments and engagements and detremines the prevailing emotion as positive, negative or neutral.
This helps marketeers and analysts understand customer sentiment at scale through data sources like youtube comments or product reviews.


## Before you Begin
###Set up your Google Cloud project

**The following steps are required, regardless of your notebook environment.**

1. [Select or create a Google Cloud project](https://console.cloud.google.com/cloud-resource-manager).

1. [Make sure that billing is enabled for your project](https://cloud.google.com/billing/docs/how-to/modify-project).

1. You'll be using your google account credentials to authenticate yourself to GCP, so ensure your google account has atleast the following roles in the GCP Project created (https://cloud.google.com/iam/docs/understanding-roles) :
  - ***roles/editor***
  - ***roles/iam.securityAdmin***

  



###Install Pre-requisites
Note: After this step your runtime will be restarted to apply the new packages

In [None]:

# Install a pip package in the current kernel and create directory
print("Installing libraries") 
!pip install --user --quiet google-cloud-language validators 
!pip install --user --quiet google-cloud-pipeline-components==0.1.7 kfp==1.8.2 
!pip install --quiet google-cloud-aiplatform==1.4.3


In [None]:
import IPython
# Automatically restart kernel after installs

app = IPython.Application.instance()
app.kernel.do_shutdown(True)

### Notebook Imports and Authenticating Google account
You will be prompted to give colab access to your google account and drive


In order to run this program you would need to give colab access to your google account id to authenticate you as a user in your gcp project, and google drive to temporarily store data.

In [None]:
import os
import sys
from google.cloud import aiplatform as vertex
from google_cloud_pipeline_components import \
    aiplatform as vertex_pipeline_components
import kfp.dsl as dsl
from kfp.v2 import compiler
from kfp.v2.dsl import (
    component as comp,
    Input,
    Output,
    Dataset,
    Metrics,
)
from google.colab import auth as google_auth

google_auth.authenticate_user()




### Set Environment Variables
Instructions:

 

1. **PROJECT_ID**
  - Enter the Project ID of the GCP Project created.

2. **REGION**
 - Enter region to store the analysis results for GCS and Big Query.
 - For supported regions refer: https://cloud.google.com/bigquery/docs/locations#regions

3. **BUCKET_ID**
  - Enter the name of the bucket to be created.
  - For naming convention, refer: https://cloud.google.com/storage/docs/naming-buckets

4. **FILENAME**
 - Enter the filename to be used for the output in GCS Bucket and Big Query

5. **DATATYPE**
 - Select Data type as 'Reviews' or 'Youtube' depending on the type of data you want to analyze.


6. **DATASET_ID**
 - Enter name of Big Query Dataset to store the results.
 - For naming convention, refer: https://cloud.google.com/bigquery/docs/datasets#dataset-naming

7. **TABLE_NAME**
 - Enter name of Big Query Table to store the results.
 - For naming convention, refer: https://cloud.google.com/bigquery/docs/tables#table_naming

8. **YOUTUBE_DEVELOPER_KEY**
  - To analyze ***YouTube*** Video Comment data, enable the Youtube API in the GCP Project created and create the YOUTUBE_DEVELOPER_KEY.
    - To generate a YOUTUBE_DEVELOPER_KEY for Youtube API, refer the steps for API Keys : https://developers.google.com/youtube/registering_an_application

  - To analyze ***Product Review*** data, leave the YOUTUBE_DEVLOPER_KEY blank.



In [15]:
# GCP Project ID of the project created
PROJECT_ID = "sentiment-analysis-dev" # @param {type:"string"}

# Region to store data in GCS Bucket and Big Query. 
# For supported regions refer: https://cloud.google.com/bigquery/docs/locations#regions
REGION = "us-central1"  # @param {type:"string"}

# Name of the bucket to be created (https://cloud.google.com/storage/docs/naming-buckets)
BUCKET_ID = "sentiment-analysis-data" # @param {type:"string"}

# Filename to be used for uploading to GCS and Big Query 
FILENAME = "Product.csv" # @param {type:"string"}

# DataType Reviews or Youtube
DATATYPE = "Reviews" #@param ["Reviews", "Youtube"]

# Dataset ID to be created for Big Query (https://cloud.google.com/bigquery/docs/datasets#dataset-naming)
DATASET_ID = "SentimentAnalysis"  # @param {type:"string"}

# Table name to be created for Big Query (https://cloud.google.com/bigquery/docs/tables#table_naming) 
TABLE_NAME = "Product_table"  # @param {type:"string"}

#Developer Key for the Youtube API
YOUTUBE_DEVELOPER_KEY = ""  # @param {type:"string"}

PIPELINE_JSON_PKG_PATH = "sentiment_analysis.json"
PIPELINE_ROOT = f"gs://{BUCKET_ID}/pipeline_root"

###Enable APIs

In [None]:
!gcloud config set project $PROJECT_ID
!gcloud config set ai/region $REGION
!gcloud services enable aiplatform.googleapis.com
!gcloud services enable compute.googleapis.com
!gcloud services enable language.googleapis.com
!gcloud services enable youtube.googleapis.com

###Create Bucket

In [None]:
from google.cloud import storage

storage_client = storage.Client(project=PROJECT_ID)
if not storage.Bucket(storage_client, BUCKET_ID).exists():
  bucket = storage_client.create_bucket(BUCKET_ID,location = REGION)
  print("Bucket {} created".format(bucket.name))
else:
  print("Bucket {} exists".format(BUCKET_ID))

###Create Big Query Dataset

In [None]:
from google.cloud.exceptions import NotFound
from google.cloud import bigquery

# Construct a BigQuery client object.
bq_client = bigquery.Client(project=PROJECT_ID)
dataset = bigquery.Dataset(PROJECT_ID + "."+ DATASET_ID)
dataset.location = REGION

try:
    bq_client.get_dataset(PROJECT_ID + "."+ DATASET_ID)  # Make an API request.
    print("Dataset {} already exists".format(DATASET_ID))
except NotFound:
    print("Dataset {} does not exist".format(DATASET_ID))
    # Send the dataset to the API for creation, with an explicit timeout.
    dataset = bq_client.create_dataset(dataset, timeout=90)  # Make an API request.
    print("Created dataset {}.{}".format(bq_client.project, dataset.dataset_id))


### Update service-account IAM policy

In [None]:
# Update Necessary permissions on service account to be used by Kubeflow
# Get Project number
project_number = !gcloud projects describe $PROJECT_ID --format="value(projectNumber)"
# Define Compute Service Account
ser_acc = "serviceAccount:"+project_number[0]+"-compute@developer.gserviceaccount.com"
!gcloud projects add-iam-policy-binding  $PROJECT_ID --member=$ser_acc --role='roles/storage.objectAdmin'

###Input Youtube URLs/ Input Product Review Dataset
Instructions:

- *If* analyzing **Youtube** data, input comma separated video URLs in the youtube_video_urls parameter. And a comment_limit

- If analyzing, **Product Review** data, a prompt will appear in colab below for you to upload a file in CSV format. (Note: Only schema requirement is presence of a 'Review' column)

In [None]:
# If Datatype youtube call the yt_main() function
import validators
from google.colab import files

def upload_blob(bucket_name, source_file_name, destination_blob_name):
            """Uploads blob to GCS bucket

            Uploads blob to a GCS bucket, given source and destination

            Args :
            bucket_name :
                Name of GCS bucket to upload blob to.
            source_file_name :
                souce file to be uploaded.
            destination_blob_name :
                destination to upload to blob
            """
            try:
                bucket = storage_client.bucket(bucket_name)
                blob = bucket.blob(destination_blob_name)
                blob.upload_from_filename(source_file_name)
                
            except Exception as error:
                print("Exception is ",error) 
            print(
                f"\n File {source_file_name} uploaded to gs://{bucket_name}/{destination_blob_name}."
        )

comment_limit = 3000 # @param{type:"integer"}
youtube_video_urls = "https://www.youtube.com/watch?v=VhozpHTzo14,https://www.youtube.com/watch?v=eUx75Pl0THU"  # @param {type:"string"}
# youtube_video_urls = ['https://www.youtube.com/watch?v=VhozpHTzo14','https://www.youtube.com/watch?v=WJJ4ORIGFXA',
    #         'https://www.youtube.com/watch?v=eUx75Pl0THU','https://www.youtube.com/watch?v=cuBAZc7loSY','https://www.youtube.com/watch?v=R1hC5vnM4EA']
if DATATYPE == 'Youtube':
    uploaded_file = FILENAME
    try:
        urls = list(youtube_video_urls.split(","))
        for i in urls:
            valid = validators.url(i)
            if valid == True:
                print("Url is valid")
                if "www.youtube.com" in i.split("https://")[1]:
                    print("Valid youtube url")
                else:
                    raise ValueError("Url must belong to youtube")
            else:
                raise ValueError("Invalid URL")      
    except Exception as error:
        print("Error : ", error)
# Else take input file containing Product review data
else:
    try:
        print("Upload Product review Dataset in csv format")
        uploaded = files.upload()
        uploaded_file = (list(uploaded.keys())[0]) 
        upload_blob(BUCKET_ID,'/content/'+uploaded_file,uploaded_file)
    except Exception as error:
        print("Exception is ",error)

## Pipeline Components
This section consists of the basic pipeline components being used

###(1) create_yt_dataset

This component creates the comment dataset from youtube video urls provided.

In [21]:
@comp(base_image = "python:3.9",packages_to_install=["google-cloud-storage","pandas","google-api-python-client","oauth2client==1.5.2"])
def create_yt_dataset(
project_ID : str,
developer_key : str,
urls : str,
comment_limit : int,
bucket_ID : str,
file_name : str
):
    #Code to test youtube API, retrieving comments by video ID

    import os
    import json
    import glob
    import shutil
    import pandas as pd
    from csv import writer
    from google.cloud import storage
    import googleapiclient.discovery
    from urllib.parse import urlparse, parse_qs
    


    def gather_comments(youtube_client,vid_id):
        """Fetches comments from Youtube videos

        Retrieves comment data for a given youtube video id

        Args:
        youtube_client :
            Instance of the youtube client.
        vid_id :
            Video ID for the given youtube video.

        Returns :
        A dict mapping keys to the corresponding table row data
        fetched. For example:

        {'Comment' : 'I like this product', 
        'Author' : 'John Doe', 
        'Comment ID' : 'NDHH24JAH', 
        'Like Count' : '24', 
        'Video ID' : 'NSJAUJW232'
        }
        """
        # put comments extracted in specific lists for each column
        comments, commentsId, likes, authors, videoId = [], [], [], [], []
        try:
            response = youtube_client.commentThreads().list(
                    part = "snippet,replies",
                    maxResults = 100,
                    videoId = vid_id,
                    textFormat = "plainText").execute()
            page = 0
            index = 0
            while len(comments)<comment_limit:
                page += 1
                # for every comment in the response received
                for item in response['items']:
                    index += 1
                    comment = item['snippet']['topLevelComment']
                    author = comment['snippet']['authorDisplayName']
                    text = comment['snippet']['textDisplay']
                    comment_id = item['snippet']['topLevelComment']['id']
                    like_count = item['snippet']['topLevelComment']['snippet']['likeCount']
                    # append the comment to the lists
                    comments.append(text)
                    authors.append(author)
                    commentsId.append(comment_id)
                    likes.append(like_count)
                    videoId.append(vid_id)
                    if item['snippet']['totalReplyCount'] >0:
                        if 'replies' in item:
                            # Check if replies actually prersent or deleted
                            for reply in item['replies']['comments']:
                                index += 1         
                                # Extract reply
                                text = reply['snippet']['textDisplay']
                                author = reply['snippet']['authorDisplayName']
                                comment_id = reply['id']
                                like_count = reply['snippet']['likeCount']
                                # append the comment to the lists
                                comments.append(text)
                                authors.append(author)
                                commentsId.append(comment_id)
                                likes.append(like_count)
                                videoId.append(vid_id)
                        # Call function to populate comment replies under thread
                # get next page of comments
                if 'nextPageToken' in response:
                    response = youtube_client.commentThreads().list(
                        part = "snippet,replies",
                        maxResults = 100,
                        videoId = vid_id,
                        textFormat = "plainText",
                        pageToken = response['nextPageToken']
                    ).execute()
                else:
                    break

            # return the comment data
            print("Number of comments : ", index)
            return dict({'Comment' : comments, 'Author' : authors, 'Comment ID' : commentsId, 'Like Count' : likes, 'Video ID' : videoId})
        except Exception as error:
            print("Exception in gather_comments() is ",error) 

    def get_video_title(youtube_client,vid_id):
        """Fetches Video title from Youtube videos

        Retrieves video title for a given youtube video id

        Args :
        youtube_client :
            Instance of the youtube client.
        vid_id :
            Video ID for the given youtube video.

        Return :
        A string containing the video title
        """
        try:
            response_title = youtube_client.videos().list(
                part = 'snippet',
                id = vid_id
            ).execute()
            # get the video title
            video_title = response_title['items'][0]['snippet']['title']
            return video_title
        except Exception as error:
            print("Exception in get_video_title() is ",error) 

    def get_youtube():
        """Fetches Youtube client
        Returns :
        Instance of Youtube client
        """
        # Currently disabled autholibs https verification 
        os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
        api_service_name = "youtube"
        api_version = "v3"
        # TODO: Take Developer_Key as input.
        try: 
            youtube_client = googleapiclient.discovery.build(
                api_service_name, api_version, developerKey = developer_key)
            return youtube_client
        except Exception as error:
            print("Exception in get_youtube() is ", error)


    def get_vid_id(url):
        """Fetches Video Id from Youtube urls

        Retrieves video Id for a given youtube video url
        Args :
          url : url of the youtube video.
        
        Returns : Video id string
        """
        # Returns the vid_id from the url eg: https://www.youtube.com/watch?v=WJJ4ORIGFXA the vid_id = WJJ4ORIGFXA
        try:
            parse_url = urlparse(url)
            # Check if embed video
            if 'embed' in parse_url.path:
                term ='embed'
            else:
                term ='v'
            vid_id_query = parse_qs(parse_url.query).get(term)
            if vid_id_query:
                return vid_id_query[0]
            if parse_url.path.split('/'):
                return parse_url.path.split('/')[-1]
        except ValueError:
            print("Invalid URL!")


    def save_as_csv(output_dict, filename):
        # save dictionary as csv
        output_df = pd.DataFrame(output_dict, columns = output_dict.keys())
        output_df.to_csv(f'/content/{filename}.csv')


    def upload_blob(bucket_name, source_file_name, destination_blob_name):
        """Uploads blob to GCS bucket

        Uploads blob to a GCS bucket, given source and destination

        Args :
        bucket_name :
            Name of GCS bucket to upload blob to.
        source_file_name :
            souce file to be uploaded.
        destination_blob_name :
            destination to upload to blob
        """
        try:
            storage_client = storage.Client(project=project_ID)
            bucket = storage_client.bucket(bucket_name)
            blob = bucket.blob(destination_blob_name)
            blob.upload_from_filename(source_file_name)
            print(
                f"\n File {source_file_name} uploaded to gs://{bucket_name}/{destination_blob_name}."
            )
        except Exception as error:
            print("Exception in upload_blob() is ",error) 


    def csv_combiner():
        """Combines CSV files generated for each video

        Combines the csv files in case of multiple youtube urls entered
        """
        #import csv files from folder
        try:
            path = r'/content/'
            allFiles = glob.glob(path + "/*.csv")
            allFiles.sort()  # orderinig files

            with open('/content/final'+file_name, 'wb') as outfile:
                for i, fname in enumerate(allFiles):
                    with open(fname, 'rb') as infile:
                        if i != 0:
                            infile.readline()  # Throw away header on all but first file
                        # Block copy rest of file from input to output without parsing
                        shutil.copyfileobj(infile, outfile)
                        print(fname + " has been imported.")
        except Exception as error:
            print("Exception is", error)

    def yt_main(urls):
        """Main function for when data_type is Youtube

        Collates comment data for multiple Urls using Youtube API and creates a single csv file
        """
        os.mkdir('/content')
        urls = list(urls.split(","))
        # TODO: Take urls as user input.
        for url in urls:
            vid_id = get_vid_id(url) # get Video ID
            youtube_client = get_youtube()  
            title = get_video_title(youtube_client, vid_id)

            print("Video : " ,title)
            rawList = gather_comments(youtube_client,vid_id)
            if rawList != None:
                # Save the csv file
                save_as_csv(rawList,title)
                upload_blob(bucket_ID,'/youtube/'+title,'youtube/'+title)

        csv_combiner()
        upload_blob(bucket_ID,'/content/final'+file_name,file_name)
        uploaded_file = file_name

    yt_main(urls)

### (2)sentiment_analysis
This component identifies the positive and negative entities, calculates the word count, and sentiment score for the given input (reviews or comments)

In [22]:

@comp(base_image="python:3.9", packages_to_install=["nltk"," google-cloud-language","pandas","fsspec","gcsfs","google-cloud-bigquery","google-cloud-storage"] )
def sentiment_analysis(
    data_type : str,
    file_name : str,
    uploaded_file : str,
    bucket_ID : str,
    project_ID : str,
    location : str,
    dataset_ID : str,
    table_name : str
):
    import os
    import re
    import nltk
    import csv
    import string
    import pandas as pd
    import numpy as np
    from pathlib import Path  
    from nltk.tokenize import sent_tokenize
    from nltk.stem import WordNetLemmatizer 
    from nltk.corpus import stopwords # Import the stop word list
    from nltk.tokenize import wordpunct_tokenize
    from collections import Counter
    from google.cloud.exceptions import NotFound
    from google.cloud import storage
    from google.cloud import bigquery
    from operator import contains
    from google.cloud import language_v1
    # downloading nltk packages
    nltk.download('punkt')
    nltk.download('wordnet')
    nltk.download('stopwords')
    nltk.download('omw-1.4')


    def analyze_comment_sentiment(postTexts):
        """Analyzes sentiment score for given sentences

        Retrieves Sentiment score and magnitude for given text input using
        Natural Language API's analyze_sentiment function

        Args :
        postTexts :
            Sentences to be analyzed.
        
        Returns :
        Pandas series containing sentiment score and magnitude
        """
        type_=language_v1.types.Document.Type.PLAIN_TEXT
        language = "en"
        document = {"content": postTexts, "type_": type_, "language": language}
        encodingType = language_v1.EncodingType.UTF8 # try with UTF32 as well
        Sentiment = ''
        positive_pool = []
        negative_pool = []
        all_entities = []
        positive_entities = []
        negative_entities = []
        client = language_v1.LanguageServiceClient()

        sentiment_response = client.analyze_sentiment(request = {'document': document, 'encoding_type': encodingType})

        # Setting overall Sentiment from sentiment score and mangnitude
        if sentiment_response.document_sentiment.score >= 0.1 and sentiment_response.document_sentiment.magnitude > 0.1: 
            Sentiment = 'Positive'
        elif sentiment_response.document_sentiment.score < -0.1 and sentiment_response.document_sentiment.magnitude > 0.1:
            Sentiment = 'Negative'
        else :
            Sentiment = 'Neutral' 
        # At a sentence level we find postive and negative sentences and pool them
        for sentence in sentiment_response.sentences:
            if sentence.sentiment.score >= 0.1 and sentence.sentiment.magnitude > 0.1:          
                positive_pool.append(sentence.text.content)
            elif sentence.sentiment.score < -0.1 and sentence.sentiment.magnitude > 0.1:
                negative_pool.append(sentence.text.content)

        all_entities_response = client.analyze_entities(request = {'document': document, 'encoding_type': encodingType})

        positive_doc = {"content": ''.join(positive_pool), "type_": type_, "language": language}
        negative_doc = {"content": ''.join(negative_pool), "type_": type_, "language": language}

        positive_entities_response = client.analyze_entities(request ={'document': positive_doc, 'encoding_type': encodingType})
        negative_entities_response = client.analyze_entities(request ={'document': negative_doc, 'encoding_type': encodingType})

        for entity in all_entities_response.entities:
            all_entities.append(entity.name)
        for entity in positive_entities_response.entities:
            positive_entities.append(entity.name)
        for entity in negative_entities_response.entities:
            negative_entities.append(entity.name)
        return pd.Series([sentiment_response.document_sentiment.score,sentiment_response.document_sentiment.magnitude,Sentiment,
                        positive_pool,negative_pool,all_entities,positive_entities,negative_entities])
        


    def upload_blob(bucket_name, source_file_name, destination_blob_name):
        """Uploads blob to GCS bucket

        Uploads blob to a GCS bucket, given source and destination

        Args:
        bucket_name :
            Name of GCS bucket to upload blob to.
        source_file_name :
            souce file to be uploaded.
        destination_blob_name :
            destination to upload to blob
        """
        try:
            storage_client = storage.Client(project=project_ID)
            bucket = storage_client.bucket(bucket_name)
            blob = bucket.blob(destination_blob_name)
            blob.upload_from_filename(source_file_name)
        except Exception as error:
            print("Exception is ",error) 
        print(
            f"\n File {source_file_name} uploaded to gs://{bucket_name}/{destination_blob_name}."
        )


    def analyze_video_level_sentiment(yt_data):
        """Analyzes overall sentiment score for the video as a whole.

        Retrieves Sentiment score and magnitude for video by collating all 
        video comments and analyzing the overall sentiment using
        Natural Language API's analyze_sentiment function. 
        The resulting csv file is uploaded to GCS
        Args :
        yt_data : Youtube dataset generated in previous component.
        """
        try: 
            out_file = 'YTvideolevel.csv'
            n_vid = yt_data['Video ID'].unique()
            comment_list_by_vid = []
            ytdict = dict.fromkeys(['Video ID','Comments'])
            for i in range(0,len(n_vid)):
                vid_data = yt_data.loc[yt_data['Video ID'] == n_vid[i]]
                list_of_comments = [x for x in vid_data['Comment'].values.tolist()]
                comment_list_by_vid.append(" ".join(list_of_comments))
            ytdict['Video ID'] = n_vid
            ytdict['Comments'] = comment_list_by_vid
            df = pd.DataFrame(ytdict)

            df[['vid_polarity','vid_magnitude','Sentiment','positive_pool','negative_pool','all_entities','positive_entities','negative_entities']]= df['Comments'].apply(lambda x: analyze_comment_sentiment(x))
            filepath = Path('/content/'+out_file)  
            df.to_csv(filepath)
            print(df.head(20))
            # upload final dataset to GCS bucket
            upload_blob(bucket_ID,filepath,out_file)
            bq_job(project_ID, dataset_ID, 'YT_video_level_sentiment', bucket_ID, out_file)
        except Exception as error:
            print("Exception is ",error)

    def bq_job(project_ID, dataset_ID, table_name, bucket_ID, file_name):
        """Creates Big Query Tables from GCS files.

        Creates a Big Query table from the csvs in Cloud Storage
        Args:
          project_ID : Project ID
          datasetID : BigQuery Dataset ID
          table_name : name of table to be created
          file_name : name of GCS source used to create table
        """
        try:
          
          table_id = project_ID+"."+dataset_ID+"."+table_name

          # Create Table Schema
          job_config = bigquery.LoadJobConfig(
              #skip_leading_rows=1,
              source_format = bigquery.SourceFormat.CSV,
              allow_quoted_newlines = True,
              autodetect = True,
          )
          # CSV File Location (Cloud Storage Bucket)
          uri = "gs://"+bucket_ID+"/"+file_name

          # Create the Job
          bq_client = bigquery.Client(project = project_ID, location =location)
          try:
            bq_client.get_table(table_id)  # check if table exists.
            print("Table {} already exists.".format(table_id))
            bq_client.delete_table(table_id, not_found_ok=True)  # Delete table if exists
            print("Deleted table '{}'.".format(table_id))
          except NotFound:
            print("Table {} is not found.".format(table_id))
          csv_load_job = bq_client.load_table_from_uri(
          uri, table_id, job_config=job_config
          )
          print("Created table '{}'.".format(table_id))
          csv_load_job.result()
        except Exception as error:
            print("Exception is ",error)

    #Loop through Comment dataframe
    def lemma(tokenized_words):
        """Generates lemmatized blocks of words from tokenized_words
        Args: tokenized_words: tokenized words given as input
        Returns: List of lemmatized words
        """
        lemmatizer = WordNetLemmatizer()
        return [lemmatizer.lemmatize(w) for w in tokenized_words]

    def freq_words(words,category):  
        """Determines the most frequent words

        Retrieves the word counts for most frequent words for each comment/review
        using the Natural language toolkit,
        The resulting csv file is uploaded to GCS
        Args: 
          words: Pool of positive or negative sentences
          category: 'Positive' or 'Negative' category
        """
        try:
            if data_type == 'Youtube':
                # new_words are additional stop words added to be removed from lemmatizer blocks
                new_words = (' ','I',"'s",'``',"n't",'...',"'re","'m","'ve","The","''","..","....",'wa','M','u',"’")
            elif data_type == 'Reviews':
                # new_words are additional stop words added to be removed from lemmatizer blocks
                new_words = (' ','I',"'s",'``',"n't",'...',"'re","'m","'ve","The","''","..","....",'wa','M','u',"’","It",'it',"This","'d")
            else :
                raise ValueError("invalid DataType : ", data_type) 

            tokenized_words = nltk.word_tokenize(words)
            word_list = []
            tokenized_words = lemma(tokenized_words)
            stop = stopwords.words('english')
            # Adding additional stop words to remove from the lemmatizer blocks
            for i in new_words:
                stop.append(i)
            # Creating word_list after removing punctuation and stop words 
            for i in tokenized_words :
              if i not in stop and i not in string.punctuation:
                word_list.append(i)

            freqs = nltk.FreqDist(word_list)
            final = pd.DataFrame(freqs.items(),columns = ('word', 'count'))

            filepath = Path('/content/wordcount'+category+file_name)
            final.to_csv(filepath) 
            # upload final dataset to GCS bucket
            upload_blob(bucket_ID,filepath,'wordcount'+category+file_name)
            bq_job(project_ID, dataset_ID, 'wordcount'+category, bucket_ID, 'wordcount'+category+file_name)
        except Exception as error:
            print("Exception is ", error)

    def sentiment_main():
        # main function that calculates sentiment, and uploads resulting csv to GCS
        os.mkdir('/content')
        #Check for data type
        if data_type == 'Reviews':
            data = pd.read_csv('gs://'+bucket_ID+'/'+uploaded_file)
            data[['polarity','magnitude','Sentiment','positive_pool','negative_pool','all_entities','positive_entities','negative_entities']] = data['Review'].apply(lambda x: analyze_comment_sentiment(x)) # Utilize the analyze_comment_sentiment function
            filepath = Path('/content/final'+file_name)
        elif data_type == 'Youtube':
            data = pd.read_csv('gs://'+bucket_ID+'/'+uploaded_file).iloc[: , 1:]
            analyze_video_level_sentiment(data)
            data[['polarity','magnitude','Sentiment','positive_pool','negative_pool','all_entities','positive_entities','negative_entities']] = data['Comment'].apply(lambda x: analyze_comment_sentiment(x)) # Utilize the analyze_comment_sentiment function
            filepath = Path('/content/final'+file_name)
        
        # Get list of all positive and negative sentences
        positive_sentences = data.positive_pool.sum()
        negative_sentences = data.negative_pool.sum()
        p_pool = ''.join(positive_sentences)
        n_pool = ''.join(negative_sentences)
        # Get positive and negative wordcounts
        freq_words(p_pool,'Positive')
        freq_words(n_pool,'Negative')

        all_entities = data.explode('all_entities')

        all_entities.to_csv(filepath) # save as csv
        # upload final dataset to GCS bucket
        upload_blob(bucket_ID,filepath,file_name)
        # Create big query table from gcs
        bq_job(project_ID, dataset_ID, table_name, bucket_ID, file_name)

    sentiment_main() 


### Execute The Pipeline

Now that the components are defined, we execute the sentiment_pipeline

In [23]:
# Define pipeline paramters
pipeline_params = {
    "data_type" : DATATYPE,
    "file_name" : FILENAME,
    "uploaded_file" : uploaded_file,
    "bucket_id" : BUCKET_ID,
    "developer_key" : YOUTUBE_DEVELOPER_KEY,
    "project_id" : PROJECT_ID,
    "location" : REGION,
    "dataset_id" : DATASET_ID,
    "table_name" : TABLE_NAME,
    "urls" : youtube_video_urls,
    "comment_limit" : int(comment_limit)
}

In [24]:
# Defining the sentiment pipeline

@dsl.pipeline(
	name = "sentimentanalysispipline",
	description = "A pipeline to perform Sentiment Analysis"
)
def sentiment_pipeline(
    data_type : str,
    file_name : str,
    uploaded_file : str,
    bucket_id : str,
    developer_key : str,
    project_id : str,
    location : str,
    dataset_id : str,
    table_name : str,
    urls : str,
    comment_limit : int
):
    with dsl.Condition(data_type == 'Reviews'):
        sentiment_task = sentiment_analysis(data_type,file_name, uploaded_file,bucket_id,project_id,location,dataset_id,table_name)
    with dsl.Condition(data_type == 'Youtube'):
       yt_task = create_yt_dataset(project_id,developer_key,urls,comment_limit,bucket_id,file_name)
       uploaded_file = file_name
       sentiment_task = sentiment_analysis(data_type,file_name, uploaded_file,bucket_id,project_id,location,dataset_id,table_name).after(yt_task)


In [25]:
# Compiling and runnning the pipeline
compiler.Compiler().compile(
    pipeline_func = sentiment_pipeline,
    package_path = PIPELINE_JSON_PKG_PATH,
)

vertex.init(project = PROJECT_ID, location = REGION)

pipeline_job = vertex.PipelineJob(
    template_path = PIPELINE_JSON_PKG_PATH,
    pipeline_root = PIPELINE_ROOT,
    parameter_values = pipeline_params,
    enable_caching = False,
    display_name = "sentimentanalysispipline"
)

response = pipeline_job.run()

print(response)

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/971809154392/locations/us-central1/pipelineJobs/sentimentanalysispipline-20220806001547
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/971809154392/locations/us-central1/pipelineJobs/sentimentanalysispipline-20220806001547')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/sentimentanalysispipline-20220806001547?project=971809154392
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/971809154392/locations/us-central1/pipelineJobs/sentimentanalysispipline-20220806001547 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/971809154392/loca