#### **Read the JSON file as a Dataframe**

In [54]:
df = spark.read.option("multiline", "true").json("Files/bing-news.json")
# df now is a Spark DataFrame containing JSON data from "Files/bing-news.json".
display(df)

StatementMeta(, fbf1c3a7-0e07-45c8-8c73-cdebc6bf2c71, 56, Finished, Available)

SynapseWidget(Synapse.DataFrame, 8b735a90-5531-4e71-b7ad-4986aadb67d7)

#### **Selecting just the value column from the dataframe - include all JSON data we need**

In [55]:
df = df.select("value")
display(df)

StatementMeta(, fbf1c3a7-0e07-45c8-8c73-cdebc6bf2c71, 57, Finished, Available)

SynapseWidget(Synapse.DataFrame, 4f4ec7f2-987d-493e-b57c-0fad3fb77eb2)

#### **Explode the "value" column from the single row structure into multiple row structure**

In [56]:
from pyspark.sql.functions import explode

df_exploded = df.select(explode(df["value"]).alias("json_object"))
display(df_exploded)

StatementMeta(, fbf1c3a7-0e07-45c8-8c73-cdebc6bf2c71, 58, Finished, Available)

SynapseWidget(Synapse.DataFrame, 3f48ad39-6658-4fde-9d9e-ed86a66ac5fe)

#### **Converting the exploded JSON dataframe to a single JSON string list**

In [57]:
json_list = df_exploded.toJSON().collect()

# to see the json structure of all the news articles from the list
# print(json_list)

# to see the json structure of one news article from the list
print(json_list[0])

StatementMeta(, fbf1c3a7-0e07-45c8-8c73-cdebc6bf2c71, 59, Finished, Available)

{"json_object":{"about":[{"name":"Nigel Farage","readLink":"https://api.bing.microsoft.com/api/v7/entities/4e19a7ad-e1b4-0f74-86c3-ba3be21089da"},{"name":"Lee Anderson","readLink":"https://api.bing.microsoft.com/api/v7/entities/c94f2fa6-9853-e53f-af90-7aec244e4382"},{"name":"Politics","readLink":"https://api.bing.microsoft.com/api/v7/entities/b8f26db0-d3b7-4fb5-fbb0-9ea469754434"}],"datePublished":"2024-02-28T08:13:00.0000000Z","description":"Nigel Farage claimed the Tories are “tearing themselves into pieces” over the Lee Anderson row amid speculation the former deputy chairman of the Conservative Party could join Reform UK. Mr Farage, the honorary president of Reform, said he had “no idea” if Mr Anderson could defect as he highlighted Tory divisions.","image":{"thumbnail":{"contentUrl":"https://www.bing.com/th?id=OVFT.S60P6X2y5QYwMzYUzewODi&pid=News","height":366,"width":700}},"name":"Politics latest news: Tories ‘tearing themselves to pieces’ over Lee Anderson row, says Nigel Farage

#### **To work with information its really easy to convert JSON string list to a JSON dictionary. This is how to do it using json.loads()**

In [58]:
import json

article = json.loads(json_list[0])

print(article)

StatementMeta(, fbf1c3a7-0e07-45c8-8c73-cdebc6bf2c71, 60, Finished, Available)

{'json_object': {'about': [{'name': 'Nigel Farage', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/4e19a7ad-e1b4-0f74-86c3-ba3be21089da'}, {'name': 'Lee Anderson', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/c94f2fa6-9853-e53f-af90-7aec244e4382'}, {'name': 'Politics', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/b8f26db0-d3b7-4fb5-fbb0-9ea469754434'}], 'datePublished': '2024-02-28T08:13:00.0000000Z', 'description': 'Nigel Farage claimed the Tories are “tearing themselves into pieces” over the Lee Anderson row amid speculation the former deputy chairman of the Conservative Party could join Reform UK. Mr Farage, the honorary president of Reform, said he had “no idea” if Mr Anderson could defect as he highlighted Tory divisions.', 'image': {'thumbnail': {'contentUrl': 'https://www.bing.com/th?id=OVFT.S60P6X2y5QYwMzYUzewODi&pid=News', 'height': 366, 'width': 700}}, 'name': 'Politics latest news: Tories ‘tearing themselves to pieces’ over Lee Ande

#### **To get information from certain elements ex: "description"**

In [59]:
print(article['json_object']['description'])

StatementMeta(, fbf1c3a7-0e07-45c8-8c73-cdebc6bf2c71, 61, Finished, Available)

Nigel Farage claimed the Tories are “tearing themselves into pieces” over the Lee Anderson row amid speculation the former deputy chairman of the Conservative Party could join Reform UK. Mr Farage, the honorary president of Reform, said he had “no idea” if Mr Anderson could defect as he highlighted Tory divisions.


#### **Lets get more information from the JSON Dictionary**

In [60]:
# name
# description
# url
# image
# provider
# datePublished

# Can use :"Online JSON Parser" to see the structure of the JSON string - copy & paste the full string to the online tool.

StatementMeta(, fbf1c3a7-0e07-45c8-8c73-cdebc6bf2c71, 62, Finished, Available)

In [61]:
print(article['json_object']['name'])
print(article['json_object']['description'])
print(article['json_object']['url'])
print(article['json_object']['image']["thumbnail"]["contentUrl"])
print(article['json_object']['provider'][0]['name'])
print(article['json_object']['datePublished'])

StatementMeta(, fbf1c3a7-0e07-45c8-8c73-cdebc6bf2c71, 63, Finished, Available)

Politics latest news: Tories ‘tearing themselves to pieces’ over Lee Anderson row, says Nigel Farage
Nigel Farage claimed the Tories are “tearing themselves into pieces” over the Lee Anderson row amid speculation the former deputy chairman of the Conservative Party could join Reform UK. Mr Farage, the honorary president of Reform, said he had “no idea” if Mr Anderson could defect as he highlighted Tory divisions.
https://www.telegraph.co.uk/politics/2024/02/28/rishi-sunak-latest-news-tories-lee-anderson-farage-reform/
https://www.bing.com/th?id=OVFT.S60P6X2y5QYwMzYUzewODi&pid=News
The Daily Telegraph
2024-02-28T08:13:00.0000000Z


#### **Now, will loop through all the JSON dictionary lists and get the data**

In [62]:
# Initialise empty lists
name = []
description = []
url = []
image = []
provider = []
datePublished = []

#process each JSON object in the list
for json_str in json_list:
    try:
        # Parse the JSON string into a dictionary
        article = json.loads(json_str)

        # This is optional - There might be data inconsistencies from the data comes from Bing API, in that case if some articles missing some data, we can write this if condition to check
        if article["json_object"].get("name") and article["json_object"].get("image", {}).get("thumbnail", {}).get("contentUrl"):

            # Extract information from the dictionary
            name.append(article["json_object"]["name"])
            description.append(article['json_object']['description'])
            url.append(article['json_object']['url'])
            image.append(article['json_object']['image']["thumbnail"]["contentUrl"])
            provider.append(article['json_object']['provider'][0]['name'])
            datePublished.append(article['json_object']['datePublished'])
    
    except Exception as e:
        print(f"Error processing JSON object: {e}")

StatementMeta(, fbf1c3a7-0e07-45c8-8c73-cdebc6bf2c71, 64, Finished, Available)

In [63]:
url

StatementMeta(, fbf1c3a7-0e07-45c8-8c73-cdebc6bf2c71, 65, Finished, Available)

['https://www.telegraph.co.uk/politics/2024/02/28/rishi-sunak-latest-news-tories-lee-anderson-farage-reform/',
 'https://www.msn.com/en-us/news/world/politics-latest-news-ex-minister-apologises-after-claims-of-religious-no-go-areas/ar-BB1iYvVc',
 'https://www.msn.com/en-us/news/world/the-latest-families-of-hostages-held-in-gaza-launch-a-4-day-march-to-demand-their-freedom/ar-BB1j1H2i',
 'https://www.msn.com/en-in/lifestyle/weddings/taapsee-pannus-latest-post-adds-fuel-to-rumours-of-her-wedding-with-beau-mathias-boe-pics-inside/ar-BB1j1GwY',
 'https://www.foxnews.com/politics/fox-news-politics-bidens-muslim-problem',
 'https://www.msn.com/en-in/money/news/do-you-want-to-increase-your-epf-contribution-here-are-latest-epfo-rules-on-submission-of-digital-joint-requests/ar-BB1j1FyD',
 'https://www.msn.com/en-us/sports/other/christian-horner-latest-red-bull-f1-boss-s-future-in-balance-as-investigation-verdict-to-be-announced-today/ar-BB1iTyen',
 'https://www.bbc.com/news/live/world-africa-68

#### **Combine all the lists together and create dataframe with a defined schema**

In [64]:
from pyspark.sql.types import StructType, StructField, StringType

# Combine the lists
data = list(zip(name,description,url,image,provider,datePublished))

# Define Schema
schema = StructType([
        StructField("name", StringType(), True),
        StructField("description", StringType(), True),
        StructField("url", StringType(), True),
        StructField("image", StringType(), True),
        StructField("provider", StringType(), True),
        StructField("datePublished", StringType(), True)
])

# Create Dataframe
df_cleaned = spark.createDataFrame(data, schema=schema)

StatementMeta(, fbf1c3a7-0e07-45c8-8c73-cdebc6bf2c71, 66, Finished, Available)

In [65]:
display(df_cleaned)

StatementMeta(, fbf1c3a7-0e07-45c8-8c73-cdebc6bf2c71, 67, Finished, Available)

SynapseWidget(Synapse.DataFrame, 213f8382-cdc6-4eea-9651-0b6808494ea0)

#### **Transform "datepublished" column data from timestamp to date data type**

In [66]:
from pyspark.sql.functions import to_date, date_format

df_cleaned_final = df_cleaned.withColumn("datePublished", date_format(to_date("datePublished"), "dd-MM-yyyy"))

StatementMeta(, fbf1c3a7-0e07-45c8-8c73-cdebc6bf2c71, 68, Finished, Available)

In [67]:
display(df_cleaned_final)
# display(df_cleaned_final.limit(5))

StatementMeta(, fbf1c3a7-0e07-45c8-8c73-cdebc6bf2c71, 69, Finished, Available)

SynapseWidget(Synapse.DataFrame, 6c83b602-3ef1-43f1-a458-83dd8076cdc6)

#### **Writing the final dataframe to the lakehouse db in Delta format**

In [68]:
# df_cleaned_final.write.format("delta").saveAsTable("bing_lake_db.tbl_latest_news")



# Traditional ways of populating data table using modes - overwrite, append

# df_cleaned_final.write.format("delta").mode('overwrite').saveAsTable("bing_lake_db.tbl_latest_news")
# This method is not ideal when you have terabytes of data - this will be waste of time and resources (will lead to performance issues), also you are loosing old data by overwriting 

# df_cleaned_final.write.format("delta").mode('append').saveAsTable("bing_lake_db.tbl_latest_news")
# This method will leads to data duplication and unwantedly increasing the size of data

# So, most of the time, the best way is "INCREMENTAL LOAD"
# aka "SQL MERGE" in data warehousing
# In data warehousing we also called this Slowly changing diemention - Type 1 & Type 2
# Type 1 ignores of keeping history - always overwrites the current value with the new value, No history preserverd
# Type 2 inserts the new value in a new row and keep the old record as well - by using "Flag" column we can keep track if the record is new or old 

# in this project, will go with type 1 as we do not need to preserve history of data

StatementMeta(, fbf1c3a7-0e07-45c8-8c73-cdebc6bf2c71, 70, Finished, Available)

#### **Handling the error "AnalysisException" when the table is already exist by when this code runs**
#### df_cleaned_final.write.format("delta").saveAsTable("bing_lake_db.tbl_latest_news")  

#### **Incremental Loading**

In [69]:
from pyspark.sql.utils import AnalysisException

try:
    table_name = 'bing_lake_db.tbl_latest_news'
    df_cleaned_final.write.format("delta").saveAsTable(table_name)

except AnalysisException:
    print("Table Already Exists")

    df_cleaned_final.createOrReplaceTempView("vw_df_cleaned_final")

    spark.sql(f""" MERGE INTO {table_name} target_table
                   USING vw_df_cleaned_final source_view

                   ON source_view.url = target_table.url

                   WHEN MATCHED AND
                   source_view.name <> target_table.name OR
                   source_view.description <> target_table.description OR
                   source_view.url <> target_table.url OR
                   source_view.image <> target_table.image OR
                   source_view.provider <> target_table.provider OR
                   source_view.datePublished <> target_table.datePublished

                   THEN UPDATE SET *
                   
                   WHEN NOT MATCHED THEN INSERT *
                   
                   """)

StatementMeta(, fbf1c3a7-0e07-45c8-8c73-cdebc6bf2c71, 71, Finished, Available)

Table Already Exists


In [70]:
%%sql

SELECT count(*) 
FROM bing_lake_db.tbl_latest_news

StatementMeta(, fbf1c3a7-0e07-45c8-8c73-cdebc6bf2c71, 72, Finished, Available)

<Spark SQL result set with 1 rows and 1 fields>