## **Autores**

In [None]:
"""
    Camadas de Serviços e Consumo de Dados

    Autor:
        Richard de Andrade
        
    Data:
        2022-12-18
"""


## **Referências**

In [None]:
"""
    Modelo de análise de sentimentos:
        https://www.earthdatascience.org/courses/use-data-open-source-python/intro-to-apis/analyze-tweet-sentiment-in-python/
    Dicionário de dados Tweepy:
        https://developer.twitter.com/en/docs/twitter-api/data-dictionary/object-model/tweet
"""

## **Bibliotecas necessárias**

In [23]:
#!pip3 install tweepy
#!pip3 install nltk==3.8
#!pip3 install textblob==0.17.1
#!pip3 install google-cloud-storage

## **Imports**

In [242]:
%%time
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import tweepy as tw
import collections
import subprocess
import itertools
import networkx
import warnings
import nltk
import os
import re

from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from nltk.corpus import stopwords
from google.cloud import storage
from datetime import datetime
from textblob import TextBlob

warnings.filterwarnings("ignore")

sns.set(font_scale=1.5)
sns.set_style("whitegrid")

spark = SparkSession.builder.appName("Spark SQL basic example").getOrCreate()
spark

CPU times: user 593 µs, sys: 4.05 ms, total: 4.65 ms
Wall time: 6.5 ms


## **Funções de diretório e remove URL**

In [212]:
%%time
def create_folder(bucket_name, destination_folder_name):
    """
        Cria uma pasta no bucket.
    """
    
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob(destination_folder_name)
    blob.upload_from_string(' ')
    print('Created {} .'.format(destination_folder_name))
    
def create_file(bucket_name, destination_folder_name, file_name, json_object):
    """
        Cria um arquivo e insere os dados.
    """    
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob("{0}{1}".format(destination_folder_name, file_name))
    blob.upload_from_string(
        data=json.dumps(json_object),
        content_type='application/json'
    )    
    print("Created {0}{1}".format(destination_folder_name, file_name))
    

def remove_url(txt):
    """
        Remove a url do texto da mensagem.
    """
    return " ".join(re.sub("([^0-9A-Za-z \t])|(\w+:\/\/\S+)", "", txt).split())


bucket_name = 'bucket-camadas'    
data_arq    = datetime.today().strftime('%Y-%m-%d')

CPU times: user 39 µs, sys: 5 µs, total: 44 µs
Wall time: 49.1 µs


## **Credenciais do Twitter**

In [15]:
%%time
api_key             = "b5sALnUsjZ2mNqAsdcYw8ERA5"
api_key_secret      = "JUb1LBMRSQr9aUqcMMRd9tX8Kns3rzazhS4k69UJMLyTkwh8T8"
access_token        = "1520806134898216960-QFU8yiU2Z0ubEGvvqazu5PWKWFEUSu"
access_token_secret = "JjRfqBB8XQcmAoWOmNfGnBVPNgRN8rh9T06cO8RdvlSBZ"

## **Cria Instância a API do Twitter (TWEEPY)**

In [16]:
%%time
auth = tw.OAuthHandler(api_key, api_key_secret)
auth.set_access_token(access_token, access_token_secret)
api = tw.API(auth, wait_on_rate_limit=True)
api

<tweepy.api.API at 0x7fb3b22c6550>

## **Busca por uma palavra especifica**

In [174]:
%%time
search_words = "#iphone13"
date_since   = data_arq
tweets       = tw.Cursor(api.search_tweets, q = search_words, lang="en", since=date_since, tweet_mode="extended").items(10)
tweets  

CPU times: user 36 µs, sys: 6 µs, total: 42 µs
Wall time: 44.3 µs


<tweepy.cursor.ItemIterator at 0x7fb3a73da310>

## **Exemplo Camada Bronze**

In [175]:
%%time
all_tweets = [tweet for tweet in tweets]
json_tweets = [json.dumps(json_obj._json) for json_obj in all_tweets] 
create_folder(bucket_name, 'dados/bronze/%s/' % data_arq)    
create_file(bucket_name, 'dados/bronze/%s/' % data_arq, '%s.json' % data_arq, json_tweets)

Unexpected parameter: since


Created dados/bronze/2022-12-18/ .
Created dados/bronze/2022-12-18/2022-12-18.json
CPU times: user 60.3 ms, sys: 7.11 ms, total: 67.4 ms
Wall time: 828 ms


## **Exemplo Camada Silver e Aplicação do Modelo de Análise de Sentimentos**

#### TextBlob - Polarity - Subjectivity [Accuracy 56%]

#### [Quanto mais próximo de 1 mais positiva a mensagem; quanto mais próxima de -1 mais negativa a mensagem]
##### 
##### Nota: 0.38 
##### Usuário: Duncan Hartley
##### Mensagem: LindaMa109999 Its amazing how high the quality is for photographs with theiPhone 13 Pro
##### 
##### Nota: -0.4 
##### Usuário: techpriest
##### Mensagem: iPhone 13 Pro battery Sucks	


In [206]:
%%time

# Cria dataframe pandas 
df = pd.DataFrame(columns=["usuario", "mensagem", "origem", "localizacao", "data", "seguidores", "qt_likes", "verificado", "sentimento"])

# Loop nos tweets
for i in all_tweets:
    
    # Armazena as variavéis
    usuario     = i.user.name
    mensagem    = remove_url(i.full_text)
    origem      = i.source
    localizacao = i.user.location
    data        = i.created_at
    seguidores  = i.user.followers_count
    qt_likes    = i.favorite_count
    verificado  = i.user.verified
    
    # Aplica modelo de sentimento nas variáveis
    sentimento  = TextBlob(mensagem) 
    sentimento  = sentimento.polarity 
    
    
    # Armazena os dados no dataframe
    df = df.append({"usuario"     : usuario
                   ,"mensagem"    : mensagem
                   ,"origem"      : origem
                   ,"localizacao" : localizacao
                   ,"data"        : data
                   ,"seguidores"  : seguidores
                   ,"qt_likes"    : qt_likes
                   ,"verificado"  : verificado
                   ,"sentimento"  : sentimento
    }, ignore_index=True)

# Cria um dataframe pyspark com base no Pandas
df = spark.createDataFrame(df)

# Converte as colunas para o formato da tabela final
df = (
    df.withColumn("data", F.col("data").cast('string'))\
      .withColumn("verificado", F.col("verificado").cast('string'))\
      .withColumn("sentimento", F.col("sentimento").cast('float'))
)    

# Escreve o arquivo parquet
df.write.parquet("gs://bucket-camadas/dados/silver/parquet")

# Limpa a pasta parquet
#!gsutil -m rm -R gs://bucket-camadas/dados/silver/parquet

CPU times: user 71.8 ms, sys: 3.31 ms, total: 75.1 ms
Wall time: 106 ms


## **Cria external table no mesmo caminho do arquivo parquet**

In [254]:
%%time
spark.sql('''drop table if exists tb_twitter_silver''')

spark.sql('''
create external table tb_twitter_silver
(
    usuario        string,
    mensagem       string,
    origem         string,
    localizacao    string,
    data           string,
    seguidores     bigint,
    qt_likes       bigint,
    verificado     string,
    sentimento     float
)
stored as parquet
location 'gs://bucket-camadas/dados/silver/parquet'
''')

spark.sql('''describe formatted tb_twitter_silver''').show(n=100, truncate=False)

+----------------------------+--------------------------------------------------------------+-------+
|col_name                    |data_type                                                     |comment|
+----------------------------+--------------------------------------------------------------+-------+
|usuario                     |string                                                        |null   |
|mensagem                    |string                                                        |null   |
|origem                      |string                                                        |null   |
|localizacao                 |string                                                        |null   |
|data                        |string                                                        |null   |
|seguidores                  |bigint                                                        |null   |
|qt_likes                    |bigint                                              

In [260]:
%%time
spark.table("tb_twitter_silver").show()

+----------------+--------------------+-------------------+-----------------+-------------------+----------+--------+----------+-----------+
|         usuario|            mensagem|             origem|      localizacao|               data|seguidores|qt_likes|verificado| sentimento|
+----------------+--------------------+-------------------+-----------------+-------------------+----------+--------+----------+-----------+
|       Mastermax|RT Moto760 Did yo...| Twitter for iPhone|  Munich, Bavaria|2022-12-18 18:48:19|       695|       0|     false|       0.25|
|   Apple Snob 😝|Did you know you ...| Twitter for iPhone|   Springfield MO|2022-12-18 18:47:16|      1891|       5|     false| 0.28333333|
|           Jenny|Just learned how ...| Twitter for iPhone|                 |2022-12-18 18:46:33|        71|       1|     false|        0.0|
|Nour Ben Mahmoud|SQUID GAMEIphone4...|    Twitter Web App|                 |2022-12-18 18:42:43|         4|       1|     false|-0.30462962|
|   Elijah Bai

## **Exemplo Camada Gold**

In [270]:
%%time
gold = (
    spark.table("tb_twitter_silver")
        .groupBy("data")
        .agg(F.sum(F.col("qt_likes")).alias("qt_likes")
            ,F.sum(F.col("seguidores")).alias("qt_seguidores"))
)    

gold.write.parquet("gs://bucket-camadas/dados/gold/parquet")

# Limpa a pasta parquet
#!gsutil -m rm -R gs://bucket-camadas/dados/gold/parquet

                                                                                

CPU times: user 13.2 ms, sys: 537 µs, total: 13.7 ms
Wall time: 4.48 s


In [271]:
%%time
spark.sql('''drop table if exists tb_twitter_gold''')

spark.sql('''
create external table tb_twitter_gold
(
    data           string,
    qt_seguidores     bigint,
    qt_likes       bigint    
)
stored as parquet
location 'gs://bucket-camadas/dados/gold/parquet'
''')

spark.sql('''describe formatted tb_twitter_gold''').show(n=100, truncate=False)

+----------------------------+--------------------------------------------------------------+-------+
|col_name                    |data_type                                                     |comment|
+----------------------------+--------------------------------------------------------------+-------+
|data                        |string                                                        |null   |
|qt_seguidores               |bigint                                                        |null   |
|qt_likes                    |bigint                                                        |null   |
|                            |                                                              |       |
|# Detailed Table Information|                                                              |       |
|Database                    |default                                                       |       |
|Table                       |tb_twitter_gold                                     

In [272]:
%%time
spark.table("tb_twitter_gold").show()

+-------------------+-------------+--------+
|               data|qt_seguidores|qt_likes|
+-------------------+-------------+--------+
|2022-12-18 18:27:24|         1085|       0|
|2022-12-18 18:14:04|            9|       0|
|2022-12-18 18:48:19|          695|       0|
|2022-12-18 18:47:16|         1891|       5|
|2022-12-18 18:42:43|            4|       1|
|2022-12-18 18:46:33|           71|       1|
|2022-12-18 18:27:58|         1085|       0|
|2022-12-18 18:06:50|            3|       0|
|2022-12-18 17:42:06|          241|       2|
|2022-12-18 18:13:05|           31|       0|
+-------------------+-------------+--------+

CPU times: user 2.46 ms, sys: 322 µs, total: 2.78 ms
Wall time: 822 ms
