In [36]:
import os
import math
import numpy as np
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, lag, count, when, monotonically_increasing_id
from pyspark.sql.functions import sum as _sum
from pyspark import SparkConf
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType
import pyspark

In [37]:
'''
spark = SparkSession.builder \
        .master("yarn") \
        .appName("HelloLines") \
        .getOrCreate()
sc = spark.sparkContext
rdd = sc.textFile("hdfs:/user/cunha/hello.txt")
'''
table_schema = StructType([
    StructField("account_created_at", StringType(), True),
    StructField("account_lang",   StringType(), True),
    StructField("country_code", StringType(), True),
    StructField("created_at", StringType(), True),
    StructField("favourites_count", StringType(), True),
    StructField("followers_count", StringType(), True),
    StructField("friends_count", StringType(), True),
    StructField("is_quote", StringType(), True),
    StructField("is_retweet", StringType(), True),
    StructField("lang", StringType(), True),
    StructField("place_full_name", StringType(), True),
    StructField("place_type", StringType(), True),
    StructField("reply_to_screen_name", StringType(), True),
    StructField("reply_to_status_id", StringType(), True),
    StructField("reply_to_user_id", StringType(), True),
    StructField("retweet_count", StringType(), True),
    StructField("screen_name", StringType(), True),
    StructField("source", StringType(), True),
    StructField("status_id", StringType(), True),
    StructField("text", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("verified", StringType(), True),
])

spark = SparkSession \
    .builder \
    .enableHiveSupport() \
    .config(conf=SparkConf().set("spark.driver.maxResultSize", "2g")) \
    .appName("wordcloudmapp") \
    .getOrCreate()

df = spark.read.json("../../database/2.json", schema=table_schema) # MUDAR O NOME AQUUUUIIII
df = df.select(col("created_at"), col("screen_name"), col("followers_count"), col("verified"))

In [38]:
tf = df.filter(df.verified == "TRUE") \
    .withColumn("followers_count", df["followers_count"] \
    .cast(IntegerType())).withColumn("created_at", col("created_at") \
    .cast(DateType())).orderBy("screen_name")

tf.show()

+----------+---------------+---------------+--------+
|created_at|    screen_name|followers_count|verified|
+----------+---------------+---------------+--------+
|2020-03-31|        012jcyl|           7271|    TRUE|
|2020-03-31|    1025TheBone|          30810|    TRUE|
|2020-03-31|     1027KIISFM|         355502|    TRUE|
|2020-03-31|     1027KIISFM|         355502|    TRUE|
|2020-03-31|     1027KIISFM|         355503|    TRUE|
|2020-03-31| 1045FreshRadio|           2448|    TRUE|
|2020-03-31|       1055WDUV|           2895|    TRUE|
|2020-03-31|10DowningStreet|        5667668|    TRUE|
|2020-03-31|10DowningStreet|        5667726|    TRUE|
|2020-03-31|    10NewsFirst|         198791|    TRUE|
|2020-03-31|    10NewsFirst|         198791|    TRUE|
|2020-03-31|    10NewsFirst|         198791|    TRUE|
|2020-03-31|    10NewsFirst|         198791|    TRUE|
|2020-03-31|    10NewsFirst|         198791|    TRUE|
|2020-03-31|    10NewsFirst|         198791|    TRUE|
|2020-03-31|    10NewsFirst|

In [39]:
window = Window.partitionBy("screen_name").orderBy("created_at")
t0 = tf.withColumn("follower_diff", col("followers_count") - lag(col("followers_count"), 1).over(window))
t0 = t0.fillna({'follower_diff': 0})
t0.show()

+----------+---------------+---------------+--------+-------------+
|created_at|    screen_name|followers_count|verified|follower_diff|
+----------+---------------+---------------+--------+-------------+
|2020-03-31|        012jcyl|           7271|    TRUE|            0|
|2020-03-31|    1025TheBone|          30810|    TRUE|            0|
|2020-03-31|     1027KIISFM|         355502|    TRUE|            0|
|2020-03-31|     1027KIISFM|         355502|    TRUE|            0|
|2020-03-31|     1027KIISFM|         355503|    TRUE|            1|
|2020-03-31| 1045FreshRadio|           2448|    TRUE|            0|
|2020-03-31|       1055WDUV|           2895|    TRUE|            0|
|2020-03-31|10DowningStreet|        5667726|    TRUE|            0|
|2020-03-31|10DowningStreet|        5667668|    TRUE|          -58|
|2020-03-31|    10NewsFirst|         198791|    TRUE|            0|
|2020-03-31|    10NewsFirst|         198791|    TRUE|            0|
|2020-03-31|    10NewsFirst|         198791|    

In [40]:
#Sum all the diferences of followers between the tweets

t1 = t0.groupBy('screen_name').agg(_sum("follower_diff").alias("followers_increase"))
t0 = t0.join(t1, 'screen_name', 'left_outer')
t0.show()


                                                                                

+---------------+----------+---------------+--------+-------------+------------------+
|    screen_name|created_at|followers_count|verified|follower_diff|followers_increase|
+---------------+----------+---------------+--------+-------------+------------------+
|        012jcyl|2020-03-31|           7271|    TRUE|            0|                 0|
|    1025TheBone|2020-03-31|          30810|    TRUE|            0|                 0|
|     1027KIISFM|2020-03-31|         355502|    TRUE|            0|                 1|
|     1027KIISFM|2020-03-31|         355502|    TRUE|            0|                 1|
|     1027KIISFM|2020-03-31|         355503|    TRUE|            1|                 1|
| 1045FreshRadio|2020-03-31|           2448|    TRUE|            0|                 0|
|       1055WDUV|2020-03-31|           2895|    TRUE|            0|                 0|
|10DowningStreet|2020-03-31|        5667726|    TRUE|            0|               -58|
|10DowningStreet|2020-03-31|        5667668

                                                                                

In [41]:
#Get the total number of tweets that a user has made
t2 = t0.groupBy('screen_name').agg(count(col('screen_name')).alias("tweets_count"))
t0 = t2.join(t1, 'screen_name', 'left_outer').orderBy(col("tweets_count"), ascending=False)
t0.show()

                                                                                

+---------------+------------+------------------+
|    screen_name|tweets_count|followers_increase|
+---------------+------------+------------------+
|  openletterbot|        1662|                -3|
|      ANCALERTS|         168|               708|
|     ABSCBNNews|          97|              1261|
|       idntimes|          72|                86|
|    indiatvnews|          69|               870|
|     IndiaToday|          66|              -132|
|      prajavani|          66|               -87|
|        bsindia|          63|               316|
|        ThaiPBS|          63|               327|
|  DZMMTeleRadyo|          62|               -52|
|     ElPitazoTV|          60|                26|
|   Thansettakij|          59|                -6|
|         aajtak|          59|               366|
|       TheQuint|          56|               268|
|       htTweets|          51|              2996|
|    ThaiPBSNews|          50|                67|
|        Notimex|          50|                -7|


In [43]:
# Get if the user is within the most 1000 active users
t0 = t0.orderBy("tweets_count", ascending=False).withColumn("index", monotonically_increasing_id())
t0 = t0.withColumn("is_active", when(t0.index <= 1000, 1).otherwise(0))
t0.show()

+---------------+------------+------------------+-----+---------+
|    screen_name|tweets_count|followers_increase|index|is_active|
+---------------+------------+------------------+-----+---------+
|  openletterbot|        1662|                -3|    0|        1|
|      ANCALERTS|         168|               708|    1|        1|
|     ABSCBNNews|          97|              1261|    2|        1|
|       idntimes|          72|                86|    3|        1|
|    indiatvnews|          69|               870|    4|        1|
|      prajavani|          66|               -87|    5|        1|
|     IndiaToday|          66|              -132|    6|        1|
|        ThaiPBS|          63|               327|    7|        1|
|        bsindia|          63|               316|    8|        1|
|  DZMMTeleRadyo|          62|               -52|    9|        1|
|     ElPitazoTV|          60|                26|   10|        1|
|   Thansettakij|          59|                -6|   11|        1|
|         

In [44]:
# Order the result and preset 100 of them
t0 = t0.filter(t0.is_active > 0)
t0 = t0.orderBy("followers_increase", ascending=False)
t0.show(100)


                                                                                

+---------------+------------+------------------+-----+---------+
|    screen_name|tweets_count|followers_increase|index|is_active|
+---------------+------------+------------------+-----+---------+
|            ANI|          34|              9544|   36|        1|
|           ndtv|          47|              3539|   18|        1|
| drharshvardhan|           4|              3065|  739|        1|
|       htTweets|          51|              2996|   14|        1|
|  DainikBhaskar|          14|              2819|  174|        1|
|  IndianExpress|          12|              2490|  209|        1|
|    MoHFW_INDIA|           7|              2380|  388|        1|
|     DDNational|           6|              2320|  458|        1|
|       DavidLat|          11|              2258|  229|        1|
|     DDNewslive|           6|              2241|  449|        1|
|     ABSCBNNews|          97|              1261|    2|        1|
|     DolarToday|           6|              1090|  435|        1|
|     West

In [45]:
t0.coalesce(1).write.mode('overwrite').option('header','false').csv('../../out/part_2.csv')

                                                                                