In [3]:
from IPython.display import display, clear_output
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
import pandas as pd

pd.options.display.max_columns = None
pd.options.display.max_rows = 30
pd.options.display.max_colwidth = 150

# SETTINGS
IN_PATH = "/home/nghiaht7/data-engineer/big-data-platforms-f20.mooc.fi/Mastering-Big-Data-Analytics-with-PySpark/Section 8 - Machine Learning in Real-Time/twitter_data/twitter/"
OUT_PATH = ""
timestampformat = "EEE MMM dd HH:mm:ss zzzz yyyy"

spark = SparkSession.builder.appName("StructuredStreamingExample").getOrCreate()
schema = spark.read.json(IN_PATH).limit(10).schema

# regular spark reader
static_spark_reader = spark.read.schema(schema)

# streaming spark reader
stream_spark_reader = spark.readStream.schema(schema)

21/10/13 10:46:30 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [6]:
# Toggle between spark streaming and batch mode by changing the spark_reader below
# spark_reader = static_spark_reader
spark_reader = stream_spark_reader

In [23]:
df = (
    spark_reader.json(IN_PATH)
    .select(
        "id",
        # extract proper timestamp from created_at column
        f.to_timestamp(f.col("created_at"), timestampformat).alias("timestamp"),
        # extract user information
        f.col("user.screen_name").alias("user"),
        "text",
    )
    .coalesce(1)
)

distinct_user_count = df.select(f.approx_count_distinct("user"), f.current_timestamp())

if not df.isStreaming:
    print("Plain old, basic DataFrame - meh!")
    # Some actions only work on non-streaming DataFrames, like show and toPandas
    distinct_user_count.show()
    display(df.limit(25).toPandas())
else:
    print("We are streaming!")
    # Creating a DataSreamWriter and StreamingQuery
    # ===
    # Calling .writeStream on a DataFrame returns an instance of DataStreamWriter
    stream_writer = (
        distinct_user_count.writeStream
        # DataStream queries need to be named
        .queryName("distinct_user_count")
        .trigger(
            # processingTime="5 seconds",
            # Setting 'once' to True will make spark only process the stream 1 time - great for debugging
            once=True,  
        )
        .outputMode("complete")
        .format("memory")
    )
    # Calling .start on a DataStreamWriter return an instance of StreamingQuery
    query = stream_writer.start()

We are streaming!


21/10/13 14:04:56 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-7c257f66-38e4-4df1-94b4-060ad52eb892. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


In [17]:
# .isStreaming can be used to determine if DataFrame is of Streaming kind or not
df.isStreaming

True

In [24]:
query.status



{'message': 'Stopped', 'isDataAvailable': False, 'isTriggerActive': False}

In [19]:
# .isActive shows if the query is actively running or not
query.isActive

False

In [20]:
# .start() transforms a DataStreamWriter to a StreamingQuery and starts the query execution
if not query.isActive:
    query = stream_writer.start()
    
# Calling .start on an already active StreamingQuery will raise an IllegalArgumentException
# -> 'Cannot start query with name {StreamingQuery.name} as a query with that name is already active'

21/10/13 13:28:16 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-d00893ca-e4b7-423a-947a-f433d994aa3a. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


In [11]:
# .stop() stops the query
query.stop()

In [12]:
# .lastProgress shows information on the last processed batch
query.lastProgress

{'id': '9b20d0f7-274d-4160-b4ec-6a1825e92d4f',
 'runId': '5b66efe3-7b0c-4794-bee7-884a971e9022',
 'name': 'distinct_user_count',
 'timestamp': '2021-10-13T04:26:57.759Z',
 'batchId': 0,
 'numInputRows': 50,
 'inputRowsPerSecond': 0.0,
 'processedRowsPerSecond': 28.312570781426952,
 'durationMs': {'addBatch': 740,
  'getBatch': 717,
  'latestOffset': 165,
  'queryPlanning': 55,
  'triggerExecution': 1766,
  'walCommit': 19},
 'stateOperators': [{'numRowsTotal': 1,
   'numRowsUpdated': 1,
   'memoryUsedBytes': 856,
   'numRowsDroppedByWatermark': 0,
   'customMetrics': {'loadedMapCacheHitCount': 0,
    'loadedMapCacheMissCount': 0,
    'stateOnCurrentVersionSizeBytes': 712}}],
 'sources': [{'description': 'FileStreamSource[file:/home/nghiaht7/data-engineer/big-data-platforms-f20.mooc.fi/Mastering-Big-Data-Analytics-with-PySpark/Section 8 - Machine Learning in Real-Time/twitter_data/twitter]',
   'startOffset': None,
   'endOffset': {'logOffset': 0},
   'numInputRows': 50,
   'inputRowsPe

In [13]:
# spark.sql can be used to request how the query is performing
display(spark.sql(f"SELECT * from {query.name}").toPandas())

Unnamed: 0,approx_count_distinct(user),current_timestamp()
0,49,2021-10-13 11:26:57.924


In [14]:
# show live results for 2 minutes, refreshed every 1 second
from time import sleep
for x in range(0, 120):
    # spark.sql can be used to request how the query is performing
    display(spark.sql(f"SELECT * from {query.name}").toPandas())
    sleep(1)
    clear_output(wait=True)
else:
    print("Live view ended...")

Live view ended...


In [15]:
# .show() will throw an error on Queries and Streaming DataFrames
distinct_user_count.show()

AnalysisException: Queries with streaming sources must be executed with writeStream.start();
FileSource[/home/nghiaht7/data-engineer/big-data-platforms-f20.mooc.fi/Mastering-Big-Data-Analytics-with-PySpark/Section 8 - Machine Learning in Real-Time/twitter_data/twitter/]

In [None]:
spark.stop()