In [None]:
# Simple example for a stream in DataBricks notebook based on the stream in  py56_fake_stream.ipynb

In [0]:
from pyspark.sql import DataFrame, Column
from pyspark.sql.streaming import StreamingQuery
from pyspark.sql.functions import window

from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType, DoubleType

# for streams the data has to be precisely defined up front
schema = StructType([StructField("BeerID", LongType(), False),
                     StructField("DrinkingTime", TimestampType(), False),
                     StructField("DrinkingVolume", DoubleType(), False),
                     StructField("Consumer", StringType(), False)]
                    )

TABLE_NAME = "beer_consumptions_streaming"

In [0]:
# lets read the source for a stream
spark.catalog.setCurrentCatalog('spark_training_stese')

stream_handle_static = spark.read.table(TABLE_NAME)
stream_handle_static.show()
stream_handle_static.describe().show()

# normal dataframe operation, still no streaming
stream_handle_static.isStreaming

# everytime, we run it, it gets bigger


+------+--------+--------------------+------------------+
|BeerID|Consumer|        DrinkingTime|    DrinkingVolume|
+------+--------+--------------------+------------------+
|     5| Skinner|2026-02-05 14:52:...| 1.110060118596158|
|     1|  Watson|2026-02-05 14:52:...|1.0717837507726424|
|    18|Zimbardo|2026-02-05 14:52:...|1.0012006501117807|
|    30| Skinner|2026-02-05 14:52:...|1.1417242467028403|
|    23|  Pavlow|2026-02-05 14:52:...|1.2373203675039406|
|     2|  Watson|2026-02-05 14:52:...|1.0262375365184455|
|    21|Zimbardo|2026-02-05 14:52:...|1.1083126972832522|
|    27|Zimbardo|2026-02-05 14:52:...|1.1943042755074351|
|     2|Zimbardo|2026-02-05 14:52:...|1.3572375758194442|
|    30|Zimbardo|2026-02-05 14:52:...|1.2940275295691228|
|    11| Skinner|2026-02-05 14:52:...|1.1910648123841079|
|    30| Skinner|2026-02-05 14:52:...|1.2716063028875124|
|    12| Skinner|2026-02-05 14:52:...|1.3248400982186004|
|     5|Zimbardo|2026-02-05 14:52:...|1.3982832704058377|
|     5| Skinn

False

In [0]:
# let's define some analysis on it
# change read to readStream
stream_handle = spark.readStream.table(TABLE_NAME)
stream_handle.isStreaming
# --> now it's true

# lets define a function for the stream:
def calc_consumer_frequs(df: DataFrame) -> DataFrame:
    return df.groupby("Consumer").count()

# the function works with static data
calc_consumer_frequs(stream_handle_static).show()




+--------+-----+
|Consumer|count|
+--------+-----+
|  Pavlow|   89|
|Zimbardo|  208|
|  Watson|  129|
| Skinner|  174|
+--------+-----+



In [0]:
# the function also works with streaming data
stream_query: StreamingQuery = calc_consumer_frequs(stream_handle) \
    .writeStream.format("console") \
    .outputMode("complete") \
    .start()

In [0]:
display(calc_consumer_frequs(stream_handle_static))

Consumer,count
Pavlow,209
Zimbardo,489
Watson,275
Skinner,427


In [0]:
stream_query.stop()
