Read the JSON file as a DataFrame

In [50]:
df = spark.read.option("multiline", "true").json("Files/bing-latest-news.json")
# df now is a Spark DataFrame containing JSON data from "Files/bing-latest-news.json".
display(df)

StatementMeta(, 26f212cd-c9af-44d0-a13c-eec767da529e, 53, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, c1903d5c-736d-4ba3-ac78-ec73078e1382)

Selecting just the value column from the dataframe

In [51]:
df= df.select("value")

StatementMeta(, 26f212cd-c9af-44d0-a13c-eec767da529e, 54, Finished, Available, Finished)

In [52]:
display(df)

StatementMeta(, 26f212cd-c9af-44d0-a13c-eec767da529e, 55, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, d17066b4-7104-4b55-a276-5fe871d43235)

Explode the JSON column

In [53]:
from pyspark.sql.functions import explode
df_exploded = df.select(explode(df["value"]).alias("json_object"))


StatementMeta(, 26f212cd-c9af-44d0-a13c-eec767da529e, 56, Finished, Available, Finished)

In [54]:
display(df_exploded)

StatementMeta(, 26f212cd-c9af-44d0-a13c-eec767da529e, 57, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 1add782a-c344-40c0-82c2-603df91b42ca)

Converting the Exploded JSON DataFrame to a single JSON String list

In [55]:
json_list = df_exploded.toJSON().collect()

StatementMeta(, 26f212cd-c9af-44d0-a13c-eec767da529e, 58, Finished, Available, Finished)

In [56]:
print(json_list)

StatementMeta(, 26f212cd-c9af-44d0-a13c-eec767da529e, 59, Finished, Available, Finished)



Testing the JSON string list

In [57]:
print(json_list[23])

StatementMeta(, 26f212cd-c9af-44d0-a13c-eec767da529e, 60, Finished, Available, Finished)

{"json_object":{"datePublished":"2024-08-22T06:24:00.0000000Z","description":"Five bodies have been found in the search for those missing after a yacht sank off the Sicily coast, bringing the total number of dead to six. Search efforts for the final missing person are resuming for a fourth morning.","image":{"thumbnail":{"contentUrl":"https://www.bing.com/th?id=OVFT.rYTpkI5Y57586oDqgmOaxC&pid=News","height":393,"width":700}},"name":"Superyacht sinks latest: Search for final missing person - as identities of five bodies to be confirmed","provider":[{"_type":"Organization","image":{"thumbnail":{"contentUrl":"https://www.bing.com/th?id=ODF.Eg62H9DIbo-JYPI6GpAgGA&pid=news"}},"name":"Sky"}],"url":"https://news.sky.com/story/superyacht-sinks-latest-search-for-final-missing-person-as-identities-of-five-bodies-to-be-confirmed-13199663"}}


In [58]:
import json

news_json = json.loads(json_list[3]) # Converting the JSON string to a JSON dictionary

StatementMeta(, 26f212cd-c9af-44d0-a13c-eec767da529e, 61, Finished, Available, Finished)

In [59]:
print(news_json)

StatementMeta(, 26f212cd-c9af-44d0-a13c-eec767da529e, 62, Finished, Available, Finished)

{'json_object': {'about': [{'name': 'Erik ten Hag', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/a61e1753-9864-40c2-8860-7a57dde78c3a'}, {'name': 'Jadon Sancho', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/f6c428ba-e5b1-b002-06da-e4dc3cc9497f'}, {'name': 'Manchester United F.C.', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/064ce28e-ed07-a026-2f79-1b321d7548ea'}, {'name': 'Borussia Dortmund', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/b12f84ff-3350-8fc4-0dc6-146456fdb333'}, {'name': 'Premier League', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/02bde011-1e9d-3aff-8309-7d07d4031798'}, {'name': 'Chelsea F.C.', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/8346cd56-1a96-adb0-26e1-f3679d88e0ba'}, {'name': 'The Athletic', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/1f67ac13-d1f9-e346-c50d-574e8700b97f'}], 'category': 'Sports', 'datePublished': '2024-08-22T07:42:07.0000000Z', 'descript

In [60]:
print(news_json['json_object']['name'])
print(news_json['json_object']['description'])
print(news_json['json_object']['category'])
print(news_json['json_object']['url'])
print(news_json['json_object']['image']['thumbnail']['contentUrl'])
print(news_json['json_object']['provider'][0]['name'])
print(news_json['json_object']['datePublished'])

StatementMeta(, 26f212cd-c9af-44d0-a13c-eec767da529e, 63, Finished, Available, Finished)

Swap deal, PSG option, £40m 'approach' - Jadon Sancho Man United transfer latest
It comes amid a proposed fire sale at Stamford Bridge as key figures upstairs attempt to generate some more cash for their own recruitment drive. Sancho is tipped to cost around £40million.
Sports
https://www.msn.com/en-gb/sport/other/swap-deal-psg-option-40m-approach-jadon-sancho-man-united-transfer-latest/ar-AA1pehqc
https://www.bing.com/th?id=OVFT.fJyErVidbcTdzHGomwbDKi&pid=News
Manchester Evening News on MSN.com
2024-08-22T07:42:07.0000000Z


Processing the JSON property to List

In [61]:
title = []
description = []
category = []
url = []
image = []
provider = []
datePublished = []

#Process each JSON odbject in the list

for json_str in json_list:
    try:
        #Parse the JSON string into Dictionary
        article = json.loads(json_str)

        if article["json_object"].get("category") and article["json_object"].get("image", {}).get("thumbnail", {}).get("contentUrl", {}):

            #Extract information from dictionary
            title.append(article["json_object"]["name"])
            description.append(article["json_object"]["description"])
            category.append(article["json_object"]["category"])
            url.append(article["json_object"]["url"])
            image.append(article["json_object"]["image"]["thumbnail"]["contentUrl"])
            provider.append(article["json_object"]["provider"][0]["name"])
            datePublished.append(article["json_object"]["datePublished"])
    except Exception as e:
        print(f"Error processing JSON object:{e}")
print(title)


StatementMeta(, 26f212cd-c9af-44d0-a13c-eec767da529e, 64, Finished, Available, Finished)



Converting the List to a DataFrame

In [62]:
from pyspark.sql.types import StructType, StructField, StringType

# Combine the lists
data = list(zip(title, description,category,url,image, provider, datePublished))
# Define schema 
schema = StructType([
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("category", StringType(), True),
    StructField("url", StringType(), True),
    StructField("image", StringType(), True),
    StructField("provider", StringType(), True),
    StructField("datePublished", StringType(), True),
])

# Create DataFrame
df_cleaned = spark.createDataFrame(data, schema=schema)

StatementMeta(, 26f212cd-c9af-44d0-a13c-eec767da529e, 65, Finished, Available, Finished)

In [63]:
display(df_cleaned)

StatementMeta(, 26f212cd-c9af-44d0-a13c-eec767da529e, 66, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, dddfcb8d-5c1f-44da-b121-158049ba357d)

Converting DatePublieshed column into Date Format

In [64]:

# Convert into Date Format
from pyspark.sql.functions import to_date, date_format
df_cleaned_final = df_cleaned.withColumn("datePublished", date_format(to_date("datePublished"), "dd-MMM-yyyy"))


StatementMeta(, 26f212cd-c9af-44d0-a13c-eec767da529e, 67, Finished, Available, Finished)

In [65]:
display(df_cleaned_final.limit(5))

StatementMeta(, 26f212cd-c9af-44d0-a13c-eec767da529e, 68, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 9565379e-13a2-48a2-9541-51584454c617)

Writing the Final DateFrame to the LakeHouse DB in a Delta format

In [66]:
# df_cleaned_final.write.format("delta").saveAsTable("bing_lake_db.tbl_latest_news")

StatementMeta(, 26f212cd-c9af-44d0-a13c-eec767da529e, 69, Finished, Available, Finished)

In [67]:
#Over write delta table
# df_cleaned_final.write.format("delta").mode('overwrite').saveAsTable("bing_lake_db.tbl_latest_news")

StatementMeta(, 26f212cd-c9af-44d0-a13c-eec767da529e, 70, Finished, Available, Finished)

In [68]:
# Appending latest data without over write exiting table data
# df_cleaned_final.write.format("delta").saveAsTable("bing_lake_db.tbl_latest_news")

StatementMeta(, 26f212cd-c9af-44d0-a13c-eec767da529e, 71, Finished, Available, Finished)

In [69]:

# %%sql
# SELECT count(*) from tbl_latest_news;
 # DELETE from TABLE tbl_latest_news;


StatementMeta(, 26f212cd-c9af-44d0-a13c-eec767da529e, 72, Finished, Available, Finished)

In [73]:
from pyspark.sql.utils import AnalysisException

try:
    table_name = 'bing_lake_db.tbl_latest_news1'
    df_cleaned_final.write.format("delta").saveAsTable(table_name)
except AnalysisException:
    print("Table Already Exits")

    df_cleaned_final.createOrReplaceTempView("vw_df_cleaned_final")

    spark.sql(f""" MERGE INTO {table_name} target_table
                   USING vw_df_cleaned_final source_view

                   ON source_view.url = target_table.url

                   WHEN MATCHED AND 
                   source_view.title <> target_table.title or
                   source_view.description <> target_table.description or
                   source_view.category <> target_table.category or
                   source_view.image <> target_table.image or
                   source_view.provider <> target_table.provider or
                   source_view.datePublished <> target_table.datePublished 

                   THEN UPDATE SET *

                   WHEN NOT MATCHED THEN INSERT *
                """)

StatementMeta(, 26f212cd-c9af-44d0-a13c-eec767da529e, 76, Finished, Available, Finished)

Table Already Exits


In [74]:
%%sql
select count(*) from tbl_latest_news1;

StatementMeta(, 26f212cd-c9af-44d0-a13c-eec767da529e, 77, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 1 fields>