## Consulta e armazenamento dos Trending Topics Twitter - Brasil
Data: 02/04/2022  
Trabalho de Projeto de Bloco Infnet  
Raphael da Rocha Fonseca

### Objetivo
Coletar os Trending Topics do Twitter ao longo do dia e observar como os eventos do dia se refletem nos assuntos comentados  


### Método de coleta
Consulta com API do twitter ao longo do dia 02/04, a cada 5 minutos, de 11:40 am à 02:00 am

### Infraestrutura utilizada
Notebook Pyspark - Databricks Community  
Armazenamento de arquivos parquet no DBFS

#### Instalação das bibliotecas

In [0]:
#!pip install tweepy
#!pip install geocoder

#### Importação das funções

In [0]:
from pyspark.sql.functions import col, lit, desc, from_utc_timestamp, lit, rank, date_format, expr, date_format
from pyspark.sql import Window
from datetime import datetime, timedelta
import tweepy, geocoder
import time

#### Credenciais de autenticação com a API do Twitter

Insira suas credenciais de API do Twitter

In [0]:
consumer_key = ["insira aqui"]
consumer_secret = ["insira aqui"]
access_key = ["insira aqui"]
access_secret = ["insira aqui"]

#### Realizando autenticação do tweepy com a API do Twitter

In [0]:
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_key, access_secret)
api = tweepy.API(auth)

#### Função que obtém os 20 assuntos do Brasil mais comentados no momento da consulta
- A função geocoder retorna uma lat e long do Brasil, em seguida esses valores são passados na função closest_trends, onde será retornado o valor do identificador woeid do territorio Brasileiro
- Em seguida é chamada a função get_place_trends, passando como argumento a variável closest_loc, onde de fato será retornada a lista dos Trending Topics do Brasil
- A lista já vem ordenada com a posição de cada assunto no Ranking, mas não há numeração, sendo assim foi utilizada a função enumerate para capturar o índice do item na lista e assumir como posição no raking
- Em seguida a lista é convertida em um pyspark dataframe
- Em alguns momentos foi identificado duplicidade de assuntos na lista, por isso foi incluído um drop_duplicates na coluna assunto
- Como houve dados dropados, foi refeito a coluna de Ranking e adicionada coluna com data da consulta
- Adicionada coluna data_ref que está em um TIMEZONE -7 (para que a virada do dia ocorra as 4am no horário local)
- Adicionada colunas de dia_ref e time_ref que serão usadas para particionar os dados durante a ingestão
- Por fim limitado o resultado a 20 itens

OBS: Um erro na API do Twitter faz com que algums assuntos sejam retornados com qtd_tweets nulos

In [0]:
def get_trends():
  g = geocoder.osm('Brazil')
  closest_loc = api.closest_trends(g.lat, g.lng)
  
  trends = api.get_place_trends(closest_loc[0]["woeid"])[0]['trends']
  list_trends = [{'rank' : i, 'assunto' : x['name'], 'qtd_tweets' : x['tweet_volume']} for i,x in enumerate(trends)]
  
  df_trends = spark.createDataFrame(list_trends)
  df_trends = df_trends.drop_duplicates(subset=['assunto'])\
                       .withColumn('rank',rank().over(Window.orderBy("rank")))\
                       .withColumn('data_consulta', lit(expr('current_timestamp - INTERVAL 3 HOURS')))\
                       .withColumn('data_ref', expr('data_consulta - INTERVAL 4 HOURS'))\
                       .withColumn('dia_ref', date_format(col('data_ref'), "yyyyMMdd"))\
                       .withColumn('time_ref', date_format(col('data_ref'), "HHmm"))\
                       .drop('data_ref')\
                       .limit(20)
  
  return df_trends

#### Função que salva o dataframe em um arquivo parquet no DBFS
- A função coalesce força a criação de apenas 01 arquivo na pasta
- Os arquivos são salvos particionados na pasta pelo dia_ref e time_ref

In [0]:
def save_trends(dataframe):
  dataframe.coalesce(1).write.mode('append').partitionBy('dia_ref','time_ref').parquet('/twitter/trending_topics/')

In [0]:
%fs
ls /twitter/trending_topics/dia_ref=20220402/time_ref=0740/

path,name,size
dbfs:/twitter/trending_topics/dia_ref=20220402/time_ref=0740/_SUCCESS,_SUCCESS,0
dbfs:/twitter/trending_topics/dia_ref=20220402/time_ref=0740/_committed_3031541097516881167,_committed_3031541097516881167,122
dbfs:/twitter/trending_topics/dia_ref=20220402/time_ref=0740/_started_3031541097516881167,_started_3031541097516881167,0
dbfs:/twitter/trending_topics/dia_ref=20220402/time_ref=0740/part-00000-tid-3031541097516881167-2fbe5369-794c-483a-8e58-2509286098c5-9-1.c000.snappy.parquet,part-00000-tid-3031541097516881167-2fbe5369-794c-483a-8e58-2509286098c5-9-1.c000.snappy.parquet,1871


#### Célula que chama as funções de coleta e ingestão dos arquivos
- Como não há forma de realizar agendamento de execução pelo Databricks community, foi utilizado o While True para rodar até que uma interrupção manual aconteça
- São coletados os trending topics a cada 5 minutos

In [0]:
while True:
  df = get_trends()
  save_trends(df)  
    
  time.sleep(60*5)