In [None]:
# Specify the path to your JSON files
from pytz import timezone 
from datetime import datetime
ind_day = datetime.now(timezone("Asia/Kolkata")).strftime('%Y-%m-%d')
ind_time_hour = datetime.now(timezone("Asia/Kolkata")).strftime('%H')
path = f"Files/{ind_day}/{ind_time_hour}"

df = spark.read.option("multiline", "true").json(path)
# df now is a Spark DataFrame containing JSON data from "Files/Everyday_News_Data".

In [None]:
df=df.select(df.value)

In [None]:
from pyspark.sql.functions import explode
df_exploded=df.select(explode(df["value"]).alias("json_object"))

In [None]:
json_list=df_exploded.toJSON().collect()

In [None]:
import json

date_time=[]
title=[]
description=[]
category=[]
provider=[]

for json_str in json_list:
    article=json.loads(json_str)

    if article["json_object"].get("category"):

        date_time.append(article["json_object"]["datePublished"])
        title.append(article["json_object"]["name"])
        description.append(article["json_object"]["description"])
        category.append(article["json_object"]["category"])
        provider.append(article["json_object"]['provider'][0]['name'])

In [None]:
from pyspark.sql.types import StructType,StructField,StringType

#combine the lists
data = list(zip(date_time,title,description,category,provider))

#defineschema
schema=StructType([
    StructField("date_time",StringType(),True),
    StructField("title",StringType(),True),
    StructField("description",StringType(),True),
    StructField("category",StringType(),True),
    StructField("provider",StringType(),True)
])

df_cleaned = spark.createDataFrame(data,schema=schema)

In [None]:
from pyspark.sql.functions import to_date,date_format,col

df_cleaned=df_cleaned.withColumn("date_time",date_format(col('date_time'),"yyyy-MM-dd").alias('date_time').cast("date"))

In [None]:
# Dropping Duplicates
df_cleaned.dropDuplicates()
display(df_cleaned)

Writing the final dataframe to the lakehouse DB in a delta format

In [None]:
from pyspark.sql.utils import AnalysisException

try:
    table_name='bing_lake_db.bing_news_table'
    df_cleaned.write.format("delta").saveAsTable(table_name)
except AnalysisException:
    print("Table Already Exists")
    df_cleaned.createOrReplaceTempView("vw_df_cleaned")
    spark.sql(f"""merge into {table_name} target_table
                using vw_df_cleaned source_view
                on source_view.title=target_table.title
                when matched and
                target_table.date_time<>source_view.date_time or
                target_table.description<>source_view.description or
                target_table.category<>source_view.category or
                target_table.provider<>source_view.provider
                then update set *
                when not matched then insert *
                """)