<a href="https://colab.research.google.com/github/jggomez/spark-demo/blob/main/Spark_Reading_Streaming_Examples.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Install the libraries**

In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# **Create a session**

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import array, create_map, col, lit, when
from pyspark.sql.types import BinaryType, StringType
import uuid

In [3]:
spark_session = SparkSession \
  .builder \
  .appName("spark_streaming") \
  .getOrCreate()


In [18]:
project_number = 823002731253
location = "us-central1-a"
topic_id = "log-messages-topic"

# **Writing From Pub/Sub - Spark Streaming**

In [21]:
dataframe = spark_session.readStream.format("rate").option("rowsPerSecond", 1).load()
dataframe

DataFrame[timestamp: timestamp, value: bigint]

In [22]:
#https://github.com/googleapis/java-pubsublite-spark#data-schema
dataframe = (
    dataframe.withColumn("key", lit("example").cast(BinaryType()))
    .withColumn("data", col("value").cast(StringType()).cast(BinaryType()))
    .withColumnRenamed("timestamp", "event_timestamp")
    .withColumn(
        "attributes",
        create_map(
            lit("key1"),
            array(when(col("value") % 2 == 0, b"even").otherwise(b"odd")),
        ),
    )
    .drop("value")
)
dataframe.printSchema()

root
 |-- event_timestamp: timestamp (nullable = true)
 |-- key: binary (nullable = false)
 |-- data: binary (nullable = true)
 |-- attributes: map (nullable = false)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = false)
 |    |    |-- element: binary (containsNull = false)



In [None]:
query = (
    dataframe.writeStream.format("pubsublite")
    .option(
        "pubsublite.topic",
        f"projects/{project_number}/locations/{location}/topics/{topic_id}",
    )
    .option("checkpointLocation", "/tmp/app" + uuid.uuid4().hex)
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

In [None]:
query.awaitTermination(60)
query.stop()

# **Reading From Pub/Sub - Spark Streaming**