In [0]:
%pip install faker

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
from faker import Faker
import pandas as pd

fake = Faker()
data = [{
    "Name": fake.name(),
    "Address": fake.address(),
    "Email": fake.email()
} for _ in range(10)]

df = pd.DataFrame(data)
df_spark = spark.createDataFrame(df)
df_spark.show()

+----------------+--------------------+--------------------+
|            Name|             Address|               Email|
+----------------+--------------------+--------------------+
| Scott Henderson|2737 Jesus Union ...|walshtiffany@exam...|
|Jeffrey Gonzales|PSC 2459, Box 972...|ymontgomery@examp...|
|   Alison Morris|0705 Wheeler Ligh...|matthew44@example...|
|    Misty Barron|71537 Powell Driv...|petersonjason@exa...|
|      Robert Ray|29501 Valdez Mead...|nelsonsteve@examp...|
| Lawrence Flores|1735 Timothy Exte...| dmurphy@example.com|
|    Jordan Baker|37027 Jackson Ter...|christophergarcia...|
|   Dustin Miller|60443 Luke Roads ...|nicholas46@exampl...|
|  Jenna Stephens|372 Sarah Light A...|paulchristensen@e...|
|Suzanne Gonzalez|442 Rachel Lane\n...|georgelester@exam...|
+----------------+--------------------+--------------------+



In [0]:
delta_path = "/tmp/fake_data_delta"

In [0]:
# Save as a Delta Table (managed table)
df_spark.write.format("delta").mode("overwrite").saveAsTable("fake_data_delta")

In [0]:
from delta.tables import DeltaTable

delta_table = DeltaTable.forName(spark, "fake_data_delta")
delta_table.toDF().show(truncate=False)

+----------------+----------------------------------------------------------+-----------------------------+------------------+
|Name            |Address                                                   |Email                        |Inserted_Timestamp|
+----------------+----------------------------------------------------------+-----------------------------+------------------+
|Scott Henderson |2737 Jesus Union Apt. 016\nEast Lancefort, NE 77931       |walshtiffany@example.net     |NULL              |
|Jeffrey Gonzales|PSC 2459, Box 9725\nAPO AP 53228                          |ymontgomery@example.net      |NULL              |
|Alison Morris   |0705 Wheeler Lights Suite 506\nLake Theresashire, RI 61825|matthew44@example.com        |NULL              |
|Misty Barron    |71537 Powell Drive Apt. 565\nEast Jamesborough, WV 29768  |petersonjason@example.com    |NULL              |
|Robert Ray      |29501 Valdez Meadows\nNew Renee, WI 27169                 |nelsonsteve@example.com      |NULL

In [0]:
df_spark.write.format("delta").mode("append").saveAsTable("fake_data_delta")

In [0]:
from delta.tables import DeltaTable

# Load the Delta Table
delta_table = DeltaTable.forName(spark, "fake_data_delta")

# Show latest contents
delta_table.toDF().show(truncate=False)

+----------------+----------------------------------------------------------+-----------------------------+------------------+
|Name            |Address                                                   |Email                        |Inserted_Timestamp|
+----------------+----------------------------------------------------------+-----------------------------+------------------+
|Scott Henderson |2737 Jesus Union Apt. 016\nEast Lancefort, NE 77931       |walshtiffany@example.net     |NULL              |
|Jeffrey Gonzales|PSC 2459, Box 9725\nAPO AP 53228                          |ymontgomery@example.net      |NULL              |
|Alison Morris   |0705 Wheeler Lights Suite 506\nLake Theresashire, RI 61825|matthew44@example.com        |NULL              |
|Misty Barron    |71537 Powell Drive Apt. 565\nEast Jamesborough, WV 29768  |petersonjason@example.com    |NULL              |
|Robert Ray      |29501 Valdez Meadows\nNew Renee, WI 27169                 |nelsonsteve@example.com      |NULL

In [0]:
from faker import Faker
import pandas as pd
from datetime import datetime
from pyspark.sql.functions import current_timestamp

fake = Faker()

def append_fake_data(n_rows=5):
    data = [{
        "Name": fake.name(),
        "Address": fake.address(),
        "Email": fake.email()
    } for _ in range(n_rows)]

    df = pd.DataFrame(data)
    df_spark = spark.createDataFrame(df)

    # ✅ Safe way: Only add Inserted_Timestamp if not already present
    if "Inserted_Timestamp" not in df_spark.columns:
        df_spark = df_spark.withColumn("Inserted_Timestamp", current_timestamp())

    # Append to Delta Table
    df_spark.write.format("delta").mode("append").saveAsTable("fake_data_delta")
    print(f"{n_rows} fake rows appended at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

In [0]:
# Show current Delta table schema
delta_table.toDF().printSchema()

root
 |-- Name: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Inserted_Timestamp: timestamp (nullable = true)



In [0]:
from delta.tables import DeltaTable

delta_table = DeltaTable.forName(spark, "fake_data_delta")
delta_table.toDF().orderBy("Inserted_Timestamp", ascending=False).show(truncate=False)

+----------------+----------------------------------------------------------+-----------------------------+------------------+
|Name            |Address                                                   |Email                        |Inserted_Timestamp|
+----------------+----------------------------------------------------------+-----------------------------+------------------+
|Alison Morris   |0705 Wheeler Lights Suite 506\nLake Theresashire, RI 61825|matthew44@example.com        |NULL              |
|Dustin Miller   |60443 Luke Roads Suite 101\nTeresatown, GU 71393          |nicholas46@example.com       |NULL              |
|Scott Henderson |2737 Jesus Union Apt. 016\nEast Lancefort, NE 77931       |walshtiffany@example.net     |NULL              |
|Misty Barron    |71537 Powell Drive Apt. 565\nEast Jamesborough, WV 29768  |petersonjason@example.com    |NULL              |
|Jenna Stephens  |372 Sarah Light Apt. 592\nRickyport, TX 12219             |paulchristensen@example.net  |NULL

In [0]:
spark.conf.set("spark.sql.session.timeZone", "Asia/Kolkata")

In [0]:
delta_table.history().select("version", "timestamp", "operation").show(truncate=False)

+-------+-------------------+---------------------------------+
|version|timestamp          |operation                        |
+-------+-------------------+---------------------------------+
|3      |2025-07-10 18:25:43|WRITE                            |
|2      |2025-07-10 18:25:36|CREATE OR REPLACE TABLE AS SELECT|
|1      |2025-07-10 18:25:04|WRITE                            |
|0      |2025-07-10 18:24:08|CREATE OR REPLACE TABLE AS SELECT|
+-------+-------------------+---------------------------------+



In [0]:
spark.read.format("delta").option("versionAsOf", 0).table("fake_data_delta").show(truncate=False)

+-------------+--------------------------------------------------------+----------------------------+--------------------------+
|Name         |Address                                                 |Email                       |Inserted_Timestamp        |
+-------------+--------------------------------------------------------+----------------------------+--------------------------+
|Robert Martin|PSC 7915, Box 3886\nAPO AE 76782                        |dferguson@example.org       |2025-07-10 18:24:06.386051|
|Megan Potter |16950 Emma Run\nLake Kennethside, WI 80137              |avilamatthew@example.net    |2025-07-10 18:24:06.386051|
|Matthew Park |0317 Archer Junctions\nBeardhaven, NY 32943             |wsmith@example.com          |2025-07-10 18:24:06.386051|
|Cesar Edwards|12042 Moore Forges Suite 813\nNew Jessica, MH 75448     |katherineellison@example.org|2025-07-10 18:24:06.386051|
|Megan Davis  |3808 Michael Brooks Apt. 811\nNorth Donaldland, WA 60019|freemanjames@example.com 

In [0]:
delta_table.toDF().orderBy("Inserted_Timestamp", ascending=False).show(truncate=False)

+----------------+----------------------------------------------------------+-----------------------------+------------------+
|Name            |Address                                                   |Email                        |Inserted_Timestamp|
+----------------+----------------------------------------------------------+-----------------------------+------------------+
|Alison Morris   |0705 Wheeler Lights Suite 506\nLake Theresashire, RI 61825|matthew44@example.com        |NULL              |
|Dustin Miller   |60443 Luke Roads Suite 101\nTeresatown, GU 71393          |nicholas46@example.com       |NULL              |
|Scott Henderson |2737 Jesus Union Apt. 016\nEast Lancefort, NE 77931       |walshtiffany@example.net     |NULL              |
|Misty Barron    |71537 Powell Drive Apt. 565\nEast Jamesborough, WV 29768  |petersonjason@example.com    |NULL              |
|Jenna Stephens  |372 Sarah Light Apt. 592\nRickyport, TX 12219             |paulchristensen@example.net  |NULL

In [0]:
# Step 1: Email config
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText

sender_email = "yamanrag.prakash@gmail.com"
receiver_email = "yamanrag.prakash@gmail.com"
app_password = "onux xfoi eklo thco"  # Gmail App Password

# Step 2: Email send function
def send_email(subject, html_table):
    msg = MIMEMultipart()
    msg['From'] = sender_email
    msg['To'] = receiver_email
    msg['Subject'] = subject

    body = f"""
    <html>
      <body>
        <h3>New Data Appended to Delta Table</h3>
        {html_table}
      </body>
    </html>
    """
    msg.attach(MIMEText(body, 'html'))

    try:
        with smtplib.SMTP('smtp.gmail.com', 587) as server:
            server.starttls()
            server.login(sender_email, app_password)
            server.send_message(msg)
        print("✅ Email sent!")
    except Exception as e:
        print("❌ Email failed:", e)

# Step 3: Get last 5 rows added & send email
new_rows = delta_table.toDF().orderBy("Inserted_Timestamp", ascending=False).limit(5)
html_table = new_rows.toPandas().to_html(index=False, escape=False)
send_email("Delta Append Notification", html_table)

✅ Email sent!


In [0]:
from delta.tables import DeltaTable
delta_table = DeltaTable.forName(spark, "fake_data_delta")

In [0]:
# Show Delta Table version history
delta_table.history().show()

+-------+-------------------+--------------+--------------------+--------------------+--------------------+----+--------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|version|          timestamp|        userId|            userName|           operation| operationParameters| job|notebook|           clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+-------------------+--------------+--------------------+--------------------+--------------------+----+--------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|      3|2025-07-10 18:25:43|75174250834895|yamanrag.prakash@...|               WRITE|{mode -> Append, ...|NULL|    NULL|0710-104320-icmdx...|          2|WriteSerializable|         true|{numFiles -> 1, n...|        NULL|Databricks-Runtim...|
|      2|2025-07-10 18:25:36|751

In [0]:
spark.conf.set("spark.sql.session.timeZone", "Asia/Kolkata")

In [0]:
%sql
DROP TABLE IF EXISTS fake_data_delta

In [0]:
from faker import Faker
import pandas as pd
from pyspark.sql.functions import current_timestamp

fake = Faker()

# Generate fake data
data = [{
    "Name": fake.name(),
    "Address": fake.address(),
    "Email": fake.email()
} for _ in range(5)]

df = pd.DataFrame(data)
df_spark = spark.createDataFrame(df)

# Add proper timestamp column
df_spark = df_spark.withColumn("Inserted_Timestamp", current_timestamp())

# Overwrite the table with correct schema
df_spark.write.format("delta").mode("overwrite").saveAsTable("fake_data_delta")

In [0]:
from datetime import datetime

def append_fake_data(n_rows=5):
    data = [{
        "Name": fake.name(),
        "Address": fake.address(),
        "Email": fake.email()
    } for _ in range(n_rows)]

    df = pd.DataFrame(data)
    df_spark = spark.createDataFrame(df)

    # Add timestamp only if not already present
    if "Inserted_Timestamp" not in df_spark.columns:
        df_spark = df_spark.withColumn("Inserted_Timestamp", current_timestamp())

    df_spark.write.format("delta").mode("append").saveAsTable("fake_data_delta")
    print(f"{n_rows} fake rows appended at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

In [0]:
append_fake_data(5)

5 fake rows appended at 2025-07-10 12:56:57
