In [2]:
import os
import pandas as pd
from azure.ai.textanalytics import TextAnalyticsClient
from azure.core.credentials import AzureKeyCredential

In [3]:
# Configure Kafka parameters
kafka_topic = "test-ofas-demo-0"

# use local ip addresses in the format {ip}:{port} such as 10.0.0.1:9093, 10.0.0.2:9093, 10.0.0.3:9093
kafka_bootstrap_servers = "{local ip addresses with port}"
kafka_username = os.environ["KAFKA_USERNAME"]
kafka_password = os.environ["KAFKA_PASSWORD"]
kafka_security_protocol = "SASL_PLAINTEXT"
kafka_sasl_mechanism = "SCRAM-SHA-256" 
kafka_sasl_jaas_config = f'org.apache.kafka.common.security.scram.ScramLoginModule required username="{kafka_username}" password="{kafka_password}";'



## The following code reads events from kafka continuously

In [None]:

tweets_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("kafka.security.protocol", kafka_security_protocol) \
    .option("kafka.sasl.mechanism", kafka_sasl_mechanism) \
    .option("kafka.sasl.jaas.config", kafka_sasl_jaas_config) \
    .option("kafka.socket.connection.setup.timeout.ms", "300000") \
    .option("startingOffsets", "latest") \
    .load()


In [None]:
tweets = tweets_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .withColumnRenamed("key", "keyword") \
    .withColumnRenamed("value", "tweet")


In [None]:
tweets.show()

## Ignore the following code if you don't want to use text analytics using Azure Cognitive services

In [None]:
#azure_endpoint = os.environ.get('ACLS_LANGUAGE_ENDPOINT')
#azure_api_key = os.environ.get('ACLS_LANGUAGE_KEY')
#set azure end point for the text analytics service such as ofas-kafka-reference-app
end_point = "{azure text analytics endpoint}"
azure_endpoint = f"https://{end_point}.cognitiveservices.azure.com/"


In [None]:
def authenticate_client():
    ta_credential = AzureKeyCredential(azure_api_key)
    text_analytics_client = TextAnalyticsClient(
        endpoint=azure_endpoint, credential=ta_credential)
    return text_analytics_client

In [None]:
client = authenticate_client()

In [None]:


# Initialize a DataFrame to store aggregated sentiment scores and counts
sentiment_summary = pd.DataFrame(columns=["keyword", "total_sentiment", "count"])

def process_batch(batch_df, batch_id):
    global sentiment_summary

    batch_df = batch_df.toPandas()

    for index, row in batch_df.iterrows():
        keyword = row["keyword"]
        tweet = row["tweet"]

        # Analyze sentiment
        sentiment_analysis = client.analyze_sentiment(documents=[tweet])[0]
        sentiment_score = sentiment_analysis.confidence_scores.positive

        # Update sentiment_summary DataFrame
        if keyword in sentiment_summary["keyword"].values:
            sentiment_summary.loc[sentiment_summary["keyword"] == keyword, "total_sentiment"] += sentiment_score
            sentiment_summary.loc[sentiment_summary["keyword"] == keyword, "count"] += 1
        else:
            sentiment_summary = sentiment_summary.append({"keyword": keyword, "total_sentiment": sentiment_score, "count": 1}, ignore_index=True)

        # Print keyword, tweet, and sentiment score
        print(f"Keyword: {keyword}\nTweet: {tweet}\nSentiment Score: {sentiment_score}\n---")

    # Calculate average sentiment score per keyword and print it
    sentiment_summary["average_sentiment"] = sentiment_summary["total_sentiment"] / sentiment_summary["count"]
    print("Average Sentiment Scores per Keyword:")
    print(sentiment_summary[["keyword", "average_sentiment"]])

In [None]:

query = tweets.writeStream \
    .foreachBatch(process_batch) \
    .outputMode("append") \
    .trigger(processingTime="5 seconds") \
    .start()

query.awaitTermination()