##### **Sentiment Analysis with SynapseML**

In [47]:
#We will perform the analysis using the below table
df = spark.sql("SELECT * FROM bing_lake_db.tbl_bing_latest_news")
display(df)

StatementMeta(, 58762229-dd4a-4d79-b22f-4bc708c02b3f, 49, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 67fd6cbc-082e-44d7-90bf-1731398229d6)

In [48]:
#Import SynapseML packages
import synapse.ml.core
from synapse.ml.services import AnalyzeText

StatementMeta(, 58762229-dd4a-4d79-b22f-4bc708c02b3f, 50, Finished, Available, Finished)

In [49]:
#Initialize the model and configure the input and output columns
model = (AnalyzeText()
        .setTextCol("description") ## set the column we want to perform sentiment analysis
        .setKind("SentimentAnalysis") ## specifying the sentiment analysis model to be performed.
        .setOutputCol("response")
        .setErrorCol("error")) 

StatementMeta(, 58762229-dd4a-4d79-b22f-4bc708c02b3f, 51, Finished, Available, Finished)

In [50]:
#Apply the model to our dataframe
result = model.transform(df)

StatementMeta(, 58762229-dd4a-4d79-b22f-4bc708c02b3f, 52, Finished, Available, Finished)

In [51]:
display(result)

StatementMeta(, 58762229-dd4a-4d79-b22f-4bc708c02b3f, 53, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, f286d056-5590-4b81-80f5-da8e6ff56f07)

In [52]:
#To get the Sentiment Column from the response column
from pyspark.sql.functions import col
sentiment_df = result.withColumn("sentiment", col("response.documents.sentiment"))

StatementMeta(, 58762229-dd4a-4d79-b22f-4bc708c02b3f, 54, Finished, Available, Finished)

In [53]:
display(sentiment_df)


StatementMeta(, 58762229-dd4a-4d79-b22f-4bc708c02b3f, 55, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 2d49f3ac-b607-49e1-96cb-f3112e7eb0a9)

In [54]:
#Droping the error and response columns
sentiment_df_final = sentiment_df.drop("error","response")

StatementMeta(, 58762229-dd4a-4d79-b22f-4bc708c02b3f, 56, Finished, Available, Finished)

In [55]:
display(sentiment_df_final)

StatementMeta(, 58762229-dd4a-4d79-b22f-4bc708c02b3f, 57, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, d6cf88ba-8661-4ca5-a969-934765e8f0be)

In [None]:
from pyspark.sql.functions import col, to_date
df = df.withColumn("datePublished", "dd-MMM-yyyy")

In [57]:
# Adopting TYPE 1 SCD incremental loading for our data.

'''In a Type 1 SCD the new data overwrites the existing data without duplicate. Thus the existing data
 is lost as it is not stored anywhere else. This is typically used when there is no need to keep 
 a history of the data.'''

from pyspark.sql.utils import AnalysisException

#Exception Handling
try:

    table_name = "bing_lake_db.tbl_bing_sentiment_analysis"
    sentiment_df_final.write.format("delta").saveAsTable(table_name)

except AnalysisException:

    print ("Table Already Exist")

    sentiment_df_final.createOrReplaceTempView("vw_sentiment_df_final")

    # Explicitly specify the column names
    spark.sql(f""" MERGE INTO {table_name} AS target_table
                   USING vw_sentiment_df_final AS source_view
                   ON source_view.url = target_table.url
                   WHEN MATCHED AND (
                                source_view.title <> target_table.title OR
                                source_view.description <> target_table.description OR
                                source_view.image <> target_table.image OR
                                source_view.datePublished <> target_table.datePublished OR
                                source_view.provider <> target_table.provider
                            ) THEN 
                                UPDATE SET 
                                target_table.title = source_view.title,
                                target_table.description = source_view.description,
                                target_table.image = source_view.image,
                                target_table.datePublished = source_view.datePublished,
                                target_table.sentiment = source_view.sentiment
                    WHEN NOT MATCHED THEN 
                                INSERT (url, title, description, image, datePublished, provider, sentiment)
                                VALUES (source_view.url, source_view.title, source_view.description, source_view.image, source_view.datePublished, source_view.provider, source_view.sentiment)
                """)

#MERGE INTO: Updates the existing records or inserts new records from the source view (vw_df_cleaned_final) into the target table (bing_lake_db.tbl_bing_latest_news).
#WHEN MATCHED: Updates the record in the target table when there is a difference in the columns (e.g., title, description, etc.).
#WHEN NOT MATCHED: Inserts a new record into the target table when no match is found.

StatementMeta(, 58762229-dd4a-4d79-b22f-4bc708c02b3f, 59, Finished, Available, Finished)

Table Already Exist


In [58]:
%%sql

SELECT COUNT(*) FROM bing_lake_db.tbl_bing_sentiment_analysis

StatementMeta(, 58762229-dd4a-4d79-b22f-4bc708c02b3f, 60, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 1 fields>

In [59]:
%%sql
SELECT *
FROM bing_lake_db.tbl_bing_sentiment_analysis


StatementMeta(, 58762229-dd4a-4d79-b22f-4bc708c02b3f, 61, Finished, Available, Finished)

<Spark SQL result set with 67 rows and 7 fields>