In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkConf

# cluster execution
spark = SparkSession.builder \
 .master("yarn") \
 .appName("Task2") \
 .config("spark.executor.instances", "2") \
 .config("spark.executor.cores", "2") \
 .config("spark.executor.memory", "2048M") \
 .getOrCreate()

"""
# local execution
spark = SparkSession \
  .builder \
  .enableHiveSupport() \
  .config(conf=SparkConf().set("spark.driver.maxResultSize", "2g")) \
  .appName("test") \
  .getOrCreate()
"""

sc = spark.sparkContext

22/05/13 20:37:39 WARN Utils: Your hostname, acer resolves to a loopback address: 127.0.1.1; using 192.168.0.40 instead (on interface wlp4s0)
22/05/13 20:37:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/13 20:37:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/05/13 20:37:41 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [15]:
from pyspark.sql.types import StructType, StringType, StructField, IntegerType, DateType
from pyspark.sql.functions import *


tweet_schema = StructType([
    StructField("account_created_at", StringType(), True),
    StructField("account_lang",   StringType(), True),
    StructField("country_code", StringType(), True),
    StructField("created_at", StringType(), True),
    StructField("favourites_count", StringType(), True),
    StructField("followers_count", StringType(), True),
    StructField("friends_count", StringType(), True),
    StructField("is_quote", StringType(), True),
    StructField("is_retweet", StringType(), True),
    StructField("lang", StringType(), True),
    StructField("place_full_name", StringType(), True),
    StructField("place_type", StringType(), True),
    StructField("reply_to_screen_name", StringType(), True),
    StructField("reply_to_status_id", StringType(), True),
    StructField("reply_to_user_id", StringType(), True),
    StructField("retweet_count", StringType(), True),
    StructField("screen_name", StringType(), True),
    StructField("source", StringType(), True),
    StructField("status_id", StringType(), True),
    StructField("text", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("verified", StringType(), True),
])
df = spark.read.json("hdfs:/datasets/covid/", schema=tweet_schema)
# df = spark.read.json("./covid.json", schema=tweet_schema)
tf = df.select("user_id", "created_at", "followers_count").filter(df.verified == "TRUE").withColumn("followers_count", col("followers_count").cast(IntegerType()))

In [16]:
from pyspark.sql import Window
from pyspark.sql.functions import *

user_names = df.select("user_id", "screen_name").filter(df.verified == "TRUE").dropDuplicates(["user_id"])

# Get diference of followers between the current tweet and the previous tweet
window = Window.partitionBy("user_id").orderBy("created_at")

follower_diff_df = tf.withColumn("follower_diff", col("followers_count") - lag(col("followers_count"), 1).over(window))
follower_diff_df = follower_diff_df.fillna({"follower_diff":"0"})

# Sum all the diferences of followers between the tweets
follower_diff_acc_ds = follower_diff_df.groupBy("user_id").agg(sum("follower_diff").alias("followers_increase"))
follower_diff_df = follower_diff_df.join(follower_diff_acc_ds, "user_id", "left_outer")

# Get the total number of tweets that a user has made
grouped_folowers_diff_df = follower_diff_df.groupBy("user_id").agg(count(col("user_id")).alias("tweets_count"))
follower_diff_df = grouped_folowers_diff_df.join(follower_diff_acc_ds, "user_id", "left_outer")

# Keep only one row per user with tweets count and the followers diff
follower_diff_df = follower_diff_df.join(user_names, "user_id", "left_outer")

# Get if the user is within the most 1000 active users
follower_diff_df = follower_diff_df.orderBy("tweets_count", ascending=False).withColumn("index", monotonically_increasing_id())
follower_diff_df = follower_diff_df.withColumn("is_active", when(follower_diff_df.index <= 1000, 1).otherwise(0))

csv_output = follower_diff_df.select("screen_name", "followers_increase", "is_active").filter(col("is_active") > 0)
csv_output = csv_output.orderBy("followers_increase", ascending=False)

# csv_output.coalesce(1).write.mode("overwrite").option("header","false").csv("out2/")
csv_output.coalesce(1).write.mode("overwrite").option("header","false").csv("hdfs:/user/viniciusramos/task2.csv")


                                                                                