### Import the required libraries for service bus and ML model

In [0]:
import asyncio
from azure.servicebus.aio import ServiceBusClient
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import date, datetime
import re
import time
from transformers import pipeline


### Fetching the pre-trained model for sentiment analysis

In [0]:
model_path_1="finiteautomata/bertweet-base-sentiment-analysis"
sentiment_analysis = pipeline("sentiment-analysis", model=model_path_1)

Downloading (…)lve/main/config.json:   0%|          | 0.00/949 [00:00<?, ?B/s]

Downloading tf_model.h5:   0%|          | 0.00/540M [00:00<?, ?B/s]

All model checkpoint layers were used when initializing TFRobertaForSequenceClassification.

All the layers of TFRobertaForSequenceClassification were initialized from the model checkpoint at finiteautomata/bertweet-base-sentiment-analysis.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFRobertaForSequenceClassification for predictions without further training.


Downloading (…)okenizer_config.json:   0%|          | 0.00/338 [00:00<?, ?B/s]

Downloading (…)solve/main/vocab.txt:   0%|          | 0.00/843k [00:00<?, ?B/s]

Downloading (…)solve/main/bpe.codes:   0%|          | 0.00/1.08M [00:00<?, ?B/s]

Downloading (…)in/added_tokens.json:   0%|          | 0.00/22.0 [00:00<?, ?B/s]

Downloading (…)cial_tokens_map.json:   0%|          | 0.00/167 [00:00<?, ?B/s]

### Creating a schema

In [0]:
%sql

CREATE SCHEMA IF NOT EXISTS git_demo

### Create a function to save the processed data into the delta lake

In [0]:
def save_record(results):
    resultCols=['labels','message','score']
    results_data=spark.createDataFrame(data=results, schema = resultCols)
    resultant=results_data.withColumn("timestamp",current_timestamp())
    resultant.write.mode("append").saveAsTable("git_demo.sentiment_analysis_result")

### Connection the messages from the Azure Service Bus receiver end

In [0]:


#Parameters to have the connection for the Azure Service Bus 

conn_parameters=spark.read.json('/gitdemo/parameters/lparameters.json')

NAMESPACE_CONNECTION_STR=conn_parameters.collect()[0]['conn_string']
SUBSCRIPTION_NAME=conn_parameters.collect()[0]['subscription']
TOPIC_NAME=conn_parameters.collect()[0]['topic']

### Fetching the messages from the Azure Service Bus receiver end

In [0]:
async def run():
    # create a Service Bus client using the credential
    async with ServiceBusClient.from_connection_string(
        conn_str=NAMESPACE_CONNECTION_STR,
        logging_enable=True) as servicebus_client:

        async with servicebus_client:
            # get the Subscription Receiver object for the subscription
            receiver = servicebus_client.get_subscription_receiver(topic_name=TOPIC_NAME, 
            subscription_name=SUBSCRIPTION_NAME, max_wait_time=5)
            message_list=[]
            async with receiver:
                received_msgs = await receiver.receive_messages(max_wait_time=20, max_message_count=50)
                for msg in received_msgs:
                    print("Received: " + str(msg))
                    #message_list.append(json_object) 
                    message_list.append(str(msg)) 
                    # complete the message so that the message is removed from the subscription
                    await receiver.complete_message(msg)
                # call the sentiment analysis model
                results=sentiment_analysis(message_list)

                print("results =>", results)
                for bs_msg in range(len(results)):
                    results[bs_msg]['message']=message_list[bs_msg]
                # save the record into the delta table
                if len(results)==0:
                    print("No received messages")
                else:
                    save_record(results)  

#### Continuous the fetching of messages in a regular interval

In [0]:

#Initiate the call to the service bus from notebook
while True:
   await run()
   time.sleep(5)
   print("Time:",datetime.now())

Received: Tiring day
results => [{'label': 'NEU', 'score': 0.9386323690414429}]
Time: 2023-05-08 07:02:05.757129
results => []
No received messages
Time: 2023-05-08 07:02:31.801369
results => []
No received messages
Time: 2023-05-08 07:02:57.839927
results => []
No received messages
Time: 2023-05-08 07:03:23.872569
results => []
No received messages
Time: 2023-05-08 07:03:49.918268
results => []
No received messages
Time: 2023-05-08 07:04:15.955813
results => []
No received messages
Time: 2023-05-08 07:04:41.994894
results => []
No received messages
Time: 2023-05-08 07:05:08.029943
results => []
No received messages
Time: 2023-05-08 07:05:34.067814
results => []
No received messages
Time: 2023-05-08 07:06:00.114272
results => []
No received messages
Time: 2023-05-08 07:06:26.159467
results => []
No received messages
Time: 2023-05-08 07:06:52.199096
results => []
No received messages
Time: 2023-05-08 07:07:18.245892
results => []
No received messages
Time: 2023-05-08 07:07:44.282537
res