### Extracting and Aggregating Sentiments on Targeted Product Features with Large Language Models 

### Introduction
In this notebook, we aim to extract valuable insights from chat history conversations by analyzing sentiments related to various product features. We will query pre-processed chat history, leverage Fabric's built-in Azure OpenAI capabilities, identify product mentions, and determine the sentiments expressed around each product feature. This analysis will help us understand customer perceptions and preferences, enabling us to make informed decisions and enhance our product development strategies.

Additionally, we will aggregate the counts for each feature and sentiment to provide a comprehensive overview of the data. This aggregation will allow us to quantify the number of mentions, likes, and dislikes for each product feature, as well as the overall sentiment distribution.

#### Prerequisites
You need the following services to run this notebook.
- Microsoft Fabric with F64 Capacity

Make sure the sample data has been uploaded in their respective containers

#### 1) Load mirrored container into a spark dataframe

In [1]:
%pip install azure-core
%pip install azure-cosmos

StatementMeta(, 9d21c6f1-e29a-4478-9370-27b520a80d01, 8, Finished, Available, Finished)


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.
Collecting azure-cosmos
  Downloading azure_cosmos-4.9.0-py3-none-any.whl.metadata (80 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m80.8/80.8 kB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
Downloading azure_cosmos-4.9.0-py3-none-any.whl (303 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m303.2/303.2 kB[0m [31m11.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: azure-cosmos
Successfully installed azure-cosmos-4.9.0

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To 

This following imports various modules and functions that are essential for working with Spark DataFrames, performing data transformations, and utilizing Azure OpenAI services.

In [2]:
import base64
import json
import time
from datetime import datetime, timezone
from azure.core.credentials import AccessToken
from azure.storage.filedatalake import DataLakeServiceClient
import azure.cosmos
from azure.cosmos import CosmosClient, PartitionKey

from pyspark.sql import SparkSession, Row
from pyspark.sql import functions as F
from pyspark.sql.functions import (col, explode, from_json, sum as _sum, lower, trim,
                                    udf, max, expr, lit, when, array_contains)
from pyspark.sql.types import (ArrayType, MapType, StringType, StructType, StructField)

from synapse.ml.services.openai import OpenAIChatCompletion

StatementMeta(, 9d21c6f1-e29a-4478-9370-27b520a80d01, 10, Finished, Available, Finished)

#### Dataset
The dataset used in this demo is stored in CosmosDB. To facilitate analysis, the data is mirrored into OneLake and brought into the Lakehouse via a shortcut. This approach ensures that we are not duplicating any data, maintaining a single copy in CosmosDB while providing seamless access for processing and analysis in the Lakehouse environment.
The synchronization between CosmosDB and the Lakehouse happens within a few minutes, which allows near real-time analysis for many applications.


let's load the chat history data from the mirrored database into a spark dataframe.

In [3]:
chat_history_container_name = 'chat_history'
#df = spark.sql(f"SELECT * FROM cdb_demo_lh.{chat_history_container_name} WHERE _ts > {max_last_analyzed_ts}")
df = spark.sql(f"SELECT * FROM cdb_demo_lh.{chat_history_container_name}")
display(df.limit(2))

StatementMeta(, 9d21c6f1-e29a-4478-9370-27b520a80d01, 11, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 1acaff00-872e-4d4a-83a0-c041727d619e)

#### 2) Extract sentiments around product features through Fabric's built-in Azure OpenAI using SynapseML library


In this section, we demonstrate how to use the Synapse ML library to call Azure OpenAI at scale. The process involves creating a user-defined function (UDF) to transform user requests into the desired format, defining the schema for the transformed data, and applying the transformation to the DataFrame. This approach ensures scalability and efficient processing of large datasets

NOTE: Before the advent of large language models (LLMs), extracting structured information from chat data was a complex and time-consuming task. It typically involved several steps and required expertise in natural language processing (NLP) techniques such as named entity recognition (NER), sentiment analysis, and feature extraction.

1. **Named Entity Recognition (NER)**: Identifying entities like product names, brands, and types required training NER models on annotated datasets. This process involved manually labeling a large amount of data, training the model, and continuously fine-tuning it to improve accuracy.

2. **Sentiment Analysis**: Determining the sentiment around each product mentioned in the conversation required separate sentiment analysis models. These models also needed to be trained on labeled data and fine-tuned to handle the nuances of human language.

3. **Feature Extraction**: Extracting features that are liked or disliked about each product involved additional NLP techniques. This often required custom rule-based systems or machine learning models to identify and categorize features mentioned in the text.

4. **Integration and Transformation**: Combining the outputs of these different models into a structured format like JSON required additional code to integrate the results and transform them into the desired format.

With LLMs, the workflow is significantly simplified:

- **Unified Model**: LLMs can perform multiple NLP tasks within a single model. This means that the same model can handle NER, sentiment analysis, and feature extraction without the need for separate models for each task.
- **Prompt Engineering**: Instead of training and fine-tuning multiple models, you can use prompt engineering to guide the LLM to generate the desired output. This involves crafting a prompt that instructs the model on what information to extract and how to format it.
- **Reduced Manual Effort**: LLMs can understand and generate human-like text, reducing the need for extensive manual labeling and fine-tuning. This makes it easier to adapt the model to new tasks and domains.
- **Scalability**: LLMs can process large volumes of data quickly and accurately, making it easier to scale the extraction process to handle more conversations and extract more detailed information.

In summary, LLMs simplify the workflow by providing a unified model that can handle multiple NLP tasks, reducing the need for manual effort and making it easier to scale the extraction process. This allows you to focus on crafting effective prompts and integrating the model's output into your analysis.


In [4]:
# Function to create messages (same as in your example)
def make_message(role, content):
    return {"role": role, "content": content, "name": role}

prompt = "You are an expert bussiness analyst, give a conversation between client and a chatbot, you identify the list of products discussed, sentiment around each product, features that are liked or disliked and provide results as a json file.\
            when identifying product, separate type, brand, and name \
            choices for sentiment: enthusiastic, neutral, obstructionist \
            choices for product features (like or disliked): design, usability, color, size, price, brand \
            Please return a a list of jsons only. Please use the example below as a template\
            [{\"product\": {\"type\": \"Climbing\", \"brand\": \"Gravitator\", \"name\": \"Gravity Beam Climbing Rope\"}, \"sentiment\": \"enthusiastic\", \"features\": {\"like\": [\"design\", \"usability\", \"color\", \"size\", \"price\", \"brand\"], \"dislike\": []}}, {\"product\": {\"type\": \"Footwear\", \"brand\": \"Raptor Elite\", \"name\": \"Trek Xtreme Hiking Shoes\"}, \"sentiment\": \"enthusiastic\", \"features\": {\"like\": [\"design\", \"usability\", \"color\", \"size\", \"price\", \"brand\"], \"dislike\": []}}, {\"product\": {\"type\": \"Navigation\", \"brand\": \"AirStrider\", \"name\": \"VenturePro GPS Watch\"}, \"sentiment\": \"enthusiastic\", \"features\": {\"like\": [\"design\", \"usability\", \"price\", \"brand\"], \"dislike\": []}}]\
            "

# UDF to transform user request into the desired list of dictionaries
def transform_to_messages(messages):
    system_message = make_message("system", prompt)
    user_message = make_message("user", messages)
    return [system_message, user_message]

# Define the schema for the list of dictionaries
message_schema = ArrayType(
    StructType([
        StructField("role", StringType(), True),
        StructField("content", StringType(), True),
        StructField("name", StringType(), True)
    ])
)

# Register the UDF
transform_udf = udf(transform_to_messages, message_schema)

# Apply the UDF to the DataFrame
df_transformed = df.withColumn("messages", transform_udf(col("Messages")))

# Show the transformed DataFrame
display(df_transformed.limit(2))

StatementMeta(, 9d21c6f1-e29a-4478-9370-27b520a80d01, 12, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, a604a6e3-3e63-4604-9a27-ba0fdaee19b8)

The following code snippet highlights how to perform chat completion for all the chat conversation!

In [5]:
chat_completion = (
    OpenAIChatCompletion()
    .setDeploymentName("gpt-4.1") # deploymentName could be one of {gpt-35-turbo-0125 or gpt-4-32k}
    .setMessagesCol("messages")
    .setErrorCol("error")
    .setOutputCol("chat_completions")
)

df_analyzed = chat_completion.transform(df_transformed).select(
        "messages", "chat_completions.choices.message.content")

display(df_analyzed.limit(2))

StatementMeta(, 9d21c6f1-e29a-4478-9370-27b520a80d01, 13, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 0a1ff0c9-8019-4a8d-b6c4-facfd2dd7d3e)

### 3) Defining and Applying a Schema for Targeted Sentiment Extraction from LLM Responses

The following line of code adds a new column called "json_string" to the df_analyzed DataFrame. This new column contains the concatenated values from the "content" column, combined into a single string without any separators. This can be useful for further processing or analysis where a single string representation of the content is needed.

In [6]:
df_parsed = df_analyzed.withColumn("json_string", F.concat_ws("", col("content")))

StatementMeta(, 9d21c6f1-e29a-4478-9370-27b520a80d01, 14, Finished, Available, Finished)

In [7]:
display(df_parsed.limit(2))

StatementMeta(, 9d21c6f1-e29a-4478-9370-27b520a80d01, 15, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, a2f1a6ad-b856-4de8-8315-8130d8e906f7)

In the following section, we define the schema for parsing JSON data and apply it to the DataFrame. This schema outlines the structure of the JSON data, including product details, sentiment, and features. By using this schema, we can transform the JSON strings into structured data, making it easier to analyze and process.



In [8]:
# Define schema for JSON parsing
schema = ArrayType(
    StructType([
        StructField("product", StructType([
            StructField("type", StringType(), True),
            StructField("brand", StringType(), True),
            StructField("name", StringType(), True)
        ]), True),
        StructField("sentiment", StringType(), True),
        StructField("features", StructType([
            StructField("like", ArrayType(StringType()), True),
            StructField("dislike", ArrayType(StringType()), True)
        ]), True)
    ])
)

StatementMeta(, 9d21c6f1-e29a-4478-9370-27b520a80d01, 16, Finished, Available, Finished)

In [9]:
# Parse JSON string into array of structs
df_parsed_schema = df_parsed.withColumn("parsed_json", from_json(col("json_string"), schema))

StatementMeta(, 9d21c6f1-e29a-4478-9370-27b520a80d01, 17, Finished, Available, Finished)

Next, we explode the parsed JSON column to create a new row for each element in the array, allowing for more granular analysis of the data.

In [10]:
# Explode the parsed JSON column
df_exploded = df_parsed_schema.withColumn("exploded", explode(col("parsed_json")))

StatementMeta(, 9d21c6f1-e29a-4478-9370-27b520a80d01, 18, Finished, Available, Finished)

In [11]:
display(df_exploded.limit(2))

StatementMeta(, 9d21c6f1-e29a-4478-9370-27b520a80d01, 19, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 6c4d47c6-f89f-4c12-834d-fea6a9a1ebbb)

Since we have the structured data, we continue to flatten the nested information. By exploding nested information, we bring desired information onto the same row as a new column. This process ensures that each product, along with its associated sentiment and features, is represented in a single, flat row, facilitating more efficient analysis and processing.

Hence, we continue to extract the like and dislike features from the exploded JSON column.

In [12]:
# Extract `like` and `dislike` features
df_exploded = df_exploded.withColumn("features", col("exploded.features"))

StatementMeta(, 9d21c6f1-e29a-4478-9370-27b520a80d01, 20, Finished, Available, Finished)

In [13]:
# Extract `like` and `dislike` features
df_exploded = df_exploded.withColumn("like_features", col("features.like"))
df_exploded = df_exploded.withColumn("dislike_features", col("features.dislike"))


StatementMeta(, 9d21c6f1-e29a-4478-9370-27b520a80d01, 21, Finished, Available, Finished)

We iterate through a list of features and create new columns for each feature, indicating whether it is liked or disliked. Each new column contains binary values: 1 if the feature is liked or disliked, and 0 otherwise. This step further refines our dataset, making it easier to analyze specific aspects of product features

In [14]:
features_list = ["price", "size", "usability", "color", "brand", "design"]

for feature in features_list:
    df_exploded = df_exploded.withColumn(
        f"liked_{feature}",
        when(array_contains(col("like_features"), lit(feature)), 1).otherwise(0)
    ).withColumn(
        f"disliked_{feature}",
        when(array_contains(col("dislike_features"), lit(feature)), 1).otherwise(0)
    )


StatementMeta(, 9d21c6f1-e29a-4478-9370-27b520a80d01, 22, Finished, Available, Finished)

Continue flattening the nested information ..

In [15]:
df_exploded = df_exploded.withColumn("sentiment", col("exploded.sentiment"))

StatementMeta(, 9d21c6f1-e29a-4478-9370-27b520a80d01, 23, Finished, Available, Finished)

In [16]:
sentiment_list = ["enthusiastic", "neutral", "obstructionist"]

for sent in sentiment_list:
    df_exploded = df_exploded.withColumn(
        sent,
        F.when(F.col("sentiment") == sent, 1).otherwise(0)
    )

StatementMeta(, 9d21c6f1-e29a-4478-9370-27b520a80d01, 24, Finished, Available, Finished)

In [17]:
display(df_exploded.limit(1))

StatementMeta(, 9d21c6f1-e29a-4478-9370-27b520a80d01, 25, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 1ab7e7e2-e11d-4000-a71d-9b77c4f7d484)

In [18]:
# Extract fields from product
df_exploded = df_exploded \
    .withColumn("product_type", col("exploded.product.type")) \
    .withColumn("product_brand", col("exploded.product.brand")) \
    .withColumn("product_name", col("exploded.product.name"))

StatementMeta(, 9d21c6f1-e29a-4478-9370-27b520a80d01, 26, Finished, Available, Finished)

#### 4) Integrating Product Catalog to Retrieve Product IDs"

In the next section, we join the data from the product catalog. In real-world applications, we may not always have access to the product IDs being used. Therefore, we retrieve the product catalog data to enrich our dataset with additional product information.

In [19]:
df_product_catalog = spark.sql("SELECT * FROM cdb_demo_lh.product_catalog")
display(df_product_catalog.limit(1))

StatementMeta(, 9d21c6f1-e29a-4478-9370-27b520a80d01, 27, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 654a7201-4e0a-44e2-a13e-9c6e08b443ae)

Now let's join with product catalog to identify the product ids associated with each product

In [20]:
# Example of pattern matching using SQL expressions
df_joined = df_exploded.join(
    df_product_catalog,
    (expr("lower(trim(product_name)) = lower(trim(name))")),
    "left"
)


StatementMeta(, 9d21c6f1-e29a-4478-9370-27b520a80d01, 28, Finished, Available, Finished)

In [21]:
# Drop unwanted columns (e.g., _ts) from the product catalog DataFrame
df_joined = df_joined.drop(df_product_catalog["_ts"])

StatementMeta(, 9d21c6f1-e29a-4478-9370-27b520a80d01, 29, Finished, Available, Finished)

In [22]:
display(df_joined.limit(2))

StatementMeta(, 9d21c6f1-e29a-4478-9370-27b520a80d01, 30, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 8b92cc8c-a07c-4f13-9b8c-3f1a39f27f62)

#### 5) Aggregating and Summarizing Product Feature Sentiments: Calculating Counts and Sums by Product ID

Next, we add aggregation expressions for each sentiment and the count of rows for each product ID. By aggregating the counts for each feature and sentiment, we can quantify the number of mentions, likes, and dislikes for each product feature, as well as the overall sentiment distribution. This aggregation provides a comprehensive overview of the data

In [23]:
from pyspark.sql.functions import expr, count, sum as spark_sum

# Define the features and their column names
features = {
    "price": ["liked_price", "disliked_price"],
    "size": ["liked_size", "disliked_size"],
    "usability": ["liked_usability", "disliked_usability"],
    "color": ["liked_color", "disliked_color"],
    "brand": ["liked_brand", "disliked_brand"],
    "design": ["liked_design", "disliked_design"]
}

# Initialize an empty list to collect DataFrames
aggregate_exprs = []

for feature, cols in features.items():
    liked_col, disliked_col = cols
    aggregate_exprs.append(
        spark_sum(when(col(liked_col) == 1, 1).otherwise(0)).alias(f"sum_liked_{feature}")
    )
    aggregate_exprs.append(
        spark_sum(when(col(disliked_col) == 1, 1).otherwise(0)).alias(f"sum_disliked_{feature}")
    )

# Add aggregation expressions for each sentiment
for sentiment in sentiment_list:
    aggregate_exprs.append(
        spark_sum(when(col(sentiment) == 1, 1).otherwise(0)).alias(f"sum_{sentiment}")
    )
    
# Add the count of rows for each product_id
aggregate_exprs.append(
    count("*").alias("sum_mentions")
)

# Aggregate counts for each feature
aggregated_counts = df_joined.groupBy("id").agg(*aggregate_exprs)

# Show the new DataFrame
display(aggregated_counts.limit(2))


StatementMeta(, 9d21c6f1-e29a-4478-9370-27b520a80d01, 31, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, f94c077d-7b51-4da9-942b-361f90d36284)

In [24]:
# Rename the column 'id' to 'product_id'
aggregated_counts = aggregated_counts.withColumnRenamed("id", "product_id")
display(aggregated_counts.limit(2))


StatementMeta(, 9d21c6f1-e29a-4478-9370-27b520a80d01, 32, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 49404924-4af1-4fb5-a5ab-5f8ed3a0e81e)

For optional writing back our analysis to cosmosdb, we define a UUID. This involves creating a user-defined function (UDF) to generate unique identifiers (UUIDs). 

In [25]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import uuid

# Define a UDF to generate UUIDs
def generate_uuid():
    return str(uuid.uuid4())

uuid_udf = udf(generate_uuid, StringType())

StatementMeta(, 9d21c6f1-e29a-4478-9370-27b520a80d01, 33, Finished, Available, Finished)

By initializing all product counts with zero, we ensure that even products not mentioned in recent conversations are included in the dataset. This step allows us to later combine these initialized counts with the actual mentions from the chat data, ensuring a comprehensive analysis.

Additionally, we add a UUID column to uniquely identify each record.

In [26]:
selected_columns = df_product_catalog.select("Id", "Type", "Brand", "Name").withColumnRenamed("Id", "product_id")
# # Convert the id column to string
# selected_columns = selected_columns.withColumn("id", col("id").cast("string"))
# Add new columns with default values
df_analytics_counts = selected_columns.withColumn("sum_liked_price", lit(0)) \
    .withColumn("sum_disliked_price", lit(0)) \
    .withColumn("sum_liked_size", lit(0)) \
    .withColumn("sum_disliked_size", lit(0)) \
    .withColumn("sum_liked_usability", lit(0)) \
    .withColumn("sum_disliked_usability", lit(0)) \
    .withColumn("sum_liked_color", lit(0)) \
    .withColumn("sum_disliked_color", lit(0)) \
    .withColumn("sum_liked_brand", lit(0)) \
    .withColumn("sum_disliked_brand", lit(0)) \
    .withColumn("sum_liked_design", lit(0)) \
    .withColumn("sum_disliked_design", lit(0))\
    .withColumn("sum_enthusiastic", lit(0))\
    .withColumn("sum_neutral", lit(0))\
    .withColumn("sum_obstructionist", lit(0))\
    .withColumn("sum_mentions", lit(0))\



# Add a UUID column to your DataFrame
df_analytics_counts = df_analytics_counts.withColumn("id", uuid_udf())
# Show the resulting DataFrame
display(df_analytics_counts.limit(2))

StatementMeta(, 9d21c6f1-e29a-4478-9370-27b520a80d01, 34, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 9d8d41b5-5304-4076-9ba9-d0db5d3f4ef3)

Next, we perform an outer join between the product catalog initialization data and the aggregated counts. This ensures that even products not mentioned in recent conversations are included in the analysis results. We update the columns with the aggregated counts, combining the initialized counts with the actual mentions from the chat data. This step provides a comprehensive overview of the data, facilitating a complete view of the product sentiments.

In [27]:
from pyspark.sql.functions import col, coalesce, lit

# Define the features and their column names (same as before)
features = {
    "price": ["liked_price", "disliked_price"],
    "size": ["liked_size", "disliked_size"],
    "usability": ["liked_usability", "disliked_usability"],
    "color": ["liked_color", "disliked_color"],
    "brand": ["liked_brand", "disliked_brand"],
    "design": ["liked_design", "disliked_design"]
}

# Define sentiment columns and their aggregations
sentiment_list = ["enthusiastic", "neutral", "obstructionist"]

# Define the columns to update
update_columns = {
    **{f"sum_liked_{feature}": f"sum_liked_{feature}" for feature in features.keys()},
    **{f"sum_disliked_{feature}": f"sum_disliked_{feature}" for feature in features.keys()},
    **{f"sum_{sentiment}": f"sum_{sentiment}" for sentiment in sentiment_list}
}

# Perform the outer join
combined_df = df_analytics_counts.alias("analytics") \
    .join(aggregated_counts.alias("agg"), "product_id", "outer") \
    .select(
        col("analytics.id"),
        col("analytics.Type"),
        col("analytics.Brand"),
        col("analytics.Name"),
        *[
            (coalesce(col(f"analytics.{col_name}"), lit(0)) + coalesce(col(f"agg.{col_name}"), lit(0))).alias(col_name)
            for col_name in update_columns.keys()
        ]
    )

# Show the resulting DataFrame
display(combined_df.limit(2))


StatementMeta(, 9d21c6f1-e29a-4478-9370-27b520a80d01, 35, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 4645ec4a-49bb-4be3-b2b4-8bb63100a915)

#### 6) Storing Aggregated Analysis Results in a Lakehouse Table

In this final step, we save the combined DataFrame to a specified path. We use the Delta format to ensure efficient storage and querying. By appending the data to the existing table, we maintain a comprehensive record of the analysis results. Storing this data in a lakehouse allows us to easily access and call upon it for future reporting purposes.

In [28]:
path = 'chat_analysis_results'
#combined_df.write.format("parquet").mode("overwrite").save(path)
combined_df.write.mode("append").format("delta").saveAsTable(path)

StatementMeta(, 9d21c6f1-e29a-4478-9370-27b520a80d01, 36, Finished, Available, Finished)

#### 7) Optional: Write back chat_analytics results to a new container in the database 

In addition to storing the data in a lakehouse for future reporting purposes, we also can optionally use a callback to store the data in the original CosmosDB in Fabric database. By maintaining a copy of the data in CosmosDB, we can leverage its capabilities for fast and scalable data access, while also benefiting from the analytical capabilities of the lakehouse.

In [29]:
display(combined_df.limit(2))

StatementMeta(, 9d21c6f1-e29a-4478-9370-27b520a80d01, 37, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 221f1d3c-38db-43c3-95d4-e23014365907)

In [30]:
# Ensure we only write rows with non-null id
combined_df_filtered = combined_df.filter(col('id').isNotNull())
display(combined_df_filtered.limit(5))

StatementMeta(, 9d21c6f1-e29a-4478-9370-27b520a80d01, 38, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 3b581622-78d8-48c5-9a72-5dd99ecccef3)

We will use the CosmosClient class to interact with CosmosDB. To do this, we need to specify the  endpoint string in hidden cell below:


In [33]:
from azure.cosmos import CosmosClient, PartitionKey
from azure.core.credentials import AccessToken
import base64, json, time

CosmosDBFabricEndpoint = "https://d3322083-a846-483d-ac60-8091a02bf768.zd3.msit-sql.cosmos.fabric.microsoft.com:443/" 

def _decode_jwt_payload(jwt: str) -> dict:
    parts = jwt.split(".")
    if len(parts) < 2:
        raise ValueError("Not a valid JWT")
    pad = "=" * (-len(parts[1]) % 4)
    return json.loads(base64.urlsafe_b64decode(parts[1] + pad).decode("utf-8"))

class TokenCredential:
    def __init__(self, access_token: str, expires_on: int | None = None):
        self._token = access_token
        if expires_on is None:
            payload = _decode_jwt_payload(access_token)
            expires_on = int(payload["exp"])
        self._expires_on = int(expires_on)

    def get_token(self, *scopes, **kwargs) -> AccessToken:
        now = int(time.time())
        if now >= self._expires_on - 60:
            raise RuntimeError("The provided access token is expired or about to expire.")
        return AccessToken(self._token, self._expires_on)

# Get Cosmos DB token and create Cosmos Client
audience = "https://cosmos.azure.com/" 
token = notebookutils.credentials.getToken(audience)
cred = TokenCredential(token)
cosmosClient = CosmosClient(
    url=CosmosDBFabricEndpoint,
    credential=cred,
    consistency_level="Session"
)

# Get database
database_list = list(cosmosClient.list_databases())
DATABASE_NAME = database_list[0]['id']
databaseClient = cosmosClient.get_database_client(DATABASE_NAME)


# Create new container of aggregated view
CONTAINER_NAME = "chat_analytics"
try:
    databaseClient.create_container(
        id=CONTAINER_NAME, 
        partition_key=PartitionKey(path="/id")
        # offer_throughput=400  # Optional: set throughput
        )
except Exception as e:
    print(f"Container maybe exists: {e}")

containerClientChatAnalytics = databaseClient.get_container_client(CONTAINER_NAME)

StatementMeta(, 9d21c6f1-e29a-4478-9370-27b520a80d01, 41, Finished, Available, Finished)

Container maybe exists: (Conflict) Message: {"Errors":["Resource with specified id, name, or unique index already exists."]}
ActivityId: 457ae88c-55b0-41bb-bb0c-871842383479, Request URI: /apps/a620de26-8c06-41bb-a0d6-8f63dd8f7a6f/services/f37e5ca0-01e8-4ac2-b095-35ca963c1545/partitions/ef6d7cfd-d69e-4691-a8d8-34cfc2bf75b0/replicas/134007103963643449p, RequestStats: , SDK: Microsoft.Azure.Documents.Common/2.14.0
Code: Conflict
Message: Message: {"Errors":["Resource with specified id, name, or unique index already exists."]}
ActivityId: 457ae88c-55b0-41bb-bb0c-871842383479, Request URI: /apps/a620de26-8c06-41bb-a0d6-8f63dd8f7a6f/services/f37e5ca0-01e8-4ac2-b095-35ca963c1545/partitions/ef6d7cfd-d69e-4691-a8d8-34cfc2bf75b0/replicas/134007103963643449p, RequestStats: , SDK: Microsoft.Azure.Documents.Common/2.14.0


In [34]:
# Loop create_item/upsert_item
for row in combined_df_filtered.toLocalIterator():
    doc = row.asDict(recursive=True)
    doc["id"] = str(doc["id"])
    try:
        containerClientChatAnalytics.upsert_item(doc)
    except exceptions.CosmosHttpResponseError as e:
        print("Failed:", doc["id"], e.status_code, e.message)

StatementMeta(, 9d21c6f1-e29a-4478-9370-27b520a80d01, 42, Submitted, Running, Running)

In [35]:
# Verify data has landed back into Cosmos DB in Fabric database
# Query top 5 items in the container
query = "SELECT TOP 5 * from c"
itemsChatAnalytics = containerClientChatAnalytics.query_items(query=query, enable_cross_partition_query=True)

# Print each item
print(f"Items in container '{CONTAINER_NAME}':")
for item in itemsChatAnalytics:
    print(item)

StatementMeta(, 9d21c6f1-e29a-4478-9370-27b520a80d01, 43, Finished, Available, Finished)

Items in container 'chat_analytics':
{'id': 'b3fed853-d797-45be-ba21-4a4a0c67ac8b', 'Type': 'Climbing', 'Brand': 'Gravitator', 'Name': 'Gravity Beam Climbing Rope', 'sum_liked_price': 2, 'sum_liked_size': 1, 'sum_liked_usability': 2, 'sum_liked_color': 2, 'sum_liked_brand': 2, 'sum_liked_design': 2, 'sum_disliked_price': 0, 'sum_disliked_size': 0, 'sum_disliked_usability': 0, 'sum_disliked_color': 0, 'sum_disliked_brand': 0, 'sum_disliked_design': 0, 'sum_enthusiastic': 2, 'sum_neutral': 0, 'sum_obstructionist': 0, '_rid': 'dD5RAKSio7IBAAAAAAAAAA==', '_self': 'dbs/dD5RAA==/colls/dD5RAKSio7I=/docs/dD5RAKSio7IBAAAAAAAAAA==/', '_etag': '"00000502-0000-0600-0000-68d566060000"', '_attachments': 'attachments/', '_ts': 1758815750}
{'id': '4ce766f3-e246-481a-947e-2f312aacc976', 'Type': 'Navigation', 'Brand': 'B&R', 'Name': 'Pulse Recon Tactical GPS Watch', 'sum_liked_price': 2, 'sum_liked_size': 0, 'sum_liked_usability': 2, 'sum_liked_color': 2, 'sum_liked_brand': 2, 'sum_liked_design': 2, 'su