In [1]:
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

spark_conf = SparkConf()\
  .setAppName("YourTest")\
  .setMaster("local[2]") \
  # .set('spark.executor.memory', '3g') \
  # .set('spark.executor.cores', '4g')

sc = SparkContext.getOrCreate(spark_conf)

# Connect to the local spark instance using as many threads as on your machine
spark = SparkSession.builder \
  .master("local[2]") \
  .appName("structured_streaming") \
  .getOrCreate()
  # .config('spark.executor.cores', '4') \
  # .config('spark.cores.max', '4') \
  # .config('spark.executor.memory', '1g') \
  # .config('spark.num.executors', '1') \
  # .config("spark.driver.memory",'1g') \
print(spark.version)

# from pyspark.sql import SparkSession
# spark = (SparkSession.builder.master("local[*]").appName("yourAwesomeApp").getOrCreate())
# spark.conf.set("spark.executor.memory", "2g")
# spark.conf.set("spark.executor.cores", "2")

import pyspark.sql.functions as F
from pyspark.sql.types import *

3.2.1


# Building a Structured App

In [22]:
# create sample dataset
df_1 = spark.createDataFrame([
    ('XN203', 'FB', 300, 30),
    ('XN201', 'Twitter', 10, 19),
    ('XN202', 'Insta', 500, 45),
], ['user_id', 'app', 'time_is_sec', 'age']) \
    .write.csv('csv_folder', mode='append')

In [2]:
# define schema for input data
schema = StructType() \
    .add('user_id', 'string') \
    .add('app', 'string') \
    .add('time_in_sec', 'integer') \
    .add('age', 'integer')

In [3]:
data = spark.readStream.option('sep', ',').schema(schema).csv('csv_folder')

In [4]:
data.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- app: string (nullable = true)
 |-- time_in_sec: integer (nullable = true)
 |-- age: integer (nullable = true)



# Operations

In [5]:
app_count = data.groupBy('app').count()

In [6]:
query = (
    app_count.writeStream \
        .queryName('count_query') \
        .outputMode('complete') \
        .format('memory') \
        .start()
)

In [7]:
spark.sql('select * from count_query').toPandas()

Unnamed: 0,app,count
0,Insta,6
1,FB,6
2,Twitter,6


In [8]:
fb_data = data.filter(data['app']=='FB')
fb_avg_time = fb_data.groupBy('user_id').agg(F.avg('time_in_sec'))

In [9]:
fb_query = (
    fb_avg_time.writeStream \
        .queryName('fb_query') \
        .outputMode('complete') \
        .format('memory') \
        .start()
)

In [24]:
spark.sql('select * from fb_query').toPandas()

Unnamed: 0,user_id,avg(time_in_sec)
0,XN203,300.0


In [25]:
df_2 = spark.createDataFrame([
    ('XN203', 'FB', 300, 30),
    ('XN201', 'Twitter', 10, 19),
    ('XN202', 'Insta', 500, 45),
], ['user_id', 'app', 'time_is_sec', 'age']) \
    .write.csv('csv_folder', mode='append')

In [26]:
spark.sql('select * from fb_query').toPandas()

Unnamed: 0,user_id,avg(time_in_sec)
0,XN203,300.0


In [27]:
spark.sql('select * from count_query').toPandas()

Unnamed: 0,app,count
0,Insta,4
1,FB,4
2,Twitter,4


In [28]:
spark.sql('select * from count_query').show()

+-------+-----+
|    app|count|
+-------+-----+
|  Insta|    4|
|     FB|    4|
|Twitter|    4|
+-------+-----+



In [10]:
app_df = data \
    .groupBy('app') \
    .agg(F.sum('time_in_sec').alias('total_time')) \
    .orderBy('total_time', ascending=False)
    
app_query = (
    app_df.writeStream \
        .queryName('app_wise_query') \
        .outputMode('complete') \
        .format('memory') \
        .start()
)

In [30]:
spark.sql('select * from app_wise_query').toPandas()

Unnamed: 0,app,total_time
0,Insta,2000
1,FB,1200
2,Twitter,40


In [33]:
df_4=spark.createDataFrame([
    ("XN203",'FB',500,30),
    ("XN201",'Insta',30,19),
    ("XN202",'Twitter',100,45),
], ["user_id", "app" ,"time_in_secs","age"]) \
    .write.csv("csv_folder",mode='append')

In [12]:
spark.sql('select * from app_wise_query').toPandas()

Unnamed: 0,app,total_time
0,Insta,2030
1,FB,1700
2,Twitter,140


In [11]:
age_df = data \
    .groupBy('app') \
    .agg(F.avg('age').alias('mean_age')) \
    .orderBy('mean_age', ascending=False)

age_query = (
    age_df.writeStream \
        .queryName('age_query') \
        .outputMode('complete') \
        .format('memory') \
        .start()
)

In [11]:
df_5=spark.createDataFrame([
    ("XN203",'FB',500,30),
    ("XN201",'Insta',30,19),
    ("XN202",'Twitter',100,45),
], ["user_id", "app" ,"time_in_secs","age"]) \
    .write.csv("csv_folder",mode='append')

In [12]:
spark.sql('select * from age_query').toPandas()

Unnamed: 0,app,mean_age
0,Insta,36.333333
1,FB,30.0
2,Twitter,27.666667


# Joins

In [13]:
# Join static dataframe with streaming dataframe
app_df=spark .createDataFrame([
    ('FB','FACEBOOK'),
    ('Insta','INSTAGRAM'),
    ('Twitter','TWITTER')
], ["app", "full_name"])
app_df.show()

+-------+---------+
|    app|full_name|
+-------+---------+
|     FB| FACEBOOK|
|  Insta|INSTAGRAM|
|Twitter|  TWITTER|
+-------+---------+



In [15]:
app_stream_df = data.join(app_df, 'app')
join_query = (
    app_stream_df.writeStream \
        .queryName('join_query') \
        .outputMode('append') \
        .format('memory') \
        .start()
)

In [16]:
spark.sql('select * from join_query').toPandas()

Unnamed: 0,app,user_id,time_in_sec,age,full_name
0,FB,XN203,500,30,FACEBOOK
1,FB,XN203,500,30,FACEBOOK
2,FB,XN203,300,30,FACEBOOK
3,FB,XN203,300,30,FACEBOOK
4,FB,XN203,300,30,FACEBOOK
5,FB,XN203,300,30,FACEBOOK
6,Insta,XN201,30,19,INSTAGRAM
7,Insta,XN201,30,19,INSTAGRAM
8,Insta,XN202,500,45,INSTAGRAM
9,Insta,XN202,500,45,INSTAGRAM
