### Read the JSON file as a Dataframe

In [1]:
df = spark.read.option("multiline", "true").json("Files/bing-latest-news.json")

StatementMeta(, 7e0bda10-78d7-4f58-a81c-b706e3434b07, 3, Finished, Available)

In [2]:
display(df)

StatementMeta(, 7e0bda10-78d7-4f58-a81c-b706e3434b07, 4, Finished, Available)

SynapseWidget(Synapse.DataFrame, 8b1736d6-9ffa-4671-89c6-66f1253464ab)

### Selecting just the value column from the dataframe

In [3]:
df = df.select("value")

StatementMeta(, 7e0bda10-78d7-4f58-a81c-b706e3434b07, 5, Finished, Available)

### Explode the JSON column

In [4]:
from pyspark.sql.functions import explode
df_exploded = df.select(explode(df["value"]).alias("json_object"))

StatementMeta(, 7e0bda10-78d7-4f58-a81c-b706e3434b07, 6, Finished, Available)

In [5]:
display(df_exploded)

StatementMeta(, 7e0bda10-78d7-4f58-a81c-b706e3434b07, 7, Finished, Available)

SynapseWidget(Synapse.DataFrame, adcaee01-d0be-4177-a79a-ba7c3b69d9ea)

### Converting the Exploded JSON Dataframe to a single JSON string list

In [6]:
json_list = df_exploded.toJSON().collect()

StatementMeta(, 7e0bda10-78d7-4f58-a81c-b706e3434b07, 8, Finished, Available)

### Testing the JSON string list

In [7]:
print(json_list[1])

StatementMeta(, 7e0bda10-78d7-4f58-a81c-b706e3434b07, 9, Finished, Available)

{"json_object":{"about":[{"name":"Delhi","readLink":"https://api.bing.microsoft.com/api/v7/entities/275e8ab8-7bd0-4633-9c89-0133be92e587"},{"name":"Gautam Gambhir","readLink":"https://api.bing.microsoft.com/api/v7/entities/02fd1989-cbb0-217a-95f4-16cfa26fea6c"},{"name":"East Delhi","readLink":"https://api.bing.microsoft.com/api/v7/entities/fafc94ad-09e2-ba9b-b05f-f2a6c1526de7"}],"datePublished":"2024-03-03T03:19:00.0000000Z","description":"Gautam Gambhir requests BJP to relieve him of political duties to focus on cricket. Thanks Modi and Amit Shah. Donates to PM's vision. Gambhir focused","image":{"thumbnail":{"contentUrl":"https://www.bing.com/th?id=OVFT.0hm-EqlrWjkPVoQNZEFQ1S&pid=News","height":380,"width":700}},"mentions":[{"name":"Delhi"},{"name":"Gautam Gambhir"},{"name":"East Delhi"}],"name":"East Delhi MP Gautam Gambhir wants to quit politics for cricket roles","provider":[{"_type":"Organization","image":{"thumbnail":{"contentUrl":"https://www.bing.com/th?id=ODF.VBzHmoY4z9NL3FbR

In [8]:
import json

news_json = json.loads(json_list[1]) #Converting the JSON string to a JSON dictionary
#print(news_json)
#print(news_json["json_object"]["name"])
#print(news_json["json_object"]["category"])

StatementMeta(, 7e0bda10-78d7-4f58-a81c-b706e3434b07, 10, Finished, Available)

### Processing the JSON property to List

In [9]:
title = []
description = []
category = []
url =[]
image = []
provider = []
datePublished =[]


# Process each JSON object in the list
for json_str in json_list:
    try:
        # Parse the JSON string into a dictionary
        article = json.loads(json_str)

        if article["json_object"].get("category") and article["json_object"].get("image", {}).get("thumbnail", {}).get("contentUrl"):
        
            #Extract information from the dictionary
            title.append(article["json_object"]["name"])
            description.append(article["json_object"]["description"])
            category.append(article["json_object"]["category"])
            url.append(article["json_object"]["url"])
            image.append(article["json_object"]["image"]["thumbnail"]["contentUrl"])
            provider.append(article["json_object"]["provider"][0]['name'])
            datePublished.append(article["json_object"]["datePublished"])

    except Exception as e:
        print(f"Error processing JSON object: {e}")

StatementMeta(, 7e0bda10-78d7-4f58-a81c-b706e3434b07, 11, Finished, Available)

### Converting the List to a Dataframe

In [10]:
from pyspark.sql.types import StructType, StructField, StringType


# Combine the lists
data = list(zip(title,description,category,url,image,provider,datePublished))

# Define schema
schema = StructType([
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("category", StringType(), True),
    StructField("url", StringType(), True),
    StructField("image", StringType(), True),
    StructField("provider", StringType(), True),
    StructField("datePublished", StringType(), True)
])

# Create DataFrame
df_cleaned = spark.createDataFrame(data, schema=schema)

StatementMeta(, 7e0bda10-78d7-4f58-a81c-b706e3434b07, 12, Finished, Available)

In [11]:
display(df_cleaned.limit(5))

StatementMeta(, 7e0bda10-78d7-4f58-a81c-b706e3434b07, 13, Finished, Available)

SynapseWidget(Synapse.DataFrame, 55b21300-b13d-4e35-8f44-46e8d1e68146)

### Processing the Date column

In [20]:
from pyspark.sql.functions import to_date, date_format

df_cleaned_final = df_cleaned.withColumn("datePublished", date_format(to_date("datePublished"), "dd-MMM-yyyy"))

StatementMeta(, 7e0bda10-78d7-4f58-a81c-b706e3434b07, 22, Finished, Available)

In [21]:
display(df_cleaned_final.limit(5))

StatementMeta(, 7e0bda10-78d7-4f58-a81c-b706e3434b07, 23, Finished, Available)

SynapseWidget(Synapse.DataFrame, 9a5183df-9791-468e-9f9e-06c1ad141b3f)

### Writing the Final Dataframe to the Lakehouse DB in a Delta format

In [24]:
from pyspark.sql.utils import AnalysisException

try:

    table_name = 'bing_lake_db.tbl_latest_news'

    df_cleaned_final.write.format("delta").saveAsTable(table_name)

except AnalysisException:

    print("Table Already Exists")

    df_cleaned_final.createOrReplaceTempView("vw_df_cleaned_final")

    spark.sql(f"""  MERGE INTO {table_name} target_table
                    USING vw_df_cleaned_final source_view

                    ON source_view.url = target_table.url
                    
                    WHEN MATCHED AND 
                    source_view.title <> target_table.title OR
                    source_view.description <> target_table.description OR
                    source_view.category <> target_table.category OR
                    source_view.image <> target_table.image OR
                    source_view.provider <> target_table.provider OR
                    source_view.datePublished <> target_table.datePublished   

                    THEN UPDATE SET *

                    WHEN NOT MATCHED THEN INSERT *

                """)

StatementMeta(, 7e0bda10-78d7-4f58-a81c-b706e3434b07, 26, Finished, Available)

Table Already Exists


In [25]:
%%sql

select count(*) from bing_lake_db.tbl_latest_news

StatementMeta(, 7e0bda10-78d7-4f58-a81c-b706e3434b07, 27, Finished, Available)

<Spark SQL result set with 1 rows and 1 fields>