Read the JSON files as a Dataframe

In [2]:
df = spark.read.option("multiline", "true").json("Files/bing-latest-news.json")
# df now is a Spark DataFrame containing JSON data from "Files/bing-latest-news.json".
display(df)

StatementMeta(, 3b885587-0e24-44b8-af01-25789e320096, 4, Finished, Available)

SynapseWidget(Synapse.DataFrame, 176d6145-6bdc-4b25-8851-d3515fb76e22)

Select just the "value" column from the Dataframe

In [3]:
df= df.select("value")

StatementMeta(, 3b885587-0e24-44b8-af01-25789e320096, 5, Finished, Available)

In [4]:
display(df)

StatementMeta(, 3b885587-0e24-44b8-af01-25789e320096, 6, Finished, Available)

SynapseWidget(Synapse.DataFrame, 5f857081-7b54-45d7-aaf1-a8bd6f7196c5)

Explode the JSON column

In [5]:
from pyspark.sql.functions import explode
df_exploded = df.select(explode(df["value"]).alias("json_object"))

#each row represent a JSON object

StatementMeta(, 3b885587-0e24-44b8-af01-25789e320096, 7, Finished, Available)

In [6]:
display(df_exploded)

StatementMeta(, 3b885587-0e24-44b8-af01-25789e320096, 8, Finished, Available)

SynapseWidget(Synapse.DataFrame, 8855fa3e-1def-4c8b-a171-ef55265ea1f0)

Converting the Exploded JSON dataframe to a single JSON string list

In [7]:
# each row is turned into a JSON document as one element in the returned RDD.
json_list = df_exploded.toJSON().collect()

StatementMeta(, 3b885587-0e24-44b8-af01-25789e320096, 9, Finished, Available)

Testing the JSON string list

In [8]:
print(json_list[0])

# output: JSON String

StatementMeta(, 3b885587-0e24-44b8-af01-25789e320096, 10, Finished, Available)

{"json_object":{"about":[{"name":"Lenovo","readLink":"https://api.bing.microsoft.com/api/v7/entities/7018c591-569b-ae17-572c-da86c73133e2"},{"name":"Intel Graphics Technology","readLink":"https://api.bing.microsoft.com/api/v7/entities/54bdf155-6cea-0fc3-be67-98e1af396cbc"},{"name":"Memory","readLink":"https://api.bing.microsoft.com/api/v7/entities/846d38b0-d018-c99e-8be9-10fb3086abdc"},{"name":"IPS panel","readLink":"https://api.bing.microsoft.com/api/v7/entities/e293ea93-83d2-cb24-aa00-0ea17c8218f9"}],"category":"ScienceAndTechnology","datePublished":"2024-05-12T03:58:05.0000000Z","description":"The Lenovo 16\" Intel IdeaPad Slim 5i Laptop is currently offered at a 21% discount, resulting in $220 in savings. This laptop is powered by an Intel Core 7 150U Processor, featuring speeds up to 5.40 GHz on P-cores and up to 4.","image":{"thumbnail":{"contentUrl":"https://www.bing.com/th?id=OVFT.kVcNJsFXQ2ICAefSTq7Z_C&pid=News","height":351,"width":391}},"mentions":[{"name":"Saving"},{"name":

In [9]:
import json

news_json = json.loads(json_list[0]) # Converting the JSON string to a JSON dictionary

StatementMeta(, 3b885587-0e24-44b8-af01-25789e320096, 11, Finished, Available)

In [10]:
print(news_json)

#output: JSON dictionary

StatementMeta(, 3b885587-0e24-44b8-af01-25789e320096, 12, Finished, Available)

{'json_object': {'about': [{'name': 'Lenovo', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/7018c591-569b-ae17-572c-da86c73133e2'}, {'name': 'Intel Graphics Technology', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/54bdf155-6cea-0fc3-be67-98e1af396cbc'}, {'name': 'Memory', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/846d38b0-d018-c99e-8be9-10fb3086abdc'}, {'name': 'IPS panel', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/e293ea93-83d2-cb24-aa00-0ea17c8218f9'}], 'category': 'ScienceAndTechnology', 'datePublished': '2024-05-12T03:58:05.0000000Z', 'description': 'The Lenovo 16" Intel IdeaPad Slim 5i Laptop is currently offered at a 21% discount, resulting in $220 in savings. This laptop is powered by an Intel Core 7 150U Processor, featuring speeds up to 5.40 GHz on P-cores and up to 4.', 'image': {'thumbnail': {'contentUrl': 'https://www.bing.com/th?id=OVFT.kVcNJsFXQ2ICAefSTq7Z_C&pid=News', 'height': 351, 'width': 391}}, 'mentio

In [11]:
print(news_json['json_object']['description'])

StatementMeta(, 3b885587-0e24-44b8-af01-25789e320096, 13, Finished, Available)

The Lenovo 16" Intel IdeaPad Slim 5i Laptop is currently offered at a 21% discount, resulting in $220 in savings. This laptop is powered by an Intel Core 7 150U Processor, featuring speeds up to 5.40 GHz on P-cores and up to 4.


In [12]:
title = []
description = []
category = []
url = []
image = []
provider = []
datePublished = []

# Process each JSON object in the list
for json_str in json_list:
    try:
        # Parse the JSON string into a dictionary
        article = json.loads(json_str)

        if article["json_object"].get("category") and article["json_object"].get("image", {}).get("thumbnail", {}).get("contentUrl"):

            # Extract information from the dictionary
            title.append(article["json_object"]["name"])
            description.append(article["json_object"]["description"])
            category.append(article["json_object"]["category"])
            url.append(article["json_object"]["url"])
            image.append(article["json_object"]["image"]["thumbnail"]["contentUrl"])
            provider.append(article["json_object"]["provider"][0]['name'])
            datePublished.append(article["json_object"]["datePublished"])
    
    except Exception as e:
        print(f"Error processing JSON object: {e}")

StatementMeta(, 3b885587-0e24-44b8-af01-25789e320096, 14, Finished, Available)

In [13]:
title

StatementMeta(, 3b885587-0e24-44b8-af01-25789e320096, 15, Finished, Available)

["Don't Miss $220 Savings on Latest IdeaPad Slim 5i Laptop",
 'All the Latest NFL Signings in a Wild Week of Free Agency',
 "Ancelotti praises Real Madrid substitutes in the Spanish champions' latest win",
 'North Korean leader Kim Jong Un oversees latest test of new multiple rocket launcher',
 'Homelessness down in Long Beach, up in Orange County, latest counts find',
 'North Korean leader Kim supervises latest test of new multiple rocket launcher',
 'Kate Middleton Latest: Prince William Shares New Update',
 "This Is What the Latest Artificial Intelligence (AI) Earnings Reports Say About Nvidia Stock's Future",
 '14-year-old stabbed in Bronx amid rash of bloody teen violence: cops',
 "High school junior the latest victim of Baltimore's stolen car crisis",
 'Gerrit Cole increases pitch count in latest bullpen session in encouraging Yankees sign',
 "After Fire Country's Latest Episode Bode Might Be Leaving Three Rock, But I Think Another Main Character Might End Up There Next Season",


Converting the List to a Dataframe

In [14]:
from pyspark.sql.types import StructType, StructField, StringType

# Combine the lists
data = list(zip(title, description, category, url, image, provider,datePublished))

# Define schema
schema = StructType([
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("category", StringType(), True),
    StructField("url", StringType(), True),
    StructField("image", StringType(), True),
    StructField("provider", StringType(), True),
    StructField("datePublished", StringType(), True),
])

# Create DataFrame
df_cleaned = spark.createDataFrame(data, schema = schema)

StatementMeta(, 3b885587-0e24-44b8-af01-25789e320096, 16, Finished, Available)

In [15]:
display(df_cleaned)

StatementMeta(, 3b885587-0e24-44b8-af01-25789e320096, 17, Finished, Available)

SynapseWidget(Synapse.DataFrame, 312b02cc-5a02-4a15-b04c-1ac731007e79)

Processing the Date column

In [16]:
from pyspark.sql.functions import to_date, date_format

df_cleaned_final = df_cleaned.withColumn("datePublished", date_format(to_date("datePublished"),"dd-MM-yyyy"))

StatementMeta(, 3b885587-0e24-44b8-af01-25789e320096, 18, Finished, Available)

In [17]:
display(df_cleaned_final)

StatementMeta(, 3b885587-0e24-44b8-af01-25789e320096, 19, Finished, Available)

SynapseWidget(Synapse.DataFrame, b1f7c2c5-7840-4253-89df-75b843f69745)

Writing the Final Dataframe to the Lakehouse DB in a Delta format

In [25]:
from pyspark.sql.utils import AnalysisException

try:
    table_name = 'bing_lake_db.tbl_latest_news'

    df_cleaned_final.write.format("delta").saveAsTable(table_name)

except AnalysisException:

    print("Table Already Exists")

    df_cleaned_final.createOrReplaceTempView("vw_df_cleaned_final")

    spark.sql(f"""  MERGE INTO {table_name} target_table
                    USING vw_df_cleaned_final source_view

                    ON source_view.url = target_table.url

                    WHEN MATCHED AND
                    source_view.title <> target_table.title OR
                    source_view.description <> target_table.description OR
                    source_view.category <> target_table.category OR
                    source_view.image <> target_table.image OR
                    source_view.provider <> target_table.provider OR
                    source_view.datePublished <> target_table.datePublished

                    THEN UPDATE SET *

                    WHEN NOT MATCHED THEN INSERT *

    """)

StatementMeta(, 3b885587-0e24-44b8-af01-25789e320096, 27, Finished, Available)

Table Already Exists


In [26]:
%%sql

select count(*) from bing_lake_db.tbl_latest_news

StatementMeta(, 3b885587-0e24-44b8-af01-25789e320096, 28, Finished, Available)

<Spark SQL result set with 1 rows and 1 fields>