In [38]:
df = spark.read.option("multiline", "true").json("Files/bing-latest-news.json")
# df now is a Spark DataFrame containing JSON data from "Files/bing-latest-news.json".
display(df)

StatementMeta(, ba146a08-e000-494d-b2e3-4867f946b2d5, 39, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 002c99d3-9ea5-43b9-a295-dee6136a51a7)

In [39]:
df = df.select("value")

StatementMeta(, ba146a08-e000-494d-b2e3-4867f946b2d5, 40, Finished, Available, Finished)

In [40]:
display(df)

StatementMeta(, ba146a08-e000-494d-b2e3-4867f946b2d5, 41, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 92e6f2eb-2958-4785-af08-f94c6bdd0ebd)

In [41]:
from pyspark.sql.functions import explode

df_exploded = df.select(explode(df["value"]).alias("json_object"))

StatementMeta(, ba146a08-e000-494d-b2e3-4867f946b2d5, 42, Finished, Available, Finished)

In [42]:
display(df_exploded)

StatementMeta(, ba146a08-e000-494d-b2e3-4867f946b2d5, 43, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, a67df6bd-f1ac-4546-90c0-d2817b504b04)

In [43]:
json_list = df_exploded.toJSON().collect()

StatementMeta(, ba146a08-e000-494d-b2e3-4867f946b2d5, 44, Finished, Available, Finished)

In [44]:
print(json_list[0])

StatementMeta(, ba146a08-e000-494d-b2e3-4867f946b2d5, 45, Finished, Available, Finished)

{"json_object":{"about":[{"name":"Arsenal F.C.","readLink":"https://api.bing.microsoft.com/api/v7/entities/8b3b4135-cb8e-ded8-10ce-8f130d64b863"},{"name":"Riccardo Calafiori","readLink":"https://api.bing.microsoft.com/api/v7/entities/49f29b7c-9e4d-9c42-9c72-bfde27d2fe38"},{"name":"Bologna","readLink":"https://api.bing.microsoft.com/api/v7/entities/f36c4093-c958-bcb0-7345-a64ab939999b"},{"name":"Nico Williams","readLink":"https://api.bing.microsoft.com/api/v7/entities/d5ea2495-e221-af84-25b3-f794c796ed05"},{"name":"Albert Sambi Lokonga","readLink":"https://api.bing.microsoft.com/api/v7/entities/b325dc03-f03e-ba02-b787-ecef75f37a81"},{"name":"Seville","readLink":"https://api.bing.microsoft.com/api/v7/entities/0a3a2612-d8ed-d4dc-69be-4f76eb7064f6"}],"category":"Sports","datePublished":"2024-07-13T18:24:00.0000000Z","description":"ARSENAL are closing in on Riccardo Calafiori – with personal reportedly terms agreed for Bologna’s £27m defender. Meanwhile, Arsenal are also interested in Spani

In [52]:
import json
title = []
description = []
category = []
url = []
image = []
provider = []
datePublished = []

#process each JSON object in the list
for json_str in json_list:
    try:
        #Parse the JSON string in to the dictionary
        article = json.loads(json_str)

        if article["json_object"].get("category") and article["json_object"].get("image",{}).get("thumbnail",{}).get("contentUrl") :
        #Extract information from the dictionary
            title.append(article["json_object"]["name"])
            description.append(article["json_object"]["description"])
            category.append(article["json_object"]["category"])
            url.append(article["json_object"]["url"])
            image.append(article["json_object"]["image"] ["thumbnail"] ["contentUrl"])
            provider.append(article["json_object"]["provider"] [0]['name'])
            datePublished.append(article["json_object"]["datePublished"])
    
    except Exception as e:
        print(f"Error processing JSON object : {e}")

StatementMeta(, ba146a08-e000-494d-b2e3-4867f946b2d5, 53, Finished, Available, Finished)

In [46]:
from pyspark.sql.types import StructType, StructField, StringType

#Combine the lists
data = list(zip(title,description,category,url,image,provider,datePublished))

#Define Schema
schema = StructType([
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("category", StringType(), True),
    StructField("url", StringType(), True),
    StructField("image", StringType(), True),
    StructField("provider", StringType(), True),
    StructField("datePublished", StringType(), True)
])

#Create DataFrame
df_cleaned = spark.createDataFrame(data,schema=schema)

StatementMeta(, ba146a08-e000-494d-b2e3-4867f946b2d5, 47, Finished, Available, Finished)

In [47]:
display(df_cleaned.limit(5))

StatementMeta(, ba146a08-e000-494d-b2e3-4867f946b2d5, 48, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, d8a78a0a-4bec-433f-8360-4c8589e695b3)

In [48]:
from pyspark.sql.functions import to_date, date_format

df_cleaned_final = df_cleaned.withColumn("datePublished",date_format(to_date("datePublished"),"dd-MMM-yyyy"))

StatementMeta(, ba146a08-e000-494d-b2e3-4867f946b2d5, 49, Finished, Available, Finished)

In [49]:
display(df_cleaned_final.limit(5))

StatementMeta(, ba146a08-e000-494d-b2e3-4867f946b2d5, 50, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 439d58ce-81a7-4d12-96a0-58e74dda8b21)

In [54]:
from pyspark.sql.utils import AnalysisException

try:
    
    table_name = 'bing_lake_db.tbl_latest_news'

    df_cleaned_final.write.format("delta").saveAsTable(table_name)

except AnalysisException:

    print("Table Already Exists")

    df_cleaned_final.createOrReplaceTempView("vw_df_cleaned_final")

    spark.sql(f""" MERGE INTO bing_lake_db.tbl_latest_news target_table
                    USING vw_df_cleaned_final source_view 
                    ON source_view.url = target_table.url
                    WHEN MATCHED AND (
                        source_view.title <> target_table.title OR
                        source_view.description <> target_table.description OR
                        source_view.category <> target_table.category OR
                        source_view.image <> target_table.image OR
                        source_view.provider <> target_table.provider OR
                        source_view.datePublished <> target_table.datePublished
                    )
                    THEN UPDATE SET *
                    WHEN NOT MATCHED THEN INSERT *

                   """)

StatementMeta(, ba146a08-e000-494d-b2e3-4867f946b2d5, 55, Finished, Available, Finished)

Table Already Exists


In [55]:
%%sql

select count(*) from bing_lake_db.tbl_latest_news

StatementMeta(, ba146a08-e000-494d-b2e3-4867f946b2d5, 56, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 1 fields>