In [1]:
# Welcome to your new notebook
# Type here in the cell editor to add code!
import synapse.ml.services
from synapse.ml.services.language import AnalyzeText
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
import sys
from datetime import datetime


StatementMeta(, 2baf1853-a73d-4ca8-b3fb-a3eee47517a4, 3, Finished, Available, Finished)

In [None]:
#######################################################
# Setup
#######################################################
item_id = "{ITEM_ID}"
item_workspace_id = "{WORKSPACE_ID}"
analysis_type = "SentimentAnalysis"
item_table_name = "customer_feedback"
item_table_source_column_name = "Feedback"
item_table_result_column_name = "sentiment"
#item_id = spark_context.getConf().get("spark.itemId")
#item_workspace_id = spark_context.getConf().get("spark.itemWorkspaceId")
#analysis_type = spark_context.getConf().get("spark.analysisType")
#item_table_name = spark_context.getConf().get("spark.itemTableName")
#item_table_source_column_name = spark_context.getConf().get("spark.itemTableSourceColumnName")
#item_table_result_column_name = spark_context.getConf().get("spark.itemTableResultColumnName")

print(f"Configuration:")
print(f"  - Item id: {item_id}")
print(f"  - Workspace id: {item_workspace_id}")
print(f"  - Analysis type: {analysis_type}")
print(f"  - Item Table: {item_table_name}")
print(f"  - Source column: {item_table_source_column_name}")
print(f"  - Result column: {item_table_result_column_name}")


# Spark session builder
spark_session = (SparkSession
    .builder
    .appName(analysis_type)
    .getOrCreate())

StatementMeta(, 2baf1853-a73d-4ca8-b3fb-a3eee47517a4, 4, Finished, Available, Finished)

Configuration:
  - Item id: 6b216ea0-70fd-46b0-8e99-6a3a5d0aa8c1
  - Workspace id: d93a1ddb-3f94-4a2f-9b43-0afe4cdb9f17
  - Analysis type: SentimentAnalysis
  - Item Table: customer_feedback
  - Source column: Feedback
  - Result column: sentiment


In [3]:
#######################################################
# Load the Lakehouse table into a DataFrame
#######################################################
print(f"Loading data from table: {item_table_name}")
df = spark_session.read.format("delta").load("Tables/" + item_table_name)

print(f"Loaded {df.count()} rows")
print(f"Available columns: {', '.join(df.columns)}")

# Validate required column exists
if item_table_source_column_name not in df.columns:
    print(f"Error: Source column '{item_table_source_column_name}' not found in table.")
    sys.exit(1)

# drop the result column if it is there
if item_table_result_column_name in df.columns:
    df = df.drop(item_table_result_column_name)
    print(f"Dropped existing result column: {item_table_result_column_name}")

StatementMeta(, 2baf1853-a73d-4ca8-b3fb-a3eee47517a4, 5, Finished, Available, Finished)

Loading data from table: customer_feedback
Loaded 100 rows
Available columns: Customer_ID, Feedback, sentiment
Dropped existing result column: sentiment


In [6]:
#######################################################
# Start  analysis
#######################################################
print(f"Starting {analysis_type} analysis on column: {item_table_source_column_name}")

# Initialize the analysis model
model = (AnalyzeText()
        .setTextCol(item_table_source_column_name)
        .setKind(analysis_type)
        .setOutputCol("TmpOutput"))

# Apply the model to get sentiment scores
print("Transforming data with sentiment model...")
result = model.transform(df)

print("Finished transformation and obtained sentiment scores.")
print(f"Available columns in result: {', '.join(result.columns)}")

print(f"Available column in analyze model: {', '.join(result.columns)}")

# Extract the  values from the response
if(analysis_type == "SentimentAnalysis"):
    print("Extracting sentiment values...")
    result = result.withColumn("documents", col("TmpOutput.documents"))\
                    .withColumn(item_table_result_column_name, col("documents.sentiment"))
else:
    print(f"Error: Analysis type '{analysis_type}' not implemented.")
    sys.exit(1)
    
print(f"Available columns in result: {', '.join(result.columns)}")
print(f"Sentiment analysis complete, added column: {item_table_result_column_name}")

StatementMeta(, 2baf1853-a73d-4ca8-b3fb-a3eee47517a4, 8, Finished, Available, Finished)

Starting SentimentAnalysis analysis on column: Feedback
Transforming data with sentiment model...
Finished transformation and obtained sentiment scores.
Available columns in result: Customer_ID, Feedback, AnalyzeText_e1a6e058d5fa_error, TmpOutput
Available column in analyze model: Customer_ID, Feedback, AnalyzeText_e1a6e058d5fa_error, TmpOutput
Extracting sentiment values...
Available columns in result: Customer_ID, Feedback, AnalyzeText_e1a6e058d5fa_error, TmpOutput, documents, sentiment
Sentiment analysis complete, added column: sentiment


In [7]:
#######################################################
# Writing the result back to the Lakehouse table
#######################################################
# Get a list of all original columns from the input DataFrame
original_columns = df.columns
# Add the result column to the output
if(item_table_result_column_name in original_columns):
    output_df = result.select(*original_columns)
else: 
    output_df = result.select(
        *original_columns,   # All original columns using the * operator to unpack the list
        col(item_table_result_column_name)  # Add the new result column
)

# Writing the data back
deltaTablePath = f"abfss://{item_workspace_id}@onelake.dfs.fabric.microsoft.com/{item_id}/Tables/{item_table_name}/"
print(f"Writing results to: {deltaTablePath}")
output_df.write.mode("overwrite").format("delta").save(deltaTablePath)

print(f"Output saved to table: {item_table_name}")
output_df.show(5)

StatementMeta(, 2baf1853-a73d-4ca8-b3fb-a3eee47517a4, 9, Finished, Available, Finished)

Writing results to: abfss://d93a1ddb-3f94-4a2f-9b43-0afe4cdb9f17@onelake.dfs.fabric.microsoft.com/6b216ea0-70fd-46b0-8e99-6a3a5d0aa8c1/Tables/customer_feedback/
Output saved to table: customer_feedback
+-----------+------------------------------------------+---------+
|Customer_ID|Feedback                                  |sentiment|
+-----------+------------------------------------------+---------+
|101        |Horrible experience, will never return!   |negative |
|102        |Customer service was rude and unhelpful.  |negative |
|103        |The service was excellent, very satisfied!|positive |
|104        |Would definitely come back again!         |neutral  |
|105        |Would definitely come back again!         |neutral  |
+-----------+------------------------------------------+---------+
only showing top 5 rows



In [12]:
#Cleaning the data

df = spark_session.read.format("delta").load("Tables/" + item_table_name)

if(item_table_result_column_name in df.columns):
    df = df.drop(item_table_result_column_name)

df.write.mode("overwrite").format("delta").save(deltaTablePath)

df = spark_session.read.format("delta").load("Tables/" + item_table_name)
df.show(5)

StatementMeta(, 2baf1853-a73d-4ca8-b3fb-a3eee47517a4, 14, Finished, Available, Finished)

+-----------+--------------------+---------+
|Customer_ID|            Feedback|sentiment|
+-----------+--------------------+---------+
|        101|Horrible experien...|     NULL|
|        102|Customer service ...|     NULL|
|        103|The service was e...|     NULL|
|        104|Would definitely ...|     NULL|
|        105|Would definitely ...|     NULL|
+-----------+--------------------+---------+
only showing top 5 rows

