In [11]:
import findspark
findspark.init()
findspark.find()

import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as func
from pyspark.sql import Window
from pyspark.sql.types import *
import re
import time

In [19]:
spark = SparkSession.builder \
    .master("local[1]") \
    .appName("SparkByExamples.com") \
    .config("spark.sql.debug.maxToStringFields", 2000) \
    .getOrCreate()

df = spark.read.\
    format("com.databricks.spark.csv") \
    .option("delimiter", "\t") \
    .load("./debate-tweets-001.tsv")

In [4]:
columns =  ("id"
            ,"content"
            ,"coordinate_0"
            ,"coordiante_1"
            ,"retweeted"
            ,"tweet_id"
            ,"tweet_id_str"
            ,"created_at"
            ,"created_at_str"
            ,"in_reply_to_user_id"
            ,"lang"
            ,"place_name"
            ,"place_id"
            ,"place_bb_00"
            ,"place_bb_01"
            ,"place_bb_10"
            ,"place_bb_11"
            ,"place_bb_20"
            ,"place_bb_21"
            ,"place_bb_30"
            ,"place_bb_31"
            ,"place_type"
            ,"place_country_code"
            ,"place_country"
            ,"place_full_name"
            ,"user_name"
            ,"user_id"
            ,"user_id_str"
            ,"user_location"
            ,"user_friend_count"
            ,"user_created_at"
            ,"user_screen_name")

df = df.toDF(*columns)

### 1. Quais foram as hashtags mais usadas pela manhã, tarde e noite? 

In [12]:
def extract(s):
    all_matches = re.findall(r'#\w+', s)
    return all_matches

extract_hashtags = func.udf(extract, ArrayType(StringType()))

df_hashtags = df \
    .filter(func.col('content').rlike('#\w+')) \
    .withColumn('extracted', extract_hashtags('content')) \
    .select('created_at', func.explode_outer(func.col('extracted')) ) \
    .withColumnRenamed('col', 'hashtag')


In [38]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number, count, lit, desc

#### Período da Manhã

In [13]:


# filter: filtra linhas que contenham substring no campo 'created_at' com horarios entre 6 e 12 horas
# groupby: agrupa hashatags repetidas
# agg: função de agregação usada pra ordenar do maior numero de ocorrencias para o menor

df_hashtags \
    .filter(func.col('created_at').substr(12,2).between(6,11)) \
    .groupBy('hashtag') \
    .agg(
        func.count(func.lit(1)).alias('count')
    ).orderBy(func.desc('count')) \
    .show(n=10)




+--------------------+-----+
|             hashtag|count|
+--------------------+-----+
|   #EMABiggestFans1D|14818|
|#EMABiggestFansJu...|14367|
|             #trndnl|  604|
|#VoteVampsTeenAwards|  365|
|             #bomdia|  317|
|         #QueroNoTVZ|  315|
|             #BomDia|  306|
|              #TwOff|  282|
|#EMABiggestFansJu...|  231|
|      #QuandoEuCasar|  146|
+--------------------+-----+
only showing top 10 rows



                                                                                

#### Período da Tarde

In [14]:
# filter: filtra linhas que contenham substring no campo 'created_at' com horarios entre 6 e 12 horas
# groupby: agrupa hashatags repetidas
# agg: função de agregação usada pra ordenar do maior numero de ocorrencias para o menor

df_hashtags \
    .filter(func.col('created_at').substr(12,2).between(12,18)) \
    .groupBy('hashtag') \
    .agg(
        func.count(func.lit(1)).alias('count')
    ).orderBy(func.desc('count')) \
    .show(n=10)



+--------------------+-----+
|             hashtag|count|
+--------------------+-----+
|   #EMABiggestFans1D|70859|
|#EMABiggestFansJu...|60128|
|        #StealMyGirl| 5704|
|         #QueroNoTVZ| 5026|
|   #bigpaynodanceoff| 1514|
|  #AustinMahoneChile| 1230|
|       #AustinMahone| 1111|
|#demiyourstorydoe...| 1090|
|             #trndnl| 1049|
|    #HottieOfTheWeek|  925|
+--------------------+-----+
only showing top 10 rows



                                                                                

#### Período da Noite

In [52]:
# filter: filtra linhas que contenham substring no campo 'created_at' com horarios entre 19 e 23 horas ou 0 e 5 horas
# groupby: agrupa hashatags repetidas
# agg: função de agregação usada pra ordenar do maior numero de ocorrencias para o menor

df_hashtags \
    .filter(func.col('created_at').substr(12,2).between(19,23) | func.col('created_at').substr(12,2).between(0,5)) \
    .groupBy('hashtag') \
    .agg(
        func.count(func.lit(1)).alias('count')
    ).orderBy(func.desc('count')) \
    .dropna() \
    .show(n=10)



+--------------------+------+
|             hashtag| count|
+--------------------+------+
|#EMABiggestFansJu...|133599|
|   #EMABiggestFans1D|129768|
|       #camilasayshi| 10507|
|        #DebateNoSBT|  3416|
|    #CartersNewVideo|  3207|
|   #bigpaynodanceoff|  2878|
|     #TheVoiceBrasil|  2819|
|          #Vote5HEMA|  2666|
|        #AssistamODR|  2545|
|     #DebateNaRecord|  2494|
+--------------------+------+
only showing top 10 rows



                                                                                

### 2. Quais as hashtags mais usadas em cada dia?

In [18]:
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY") # Habilita formato de data com EEE
w = Window.partitionBy('timestamp') # Window criada para usar filtro de maximo

df_hashtags \
    .select('hashtag', func.to_timestamp(col('created_at'), 'EEE MMM dd HH:mm:ss ZZZZ yyyy').substr(0,10).alias('timestamp')) \
    .groupBy( 'timestamp','hashtag') \
    .agg(
        func.count(func.lit(1)).alias('count'),
    ).orderBy(func.desc('count')) \
    .withColumn('max', func.max('count').over(w)) \
    .where(func.col('count') == func.col('max')) \
    .drop('max') \
    .dropna() \
    .show(truncate=0)


                                                                                

+----------+---------------------------+-----+
|timestamp |hashtag                    |count|
+----------+---------------------------+-----+
|null      |#EMABiggestFansJustinBieber|7    |
|2014-10-15|#EMABiggestFans1D          |34547|
|2014-10-16|#EMABiggestFans1D          |68633|
|2014-10-17|#EMABiggestFansJustinBieber|49488|
|2014-10-18|#EMABiggestFansJustinBieber|27454|
|2014-10-19|#EMABiggestFansJustinBieber|33582|
|2014-10-20|#EMABiggestFansJustinBieber|10452|
+----------+---------------------------+-----+



#### 3. Qual o número de tweets por hora a cada dia?

In [23]:
# contar quantidade de tweets por dia
# agrupar por dia
# dividir quantidade por 24

spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY") # Habilita formato de data com EEE

df_hashtags \
    .select(func.to_timestamp(col('created_at'), 'EEE MMM dd HH:mm:ss ZZZZ yyyy').substr(0,10).alias('timestamp')) \
    .groupBy( 'timestamp') \
    .agg(
        func.count(func.lit(1)).alias('count'),
    ) \
    .withColumn('number of tweets', func.col('count') / 24) \
    .drop('count') \
    .dropna() \
    .show(truncate=0)




+----------+------------------+
|timestamp |number of tweets  |
+----------+------------------+
|2014-10-15|6041.916666666667 |
|2014-10-16|12244.208333333334|
|2014-10-17|10139.125         |
|2014-10-18|7554.708333333333 |
|2014-10-19|9477.791666666666 |
|2014-10-20|3134.625          |
+----------+------------------+



                                                                                