#### **Read the JSON file as a Dataframe**

In [92]:
df = spark.read.option("multiline", "true").json("Files/latest-news.json")
# df now is a Spark DataFrame containing JSON data from "Files/latest-news.json".
display(df)

StatementMeta(, 1818d2d0-a249-4a16-8e84-31142d740696, 94, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 38e41217-4d94-4acf-aca0-5afa5223b148)

#### **Selecting just the Value column from the dataframe**

In [93]:
df= df.select("results")
#display(df)

StatementMeta(, 1818d2d0-a249-4a16-8e84-31142d740696, 95, Finished, Available, Finished)

#### **Explode the JSON column**

In [94]:
from pyspark.sql.functions import explode
df_explode = df.select(explode(df["results"]).alias("json_object"))
#display(df_explode)

StatementMeta(, 1818d2d0-a249-4a16-8e84-31142d740696, 96, Finished, Available, Finished)

#### **Converting the Exploded JSON Dataframe to a single JSON string list**

In [95]:
json_list = df_explode.toJSON().collect()

StatementMeta(, 1818d2d0-a249-4a16-8e84-31142d740696, 97, Finished, Available, Finished)

#### **Testing the JSON String list**

In [96]:
#print(json_list[25])

StatementMeta(, 1818d2d0-a249-4a16-8e84-31142d740696, 98, Finished, Available, Finished)

In [97]:
import json

news_json = json.loads(json_list[25])

StatementMeta(, 1818d2d0-a249-4a16-8e84-31142d740696, 99, Finished, Available, Finished)

In [98]:
# print(news_json["json_object"]["title"])
# print(news_json["json_object"]["description"])
# print(news_json["json_object"]["type"])
# print(news_json["json_object"]["age"])
# print(news_json["json_object"]["url"])
# print(news_json["json_object"]["meta_url"]["hostname"])
# print(news_json["json_object"]["thumbnail"]["src"])
# print(news_json["json_object"]["page_age"])
# print(news_json["json_object"]["meta_url"]["path"])

StatementMeta(, 1818d2d0-a249-4a16-8e84-31142d740696, 100, Finished, Available, Finished)

#### **Processing the JSON property to List**

In [99]:
import json

title = []

description = []

category = []

url = []

image = []

provider = []

datePublished = []

for json_str in json_list:

    try:

        article = json.loads(json_str)

        obj = article["json_object"]

        if obj.get("description") and obj.get("thumbnail", {}).get("src"):

            title.append(obj.get("title"))  # will be None

            description.append(obj.get("description"))

            category.append(obj.get("meta_url", {}).get("path"))

            url.append(obj.get("url"))

            image.append(obj.get("thumbnail", {}).get("src"))

            provider.append(obj.get("meta_url", {}).get("hostname"))

            datePublished.append(obj.get("page_age"))

    except Exception as e:

        print(f"Error processing JSON object: {e}")
 

StatementMeta(, 1818d2d0-a249-4a16-8e84-31142d740696, 101, Finished, Available, Finished)

#### **Converting the List to a Dataframe**

In [100]:
from pyspark.sql.types import StructType, StructField, StringType

# Combine the lists
data = list(zip(title,description,category,url,image,provider,datePublished))

# Define Schema
schema = StructType([
    StructField("title",StringType(),True),
    StructField("description",StringType(),True),
    StructField("category",StringType(),True),
    StructField("url",StringType(),True),
    StructField("image",StringType(),True),
    StructField("provider",StringType(),True),
    StructField("datePublished",StringType(),True)
])

# Create DataFrame
df_cleaned = spark.createDataFrame(data,schema=schema)

StatementMeta(, 1818d2d0-a249-4a16-8e84-31142d740696, 102, Finished, Available, Finished)

In [101]:
display(df_cleaned.limit(5))

StatementMeta(, 1818d2d0-a249-4a16-8e84-31142d740696, 103, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, e6ff91d1-af34-48eb-91e2-32cd74d182f8)

#### **Processing the Date column**

In [102]:
from pyspark.sql.functions import to_date, date_format

df_cleaned_final = df_cleaned.withColumn("datePublished",date_format(to_date("datePublished"),"dd-MMM-yyy"))

StatementMeta(, 1818d2d0-a249-4a16-8e84-31142d740696, 104, Finished, Available, Finished)

In [103]:
display(df_cleaned_final)

StatementMeta(, 1818d2d0-a249-4a16-8e84-31142d740696, 105, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, c20e9369-9471-46a2-82bf-7f49babd0139)

#### **Cleaning Category column**

In [104]:
from pyspark.sql.functions import col, regexp_replace, split, trim, when

c = col("category").cast("string")

# Normalize any 'greater-than' representations to a literal '>'

c = regexp_replace(c, r"(?i)&gt;", ">")           # HTML entity

c = regexp_replace(c, r"\\u003e", ">")           # literal \u003e (escaped)

c = regexp_replace(c, r"\u003e", ">")            # unicode escape rendered

c = regexp_replace(c, r"[›»]", ">")              # common arrow glyphs

# Normalize spacing: " > " -> ">"

c = regexp_replace(c, r"\s*>\s*", ">")

# Remove leading '>' if present

c = regexp_replace(c, r"^>+", "")

parts = split(trim(c), ">")

df_final = df_cleaned_final.withColumn(

    "category_clean",

    when(

        parts.getItem(0).rlike(r"^\d{4}$") &

        parts.getItem(1).rlike(r"^\d{1,2}$") &

        parts.getItem(2).rlike(r"^\d{1,2}$"),

        parts.getItem(3)   # skip YYYY>MM>DD>

    ).otherwise(parts.getItem(0))

)
 

 

StatementMeta(, 1818d2d0-a249-4a16-8e84-31142d740696, 106, Finished, Available, Finished)

In [105]:
from pyspark.sql.functions import lower, trim, col
df_final = df_final.withColumn(
   "category_clean",
   lower(trim(col("category_clean")))
)

StatementMeta(, 1818d2d0-a249-4a16-8e84-31142d740696, 107, Finished, Available, Finished)

In [106]:
from pyspark.sql.functions import when, length
df_final = df_final.withColumn(
   "category_clean",
   when(length(col("category_clean")) > 25, None)
   .otherwise(col("category_clean"))
)

StatementMeta(, 1818d2d0-a249-4a16-8e84-31142d740696, 108, Finished, Available, Finished)

In [107]:
from pyspark.sql.functions import when

df_final = df_final.withColumn(

    "category",

    when(col("category_clean").isin("us", "live"), "us")

    .when(col("category_clean") == "national-security", "politics")

    .when(col("category_clean") == "politics-news", "politics")

    .otherwise(col("category_clean"))

)
 

StatementMeta(, 1818d2d0-a249-4a16-8e84-31142d740696, 109, Finished, Available, Finished)

In [108]:
df_final.groupBy("category").count().orderBy("count", ascending=False).show()
 

StatementMeta(, 1818d2d0-a249-4a16-8e84-31142d740696, 110, Finished, Available, Finished)

+----------------+-----+
|        category|count|
+----------------+-----+
|            news|   15|
|              us|    9|
|        politics|    5|
|           world|    4|
|         us-news|    3|
|         weather|    2|
|        homenews|    2|
|highschoolsports|    2|
|           legal|    1|
|            NULL|    1|
|    live-updates|    1|
|      california|    1|
|              en|    1|
|        newshour|    1|
|        athletic|    1|
|      chelsea-fc|    1|
|            post|    1|
|             nba|    1|
|           crime|    1|
|            heat|    1|
+----------------+-----+
only showing top 20 rows



In [109]:
df_final = df_final.drop("category_clean")

StatementMeta(, 1818d2d0-a249-4a16-8e84-31142d740696, 111, Finished, Available, Finished)

In [110]:
from pyspark.sql.functions import col, regexp_replace, lower, trim
df_final = df_final.withColumn(
   "provider",
   lower(
       trim(
           regexp_replace(
               regexp_replace(col("provider"), r"^www\.", ""),
               r"(\.go)?\.(com|org|net)$",
               ""
           )
       )
   )
)

StatementMeta(, 1818d2d0-a249-4a16-8e84-31142d740696, 112, Finished, Available, Finished)

In [111]:
display(df_final)

StatementMeta(, 1818d2d0-a249-4a16-8e84-31142d740696, 113, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, fcd32a98-decc-4cc5-87b3-614b8ef5d57a)

#### **Writing the Final Dataframe to the Lakehouse DB in a Delta format**

In [112]:
from pyspark.sql.utils import AnalysisException

try:
    table_name = 'News_Lake_db.dbo.tbl_latest_news'
    df_final.write.format("delta").saveAsTable(table_name)

except AnalysisException:

    print("Table Already Exists")

    df_final.createOrReplaceTempView("vw_df_final")

    spark.sql(f"""    MERGE INTO {table_name} target_table
                      USING vw_df_final source_view
                      ON source_view.url = target_table.url
                      WHEN MATCHED AND
                      source_view.title <> target_table.title OR
                      source_view.description <> target_table.description OR
                      source_view.category <> target_table.category OR
                      source_view.image <> target_table.image OR
                      source_view.provider <> target_table.provider OR
                      source_view.datePublished <> target_table.datePublished 
                      
                      THEN UPDATE SET *

                      WHEN NOT MATCHED THEN INSERT *
                 """ )


StatementMeta(, 1818d2d0-a249-4a16-8e84-31142d740696, 114, Finished, Available, Finished)