In [None]:
from pyspark.sql.functions import col, max as max_,min as min_,datediff,expr,count, when
from pyspark.sql import SparkSession,Row,Window

In [None]:
#Load spark session
spark = SparkSession.builder \
 .master("yarn") \
 .appName("Task2") \
 .config("spark.executor.instances", "2") \
 .config("spark.executor.cores", "2") \
 .config("spark.executor.memory", "1024M") \
 .getOrCreate()
sc = spark.sparkContext


In [None]:
#Load the dataset and filters it by verified users
df = spark.read.json("hdfs:/datasets/covid")
dfV = df.filter(df.verified=="TRUE").filter(df.created_at.isNotNull())

In [None]:
#Create a window with the user_id and use it to get the follower count at the dates of the first tweet and last tweet
w = Window.partitionBy('user_id')
df1 = dfV.withColumn('maxD', max_('created_at').over(w)).where(col('created_at') == col('maxD')).drop('maxD').select("user_id","screen_name",col("followers_count").alias("fcend"))
df2 = dfV.withColumn('minD', min_('created_at').over(w)).where(col('created_at') == col('minD')).drop('minD').select("user_id","screen_name",col("followers_count").alias("fcstart"))

In [None]:
#Gets the follower increase of each user then filters it to only the top 1000 users
increase = df1.alias("df1").join(df2.alias("df2"), on="user_id").withColumn("followers_increase", df1.fcend-df2.fcstart)
increase = increase.sort(increase.followers_increase.desc()).limit(1000).select("df1.user_id","df1.screen_name","followers_increase")

In [None]:
#Counts the number of tweets of each user and gets the top 1000
actives = df.groupBy("user_id").agg(count("user_id").alias("count")).sort(col("count").desc()).limit(1000)

In [None]:
#Creates a top_active column for the verified users who are also in the top active
actives_increase = increase.alias("a").join(actives.alias("b"),[actives.user_id==increase.user_id],how='left_outer').withColumn("top_active",when(col("count").isNull(),0).otherwise(1)).select("a.screen_name","a.followers_increase","top_active")

In [None]:
#Writes top 1000 verified to disk
actives_increase.write.option("header",False).option("delimiter",",").csv("/user/julioferreira/task2/allactives")

In [None]:
actives_increase = actives_increase.filter(actives_increase.top_active==1)

In [None]:
#Writes top 1000 verified who are also top 1000 active to disk
actives_increase.write.option("header",False).option("delimiter",",").csv("/user/julioferreira/task2/onlytopactives")

In [None]:
sc.stop()