In [1]:
# !pip install delta-spark
# !pip install pyspark==3.5.0
# !pip install google-cloud-pubsub

In [2]:
from delta import *
import pyspark

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()



In [3]:
import shutil
data = [(1, 'Alice', '555-555-5555'), (2, 'Bob', '123-456-7890'), (3, 'Charlie', '098-876-5432'), (4, 'Benny', '000-000-0000')]
columns = ['id', 'name', 'phone']
df = spark.createDataFrame(data, columns)

delta_log_path = "delta"

# Forcefully delete the directory
shutil.rmtree(delta_log_path, ignore_errors=True)

df.write.format("delta").mode("overwrite").save("delta")

In [4]:
# Load the Delta table
delta_table = DeltaTable.forPath(spark, "delta")

# Read the data from the Delta table
delta_data = delta_table.toDF()

# Show the data
delta_data.show()


+---+-------+------------+
| id|   name|       phone|
+---+-------+------------+
|  3|Charlie|098-876-5432|
|  4|  Benny|000-000-0000|
|  1|  Alice|555-555-5555|
|  2|    Bob|123-456-7890|
+---+-------+------------+



In [5]:
from google.cloud import pubsub_v1
import json
import os

# Set the path to your service account key JSON file
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "deep-theorem-402423-c70baa940bdf.json"

# Replace with your Google Cloud project ID and subscription name
project_id = "deep-theorem-402423"
subscription_name = "practice-sub"

# Create a Pub/Sub subscriber client
subscriber = pubsub_v1.SubscriberClient()

# Create a subscription path
subscription_path = subscriber.subscription_path(project_id, subscription_name)

def callback(message):
    print(f"Received message: {message.data}")
    message.ack()  # Acknowledge the message to remove it from the subscription

# Open the subscription to start receiving messages
subscriber.subscribe(subscription_path, callback=callback)

<StreamingPullFuture at 0x7fbcfe90e7d0 state=pending>

In [6]:
from google.cloud import pubsub_v1

# Replace these variables with your own values
project_id = "deep-theorem-402423"
topic_name = "practice"

# Create a Pub/Sub client
publisher = pubsub_v1.PublisherClient()

# Create a Topic path
topic_path = publisher.topic_path(project_id, topic_name)

# Load data from your Delta table into a DataFrame
delta_log_path = "delta"  # Replace with the actual path to your Delta table

#Read delta table
delta_df = spark.read.format("delta").load(delta_log_path)

# Convert the DataFrame to a list of dictionaries (each row as a dictionary)
data_to_publish = delta_df.toJSON().collect()

# Publish each record to Pub/Sub
for record in data_to_publish:
    # Convert the JSON record to bytes
    message_data = json.dumps(record).encode("utf-8")

    # Publish the message
    publisher.publish(topic_path, data=message_data)

    print(f"Published message: {record}")

Published message: {"id":3,"name":"Charlie","phone":"098-876-5432"}
Published message: {"id":4,"name":"Benny","phone":"000-000-0000"}
Published message: {"id":1,"name":"Alice","phone":"555-555-5555"}
Published message: {"id":2,"name":"Bob","phone":"123-456-7890"}
