**Read Json file and store the data in dataframe**

In [1]:
df = spark.read.option("multiline", "true").json("Files/bing_news_latest.json")
# df now is a Spark DataFrame containing JSON data from "Files/bing_news_latest.json".



StatementMeta(, a5e7b9c7-6f64-434b-9100-d3833c2093ce, 3, Finished, Available)

**Convert all the json objects in a single row to individual rows**

In [2]:
from pyspark.sql.functions import explode
df_exploded = df.select(explode(df['value']).alias("json_object"))
json_list = df_exploded.toJSON().collect()

StatementMeta(, a5e7b9c7-6f64-434b-9100-d3833c2093ce, 4, Finished, Available)

In [3]:
import json
title = []
description = []
category = []
url =[]
image = []
provider = []
datePublished = []

for json_str in json_list:
    try:
        article = json.loads(json_str)
        if article['json_object'].get('category') and article['json_object'].get('image', {}).get('thumbnail', {}).get('contentUrl', {}):
            title.append(article['json_object']['name'])
            description.append(article['json_object']['description'])
            category.append(article['json_object']['category'])
            url.append(article['json_object']['url'])
            image.append(article['json_object']['image']['thumbnail']['contentUrl'])
            provider.append(article['json_object']['provider'])
            datePublished.append(article['json_object']['datePublished'])
    except Exception as e:
        print(f" Error occured in {e}")
    



StatementMeta(, a5e7b9c7-6f64-434b-9100-d3833c2093ce, 5, Finished, Available)

In [4]:
from pyspark.sql.types import StructType, StructField, StringType

data = list(zip(title, description, category, url, image, provider, datePublished))

schema = StructType([
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("category", StringType(), True),
    StructField("url", StringType(), True),
    StructField("image", StringType(), True),
    StructField("provider", StringType(), True),
    StructField("datePublished", StringType(), True)
])

df_cleaned = spark.createDataFrame(data, schema = schema)


StatementMeta(, a5e7b9c7-6f64-434b-9100-d3833c2093ce, 6, Finished, Available)

In [5]:
from pyspark.sql.functions import to_date, date_format
df_cleaned_final = df_cleaned.withColumn("datePublished", date_format(to_date("datePublished"), "dd-MMM-yyyy"))

StatementMeta(, a5e7b9c7-6f64-434b-9100-d3833c2093ce, 7, Finished, Available)

**Convert datePublished datatype from string to Date format for PowerBi dashboard filtering**

In [6]:
from pyspark.sql.functions import col, date_format
df_cleaned_final = df_cleaned_final.withColumn("datePublished", to_date(col("datePublished"), "dd-MMM-yyyy"))

StatementMeta(, a5e7b9c7-6f64-434b-9100-d3833c2093ce, 8, Finished, Available)

**Type1 Merge**

In [7]:
from pyspark.sql.utils import AnalysisException
try:
    table_name = 'bing_news_db_latest.tbl_latest_news'
    df_cleaned_final.write.format('delta').saveAsTable(table_name)
except AnalysisException:
    df_cleaned_final.createOrReplaceTempView("vw_df_cleaned_final")
    spark.sql(f""" MERGE INTO {table_name} target_table
                   USING vw_df_cleaned_final source_vw
                   on source_vw.url = target_table.url
                   WHEN MATCHED AND
                   source_vw.title <> target_table.title OR
                   source_vw.description <> target_table.description OR
                   source_vw.category <> target_table.category OR
                   source_vw.image <> target_table.image OR
                   source_vw.provider <> target_table.provider OR
                   source_vw.datePublished <> target_table.datePublished
                   THEN UPDATE SET *
                   WHEN NOT MATCHED THEN INSERT *

                """)
    



StatementMeta(, a5e7b9c7-6f64-434b-9100-d3833c2093ce, 9, Finished, Available)