In [4]:
from pyspark.sql import SparkSession 
from pyspark.sql.functions import window
from pyspark.sql.types import StructType

In [5]:
spark = SparkSession\
        .builder\
        .appName("StructuredCSVFile")\
        .getOrCreate()

In [6]:
windowSize = "60"
slideSize = "10"

windowDuration = '{} seconds'.format(windowSize)
slideDuration = '{} seconds'.format(slideSize)
monitoring_dir = 'monitoring_data'

In [7]:
spark = SparkSession\
    .builder\
    .appName("InteractionCount")\
    .config("spark.eventLog.enabled","true")\
    .config("spark.eventLog.dir","applicationHistory")\
    .master("local[*]")\
    .getOrCreate()

In [8]:
userSchema = StructType().add("userA","string")\
                            .add("userB","string")\
                            .add("timestamp","timestamp")\
                            .add("interaction","string")

In [9]:
twitterIDSchema = StructType().add("userA","string")
twitterIDs = spark.read.schema(twitterIDSchema).csv('twitterIDs.csv')
csvDF = spark\
    .readStream\
    .schema(userSchema)\
    .csv(monitoring_dir)

joinedDF = csvDF.join(twitterIDs,"userA")

In [10]:
interactions = joinedDF.select(joinedDF['userA'],joinedDF['interaction'],joinedDF['timestamp'])

In [11]:
windowedCounts = interactions\
                .groupBy(window(interactions.timestamp, windowDuration, slideDuration),interactions.userA)\
                .count()

In [None]:
query = windowedCounts\
    .writeStream\
    .outputMode('complete')\
    .format('console')\
    .option('truncate','false')\
    .option('numRows','10000')\
    .trigger(processingTime='15 seconds')\
    .start()

query.awaitTermination()