## 2 - Stream files & aggregate

In [None]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType
from pyspark.sql.functions import current_timestamp
from IPython.display import display, clear_output
import uuid
from time import sleep

def makeFolder(path):
    try:
        os.mkdir(path)
        print("Directory '% s' created" % path)
    except BaseException as e:
        print("Error on ",path," : ",e)

stream_folder = './tweets_folder'
sink_folder = './tweet_sink'
checkpoint_folder = './tweet_checkpoint'

#stream_folder shoudl exist
makeFolder(sink_folder)
makeFolder(checkpoint_folder)

In [None]:
# spark.stop() # stop session
# start spark session
spark = SparkSession.builder.getOrCreate()

# enable suffling
spark.conf.set("spark.sql.shuffle.partitions", "2")

In [None]:
# Reading incoming files in file streaming folder
# no header in csv
# setting maxFilesPerTrigger to consume files one at a time
streamingDF = spark.readStream.format('csv')\
  .option("maxFilesPerTrigger", 1)\
  .schema(schema='id long,text string')\
  .load(stream_folder)

streamingDF.printSchema()
print("Streaming DataFrame : " + str(streamingDF.isStreaming))

# FYI option to remove completed files
# .option("cleanSource","delete")

## id aggregation

In [None]:
# aggregation query for id
streamingIDCountsDF = (
  streamingDF
    .groupBy(
      streamingDF.id
    )
    .count()
)

print("Streaming streamingIDCountsDF : " + str(streamingIDCountsDF.isStreaming))

In [None]:
q_uuid = uuid.uuid4().hex

# display real-time query
query1 = (
  streamingIDCountsDF
    .writeStream
    .format("memory")
    .queryName(q_uuid) 
    .outputMode("complete")
    .start()
)

while True:
    clear_output(wait=False)
    display(query1.status)
    display(spark.sql('SELECT * FROM '+ q_uuid + ' ORDER BY 1').show())
    sleep(1)


## check RT : re-tweets

In [None]:
# aggregation query for id
streaming_char = (
  streamingDF
    .groupBy(
      streamingDF["text"][0:2]
    )
    .count()
)

print("Streaming streaming_char : " + str(streaming_char.isStreaming))

In [None]:
q_uuid = uuid.uuid4().hex

# display real-time query
query2 = (
  streaming_char
    .writeStream
    .format("memory")
    .queryName(q_uuid) 
    .outputMode("complete")
    .start()
)

while True:
    clear_output(wait=False)
    display(query2.status)
    display(spark.sql('SELECT * FROM '+ q_uuid + ' ORDER BY 2 DESC').show())
    sleep(1)


## store RT - events

In [None]:
# aggregation query for id
streaming_store = (
  streamingDF
    .withColumn("current_timestamp", current_timestamp() )\
    .withColumn("contains_at",streamingDF["text"].rlike("@"))\
    .withWatermark("current_timestamp", "1 minutes") \
    .groupBy("contains_at","current_timestamp")
    .count()
)
print("Streaming streaming_store : " + str(streaming_store.isStreaming))

In [None]:

# enable suffling
spark.conf.set("spark.sql.shuffle.partitions", "1")

# https://docs.databricks.com/structured-streaming/examples.html#foreachbatch-sqldw-example
class SendToDynamoDB_ForeachWriter:

  def open(self, partition_id, epoch_id): 
    self.file = open(filepath_each, 'a')
    # header
    self.file.write( str( ['current_timestamp'])  + "," + str( ['contains_at']) + "," +  str( ['count']) + "\n" )
    return True

  def process(self, row): 
    # write row to file as it is processed 
    self.file.write( str(row['current_timestamp'])  + "," + str(row['contains_at']) + "," +  str(row['count']) + "\n" )

  def close(self, err):
    self.file.close()
    if err:
      raise err


# file to write to
filepath_each = os.path.join(sink_folder, "_store_for_each"+ ".csv")

# initialise file
fe = open(filepath_each, "w")
fe.close()

query_file = (
  streaming_store
    .toDF( "current_timestamp","contains_at","count" )
    .writeStream
    .foreach(SendToDynamoDB_ForeachWriter())
    .outputMode("complete")
    .start()
)

data_available = True

while data_available:
    clear_output(wait=False)
    display(query_file.status)
    if 'Initializing sources' in query_file.status["message"] or 'Getting offsets' in query_file.status["message"]:
        pass
    else:
        data_available = query_file.status["isDataAvailable"]
    sleep(1)

print('file ready')