### Reading the JSON file as a Dataframe

In [1]:
df = spark.read.option("multiline", "true").json("Files/bing-latest-news.json")
# df now is a Spark DataFrame containing JSON data from "Files/bing-latest-news.json".
display(df)

StatementMeta(, 54d37db1-6be8-4484-ba37-ec2ab73504bb, 3, Finished, Available)

SynapseWidget(Synapse.DataFrame, 9a81fd0c-6857-4685-9122-90005a8180a4)

#### Selecting just the value column from the dataframe 

In [2]:
df = df.select("value")

StatementMeta(, 54d37db1-6be8-4484-ba37-ec2ab73504bb, 4, Finished, Available)

#### Exploding the JSON column

In [3]:
from pyspark.sql.functions import explode

df_exploded = df.select(explode(df["value"]).alias("json_object"))

StatementMeta(, 54d37db1-6be8-4484-ba37-ec2ab73504bb, 5, Finished, Available)

In [4]:
display(df_exploded)

StatementMeta(, 54d37db1-6be8-4484-ba37-ec2ab73504bb, 6, Finished, Available)

SynapseWidget(Synapse.DataFrame, b89ddac7-5a1c-4220-b202-e497bf6b309a)

#### Converting the Exploded JSON Dataframe to a single JSON string list

In [5]:
json_list = df_exploded.toJSON().collect()

StatementMeta(, 54d37db1-6be8-4484-ba37-ec2ab73504bb, 7, Finished, Available)

#### Testing the JSON string list

In [6]:
print(json_list[1])

StatementMeta(, 54d37db1-6be8-4484-ba37-ec2ab73504bb, 8, Finished, Available)

{"json_object":{"about":[{"name":"Beirut","readLink":"https://api.bing.microsoft.com/api/v7/entities/afaaff6f-4112-4894-7e7e-2ca7359df5cf"},{"name":"Israel","readLink":"https://api.bing.microsoft.com/api/v7/entities/1ffafed3-2b37-b871-c271-aa855d98449a"},{"name":"Amos Hochstein","readLink":"https://api.bing.microsoft.com/api/v7/entities/eaa06716-dd0c-bdf2-0b10-2935744c8697"},{"name":"Joe Biden","readLink":"https://api.bing.microsoft.com/api/v7/entities/cad484f9-be75-7a78-12dd-16233f823cd7"},{"name":"Hezbollah","readLink":"https://api.bing.microsoft.com/api/v7/entities/16e64cd8-e667-d597-b9c8-1586cb18cf00"},{"name":"Gaza Strip","readLink":"https://api.bing.microsoft.com/api/v7/entities/90eb9ff6-60ad-da43-d3fe-1ee3e3cb775a"},{"name":"Lebanon","readLink":"https://api.bing.microsoft.com/api/v7/entities/f2d30d7c-bea9-11b5-a8fe-a92d37480c87"},{"name":"Supreme court","readLink":"https://api.bing.microsoft.com/api/v7/entities/0eb0d3b0-0761-1b99-1af6-056d60cf8eb0"},{"name":"Itamar Ben-Gvir","re

In [7]:
import json 

news_json = json.loads(json_list[1]) #Converting the JSON string to a JSON dictionary
#print(news_json)
#print(news_json)["json_object"]["name"]
#print(news_json)["json_object"]["category"]

StatementMeta(, 54d37db1-6be8-4484-ba37-ec2ab73504bb, 9, Finished, Available)

#### Processing the JSON property to List

In [8]:
import json

# Initialize lists to store extracted information
title = []
description = []
category = []
url = []
image = []
provider = []
datePublished = []

# Process each JSON object in the list
for json_str in json_list:
    try:
        # Parse the JSON string into a dictionary
        article = json.loads(json_str)
        
        # Extract information from the dictionary
        title.append(article["json_object"]["name"])
        description.append(article["json_object"]["description"])
        category.append(article["json_object"].get("category", "Category not available"))
        url.append(article["json_object"]["url"])
        image.append(article["json_object"].get("image", {}).get("thumbnail", {}).get("contentUrl", "Image not available"))
        provider.append(article["json_object"]["provider"][0]["name"])
        datePublished.append(article["json_object"]["datePublished"])
    except Exception as e:
        print(f"Error processing JSON object: {e}")


StatementMeta(, 54d37db1-6be8-4484-ba37-ec2ab73504bb, 10, Finished, Available)

#### Converting the List to a Dataframe

In [9]:
from pyspark.sql.types import StructType, StructField, StringType

# Combine the lists
data = list(zip(title, description, category, url, image, provider, datePublished))

# Define schema
schema = StructType([
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("category", StringType(), True),
    StructField("url", StringType(), True),
    StructField("image", StringType(), True),
    StructField("provider", StringType(), True),
    StructField("datePublished", StringType(), True)
])

# Create DataFrame
df_cleaned = spark.createDataFrame(data, schema=schema)

StatementMeta(, 54d37db1-6be8-4484-ba37-ec2ab73504bb, 11, Finished, Available)

In [10]:
display(df_cleaned.limit(5))

StatementMeta(, 54d37db1-6be8-4484-ba37-ec2ab73504bb, 12, Finished, Available)

SynapseWidget(Synapse.DataFrame, f8a03fe2-4d61-4585-8fe7-5632a4a7545a)

#### Processing the Date column

In [11]:
from pyspark.sql.functions import to_date, date_format

df_cleaned_final = df_cleaned.withColumn("datePublished", date_format(to_date("datePublished"), "MM-dd-yyyy"))

StatementMeta(, 54d37db1-6be8-4484-ba37-ec2ab73504bb, 13, Finished, Available)

In [12]:
display(df_cleaned_final)

StatementMeta(, 54d37db1-6be8-4484-ba37-ec2ab73504bb, 14, Finished, Available)

SynapseWidget(Synapse.DataFrame, 2354f9ec-4274-4ef8-8492-17b79cf424e7)

#### Writing the Final Dataframe to the Lakehouse DB in a Delta format

In [13]:
from pyspark.sql.utils import AnalysisException

try:
    table_name = 'bing_lake_db.tbl_latest_news'
    df_cleaned_final.write.format("delta").saveAsTable(table_name)
except AnalysisException:
    print("Table Already Exists")

    df_cleaned_final.createOrReplaceTempView("vw_df_cleaned_final")

    spark.sql(f"""
        MERGE INTO {table_name} target_table
        USING vw_df_cleaned_final source_view
        ON source_view.url = target_table.url
        WHEN MATCHED AND
            source_view.title <> target_table.title OR
            source_view.description <> target_table.description OR
            source_view.category <> target_table.category OR
            source_view.image <> target_table.image OR
            source_view.provider <> target_table.provider OR
            source_view.datePublished <> target_table.datePublished
        THEN UPDATE SET *

        WHEN NOT MATCHED THEN INSERT *

    """)


StatementMeta(, 54d37db1-6be8-4484-ba37-ec2ab73504bb, 15, Finished, Available)

Table Already Exists


In [14]:
%%sql

select count(*) from bing_lake_db.tbl_latest_news

StatementMeta(, 54d37db1-6be8-4484-ba37-ec2ab73504bb, 16, Finished, Available)

<Spark SQL result set with 1 rows and 1 fields>