In [0]:
def commentSentimentAnalysis(pysparkDf, colName):
  """
  This functions performs the sentiment analysis on the passed pyspark dataframe. We use cardiffnlp/twitter-xlm-roberta-base-sentiment model from huggingface for the scoring. Passed pyspark dataframe should have a column called `comment` and if the schema of the passed dataframe dont match, the functions returns the result else the result is stored in the predetermined location as a delta table. The expected dataframe should have the these columns title:string, description:string, tags:string, videoId:string, commentId:string, commentUpdatedDate:date, commentAuthorChannelId:string, comment:string

  usage:
  
  commentSentimentAnalysis(pysparkDf = youtubeVideoCommentsRepliesFiltered)
  
  """
  from pyspark.sql import SparkSession
  from pyspark.sql.functions import col, udf
  from pyspark.sql.types import StringType, ArrayType, MapType, StructType, StructField
  from pyspark.storagelevel import StorageLevel
  from pyspark.sql import SparkSession
  spark = SparkSession.builder.getOrCreate()
  
  #destination path
  sentimentsMountPath = "/mnt/Sandbox/pm_ap_conn/sid_test/youtube/commentSentimentsTest202401060630"
  #tweetnlp function load
  def getSentimentModel():
    import tweetnlp
    model = tweetnlp.load_model('sentiment', multilingual=True)
    return model
  
  def sentimentUdf(model, text):
    # Perform sentiment analysis on the entire text
    x = model.sentiment(text, return_probability=True, batch_size=32)
    label = x['label']
    probability = x['probability'][label]
    sentiment_details = {"label": label, "probability": probability}
    return sentiment_details

  def main(pysparkDf, colName):
    #load the model only once
    sentimentModel = getSentimentModel()
    #read input dataset
    feed = pysparkDf.repartition(31)
    #define UDF for sentiment analysis
    sentimentSchema = MapType(StringType(), StringType())
    sentimentSparkUdf = udf(lambda comments: sentimentUdf(sentimentModel, comments), sentimentSchema)

    #apply the UDF to the spark dataFrame
    result = feed.withColumn("sentimentAnalysis", sentimentSparkUdf(col(colName))) \
                        .withColumn("sentimentCategory", col("sentimentAnalysis").getItem("label")) \
                        .withColumn("sentimentCategoryProbability", col("sentimentAnalysis").getItem("probability"))
    #save the result as delta format in datalake
    try:
      result.write.format("delta").mode("overwrite").option("overwriteSchema", "False").save(sentimentsMountPath)
    except:
      print("schema of the passed df dont meet the requirements. Hence returned!!")
    return result

  main(pysparkDf, colName) 

In [0]:
pip install -U tensorflow==2.10 

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Collecting tensorflow==2.10
  Using cached tensorflow-2.10.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (578.0 MB)
Collecting keras<2.11,>=2.10.0
  Using cached keras-2.10.0-py2.py3-none-any.whl (1.7 MB)
Collecting keras-preprocessing>=1.1.1
  Using cached Keras_Preprocessing-1.1.2-py2.py3-none-any.whl (42 kB)
Collecting protobuf<3.20,>=3.9.2
  Using cached protobuf-3.19.6-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.1 MB)
Collecting tensorflow-estimator<2.11,>=2.10.0
  Using cached tensorflow_estimator-2.10.0-py2.py3-none-any.whl (438 kB)
Collecting tensorboard<2.11,>=2.10
  Using cached tensorboard-2.10.1-py3-none-any.whl (5.9 MB)
Collecting tensorboard-data-server<0.7.0,>=0.6.0
  Using cached tensorboard_data_server-0.6.1-py3-none-manylinux2010_x86_64.whl (4.9 MB)
Collecting tensorboard-plugin-wit>=1.6.0
  Using cached tensorboard_plugin_w

In [0]:
pip install tweetnlp==0.4.4

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Collecting tweetnlp==0.4.4
  Downloading tweetnlp-0.4.4.tar.gz (54 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 54.6/54.6 kB 2.5 MB/s eta 0:00:00
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting ray[tune]
  Downloading ray-2.38.0-cp310-cp310-manylinux2014_x86_64.whl (66.0 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 66.0/66.0 MB 13.3 MB/s eta 0:00:00
Collecting urlextract
  Downloading urlextract-1.9.0-py3-none-any.whl (21 kB)
Collecting transformers<=4.21.2
  Downloading transformers-4.21.2-py3-none-any.whl (4.7 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 4.7/4.7 MB 143.7 MB/s eta 0:00:00
Collecting huggingface-hub<=0.9.1
  Downloading huggingface_hub-0.9.1-py3-none-any.whl (120 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 120.7/120.7 kB 26.2 MB/s eta 0:00:00
Collecting tokenizers!=0.11.3,<0.

In [0]:
display(commentSentimentAnalysis('2024-08-01', '2024-08-10', slang_dict_path = '/Workspace/Users/siddardha.kaja@aircanada.ca/EnglishSDK/slangDict.txt'))

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
File [0;32m<command-1330541243429584>, line 1[0m
[0;32m----> 1[0m display([43mcommentSentimentAnalysis[49m[43m([49m[38;5;124;43m'[39;49m[38;5;124;43m2024-08-01[39;49m[38;5;124;43m'[39;49m[43m,[49m[43m [49m[38;5;124;43m'[39;49m[38;5;124;43m2024-08-10[39;49m[38;5;124;43m'[39;49m[43m,[49m[43m [49m[43mslang_dict_path[49m[43m [49m[38;5;241;43m=[39;49m[43m [49m[38;5;124;43m'[39;49m[38;5;124;43m/Workspace/Users/siddardha.kaja@aircanada.ca/EnglishSDK/slangDict.txt[39;49m[38;5;124;43m'[39;49m[43m)[49m)

File [0;32m<command-1330541243429580>, line 135[0m, in [0;36mcommentSentimentAnalysis[0;34m(startCommentDate, endCommentDate, slang_dict_path)[0m
[1;32m    133[0m   finalDf [38;5;241m=[39m commentSentimentAnalysis(forSentiAnalysis, [38;5;124m"[39m[38;5;124mexpandedText