In [48]:
try:
    import nltk
    import findspark
    from nltk.stem import WordNetLemmatizer
    from pyspark import SparkContext
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import lower, regexp_replace, col, split, explode, count, sum as spark_sum, concat_ws, array, udf, collect_list, array_distinct, flatten, to_timestamp, substring, when, hour, size
    from pyspark.sql.types import ArrayType, StringType
    from pyspark.ml.feature import StopWordsRemover, Tokenizer, NGram, CountVectorizer
    from pyspark.ml.clustering import LDA
    import pandas as pd
    from textblob import TextBlob
    import matplotlib.pyplot as plt
    import seaborn as sns
    from functools import reduce
    import re
except:
    print("downloading stuff...")
    %pip install pyspark
    %pip install pandas
    %pip install textblob
    %pip install matplotlib
    %pip install seaborn
    %pip install functools
    %pip install nltk
    %pip install findspark
    import nltk
    import findspark
    from nltk.stem import WordNetLemmatizer
    from pyspark import SparkContext
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import lower, regexp_replace, col, split, explode, count, sum as spark_sum, concat_ws, array, udf, collect_list, array_distinct, flatten, to_timestamp, substring, when, hour, size
    from pyspark.sql.types import ArrayType, StringType
    from pyspark.ml.feature import StopWordsRemover, Tokenizer, NGram, CountVectorizer
    from pyspark.ml.clustering import LDA
    import pandas as pd
    from textblob import TextBlob
    import matplotlib.pyplot as plt
    import seaborn as sns
    from functools import reduce
    import re

nltk.download('wordnet')
pd.set_option('display.max_colwidth', None)

[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\danys\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


In [49]:
def show_pandas_df(spark_df, limit = None, *columns):
    if not columns:
        if limit is not None:
            return spark_df.limit(limit).toPandas()
        else:
            return spark_df.toPandas()
    else:
        return spark_df.select(*columns).limit(limit).toPandas()

In [50]:
findspark.init()
spark = SparkSession.builder \
    .appName('ElectionCampaignTweetsAnalysis') \
    .config('spark.executor.memory', '2g') \
    .config('spark.driver.memory', '2g') \
    .config('spark.network.timeout', '800s') \
    .config('spark.sql.shuffle.partitions', '2') \
    .config('spark.python.worker.reuse', 'false') \
    .getOrCreate()

# Minha cópia mais leve do arquivo
df = spark.read.option('header', 'false').option('delimiter', '\t').csv('../../datasets/debate_tweets_c.tsv') \
    .withColumnRenamed('_c0', 'id').withColumnRenamed('_c1', 'text').withColumnRenamed('_c7', 'time_info') \
    .select('id', 'text', 'time_info')

df = df.withColumn("time_info", to_timestamp(substring(df["time_info"], 12, 8), "HH:mm:ss"))
df = df.withColumn("hora_do_dia", hour(df["time_info"]))

df.show()

+------------------+--------------------+-------------------+-----------+
|                id|                text|          time_info|hora_do_dia|
+------------------+--------------------+-------------------+-----------+
|522394422710136832|@anacddd verdade,...|1970-01-01 14:31:50|         14|
|522394422806581248|              Que ñ*|1970-01-01 14:31:50|         14|
|522394422731100160| Vou quebrar a Bruna|1970-01-01 14:31:50|         14|
|522394422810783745|agora vou p segun...|1970-01-01 14:31:50|         14|
|522394423137943553|Me sinto tão bem ...|1970-01-01 14:31:50|         14|
|522394423188271104|Eu estou aqui, de...|1970-01-01 14:31:50|         14|
|522394423238606848|Quando vai embora...|1970-01-01 14:31:50|         14|
|522394423528022016|@paynecaralhudo k...|1970-01-01 14:31:50|         14|
|522394423632875521|Conceição da Barr...|1970-01-01 14:31:50|         14|
|522394424010362881| @Maniavato te amo ♥|1970-01-01 14:31:50|         14|
|522394424048091138|Alg me curtindo rs

### UDFs importantes

In [64]:
# UDFs importantes

@udf(returnType=StringType())
def categorizar_periodo(hora_do_dia):
    if 5 <= hora_do_dia < 12:
        return "manha"
    elif 12 <= hora_do_dia < 18:
        return "tarde"
    else:
        return "noite"

@udf(returnType=ArrayType(StringType()))
def extrai_hashtags(text):
    return re.findall(r'#(\w+)', text)

In [65]:
new_df = df.withColumn("hashtags", extrai_hashtags(df["text"])) \
    .withColumn("periodo", categorizar_periodo(col('hora_do_dia'))) \
    .filter(size(col('hashtags')) > 0) \
    .withColumn('hashtag', explode("hashtags")) \
    .select('hashtag', 'periodo') \
    .groupBy('hashtag', 'periodo').count().orderBy(col('count').desc())

new_df.show()

+--------------------+-------+-----+
|             hashtag|periodo|count|
+--------------------+-------+-----+
|    EMABiggestFans1D|  tarde| 2798|
|EMABiggestFansJus...|  tarde| 2712|
|          QueroNoTVZ|  tarde|  156|
|QueroMuitosSeguid...|  tarde|   77|
|       VoteVampsVevo|  tarde|   69|
|AguacateTraeDeNue...|  tarde|   53|
|   SorrisoNoEncontro|  tarde|   45|
|   FrasesProfessores|  tarde|   45|
|        PediuTocouRD|  tarde|   40|
|                 MPN|  tarde|   33|
|EMABiggestFansAri...|  tarde|   32|
|            CreoEnTi|  tarde|   29|
|EMABiggestFansNic...|  tarde|   26|
|    EMABiggetsFans1D|  tarde|   23|
|                 EMA|  tarde|   23|
|            BestLive|  tarde|   23|
|           BrunoMars|  tarde|   21|
|              trndnl|  tarde|   19|
|            Encontro|  tarde|   17|
|                Luan|  tarde|   15|
+--------------------+-------+-----+
only showing top 20 rows

