In [None]:
import pyspark

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder \
        .master("local[*]") \
        .appName("some_stream_processing") \
        .config("spark.executor.memory","3g") \
        .config("spark.executor.cores", "4") \
        .enableHiveSupport() \
        .getOrCreate()

In [None]:
static_dataframe = spark.read.json('activity-data/')

In [None]:
static_dataframe.count()

In [None]:
schema = static_dataframe.schema

In [None]:
streaming_dataframe = spark.readStream.schema(schema).option('maxFilesPerTrigger','1').json('activity-data')

In [None]:
activity_counts = streaming_dataframe.groupBy('gt').count()

In [None]:
spark.conf.set('spark.sql.shuffle.partitions', 5)

In [None]:
activity_counts_query = activity_counts.writeStream.queryName('activity_counts_all').format('memory').outputMode('complete').start()

In [None]:
from time import sleep

In [None]:
for x in range(5):
    spark.sql("SELECT * FROM  activity_counts_all").show()
    sleep(1)

In [None]:
spark.streams.active

In [None]:
from pyspark.sql.functions import expr

In [None]:
simple_transform = streaming_dataframe.withColumn("stairs", expr("gt like '%stairs%'")) \
                   .where("stairs") \
                   .where("gt is not null") \
                   .select("gt","model", "arrival_time", "creation_time") \
                   .writeStream \
                   .queryName("jus_some_query") \
                   .format("memory") \
                   .outputMode("append") \
                   .start()

In [None]:
#spark.sql('SELECT * FROM simple_transform').show()

In [None]:
device_model_stats = streaming_dataframe.cube("gt", "model").avg() \
                     .drop("avg(Arrival_time)") \
                     .drop("avg(Creation_Time)") \
                     .drop("avg(Index)") \
                     .writeStream \
                     .queryName("ahona") \
                     .format("memory") \
                     .outputMode("complete") \
                     .start()

In [None]:
spark.sql("SELECT * FROM  device_model_stats")

In [None]:
historical_aggregate = static_dataframe.groupBy("gt", "model").avg()
device_model_stats1 = streaming_dataframe.drop('Arrival_Time', "Creation_time", "Index") \
                      .cube("gt", "model").avg() \
                      .join(historical_aggregate,["gt","model"]) \
                      .writeStream \
                      .queryName("absolute_test") \
                      .format("memory") \
                      .outputMode("complete") \
                      .start()

In [1]:
#reading data from kafka

In [None]:
df1 = spark.readStream.format("kafka") \
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
      .option("subscribe","topic1,topic2") \
      .load()

In [None]:
#writing to kafka

In [None]:
df1.selectExpr('topic', "CAST(key AS STRING)","CAST(value AS STRING)") \
     .writeStream \
     .format("kafka") \
     .option("kafka.bootstrap.servers","host1:port1, host2:port2") \
     .option("checkpointLocation", "/to/HDFS-compatible/dir") \
     .option("topic","topic1") \
     .start()

In [None]:
#reading data from a socket

In [None]:
socker_df = spark.readStream.format("socket") \
            .option("host","localhost") \
            .option("port",9999) \
            .load()

In [None]:
#trigger

In [None]:
activitycounts.writeStream.trigger(processing.Time='5 seconds') \
              .format('memory') \
              .outputMode("complete") \
              .start()

In [None]:
activitycounts2.writeStream.trigger(once=True) \
               .format("console") \
               .outputMode("complete") \
               .start()

In [None]:
# Event-time  streaming

In [None]:
spark.conf.set("spark.sql.shuffle.partitions",5)

static_dataframe = spark.read.option("multiline", "true").json('activity-data/')
schema = static_dataframe.schema

streaming_dataframe = spark.readStream.schema(schema) \
.option('maxFilesPerTrigger', 10) \
.json('activity-data/')

In [None]:
streaming_dataframe.printSchema()

In [None]:
#windows on event time

In [None]:
withEventTime = streaming_dataframe \
.selectExpr("*", cast(cast(Creation_Time as double)/1000000000) as timestamp) as event_time")

In [None]:
from pyspark.sql.functions import window, col

In [None]:
withEventTime.groupBy(window(col('event_time'), '10 minutes').count() \
.writeStream \
.queryName("pyevents") \
.format("memory") \
.outputMode("complete") \
.start()

In [None]:
spark.sql("SELECT * FROM pyevents").printSchema()

SELECT * FROM events_per_windows

In [None]:
withEventTime.groupBy(window(col('event_time'), '10 minutes', 'User').count() \
.writeSchema \
.queryName('payments_per_window') \
.format('memory') \
.outputMode('complete') \
.start()

In [None]:
withEventTime.groupBy(window(col('event_time'), '10 minutes', '5 minutes')) \
.count() \
.writeStream \
.queryName("pyevents") \
.format("memory") \
.outputMode("complete") \
.start()

In [None]:
withEventTime \
.withWatermark("event_time", "30 minutes") \
.groupBy(window(col('event_time'), '10 minutes','5 minutes')) \
.count() \
.writeStream \
.queryName('events_per_window') \
.format("memory") \
.outputMode("complete") \
.start()

In [None]:
SELECT * FROM events_per_window

In [None]:
withEventTime \
.withWatermark("event_time","5 seconds") \
.dropDuplicates(["User","event_time"]) \
.groupBy("User") \
.count() \
.writeStream \
.queryName('pydeduplicated') \
.format("memory") \
.outputMode("complete") \
.start()
