In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('structured_streaming').getOrCreate()
import pyspark.sql.functions as F
from pyspark.sql.types import*

In [4]:
df_1 = spark.createDataFrame([("XN203",'FB',300,30),('XN201','Twitter',10,19),('XN202','Insta', 500, 45)],
                            ["user_id","app","time_in_secs","age"]).write.csv("csv_folder",mode='append')

In [5]:
schema=StructType().add("user_id","string").add("app","string").add("time_in_secs","integer").add("age","integer")

In [6]:
data=spark.readStream.option("sep", ",").schema(schema).csv("csv_folder")
data.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- app: string (nullable = true)
 |-- time_in_secs: integer (nullable = true)
 |-- age: integer (nullable = true)



In [7]:
app_count=data.groupBy('app').count()

In [13]:
query=(app_count.writeStream.queryName('count_query').outputMode('complete').format('memory').start())
spark.sql("Select * from count_query").toPandas().head(5)

Unnamed: 0,app,count
0,Insta,1
1,FB,1
2,Twitter,1


In [17]:
fb_data=data.filter(data['app']=='FB')
fb_avg_time=fb_data.groupBy('user_id').agg(F.avg('time_in_secs'))
fb_query=(fb_avg_time.writeStream.queryName('fb_query').outputMode('complete').format('memory').start())
spark.sql("select * from fb_query").toPandas().head(5)

Unnamed: 0,user_id,avg(time_in_secs)
0,XN203,300.0


In [19]:
df_2=spark.createDataFrame([("XN203",'FB',100,30),("XN201",'FB',10,19),("XN202",'FB',2000,45)], 
                           ["user_id","app","time_in_secs","age"]).write.csv("csv_folder",mode='append')

In [20]:
spark.sql("select * from fb_query ").toPandas().head(5)

Unnamed: 0,user_id,avg(time_in_secs)
0,XN203,200.0
1,XN201,10.0
2,XN202,2000.0


In [21]:
df_3=spark.createDataFrame([("XN203",'FB',500,30),("XN201",'Insta',30,19),("XN202",'Twitter',100,45)], 
                           ["user_id","app","time_in_secs","age"]).write.csv("csv_folder",mode='append')

In [22]:
spark.sql("select * from fb_query ").toPandas().head(5)

Unnamed: 0,user_id,avg(time_in_secs)
0,XN203,300.0
1,XN201,10.0
2,XN202,2000.0


In [24]:
app_df=data.groupBy('app').agg(F.sum('time_in_secs').alias('total_time')).orderBy('total_time',ascending=False)
app_query=(app_df.writeStream.queryName('app_wise_query').outputMode('complete').format('memory').start())
spark.sql("select * from app_wise_query ").toPandas().head(5)

Unnamed: 0,app,total_time
0,FB,2910
1,Insta,530
2,Twitter,110


In [25]:
df_4=spark.createDataFrame([("XN203",'FB',500,30),("XN201",'Insta',30,19),("XN202",'Twitter',100,45)], 
                           ["user_id","app","time_in_secs","age"]).write.csv("csv_folder",mode='append')

In [26]:
spark.sql("select * from app_wise_query ").toPandas().head(5)

Unnamed: 0,app,total_time
0,FB,3410
1,Insta,560
2,Twitter,210


In [31]:
age_df=data.groupBy('app').agg(F.avg('age').alias('mean_age')).orderBy('mean_age',ascending=False)
age_query=(age_df.writeStream.queryName('age_query').outputMode('complete').format('memory').start())
df_5=spark.createDataFrame([("XN210",'FB',500,50),("XN255",'Insta',30,23),("XN222",'Twitter',100,30)], 
                           ["user_id","app","time_in_secs","age"]).write.csv("csv_folder",mode='append')
spark.sql("select * from age_query ").toPandas().head(5)

Unnamed: 0,app,mean_age
0,FB,37.111111
1,Twitter,33.166667
2,Insta,25.333333


In [32]:
app_df=spark.createDataFrame([('FB', "Facebook"),('Insta','INSTAGRAM'),('Twitter','TWITTER')], ['app','full_name'])
app_df.show()

+-------+---------+
|    app|full_name|
+-------+---------+
|     FB| Facebook|
|  Insta|INSTAGRAM|
|Twitter|  TWITTER|
+-------+---------+



In [35]:
app_stream_df=data.join(app_df,'app')
join_query=(app_stream_df.writeStream.queryName('join_query').outputMode('append').format('memory').start())
spark.sql("select * from join_query ").toPandas().head(50)

Unnamed: 0,app,user_id,time_in_secs,age,full_name
0,FB,XN201,10,19,Facebook
1,FB,XN210,500,50,Facebook
2,FB,XN210,500,50,Facebook
3,FB,XN210,500,50,Facebook
4,FB,XN210,500,50,Facebook
5,FB,XN203,500,30,Facebook
6,FB,XN203,500,30,Facebook
7,FB,XN203,100,30,Facebook
8,FB,XN203,300,30,Facebook
9,FB,XN202,2000,45,Facebook
