In [1]:
df = spark.read.option("multiline", "true").json("Files/bing-latest-news.json")
display(df)

StatementMeta(, d2de2d3b-f798-451d-b4f5-105f221f46e1, 3, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 19ac064a-ea39-4525-9b87-698e759ac6e6)

In [2]:
from pyspark.sql.functions import explode
df_exploded = df.select(explode(df["organic_results"]).alias("json_object"))

StatementMeta(, d2de2d3b-f798-451d-b4f5-105f221f46e1, 4, Finished, Available, Finished)

In [3]:
display(df_exploded)

StatementMeta(, d2de2d3b-f798-451d-b4f5-105f221f46e1, 5, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 7f10f553-fd36-4e7d-9017-1d87df36dc62)

In [4]:
df = df_exploded.select("json_object.*")

StatementMeta(, d2de2d3b-f798-451d-b4f5-105f221f46e1, 6, Finished, Available, Finished)

In [5]:
display(df)

StatementMeta(, d2de2d3b-f798-451d-b4f5-105f221f46e1, 7, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 050bf6c0-25a6-414b-89d3-9752b8336728)

In [6]:
from pyspark.sql import functions as F
from datetime import datetime, timedelta
import re

# -----------------------------
# Step 1: UDF to convert relative date
# -----------------------------
def convert_relative_date(val):
    if val is None or str(val).strip() == "":
        return None
    val_str = str(val).strip().lower()
    now = datetime.now()

    m = re.match(r"^(\d+)([dh])$", val_str)
    if m:
        num, unit = int(m.group(1)), m.group(2)
        if unit == "d":
            return now - timedelta(days=num)
        elif unit == "h":
            return now - timedelta(hours=num)

    # Try parsing as absolute date
    try:
        return datetime.strptime(val_str, "%Y-%m-%d")
    except:
        return None


convert_relative_date_udf = F.udf(convert_relative_date, "timestamp")
df = df.withColumn("date_clean", convert_relative_date_udf(F.col("date")))

StatementMeta(, d2de2d3b-f798-451d-b4f5-105f221f46e1, 8, Finished, Available, Finished)

In [7]:
display(df)

StatementMeta(, d2de2d3b-f798-451d-b4f5-105f221f46e1, 9, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 982f5bfd-9098-43b3-a2aa-5ef60f586f72)

In [8]:
df = df.drop("date", "favicon")

StatementMeta(, d2de2d3b-f798-451d-b4f5-105f221f46e1, 10, Finished, Available, Finished)

In [9]:
df = df.withColumn(
    "thumbnail",
    F.when(F.col("thumbnail").isNull(), F.lit("https://www.bing.com/th?id=OVFT.placeholder&pid=News"))
    .otherwise(F.col("thumbnail"))
)

StatementMeta(, d2de2d3b-f798-451d-b4f5-105f221f46e1, 11, Finished, Available, Finished)

In [10]:
from pyspark.sql.functions import col, year, month, dayofmonth, date_format, hour, to_date

df = df.withColumn("date_only", F.to_date(F.col("date_clean")))
df = df.withColumn("year", F.year(F.col("date_clean")))
df = df.withColumn("month", F.month(F.col("date_clean")))
df = df.withColumn("day", F.dayofmonth(F.col("date_clean")))
df = df.withColumn("weekday", F.date_format(F.col("date_clean"), "EEEE"))
df = df.withColumn("hour", F.hour(F.col("date_clean")))

StatementMeta(, d2de2d3b-f798-451d-b4f5-105f221f46e1, 12, Finished, Available, Finished)

In [11]:
display(df)

StatementMeta(, d2de2d3b-f798-451d-b4f5-105f221f46e1, 13, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 24c71579-ee04-43a3-9963-95882e819400)

In [12]:
df.columns

StatementMeta(, d2de2d3b-f798-451d-b4f5-105f221f46e1, 14, Finished, Available, Finished)

['link',
 'position',
 'snippet',
 'source',
 'thumbnail',
 'title',
 'date_clean',
 'date_only',
 'year',
 'month',
 'day',
 'weekday',
 'hour']

In [13]:
df.write.format("delta").mode("overwrite").saveAsTable("tbl_latest_news")

StatementMeta(, d2de2d3b-f798-451d-b4f5-105f221f46e1, 15, Finished, Available, Finished)

**Incremental Load**

In [14]:
from delta.tables import DeltaTable
from pyspark.sql import functions as F


delta_path = "abfss://38f83567-163a-49e1-866b-5a801baf7318@onelake.dfs.fabric.microsoft.com/a70173c4-1f34-4d8a-94f1-2e274b2ecc98/Tables/tbl_latest_news"  

# Table name registered in the metastore
table_name = "tbl_latest_news"


# Step 1: Filter out invalid rows

df = df.filter(F.col("link").isNotNull())  # remove rows with null link


# Step 2: Check if Delta table exists

if DeltaTable.isDeltaTable(spark, delta_path):
    # Table exists → do MERGE
    deltaTable = DeltaTable.forPath(spark, delta_path)

    # Match schema (avoid mismatches)
    target_cols = set(deltaTable.toDF().columns)
    source_cols = set(df.columns)
    common_cols = list(target_cols.intersection(source_cols))
    
    # Columns to update on match (exclude PK)
    update_dict = {c: f"src.{c}" for c in common_cols if c != "link"}
    
    # Columns to insert on no match (include PK)
    insert_dict = {c: f"src.{c}" for c in common_cols}

    # Perform merge
    (deltaTable.alias("tgt")
        .merge(df.alias("src"), "tgt.link = src.link")
        .whenMatchedUpdate(set=update_dict)
        .whenNotMatchedInsert(values=insert_dict)
        .execute()
    )
    print("✅ Incremental load merged into tbl_latest_news")

else:
    # First load → write dataframe as Delta table
    (df.write
        .format("delta")
        .mode("overwrite")
        .option("path", delta_path)   # physical path
        .saveAsTable(table_name)      # register in metastore
    )
    print("✅ First load created tbl_latest_news")


StatementMeta(, d2de2d3b-f798-451d-b4f5-105f221f46e1, 16, Finished, Available, Finished)

✅ Incremental load merged into tbl_latest_news


In [18]:
%%sql
SELECT * FROM tbl_latest_news

StatementMeta(, d2de2d3b-f798-451d-b4f5-105f221f46e1, 20, Finished, Available, Finished)

<Spark SQL result set with 37 rows and 13 fields>