# Conexão com o Hadoop

In [1]:
from pyarrow import fs, hdfs
hdfs.HadoopFileSystem("hdfs://namenode:9000")
conexao = fs.HadoopFileSystem(host="namenode", port=9000)

  hdfs.HadoopFileSystem("hdfs://namenode:9000")


## Preparar Ambiente

In [2]:
#navegar pelas pastas
conexao.get_file_info(fs.FileSelector('/', recursive=True))

[]

### Diretório

In [3]:
#Criar pastas
conexao.create_dir('/somativa2')

### Arquivo

In [4]:
#Criar Arquivos
caminho_arquivo_origem = 'sentiment140.csv'
caminho_arquivo_destino = '/somativa2/sentiment140.csv'

#Escrever Arquivo
with conexao.open_output_stream(caminho_arquivo_destino) as stream:
    stream.write(open(caminho_arquivo_origem, 'rb').read())

# Importando PySpark

In [5]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import to_timestamp, expr
from pyspark.sql.functions import col, lower, explode, split, regexp_replace

## Sessão

In [6]:
# Criar sessao
spark = SparkSession.builder \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()

## DataFrame

In [7]:
# Ler arquivo e criar dataframe
df = spark.read.csv('hdfs://namenode:9000/somativa2/sentiment140.csv', header=True, inferSchema=True)

In [8]:
# Mostrar dataframe
df.show()

+-----------------+--------------------+------------+-------------------+--------------------+-------------------------+
|           target|                 ids|        date|               flag|                user|text;;;;;;;;;;;;;;;;;;;;;|
+-----------------+--------------------+------------+-------------------+--------------------+-------------------------+
|"0,""1467810369""|""Mon Apr 06 22:1...|""NO_QUERY""|""_TheSpecialOne_""|""@switchfoot htt...|      that's a bummer....|
|"0,""1467810672""|""Mon Apr 06 22:1...|""NO_QUERY""|  ""scotthamilton""|""is upset that h...|                     NULL|
|"0,""1467810917""|""Mon Apr 06 22:1...|""NO_QUERY""|       ""mattycus""|""@Kenichan I div...|                     NULL|
|"0,""1467811184""|""Mon Apr 06 22:1...|""NO_QUERY""|        ""ElleCTF""|""my whole body f...|                     NULL|
|"0,""1467811193""|""Mon Apr 06 22:1...|""NO_QUERY""|         ""Karoli""|""@nationwideclas...|      it's not behavin...|
|"0,""1467811372""|""Mon Apr 06 

In [9]:
# Mostrar colunas e respectivos tipos
df.cache()

DataFrame[target: string, ids: string, date: string, flag: string, user: string, text;;;;;;;;;;;;;;;;;;;;;: string]

### Organizar DataFrame

Para facilitar a análise dos dados, vamos organizar as informações do DataFrame, removendo inicialmente as colunas que não são relevantes para a análise, como a coluna 'target' e 'flag'.

Contrariando o planejamento inicial, a coluna 'date' se revelou importante, pois é esperado que contenha informações de data e tempo. No entanto, parece que durante a coleta dos dados, os valores que deveriam estar em 'date' foram erroneamente armazenados na coluna 'ids'. Para corrigir esse equívoco, vamos renomear a coluna 'ids' para 'date'.

Uma revisão preliminar das colunas 'user' e 'text;;;;;;;;;;;;;;;;;;;;;' sugere que é possível combinar seus conteúdos em uma única coluna. Portanto, vamos fundir essas duas colunas em uma nova chamada 'text'. Com isso, as colunas originais 'user' e 'text;;;;;;;;;;;;;;;;;;;;;' podem ser descartadas, pois todas as informações necessárias estarão contidas na nova coluna 'text'.

In [10]:
# 1 - Excluir as colunas 'target' e 'date'
df = df.drop('target', 'flag', 'date')

# 2 - Renomear a coluna 'ids' para 'date'
df = df.withColumnRenamed('ids', 'date')

# 3 - Juntar as colunas 'user' e 'text;;;;;;;;;;;;;;;;;;;;;' usando concat_ws e um espaço como separador
df = df.withColumn('text', F.concat_ws(' ', F.col('user'), F.col('text;;;;;;;;;;;;;;;;;;;;;')))

# 4 - Depois de criada a coluna 'text', excluir as colunas 'user' e 'text;;;;;;;;;;;;;;;;;;;;;'
df = df.drop('user', 'text;;;;;;;;;;;;;;;;;;;;;')

In [11]:
# Mostrar o DataFrame resultante para verificar as mudanças
df.show(truncate=False)

+--------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+
|date                            |text                                                                                                                                          |
+--------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+
|""Mon Apr 06 22:19:45 PDT 2009""|""@switchfoot http://twitpic.com/2y1zl - Awww  that's a bummer.  You shoulda got David Carr of Third Day to do it. ";"D""";;;;;;;;;;;;;;;;;;;;|
|""Mon Apr 06 22:19:49 PDT 2009""|""is upset that he can't update his Facebook by texting it... and might cry as a result  School today also. Blah!""";;;;;;;;;;;;;;;;;;;;;     |
|""Mon Apr 06 22:19:53 PDT 2009""|""@Kenichan I dived many times for the ball. Managed to save 50%  The rest g

Observa-se que a visualização dos valores das colunas ainda pode ser melhorada, visto que cada valor está precedido e seguido por dois caracteres de aspas (""). Esses caracteres não adicionam informações relevantes e, portanto, podem ser removidos das strings. Isso tornará os valores mais 'limpos' e claros, facilitando a análise e interpretação dos dados.

In [12]:
# Remover aspas extras de todas as colunas
for col_name in df.columns:
    df = df.withColumn(col_name, F.regexp_replace(F.col(col_name), '^""|""$', ''))

# Para garantir que não há aspas no meio dos textos também (caso seja necessário)
for col_name in df.columns:
    df = df.withColumn(col_name, F.regexp_replace(F.col(col_name), '""', ''))

# Mostrar algumas linhas para confirmar as alterações
df.show(truncate=False)

+----------------------------+------------------------------------------------------------------------------------------------------------------------------------------+
|date                        |text                                                                                                                                      |
+----------------------------+------------------------------------------------------------------------------------------------------------------------------------------+
|Mon Apr 06 22:19:45 PDT 2009|@switchfoot http://twitpic.com/2y1zl - Awww  that's a bummer.  You shoulda got David Carr of Third Day to do it. ";"D";;;;;;;;;;;;;;;;;;;;|
|Mon Apr 06 22:19:49 PDT 2009|is upset that he can't update his Facebook by texting it... and might cry as a result  School today also. Blah!";;;;;;;;;;;;;;;;;;;;;     |
|Mon Apr 06 22:19:53 PDT 2009|@Kenichan I dived many times for the ball. Managed to save 50%  The rest go out of bounds";;;;;;;;;;;;;;;;;;;;;         

Para concluir, converteremos a coluna 'date' para o formato de timestamp. Isso nos permitirá realizar filtros precisos por data e hora nas análises subsequentes.

In [13]:
# Alterar o tipo da coluna existente de string para timestamp
df = df.withColumn("date", to_timestamp("date", "EEE MMM dd HH:mm:ss zzz yyyy"))

# Ajustar o fuso horário manualmente (subtrair 7 horas para PDT)
df = df.withColumn("date", expr("date - interval 7 hours"))

# Mostrar o resultado
df.show()

+-------------------+--------------------+
|               date|                text|
+-------------------+--------------------+
|2009-04-06 22:19:45|@switchfoot http:...|
|2009-04-06 22:19:49|is upset that he ...|
|2009-04-06 22:19:53|@Kenichan I dived...|
|2009-04-06 22:19:57|my whole body fee...|
|2009-04-06 22:19:57|@nationwideclass ...|
|2009-04-06 22:20:00|@Kwesidei not the...|
|2009-04-06 22:20:03|Need a hug ";;;;;...|
|2009-04-06 22:20:03|@LOLTrish hey  lo...|
|2009-04-06 22:20:05|@Tatiana_K nope t...|
|2009-04-06 22:20:09|@twittera que me ...|
|2009-04-06 22:20:16|spring break in p...|
|2009-04-06 22:20:17|I just re-pierced...|
|2009-04-06 22:20:19|@caregiving I cou...|
|2009-04-06 22:20:19|@octolinz16 It it...|
|2009-04-06 22:20:20|@smarrison i woul...|
|2009-04-06 22:20:20|@iamjazzyfizzle I...|
|2009-04-06 22:20:22|Hollis' death sce...|
|2009-04-06 22:20:25|about to file tax...|
|2009-04-06 22:20:31|@LettyA ahh ive a...|
|2009-04-06 22:20:34|@FakerPattyPattz ...|
+----------

## Análises

- Mostrar as **palavras mais frequentes** nos tweets (similar ao código da tokenização)
- Mostrar os **usuários mais mencionados** (isto é, todas as palavras iniciadas com um "@")
- Mostrar os **hashtags mais frequentes** (isto é, todas as palvras iniciadas com um "#")

### Palavras Mais Frequentes 
Para analisar as palavras mais frequentes nos tweets, iremos considerar a coluna 'text':

In [41]:
# Remover caracteres especiais antes de tokenizar
# Limpar o texto mantendo '@' e '#' enquanto remove outros caracteres especiais como ';'
cleaned_df = df.withColumn("clean_text", regexp_replace(col("text"), "[^\w\s@#]", ""))

In [37]:
# Tokenizar o texto e contar as palavras
df_word = cleaned_df.withColumn("word", explode(split(lower(col("clean_text")), "\\W+"))) \
                   .groupBy("word") \
                   .count()

In [38]:
# Ordenar o DataFrame pela coluna 'count' em ordem decrescente
df_word = df_word.orderBy(F.desc("count"))

# Mostrar palavras com maior frequencia
df_word.show()

+----+-------+
|word|  count|
+----+-------+
|    |1015209|
|   i|  63424|
|  to|  46496|
| the|  36165|
|  my|  26033|
|   a|  26012|
| and|  21408|
|  is|  18954|
|  in|  16622|
|  it|  16289|
| for|  14054|
|  im|  13928|
|  of|  13492|
| you|  12466|
|  me|  12247|
|  on|  12166|
|have|  11719|
|  so|  11620|
| but|  10823|
| not|  10300|
+----+-------+
only showing top 20 rows



### Usuários Mais Mencionados
Para analisar os usuários mais mencionados, também iremos considerar a coluna 'text', filtrando as palavras que iniciam com o caractere '@':

In [42]:
# Encontrar e contar menções de usuários
# filtrar as palavras que começam com o caractere '@'
df_mention = cleaned_df.withColumn("mention", explode(split(col("clean_text"), "\s+"))) \
                .filter(col("mention").startswith("@")) \
                .groupBy("mention") \
                .count() \
                .orderBy("count", ascending=False)

In [43]:
# Mostrar os usuários mais mencionados
df_mention.show()

+----------------+-----+
|         mention|count|
+----------------+-----+
|               @|  857|
|       @tommcfly|  193|
|     @mileycyrus|  188|
|       @ddlovato|  122|
|    @DavidArchie|  110|
|     @stephenfry|   80|
|      @petewentz|   62|
| @DonnieWahlberg|   59|
|@JonathanRKnight|   56|
|    @nick_carter|   54|
|   @joeymcintyre|   53|
|         @aplusk|   50|
|   @jordanknight|   50|
|    @dougiemcfly|   46|
|    @heidimontag|   43|
|    @gfalcone601|   41|
|  @amazingphoebe|   38|
|     @JoelMadden|   37|
|      @dannywood|   37|
|   @shaundiviney|   36|
+----------------+-----+
only showing top 20 rows



### Hashtags Mais Frequentes

Para analisar as hashtags mais frequentes, também iremos considerar a coluna 'text', filtrando as palavras que iniciam com o caractere '#':

In [44]:
# Encontrar e contar menções de usuários
# filtrar as palavras que começam com o caractere '#'
df_hashtags = cleaned_df.withColumn("hashtags", explode(split(col("clean_text"), "\s+"))) \
                .filter(col("hashtags").startswith("#")) \
                .groupBy("hashtags") \
                .count() \
                .orderBy("count", ascending=False)

In [45]:
# Mostrar os hashtags mais frequentes
df_hashtags.show()

+--------------+-----+
|      hashtags|count|
+--------------+-----+
|      #asot400|  110|
|           #fb|  106|
|           #f1|   37|
|             #|   34|
|         #fail|   33|
|          #tag|   24|
|      #ASOT400|   24|
|            #1|   23|
|           #F1|   20|
|#3turnoffwords|   20|
|   #fixreplies|   19|
|            #2|   16|
|   #shortstack|   15|
|      #awaresg|   15|
|     #pawpawty|   14|
|    #hoppusday|   13|
|     #swineflu|   13|
| #followfriday|   12|
|      #Tweetie|   12|
|        #mcfly|   11|
+--------------+-----+
only showing top 20 rows



### 10 Palavras Mais Frequentes por Dia da Semana 

In [21]:
# Adicionar uma coluna 'day_of_week' ao DataFrame que contém o nome do dia da semana
df = df.withColumn("day_of_week", F.date_format(F.col("date"), "EEEE"))

In [22]:
df.show()

+-------------------+--------------------+-----------+
|               date|                text|day_of_week|
+-------------------+--------------------+-----------+
|2009-04-06 22:19:45|@switchfoot http:...|     Monday|
|2009-04-06 22:19:49|is upset that he ...|     Monday|
|2009-04-06 22:19:53|@Kenichan I dived...|     Monday|
|2009-04-06 22:19:57|my whole body fee...|     Monday|
|2009-04-06 22:19:57|@nationwideclass ...|     Monday|
|2009-04-06 22:20:00|@Kwesidei not the...|     Monday|
|2009-04-06 22:20:03|Need a hug ";;;;;...|     Monday|
|2009-04-06 22:20:03|@LOLTrish hey  lo...|     Monday|
|2009-04-06 22:20:05|@Tatiana_K nope t...|     Monday|
|2009-04-06 22:20:09|@twittera que me ...|     Monday|
|2009-04-06 22:20:16|spring break in p...|     Monday|
|2009-04-06 22:20:17|I just re-pierced...|     Monday|
|2009-04-06 22:20:19|@caregiving I cou...|     Monday|
|2009-04-06 22:20:19|@octolinz16 It it...|     Monday|
|2009-04-06 22:20:20|@smarrison i woul...|     Monday|
|2009-04-0

#### Definir Funcao
Vamos definir uma funcao que selecione as palavras mais frequentes considerando um dia da semana especifico:

In [23]:
def get_top_words_by_day(df, day_of_week):
    # Filtrar para apenas as entradas onde 'day_of_week' é especificado pelo parâmetro
    day_df = df.filter(col("day_of_week") == day_of_week)

    # Remover caracteres especiais antes de tokenizar
    # Isso garante que caracteres como ponto e vírgula e aspas sejam removidos
    cleaned_df = day_df.withColumn("clean_text", regexp_replace(col("text"), r"[^\w\s]", ""))

    # Tokenizar o texto limpo, converter para minúsculas e expandir em uma nova linha cada palavra
    words_df = cleaned_df.withColumn("word", explode(split(lower(col("clean_text")), "\\W+")))

    # Agrupar por 'word', contar e ordenar por frequência
    word_counts = (words_df.groupBy("word")
                           .count()
                           .orderBy(col("count").desc()))

    # Selecionar as top 10 palavras mais frequentes
    top_10_words = word_counts.limit(10)

    return top_10_words

#### Monday

In [24]:
# Usar a função para obter as palavras mais frequentes para 'Monday'
top_10_words_monday = get_top_words_by_day(df, "Monday")
top_10_words_monday.show()

+----+-----+
|word|count|
+----+-----+
|    |12462|
|   i| 8958|
|  to| 7291|
| the| 5329|
|   a| 3804|
|  my| 3800|
| and| 3147|
|  is| 2848|
|  in| 2603|
|  it| 2347|
+----+-----+



#### Tuesday

In [25]:
# Usar a função para obter as palavras mais frequentes para 'Tuesday'
top_10_words_tuesday = get_top_words_by_day(df, "Tuesday")

top_10_words_tuesday.show()

+----+-----+
|word|count|
+----+-----+
|    | 7706|
|   i| 5272|
|  to| 4237|
| the| 3492|
|   a| 2435|
|  my| 2262|
| and| 1936|
|  is| 1924|
|  in| 1695|
|  it| 1523|
+----+-----+



#### Wednesday

In [26]:
# Usar a função para obter as palavras mais frequentes para 'Wednesday'
top_10_words_wednesday = get_top_words_by_day(df, "Wednesday")

top_10_words_wednesday.show()

+----+-----+
|word|count|
+----+-----+
|    | 1296|
|   i| 1008|
|  to|  707|
| the|  538|
|  my|  413|
|   a|  371|
| and|  305|
|  is|  303|
|  in|  253|
|  im|  229|
+----+-----+



#### Thursday

In [27]:
# Usar a função para obter as palavras mais frequentes para 'Thursday'
top_10_words_thursday = get_top_words_by_day(df, "Thursday")

top_10_words_thursday.show()

+----+-----+
|word|count|
+----+-----+
|    | 5191|
|   i| 3949|
|  to| 2960|
| the| 2238|
|  my| 1673|
|   a| 1668|
| and| 1246|
|  is| 1186|
|  it| 1086|
|  in| 1040|
+----+-----+



#### Friday

In [28]:
# Usar a função para obter as palavras mais frequentes para 'Friday'
top_10_words_friday = get_top_words_by_day(df, "Friday")

top_10_words_friday.show()

+----+-----+
|word|count|
+----+-----+
|    | 3115|
|   i| 2577|
|  to| 1736|
| the| 1305|
|  my| 1009|
|   a|  993|
| and|  822|
|  is|  697|
|  im|  643|
|  it|  640|
+----+-----+



#### Saturday

In [29]:
# Usar a função para obter as palavras mais frequentes para 'Saturday'
top_10_words_saturday = get_top_words_by_day(df, "Saturday")

top_10_words_saturday.show()

+----+-----+
|word|count|
+----+-----+
|    |20566|
|   i|16588|
|  to|11008|
| the| 8932|
|   a| 6570|
|  my| 6542|
| and| 5255|
|  is| 4605|
|  in| 4091|
|  it| 4042|
+----+-----+



#### Sunday

In [30]:
# Usar a função para obter as palavras mais frequentes para 'Sunday'
top_10_words_sunday = get_top_words_by_day(df, "Sunday")

top_10_words_sunday.show()

+----+-----+
|word|count|
+----+-----+
|    |32293|
|   i|24712|
|  to|18280|
| the|14089|
|  my|10193|
|   a|10012|
| and| 8546|
|  is| 7289|
|  it| 6313|
|  in| 6220|
+----+-----+

