In [1]:
# Welcome to your new notebook
# Type here in the cell editor to add code!
df = spark.read.option("multiline", "true").json("Files/bing-latest-news.json")
# df now is a Spark DataFrame containing JSON data from "Files/bing-latest-news.json".
display(df)

StatementMeta(, f077c265-d978-493e-8165-f7377ce6bd25, 3, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 07d7acda-ab2b-4c22-bbf8-fa109e607d81)

In [2]:
from pyspark.sql.functions import explode
df_exploded = df.select(explode(df["organic_results"]).alias("json_object"))

StatementMeta(, f077c265-d978-493e-8165-f7377ce6bd25, 4, Finished, Available, Finished)

In [3]:
display(df_exploded)

StatementMeta(, f077c265-d978-493e-8165-f7377ce6bd25, 5, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 871cee74-1a75-469e-bea1-e5719705730a)

In [4]:
df = df_exploded.select("json_object.*")

StatementMeta(, f077c265-d978-493e-8165-f7377ce6bd25, 6, Finished, Available, Finished)

In [5]:
display(df)

StatementMeta(, f077c265-d978-493e-8165-f7377ce6bd25, 7, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 099bf750-32fa-40a8-98ca-232d8b188113)

In [6]:
from pyspark.sql import functions as F
from datetime import datetime, timedelta
import re

# UDF to convert relative date into proper timestamp
def convert_relative_date(val):
    if val is None or str(val).strip() == "":
        return None
    val_str = str(val).strip().lower()
    now = datetime.now()

    m = re.match(r"^(\d+)([dh])$", val_str)
    if m:
        num, unit = int(m.group(1)), m.group(2)
        if unit == "d":
            return now - timedelta(days=num)
        elif unit == "h":
            return now - timedelta(hours=num)

    # Try parsing as absolute date
    try:
        return datetime.strptime(val_str, "%Y-%m-%d")
    except:
        return None

# Register UDF
convert_relative_date_udf = F.udf(convert_relative_date, "timestamp")

# Apply transformation
df = df.withColumn("date_clean", convert_relative_date_udf(F.col("date")))

# Show results
df.select("date", "date_clean").show(10, truncate=False)


StatementMeta(, f077c265-d978-493e-8165-f7377ce6bd25, 8, Finished, Available, Finished)

+----+--------------------------+
|date|date_clean                |
+----+--------------------------+
|18m |NULL                      |
|19h |2025-09-21 21:43:46.810927|
|1d  |2025-09-21 16:43:46.810961|
|1h  |2025-09-22 15:43:46.810976|
|49m |NULL                      |
|1d  |2025-09-21 16:43:46.811008|
|20h |2025-09-21 20:43:46.811029|
|2d  |2025-09-20 16:43:46.811047|
|5h  |2025-09-22 11:43:46.811065|
|2d  |2025-09-20 16:43:46.811076|
+----+--------------------------+
only showing top 10 rows



In [7]:
display(df)

StatementMeta(, f077c265-d978-493e-8165-f7377ce6bd25, 9, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 755b87db-67f8-4b98-adf9-cd8e81704d64)

In [8]:
df = df.drop('favicon', 'date')

StatementMeta(, f077c265-d978-493e-8165-f7377ce6bd25, 10, Finished, Available, Finished)

In [9]:
from pyspark.sql.functions import col, when, lit

df = df.withColumn(
    "thumbnail",
    when(col("thumbnail").isNull(), lit("https://www.bing.com/th?id=OVFT.placeholder&pid=News"))
    .otherwise(col("thumbnail"))
)

StatementMeta(, f077c265-d978-493e-8165-f7377ce6bd25, 11, Finished, Available, Finished)

In [10]:
from pyspark.sql.functions import col, year, month, dayofmonth, date_format, hour, to_date

df = df.withColumn("date_only", to_date(col("date_clean")))
df = df.withColumn("year", year(col("date_clean")))
df = df.withColumn("month", month(col("date_clean")))
df = df.withColumn("day", dayofmonth(col("date_clean")))
df = df.withColumn("weekday", date_format(col("date_clean"), "EEEE"))
df = df.withColumn("hour", hour(col("date_clean")))

StatementMeta(, f077c265-d978-493e-8165-f7377ce6bd25, 12, Finished, Available, Finished)

In [11]:
display(df)

StatementMeta(, f077c265-d978-493e-8165-f7377ce6bd25, 13, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, e905d0c8-63d4-42fe-a0f9-3ebe9ac99281)

In [12]:
df.columns

StatementMeta(, f077c265-d978-493e-8165-f7377ce6bd25, 14, Finished, Available, Finished)

['link',
 'position',
 'snippet',
 'source',
 'thumbnail',
 'title',
 'date_clean',
 'date_only',
 'year',
 'month',
 'day',
 'weekday',
 'hour']

In [13]:
# Assume df is your transformed PySpark DataFrame
df.write.format("delta").mode("overwrite").saveAsTable("lakehouse1.tbl_latest_news")


StatementMeta(, f077c265-d978-493e-8165-f7377ce6bd25, 15, Finished, Available, Finished)

In [14]:
# Save as CSV inside the Files folder of Lakehouse
df.write.mode("overwrite").csv("Files/my_transformed_data_csv", header=True)


StatementMeta(, f077c265-d978-493e-8165-f7377ce6bd25, 16, Finished, Available, Finished)

In [15]:
from delta.tables import DeltaTable

table_name = "tbl_latest_news"
delta_path = "abfss://4e43def1-faad-4fd1-a990-2c79f595c988@onelake.dfs.fabric.microsoft.com/ee7fedf0-cea4-4fb2-8147-b4e9e5c012ea/Tables/tbl_latest_news" 

StatementMeta(, f077c265-d978-493e-8165-f7377ce6bd25, 17, Finished, Available, Finished)

In [16]:
from delta.tables import DeltaTable

# Deduped df is already ready from above step
# delta_path and table_name defined earlier

if DeltaTable.isDeltaTable(spark, delta_path):
    # Table exists → do MERGE
    deltaTable = DeltaTable.forPath(spark, delta_path)

    # Match schema (avoid mismatches)
    target_cols = set(deltaTable.toDF().columns)
    source_cols = set(df.columns)
    common_cols = list(target_cols.intersection(source_cols))
    update_dict = {c: f"src.{c}" for c in common_cols if c != "link"}  # exclude PK

    (deltaTable.alias("tgt")
        .merge(df.alias("src"), "tgt.link = src.link")
        .whenMatchedUpdate(set=update_dict)
        .whenNotMatchedInsert(values=update_dict)
        .execute()
    )
    print("✅ Incremental load merged into tbl_latest_news")

else:
    # First load → write dataframe as Delta table
    (df.write
        .format("delta")
        .mode("overwrite")
        .option("path", delta_path)   # physical path
        .saveAsTable(table_name)      # register in metastore
    )
    print("✅ First load created tbl_latest_news")


StatementMeta(, f077c265-d978-493e-8165-f7377ce6bd25, 18, Finished, Available, Finished)

✅ Incremental load merged into tbl_latest_news


In [17]:
# Clear cache & refresh metadata
spark.catalog.clearCache()
spark.sql("REFRESH TABLE tbl_latest_news")

StatementMeta(, f077c265-d978-493e-8165-f7377ce6bd25, 19, Finished, Available, Finished)

DataFrame[]

In [18]:
spark.sql("DROP TABLE IF EXISTS tbl_latest_news")

StatementMeta(, f077c265-d978-493e-8165-f7377ce6bd25, 20, Finished, Available, Finished)

DataFrame[]

In [19]:
(df.write
    .format("delta")
    .mode("overwrite")
    .option("path", delta_path)   # link table to this physical path
    .saveAsTable(table_name))     # register in metastore

StatementMeta(, f077c265-d978-493e-8165-f7377ce6bd25, 21, Finished, Available, Finished)

In [20]:
%%sql
SELECT * FROM tbl_latest_news

StatementMeta(, f077c265-d978-493e-8165-f7377ce6bd25, 22, Finished, Available, Finished)

<Spark SQL result set with 34 rows and 13 fields>