### **Data Loading **


In [2]:
df = spark.read.option("multiline", "true").json("Files/bing-latest-news.json")
# df now is a Spark DataFrame containing JSON data from "Files/bing-latest-news.json".
display(df)

StatementMeta(, , , Waiting, , Waiting)

SynapseWidget(Synapse.DataFrame, 61cd5a32-5db9-4888-a43b-4b4119721f76)

In [3]:
df=df.select("value")   #selecting the column which is realted to us and droping all other  

StatementMeta(, , , Waiting, , Waiting)

In [5]:
display(df)
print(type(df))
print(df.printSchema())

StatementMeta(, , , Waiting, , Waiting)

SynapseWidget(Synapse.DataFrame, 2ff005c6-168f-4628-9d44-7ce720c36a03)

<class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- value: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- about: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |-- readLink: string (nullable = true)
 |    |    |-- category: string (nullable = true)
 |    |    |-- datePublished: string (nullable = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- image: struct (nullable = true)
 |    |    |    |-- thumbnail: struct (nullable = true)
 |    |    |    |    |-- contentUrl: string (nullable = true)
 |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |-- width: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- provider: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- _type: string (nullable = true)
 |   

In [25]:
#flatten nested object ...after each row will contain single json object
from pyspark.sql.functions import explode 
df_exploaded=df.select(explode(df["value"]).alias("json_object"))

StatementMeta(, , , Waiting, , Waiting)

In [26]:
display(df_exploaded)

StatementMeta(, , , Waiting, , Waiting)

SynapseWidget(Synapse.DataFrame, 958fbda9-58a6-4c35-9625-de030b03336e)

In [27]:
# Convert DataFrame rows to JSON strings
json_list=df_exploaded.toJSON().collect()   #collect bring data from spark cluster to local environment

StatementMeta(, , , Waiting, , Waiting)

In [28]:
json_list

StatementMeta(, , , Waiting, , Waiting)

['{"json_object":{"about":[{"name":"Verenigde Staten","readLink":"https://api.bing.microsoft.com/api/v7/entities/5232ed96-85b1-2edb-12c6-63e6c597a1de"},{"name":"Nicolás Maduro","readLink":"https://api.bing.microsoft.com/api/v7/entities/62dd568b-6165-618b-d9a8-240b52e23c2f"},{"name":"Florida","readLink":"https://api.bing.microsoft.com/api/v7/entities/5fece3f4-e8e8-4159-843e-f725a930ad50"},{"name":"CNN","readLink":"https://api.bing.microsoft.com/api/v7/entities/c4c637e5-f9cc-a3ed-3380-7152515e062e"},{"name":"Venezuela","readLink":"https://api.bing.microsoft.com/api/v7/entities/6dd1d7bd-393f-a467-12fa-e71f98cc00b9"}],"datePublished":"2024-09-02T17:03:00.0000000Z","description":"De Verenigde Staten hebben maandag het vliegtuig van de Venezolaanse president Nicolás Maduro in beslag genomen. De actie werd uitgevoerd omdat de koop van het vliegtuig in strijd zou zijn met Amerika","image":{"thumbnail":{"contentUrl":"https://www.bing.com/th?id=OVFT.ty6YVUSXp2W8YSD0m6AHpC&pid=News","height":393,

In [41]:
#convert different item into json dictionary and load them into list
import json

title = []
description = []
category = []
url = []
image = []
provider = []
datePublished = []

for counter_str in json_list:
    try:
        news_json = json.loads(counter_str) #load json into dictionay 
        # no processing if the json from api does not contain category or image
        if(not news_json['json_object'].get("category") is None and not news_json['json_object'].get('image') is None):
            title.append(news_json['json_object']['name'])
            description.append(news_json['json_object']['description'])
            category.append(news_json['json_object']['category'])
            url.append(news_json['json_object']['url'])
            image.append(news_json['json_object']['image']['thumbnail']['contentUrl'])
            provider.append(news_json['json_object']['provider'][0]['name'])
            datePublished.append(news_json['json_object']['datePublished'])
    except Exception as e:
        print(f"errors processing json: {e}")

StatementMeta(, , , Waiting, , Waiting)

In [42]:
datePublished

StatementMeta(, , , Waiting, , Waiting)

['2024-09-02T07:09:00.0000000Z', '2024-09-03T09:05:00.0000000Z']

In [56]:

from pyspark.sql.types import StructType,StructField, StringType,TimestampType

data = list(zip(title,description,category,url,image,provider,datePublished))

schema = StructType([
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("category", StringType(), True),
    StructField("url", StringType(), True),
    StructField("image", StringType(), True),
    StructField("provider", StringType(), True),
    StructField("datePublished", StringType(), True)
    ])

df_cleaned = spark.createDataFrame(data,schema =schema)

StatementMeta(, , , Waiting, , Waiting)

In [57]:
from pyspark.sql.functions import to_date, date_format
df_cleaned_final = df_cleaned.withColumn("datePublished", date_format(to_date("datePublished"), "dd-MM-yyyy"))

StatementMeta(, , , Waiting, , Waiting)

In [58]:
from pyspark.sql.utils import AnalysisException

table_name = "bing_lake_db.tbl_latest_news"

try: 
    df_cleaned_final.write.format("delta").saveAsTable(table_name)

except AnalysisException:
    print ("Table already exists!")

    df_cleaned_final.createOrReplaceTempView("vw_df_cleaned_final")
    # check if the url matched between the source and the target, if it matched then, check if any of the 
    # column values has changed and update the whole row accordingly, if not insert the whole row in the table
    spark.sql(f"""MERGE INTO {table_name} target_table
                     USING vw_df_cleaned_final source_view
                     ON source_view.url = target_table.url

                     WHEN MATCHED AND
                     (source_view.title <> target_table.title OR
                     source_view.description <> target_table.description OR
                     source_view.category <> target_table.category OR
                     source_view.image <> target_table.image OR
                     source_view.provider <> target_table.provider OR
                     source_view.datePublished <> target_table.datePublished)

                     THEN UPDATE SET *
                     WHEN NOT MATCHED THEN INSERT *

                """)


StatementMeta(, , , Waiting, , Waiting)

In [60]:
df=spark.sql("SELECT * FROM bing_lake_db.tbl_latest_news")

StatementMeta(, , , Waiting, , Waiting)

In [61]:
df

StatementMeta(, , , Waiting, , Waiting)

DataFrame[title: string, description: string, category: string, url: string, image: string, provider: string, datePublished: string]

In [35]:
# from pyspark.sql.functions import col,to_date 
# df=df.withColumn("datePublished",to_date(col("datePublished"),"dd-MMM-yyyy"))

StatementMeta(, , , Waiting, , Waiting)

In [45]:
display(df)


StatementMeta(, , , Waiting, , Waiting)

SynapseWidget(Synapse.DataFrame, 6d3081f0-d2da-44ff-b80a-d2c9ef776673)

In [62]:
df.printSchema()

StatementMeta(, , , Waiting, , Waiting)

root
 |-- title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- category: string (nullable = true)
 |-- url: string (nullable = true)
 |-- image: string (nullable = true)
 |-- provider: string (nullable = true)
 |-- datePublished: string (nullable = true)



In [38]:
# df.write.format('delta').mode("overwrite").option("overwriteSchema","True").saveAsTable(table_name)

StatementMeta(, , , Waiting, , Waiting)

In [63]:
# from pyspark.sql.functions import date_format

# # Format the 'datePublished' column as 'dd-MM-yyyy'
# df_cleaned_final = df_cleaned.withColumn("datePublished", date_format("datePublished", "dd-MM-yyyy"))

# # Show the result


StatementMeta(, , , Waiting, , Waiting)