In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import BooleanType, ArrayType, StringType, DoubleType
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, NGram
from pyspark.ml.linalg import Vectors
from unicodedata import normalize

import pandas as pd
import re
import nltk

In [2]:
spark = (SparkSession.builder \
    .appName("SPARK TRAB - QUESTION 1 D").getOrCreate())

In [3]:
df_orig = spark.read.option("header", "false").option("delimiter", "\t").csv("./data/debate-tweets.tsv")

## Objetivo

In [4]:
df = df_orig.select("_c0", "_c1")

df = df.withColumnRenamed("_c0", "id") \
                     .withColumnRenamed("_c1", "content")

df.show()

+------------------+--------------------+
|                id|             content|
+------------------+--------------------+
|522394422710136832|@anacddd verdade,...|
|522394422806581248|              Que ñ*|
|522394422731100160| Vou quebrar a Bruna|
|522394422810783745|agora vou p segun...|
|522394423137943553|Me sinto tão bem ...|
|522394423188271104|Eu estou aqui, de...|
|522394423238606848|Quando vai embora...|
|522394423528022016|@paynecaralhudo k...|
|522394423632875521|Conceição da Barr...|
|522394424010362881| @Maniavato te amo ♥|
|522394424048091138|Alg me curtindo rs ♡|
|522394424010358784|@MiiluAA No, porq...|
|522394423741906944|#EMABiggestFansJu...|
|522394424568213505|@raizabatista dev...|
|522394424920506368|Me senti ate d fe...|
|522394424811458560|qual o sentido de...|
|522394425029574656|I'm at Lava Rápid...|
|522394425121841153|Fica comentando m...|
|522394425461579777|"odeio que me man...|
|522394425960701952|CAMAMTEBABILONFRA...|
+------------------+--------------

In [5]:
def extract_aecio(text):
    text_sem_acento = normalize('NFKD', text).encode('ASCII', 'ignore').decode('ASCII')
    if re.findall(r'\bA[eé]cio\b', text_sem_acento, re.IGNORECASE):
        return True
    else:
        return False

def remove_punctuation(text):
    return re.sub(r'[^\w\s]', '', text)

In [6]:
extract_aecio_udf = udf(extract_aecio, BooleanType())

df = df.withColumn("hasAecio", extract_aecio_udf(df["content"]))

df_aecio = df.filter(col("hasAecio")).select("id", "content")

display(df_aecio.head(20))

[Row(id='522394798914015233', content='@KennedyAncar @Drimone @jornaldaREALMENTE DILMA CAGUEJOU,ESTAVA NERVOSA,SEM PREPARO,MUITO REPETITIVA VOLTA MUITO NO PASSADO,AÉCIO FOI MELHOR'),
 Row(id='522394968611360768', content='O site do TSE-MG ficou fora do ar, por que será hein Aécio?'),
 Row(id='522394979403325440', content='Olha isso @RealKajuru, mais um motivo pra não votar no Aécio http://t.co/DIe1xj4XJs #folha""'),
 Row(id='522395075972976643', content='@Indianara_m também quero, to fodido pra arranja a absolut se o Aecio ganha, vou cagar uma Absolut'),
 Row(id='522395717797969920', content='@BlogdoNoblat ganho bem. Aécio não tem condições de administrar um país.'),
 Row(id='522395826153619456', content='@UOLNoticias e o Alkmin esperando por São Pedro, se o Aécio ganhar alguém sabe por qual Santo ele vai esperar para resolver problemas? Kkkkk'),
 Row(id='522395968491520001', content='"""@sensacionalista: Aécio não empregará mais irmã, apenas cunhado, pois cunhado não é parente - http:

In [7]:
remove_punctuation_udf = udf(remove_punctuation, StringType())

df_aecio = df_aecio.withColumn("content_clean", remove_punctuation_udf(col("content")))

display(df_aecio.head(20))

[Row(id='522394798914015233', content='@KennedyAncar @Drimone @jornaldaREALMENTE DILMA CAGUEJOU,ESTAVA NERVOSA,SEM PREPARO,MUITO REPETITIVA VOLTA MUITO NO PASSADO,AÉCIO FOI MELHOR', content_clean='KennedyAncar Drimone jornaldaREALMENTE DILMA CAGUEJOUESTAVA NERVOSASEM PREPAROMUITO REPETITIVA VOLTA MUITO NO PASSADOAÉCIO FOI MELHOR'),
 Row(id='522394968611360768', content='O site do TSE-MG ficou fora do ar, por que será hein Aécio?', content_clean='O site do TSEMG ficou fora do ar por que será hein Aécio'),
 Row(id='522394979403325440', content='Olha isso @RealKajuru, mais um motivo pra não votar no Aécio http://t.co/DIe1xj4XJs #folha""', content_clean='Olha isso RealKajuru mais um motivo pra não votar no Aécio httptcoDIe1xj4XJs folha'),
 Row(id='522395075972976643', content='@Indianara_m também quero, to fodido pra arranja a absolut se o Aecio ganha, vou cagar uma Absolut', content_clean='Indianara_m também quero to fodido pra arranja a absolut se o Aecio ganha vou cagar uma Absolut'),
 

In [8]:
tokenizer = Tokenizer(inputCol="content_clean", outputCol="words")
df_aecio = tokenizer.transform(df_aecio)

display(df_aecio.head(20))

[Row(id='522394798914015233', content='@KennedyAncar @Drimone @jornaldaREALMENTE DILMA CAGUEJOU,ESTAVA NERVOSA,SEM PREPARO,MUITO REPETITIVA VOLTA MUITO NO PASSADO,AÉCIO FOI MELHOR', content_clean='KennedyAncar Drimone jornaldaREALMENTE DILMA CAGUEJOUESTAVA NERVOSASEM PREPAROMUITO REPETITIVA VOLTA MUITO NO PASSADOAÉCIO FOI MELHOR', words=['kennedyancar', 'drimone', 'jornaldarealmente', 'dilma', 'caguejouestava', 'nervosasem', 'preparomuito', 'repetitiva', 'volta', 'muito', 'no', 'passadoaécio', 'foi', 'melhor']),
 Row(id='522394968611360768', content='O site do TSE-MG ficou fora do ar, por que será hein Aécio?', content_clean='O site do TSEMG ficou fora do ar por que será hein Aécio', words=['o', 'site', 'do', 'tsemg', 'ficou', 'fora', 'do', 'ar', 'por', 'que', 'será', 'hein', 'aécio']),
 Row(id='522394979403325440', content='Olha isso @RealKajuru, mais um motivo pra não votar no Aécio http://t.co/DIe1xj4XJs #folha""', content_clean='Olha isso RealKajuru mais um motivo pra não votar no 

In [9]:
def remove_empty_tokens(tokens):
    return [token for token in tokens if len(token) >= 3]

In [10]:
stopwordList = nltk.corpus.stopwords.words('portuguese')

remover = StopWordsRemover(inputCol="words", outputCol="filtered", stopWords = stopwordList)
df_aecio = remover.transform(df_aecio)

remove_empty_tokens_udf = udf(remove_empty_tokens, ArrayType(StringType()))

df_aecio = df_aecio.withColumn("filtered", remove_empty_tokens_udf(col("filtered")))

display(df_aecio.head(20))

[Row(id='522394798914015233', content='@KennedyAncar @Drimone @jornaldaREALMENTE DILMA CAGUEJOU,ESTAVA NERVOSA,SEM PREPARO,MUITO REPETITIVA VOLTA MUITO NO PASSADO,AÉCIO FOI MELHOR', content_clean='KennedyAncar Drimone jornaldaREALMENTE DILMA CAGUEJOUESTAVA NERVOSASEM PREPAROMUITO REPETITIVA VOLTA MUITO NO PASSADOAÉCIO FOI MELHOR', words=['kennedyancar', 'drimone', 'jornaldarealmente', 'dilma', 'caguejouestava', 'nervosasem', 'preparomuito', 'repetitiva', 'volta', 'muito', 'no', 'passadoaécio', 'foi', 'melhor'], filtered=['kennedyancar', 'drimone', 'jornaldarealmente', 'dilma', 'caguejouestava', 'nervosasem', 'preparomuito', 'repetitiva', 'volta', 'passadoaécio', 'melhor']),
 Row(id='522394968611360768', content='O site do TSE-MG ficou fora do ar, por que será hein Aécio?', content_clean='O site do TSEMG ficou fora do ar por que será hein Aécio', words=['o', 'site', 'do', 'tsemg', 'ficou', 'fora', 'do', 'ar', 'por', 'que', 'será', 'hein', 'aécio'], filtered=['site', 'tsemg', 'ficou', 'h

In [11]:
from functools import reduce

def generate_ngrams(wordsData, ngram_range):
    ngrams_data = []
    for n in range(ngram_range[0], ngram_range[1] + 1):
        ngram = NGram(n=n, inputCol="filtered", outputCol="ngram")
        ngram_data = ngram.transform(wordsData)
        ngrams_data.append(ngram_data)
    return reduce(lambda df1, df2: df1.union(df2), ngrams_data)

In [12]:
ngram_range = (2, 5)
df_aecio = generate_ngrams(df_aecio, ngram_range)

df_aecio.show() # OK

+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|                id|             content|       content_clean|               words|            filtered|               ngram|
+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|522394798914015233|@KennedyAncar @Dr...|KennedyAncar Drim...|[kennedyancar, dr...|[kennedyancar, dr...|[kennedyancar dri...|
|522394968611360768|O site do TSE-MG ...|O site do TSEMG f...|[o, site, do, tse...|[site, tsemg, fic...|[site tsemg, tsem...|
|522394979403325440|Olha isso @RealKa...|Olha isso RealKaj...|[olha, isso, real...|[olha, realkajuru...|[olha realkajuru,...|
|522395075972976643|@Indianara_m tamb...|Indianara_m també...|[indianara_m, tam...|[indianara_m, que...|[indianara_m quer...|
|522395717797969920|@BlogdoNoblat gan...|BlogdoNoblat ganh...|[blogdonoblat, ga...|[blogdonoblat, ga...|[blogdonoblat 

In [13]:
hashingTF = HashingTF(inputCol="ngram", outputCol="features") 
# aparentemente isso aplica um countvec
df_aecio = hashingTF.transform(df_aecio)

In [14]:
# Aplicar IDF para calcular TF-IDF
idf = IDF(inputCol="features", outputCol="final_features", minDocFreq = 5)
idfModel = idf.fit(df_aecio)
df_tfidf = idfModel.transform(df_aecio).select("id", "content", "ngram", "final_features")

In [15]:
df_tfidf.show()

+------------------+--------------------+--------------------+--------------------+
|                id|             content|               ngram|      final_features|
+------------------+--------------------+--------------------+--------------------+
|522394798914015233|@KennedyAncar @Dr...|[kennedyancar dri...|(262144,[20797,29...|
|522394968611360768|O site do TSE-MG ...|[site tsemg, tsem...|(262144,[33219,23...|
|522394979403325440|Olha isso @RealKa...|[olha realkajuru,...|(262144,[13974,21...|
|522395075972976643|@Indianara_m tamb...|[indianara_m quer...|(262144,[24461,44...|
|522395717797969920|@BlogdoNoblat gan...|[blogdonoblat gan...|(262144,[24081,74...|
|522395826153619456|@UOLNoticias e o ...|[uolnoticias alkm...|(262144,[251,5644...|
|522395968491520001|"""@sensacionalis...|[sensacionalista ...|(262144,[1917,152...|
|522395969577832448|Só eu acho qe o A...|[acho aécio, aéci...|(262144,[33249,93...|
|522396040490917889|e falam q nao vao...|[falam nao, nao v...|(262144,[14282

In [16]:
def calculate_l1_norm(vector):
    sparse_vector = Vectors.sparse(vector.size, vector.indices, vector.values)
    return float(sparse_vector.norm(1))

In [17]:
sum_udf = udf(calculate_l1_norm, DoubleType())

important_sentences_df = df_tfidf.withColumn("tfidf_sum", sum_udf(col("final_features")))

# Ordena as sentenças por tfidf_sum em ordem decrescente
important_sentences_df = (important_sentences_df.orderBy(col("tfidf_sum").desc())).limit(150).select("id", "content", "ngram", "final_features", "tfidf_sum")

In [18]:
df_orig = None
df = None
df_aecio = None

In [19]:
#important_sentences_df.show()

In [20]:
dataframe_pd = important_sentences_df.toPandas()

dataframe_pd.to_csv('./outcome/Q1/result_aecio.csv', index=False)

In [21]:
spark.stop()

In [22]:
# REDUZIR NUMERO DE FEATURES
# CHECAR MOSTRAR O RESUTLADO SEM TER A FINAL_FEATURES E PRONTO