## Step 1: Read the JSON data into a Spark DataFrame.

In [1]:
df = spark.read.option("multiline", "true").json("Files/bing-latest-news.json")
# This reads the JSON file "bing-latest-news.json" into a Spark DataFrame. The option "multiline" ensures that multiple JSON records in the file are correctly handled as a single DataFrame.
# The `df` now holds all the raw JSON data from the file.
display(df)

StatementMeta(, 22ac2a7e-e9e4-4f2c-a82c-0859dd25ed8a, 3, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 657a1dac-c06b-4d71-8cf6-a94b9c125d6a)

### Step 2: Select the "value" column from the DataFrame.

In [2]:
df = df.select("value")
display(df)
# Here we are extracting only the "value" column from the DataFrame, which likely contains the actual news data.
# This is common when the JSON file has nested structures, and we are interested in the "value" field.

StatementMeta(, 22ac2a7e-e9e4-4f2c-a82c-0859dd25ed8a, 4, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 5c5acd33-57aa-4003-8a26-a03b410e0b68)

### Step 3: Explode the "value" column to flatten nested structures.

In [3]:
from pyspark.sql.functions import explode
df_exploded = df.select(explode(df["value"]).alias("json_object"))
display(df_exploded)
# The explode function is used to flatten the nested structure in the "value" column.
# Each element within the nested array is turned into a separate row, simplifying the data for further processing.

StatementMeta(, 22ac2a7e-e9e4-4f2c-a82c-0859dd25ed8a, 5, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, c62418f8-cd54-4d4e-8220-1dbf184f033a)

In [4]:
# Step 4: Convert the exploded data into a list of JSON strings.
json_list = df_exploded.toJSON().collect()
print(json_list)
# We convert the DataFrame to JSON format and collect the data as a list of JSON strings.
# `collect()` pulls the entire data into memory, so be cautious with very large datasets.

StatementMeta(, 22ac2a7e-e9e4-4f2c-a82c-0859dd25ed8a, 6, Finished, Available, Finished)

['{"json_object":{"about":[{"name":"Asian Paints","readLink":"https://api.bing.microsoft.com/api/v7/entities/5e04a015-da42-b6af-e042-b6c7d8e0fabd?setLang=en"},{"name":"Competition Commission of India","readLink":"https://api.bing.microsoft.com/api/v7/entities/94fb3d11-9d95-d68d-3116-794374d5c9eb?setLang=en"},{"name":"Grasim Industries","readLink":"https://api.bing.microsoft.com/api/v7/entities/038b377b-d31f-24ce-f26e-13099f260bec?setLang=en"}],"datePublished":"2025-07-02T05:00:00.0000000Z","description":"Asian Paints Share Price: Shares of Asian Paints Ltd fell on Wednesday, July 2, after the Competition Commission of India (CCI) directed its Director General to initiate an investigation into allegations of anti-competitive practices by the company.","image":{"thumbnail":{"contentUrl":"https://www.bing.com/th?id=OVFT.PPg6izh3yRy3iy8JnS8jTS&pid=News","height":393,"width":700}},"mentions":[{"name":"Asian Paints"},{"name":"Alleged"},{"name":"Share"}],"name":"Asian Paints Shares Fall As CC

### Step 5: Initialize lists to store the cleaned and extracted data.

In [5]:
import json
title = []
description = []
category = []
url = []
image = []
provider = []
datePublished = []


StatementMeta(, 22ac2a7e-e9e4-4f2c-a82c-0859dd25ed8a, 7, Finished, Available, Finished)

### Step 6: Process each JSON string in the list and extract necessary fields.

In [6]:
# Process each JSON object in the list
for json_str in json_list:
    try:
        # Parse the JSON string into a dictionary
        article = json.loads(json_str)

        if article["json_object"].get("category") and article["json_object"].get("image", {}).get("thumbnail", {}):
            # Extract information from the dictionary
            title.append(article["json_object"]["name"])
            description.append(article["json_object"]["description"])
            category.append(article["json_object"]["category"])
            url.append(article["json_object"]["url"])
            image.append(article["json_object"]["image"]["thumbnail"]["contentUrl"])
            provider.append(article["json_object"]["provider"][0]['name'])
            datePublished.append(article["json_object"]["datePublished"])
    
    except Exception as e:
        print(f"Error processing JSON object: {e}")


StatementMeta(, 22ac2a7e-e9e4-4f2c-a82c-0859dd25ed8a, 8, Finished, Available, Finished)

### Step 7: Create a Spark DataFrame from the cleaned data lists.

In [7]:
from pyspark.sql.types import StructType, StructField, StringType

# Combine the lists
data = list(zip(title, description, category, url, image, provider, datePublished))

# Define schema
schema = StructType([
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("category", StringType(), True),
    StructField("url", StringType(), True),
    StructField("image", StringType(), True),
    StructField("provider", StringType(), True),
    StructField("datePublished", StringType(), True)
])

# Create DataFrame
df_cleaned = spark.createDataFrame(data, schema=schema)

display(df_cleaned)


StatementMeta(, 22ac2a7e-e9e4-4f2c-a82c-0859dd25ed8a, 9, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 1d2365b9-b3d6-4500-bf4d-c495a315d0bf)

### Step 8: Format the "datePublished" column to a consistent date format.

In [8]:
from pyspark.sql.functions import to_date, date_format

df_cleaned_final = df_cleaned.withColumn("datePublished", date_format(to_date("datePublished"), "dd-MMM-yyyy"))
display(df_cleaned_final.head(2))


StatementMeta(, 22ac2a7e-e9e4-4f2c-a82c-0859dd25ed8a, 10, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 566214c0-562f-4a59-b2f2-45bfc77ba9de)

### Step 9: Save the cleaned DataFrame as a Delta table (or update if exists).

In [9]:
from pyspark.sql.utils import AnalysisException

try:
    # Try saving the DataFrame as a Delta table.
    table_name = 'bing_lake_db.tbl_latest_news'
    df_cleaned_final.write.format("delta").saveAsTable(table_name)

except AnalysisException:
    # If the table already exists, perform a MERGE operation to update it.
    print("Table Already Exists")
    
    # Temporary Vieww is created 
    df_cleaned_final.createOrReplaceTempView("vw_df_cleaned_final")

    # Perform a MERGE operation to update or insert data.
    spark.sql(f"""   MERGE INTO {table_name} target_table
                     USING vw_df_cleaned_final source_view
    
                     ON source_view.url = target_table.url

                     WHEN MATCHED AND 
                     source_view.title <> target_table.title OR
                     source_view.description <> target_table.description OR
                     source_view.category <> target_table.category OR
                     source_view.image <> target_table.image OR
                     source_view.provider <> target_table.provider OR
                     source_view.datePublished <> target_table.datePublished

                     THEN UPDATE SET *

                     WHEN NOT MATCHED THEN INSERT *
                """)


StatementMeta(, 22ac2a7e-e9e4-4f2c-a82c-0859dd25ed8a, 11, Finished, Available, Finished)

Table Already Exists
