## Send Chat Messages to RTI
Stream data from the groundingzavachats.txt file to RTI for processing.

In [32]:
# Install the required package
%pip install azure-cosmos

In [33]:
# import required libraries
import random
import uuid
import json
import time
from azure.cosmos import CosmosClient, PartitionKey
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
import uuid
from pyspark.sql.types import IntegerType


In [44]:
# Step 1 create the dataframe

# Define the path to the CSV file in the Lakehouse
file_path = "abfss://Zava@onelake.dfs.fabric.microsoft.com/ZavaLakehouse.Lakehouse/Files/chatdata/groundingzavachats.txt"

# Read the text file into a DataFrame
df = spark.read.text(file_path)

# Rename the 'value' column to 'message'
df = df.withColumnRenamed("value", "message")

# Define a UDF to generate a UUID
uuid_udf = F.udf(lambda: str(uuid.uuid4()), StringType())

# Add the ID column with UUIDs to the DataFrame
df = df.withColumn("id", uuid_udf())

# Add an incrementing integer column to each row
df = df.withColumn("chatid", F.monotonically_increasing_id()+1)

# Add a customer id that is random
df = df.withColumn("customerid", (F.rand() * 1000000).cast("int"))

# Add a initial sentiment score
# Function to generate a random number with normal distribution, mean=5, std=1.75 and round it to the nearest integer
def generate_random_normal_rounded():
    return int(round(max(0, min(10, random.normalvariate(5, 1.75)))))
# Register the UDF
random_normal_udf = F.udf(generate_random_normal_rounded, IntegerType())
# Add the new column with distributed sentiment
df = df.withColumn("initialsentiment", random_normal_udf())

# Add datatime of chat
df = df.withColumn("chatdatetime", F.current_timestamp())

# Show the first few rows
df.show()

# Step 2: Convert DataFrame to JSON rows
json_rows = df.toJSON().collect()

In [None]:
# Step 3: Setup cosmos connection info
COSMOS_DB_ENDPOINT = ""
COSMOS_DB_KEY = ""
DATABASE_NAME = "zava"
CONTAINER_NAME = "chatlog"

# Initialize the Cosmos client
client = CosmosClient(COSMOS_DB_ENDPOINT, COSMOS_DB_KEY)

# Create or get the database
database = client.create_database_if_not_exists(id=DATABASE_NAME)

# Create or get the container
container = database.create_container_if_not_exists(
    id=CONTAINER_NAME,
    partition_key=PartitionKey(path="/id")
)

### Start the data generation below to send one round of the file

In [45]:
# Step 4: Write each row to the database once
counter=1
while True:
    for row in json_rows:
        container.create_item(body=json.loads(row))
        print(f"Inserted row: {counter}")
        time.sleep(1)
        counter += 1


In [45]:
# Step 4b: Keep writing the file to the database (need to recreate ids for CosmosDB)
counter=1
while True:
    # Read the text file into a DataFrame
    df = spark.read.text(file_path)
    print("File Read")

    # Rename the 'value' column to 'message'
    df = df.withColumnRenamed("value", "message")

    # Define a UDF to generate a UUID
    uuid_udf = F.udf(lambda: str(uuid.uuid4()), StringType())

    # Add the ID column with UUIDs to the DataFrame
    df = df.withColumn("id", uuid_udf())

    # Add an incrementing integer column to each row
    df = df.withColumn("chatid", F.monotonically_increasing_id()+1)

    # Add a customer id that is random
    df = df.withColumn("customerid", (F.rand() * 1000000).cast("int"))

    # Add a initial sentiment score
    df = df.withColumn("initialsentiment", (F.rand() * 11).cast("int"))

    # Add datatime of chat
    df = df.withColumn("chatdatetime", F.current_timestamp())

    json_rows = df.toJSON().collect()

    for row in json_rows:
        container.create_item(body=json.loads(row))
        print(f"Inserted row: {counter}")
        time.sleep(1)
        counter += 1
