# Read the JSON file as a DataFrame

In [33]:
df = spark.read.option("multiline", "true").json("Files/bing-latest-news.json")
# df now is a Spark DataFrame containing JSON data from "Files/bing-latest-news.json".
display(df)

StatementMeta(, 7adc91e4-4379-4563-a2b5-2d22e5eaae9f, 35, Finished, Available)

SynapseWidget(Synapse.DataFrame, 5d46bd54-8846-454a-b9c3-cfa50f37d12a)

# Selecting just the value column from DataFrame

In [34]:
df = df.select("value")

StatementMeta(, 7adc91e4-4379-4563-a2b5-2d22e5eaae9f, 36, Finished, Available)

In [35]:
display(df)

StatementMeta(, 7adc91e4-4379-4563-a2b5-2d22e5eaae9f, 37, Finished, Available)

SynapseWidget(Synapse.DataFrame, 99121e94-2679-4a2f-a078-632c0636736f)

# Explode the JSON column

In [36]:
from pyspark.sql.functions import explode
df_exploded = df.select(explode(df['value']).alias("json_object"))

StatementMeta(, 7adc91e4-4379-4563-a2b5-2d22e5eaae9f, 38, Finished, Available)

In [37]:
display(df_exploded)

StatementMeta(, 7adc91e4-4379-4563-a2b5-2d22e5eaae9f, 39, Finished, Available)

SynapseWidget(Synapse.DataFrame, 700fccbb-1ae6-47a3-ae47-b926e1fae7fd)

# Converting the Exploded JSON DataFrame to a single JSON string list

In [38]:
json_list = df_exploded.toJSON().collect()

StatementMeta(, 7adc91e4-4379-4563-a2b5-2d22e5eaae9f, 40, Finished, Available)

# Testing the JSON string list

In [39]:
print(json_list[1])

StatementMeta(, 7adc91e4-4379-4563-a2b5-2d22e5eaae9f, 41, Finished, Available)

{"json_object":{"datePublished":"2024-06-08T14:52:00.0000000Z","description":"Read Live India News and latest India updates on Hindustan Times. Stay up-to-date with current top news in India along with breaking India news headlines, photos, videos and more.","name":"India News","provider":[{"_type":"Organization","image":{"thumbnail":{"contentUrl":"https://www.bing.com/th?id=ODF.Caqyr0opfKlydldMITdwcw&pid=news"}},"name":"Hindustan Times"}],"url":"https://www.hindustantimes.com/india-news"}}


In [40]:
import json
news_json = json.loads(json_list[20]) # Converting JSON string to a JSON dictionary

StatementMeta(, 7adc91e4-4379-4563-a2b5-2d22e5eaae9f, 42, Finished, Available)

In [41]:
print(news_json)

StatementMeta(, 7adc91e4-4379-4563-a2b5-2d22e5eaae9f, 43, Finished, Available)

{'json_object': {'about': [{'name': 'Sabrina Carpenter', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/b5556b46-1b1b-49b0-860d-7747d71dcbef'}, {'name': 'Pitbull', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/4e9d7237-1e59-4dd4-a707-addf0ae53344'}, {'name': 'Ne-Yo', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/7ae022a0-e9a3-d8b0-4a88-008e9b80eda6'}, {'name': 'Afrojack', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/9069a5c6-7461-e342-9454-abc0c42fa38a'}, {'name': 'Major League Baseball', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/2468e4af-2f20-1a22-40fc-e932fe5418aa'}, {'name': 'Yankee Stadium', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/2b5e270a-0954-033a-5474-fe8b45c0eecc'}, {'name': 'Los Angeles Dodgers', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/f53713ae-71b9-a1a2-5aa3-bb4143549f01'}, {'name': 'The Bronx', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/7df5a4fa-

# Processing the JSON property to List

In [42]:
title = []
description = []
category = []
url = []
image = []
provider = []
datePublished = []

# Process each JSON object in the list
for json_str in json_list:
    try:
        # Parse the JSON string into a dictionary
        article = json.loads(json_str)

        if article["json_object"].get("category") and article["json_object"].get("image",{}).get("thumbnail",{}).get("contentUrl",{}):

            # Extract information from the dictionary
            title.append(article["json_object"]["name"])
            description.append(article["json_object"]["description"])
            category.append(article["json_object"]["category"])
            url.append(article["json_object"]["url"])
            image.append(article["json_object"]["image"]["thumbnail"]["contentUrl"])
            provider.append(article["json_object"]["provider"][0]["name"])
            datePublished.append(article["json_object"]["datePublished"])
    
    except Exception as e:
        print(f"Error processing JSON object: {e}")

StatementMeta(, 7adc91e4-4379-4563-a2b5-2d22e5eaae9f, 44, Finished, Available)

# Converting the List to a DataFrame

In [43]:
from pyspark.sql.types import StructType, StructField, StringType

# Combine the lists
data = list(zip(title,description,category,url,image,provider,datePublished))

# Define schema
schema = StructType([
    StructField("title",StringType(), True),
    StructField("description",StringType(), True), 
    StructField("category",StringType(), True),
    StructField("url",StringType(), True),
    StructField("image",StringType(), True),
    StructField("provider",StringType(), True),
    StructField("datePublished",StringType(), True)  

])

# Create DataFrame
df_cleaned = spark.createDataFrame(data, schema = schema)

StatementMeta(, 7adc91e4-4379-4563-a2b5-2d22e5eaae9f, 45, Finished, Available)

In [44]:
display(df_cleaned.limit(5))

StatementMeta(, 7adc91e4-4379-4563-a2b5-2d22e5eaae9f, 46, Finished, Available)

SynapseWidget(Synapse.DataFrame, 1a0d32a5-30d7-453a-b78d-cf3dceadde12)

# Processing the Date column

In [45]:
from pyspark.sql.functions import to_date, date_format

df_cleaned_final = df_cleaned.withColumn("datePublished",date_format(to_date("datePublished"),"dd-MMM-yyyy"))

StatementMeta(, 7adc91e4-4379-4563-a2b5-2d22e5eaae9f, 47, Finished, Available)

In [46]:
display(df_cleaned_final.limit(5))

StatementMeta(, 7adc91e4-4379-4563-a2b5-2d22e5eaae9f, 48, Finished, Available)

SynapseWidget(Synapse.DataFrame, 90baf17f-f282-49ce-8266-2f7d717d9298)

# Writing the Final DataFrame to the Lakehouse DB in a Delta format

In [47]:
%%sql

select count(*) from bing_lake_db.tbl_latest_news;

StatementMeta(, 7adc91e4-4379-4563-a2b5-2d22e5eaae9f, 49, Finished, Available)

<Spark SQL result set with 1 rows and 1 fields>

In [48]:
from pyspark.sql.utils import AnalysisException

table_name = 'bing_lake_db.tbl_latest_news'

try:
    df_cleaned_final.write.format('delta').saveAsTable(table_name)
except AnalysisException:
    print("Table Already Exists")
    df_cleaned_final.createOrReplaceTempView("vw_df_cleaned_final")
    spark.sql(f"""MERGE INTO {table_name} AS target_table
                  USING vw_df_cleaned_final AS source_view
                  ON source_view.url = target_table.url
                  WHEN MATCHED AND (
                      source_view.title <> target_table.title OR 
                      source_view.description <> target_table.description OR 
                      source_view.category <> target_table.category OR 
                      source_view.image <> target_table.image OR 
                      source_view.provider <> target_table.provider OR 
                      source_view.datePublished <> target_table.datePublished
                  )
                  THEN UPDATE SET *
                  WHEN NOT MATCHED THEN INSERT *
                """)

StatementMeta(, 7adc91e4-4379-4563-a2b5-2d22e5eaae9f, 50, Finished, Available)

Table Already Exists


In [49]:
%%sql

select count(*) from bing_lake_db.tbl_latest_news;

StatementMeta(, 7adc91e4-4379-4563-a2b5-2d22e5eaae9f, 51, Finished, Available)

<Spark SQL result set with 1 rows and 1 fields>