In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, ArrayType
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import StringIndexer

import google.auth
import google.auth.transport.requests
import requests

import time

In [None]:
# When using Dataproc Serverless, installed packages are automatically available on all nodes
!pip install --upgrade google-cloud-aiplatform google-cloud-vision
# When using a Dataproc cluster, you will need to install these packages during cluster creation: https://cloud.google.com/dataproc/docs/tutorials/python-configuration

#### Get credentials to authenticate with Google APIs


In [None]:
credentials, project_id = google.auth.default()
auth_req = google.auth.transport.requests.Request()
credentials.refresh(auth_req)

In [None]:
spark = SparkSession.builder \
    .appName("Sentimental Analysis using Dataproc and Vertex LLM") \
    .getOrCreate()

In [None]:
movie_reviews = spark.read.format("bigquery").option("table", "bigquery-public-data.imdb.reviews").load()

|                                                                                              review|split|   label| movie_id|reviewer_rating|                           movie_url|title|
|----------------------------------------------------------------------------------------------------|-----|--------|---------|---------------|------------------------------------|-----|
|I had to see this on the British Airways plane. It was terribly bad acting and a dumb story. Not ...| test|Negative|tt0158887|              2|http://www.imdb.com/title/tt0158887/| null|
|This is a family movie that was broadcast on my local ITV station at 1.00 am a couple of nights a...| test|Negative|tt0158887|              4|http://www.imdb.com/title/tt0158887/| null|
|I would like to comment on how the girls are chosen. why is that their are always more white wome...| test|Negative|tt0391576|              2|http://www.imdb.com/title/tt0391576/| null|
|Tyra & the rest of the modeling world needs to know that real women like myself and my daughter d...| test|Negative|tt0391576|              3|http://www.imdb.com/title/tt0391576/| null|

In [None]:
positive_movie_reviews = movie_reviews.select(col("review"), col("reviewer_rating"), col("movie_id"), col("label")).where(col("label") == "Positive").limit(100)

In [None]:
negative_movie_reviews = movie_reviews.select(col("review"), col("reviewer_rating"), col("movie_id"), col("label")).where(col("label") == "Negative").limit(100)

In [None]:
movie_reviews_mixed = positive_movie_reviews.union(negative_movie_reviews)

|              review|reviewer_rating| movie_id|   label|
|--------------------|---------------|---------|--------|
|This movie is ama...|             10|tt0187123|Positive|
|THE HAND OF DEATH...|             10|tt0187123|Positive|
|The Hand of Death...|              7|tt0187123|Positive|
|Just as a reminde...|             10|tt0163955|Positive|
|Like an earlier c...|              9|tt0163955|Positive|

In [None]:
movie_reviews_mixed.count()

In [None]:
import vertexai
from vertexai.generative_models import GenerativeModel, Part , HarmCategory, HarmBlockThreshold

vertexai.init(project=project_id, location="us-central1")

def gemini_predict(prompt):
      
    gemini_pro_model = GenerativeModel("gemini-1.0-pro")
    config = {"max_output_tokens": 2048, "temperature": 0.4, "top_p": 1, "top_k": 32}
    safety_config = {
        HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_LOW_AND_ABOVE,
        HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_LOW_AND_ABOVE,
    }
    
    prediction = gemini_pro_model.generate_content([
          prompt
        ],
        generation_config=config,
        safety_settings=safety_config,
        stream=True
    )
                    
    text_responses = []
    try:
        for response in prediction:
            text_responses.append(response.text)
    except:
        pass
    return "".join(text_responses)

In [None]:
def find_sentiment_zero_shot(text):
    
    prompt = f"""For the given text below, provide the sentiment classification from the two classes mentioned below:
    The two classes are: Negative, Positive.
    Always choose between one of them (the most appropriate one.
    Text: {text}
    Sentiment:"""
    
    sentiment = gemini_predict(prompt)
    return sentiment
    
find_sentiment_zero_shot_udf = udf(find_sentiment_zero_shot)

In [None]:
movie_reviews_mixed.printSchema()

In [None]:
movie_review_sentiment_pred = movie_reviews_mixed.withColumn("predicted_sentiment", find_sentiment_zero_shot_udf(movie_reviews_mixed["review"]))

In [None]:
# Trim whitespaces
trimmed_movie_review_sentiment_pred = movie_review_sentiment_pred.withColumn("predicted_sentiment", trim(col("predicted_sentiment"))).withColumn("label", trim(col("label")))

In [None]:
trimmed_movie_review_sentiment_pred.select(col("predicted_sentiment"), col("label")).show(200,100)

In [None]:
trimmed_movie_review_sentiment_pred.cache()

In [None]:
inputs = ["predicted_sentiment", "label"]
outputs = ["predicted_sentiment_indexed", "label_indexed"]

stringIndexer = StringIndexer(inputCols=inputs, outputCols=outputs)
indexer = stringIndexer.fit(trimmed_movie_review_sentiment_pred)

movie_review_sentiment_pred_indexed = indexer.transform(trimmed_movie_review_sentiment_pred)

In [None]:
evaluator = BinaryClassificationEvaluator()
evaluator.setRawPredictionCol("predicted_sentiment_indexed")
evaluator.setLabelCol("label_indexed")

area_under_roc = evaluator.evaluate(movie_review_sentiment_pred_indexed, {evaluator.metricName: "areaUnderROC"})

print("area_under_roc (%): ", area_under_roc)

In [None]:
match_predictions_df = movie_review_sentiment_pred_indexed.withColumn("if_match", when((col("predicted_sentiment_indexed")==col("label_indexed")),1).otherwise(0))

In [None]:
match_predictions_df.where(col("if_match")==0).count()

In [None]:
mismatch_df = match_predictions_df.where(col("if_match")==0).select(col('predicted_sentiment'),col('label'),col('review'))

In [None]:
mismatch_df.show()