In [None]:
from tweepy.streaming import StreamListener
from tweepy import Stream
from tweepy import OAuthHandler

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import functions as F

import json
import time

class Tweet():
    """
    Objeto com os atributos refrentes aos dados a serem extraídos do JSON da API do Twitter
    """
    tweet_id: str = None
    created_at: str = None
    text: str = None
    hashtags: str = None
    retweet_count: int = None
    possibly_sensitive: bool = None
    lang: str = None
    user_id: str = None
    user_name: str = None
    user_description: str = None
    user_verification: bool = None
    user_followers_count: int = None
    user_friends_count: int = None
    user_created_at: str = None
    user_location: str = None
    
    def get_list(self):
        """
        Retorna uma lista com os valores atribuídos ao objeto instanciado
        """
        list_data = [
            self.tweet_id,
            self.created_at,
            self.text,
            self.hashtags,
            self.retweet_count,
            self.possibly_sensitive,
            self.lang,
            self.user_id,
            self.user_name,
            self.user_description,
            self.user_verification,
            self.user_followers_count,
            self.user_friends_count,
            self.user_created_at,
            self.user_location,
        ]
        return list_data
    
    def insert(self, data):
        """
        Recebe o retorno da API e insere os dados em seus respectivos atributos
        """
        hashtags = ""
        if "extended_tweet" in data:
            for h in data['extended_tweet']["entities"]["hashtags"]:
                hashtags = h["text"] + ", " + hashtags
        else:
            for h in data["entities"]["hashtags"]:
                hashtags = h["text"] + ", " + hashtags
        self.tweet_id = data["id"]
        self.created_at = data["created_at"]
        self.text = data['extended_tweet']['full_text'] if "extended_tweet" in data else data["text"]
        self.user_id = data["user"]["id"]
        self.hashtags = hashtags
        self.retweet_count = data["retweet_count"]
        self.possibly_sensitive = data['possibly_sensitive'] if "possibly_sensitive" in data else None
        self.lang = data["lang"]
        self.user_id = data["user"]["id"]
        self.user_name = data["user"]["name"]
        self.user_description = data["user"]["description"]
        self.user_verification = data["user"]["verified"]
        self.user_followers_count = data["user"]["followers_count"]
        self.user_friends_count = data["user"]["friends_count"]
        self.user_created_at = data["user"]["created_at"]
        self.user_location = data["user"]["location"]

class TwitterListener(StreamListener):
    """
    Listener da API do Twitter, esta classe realiza o streaming de tweets,
    recebendo os tweets, processando e persistindo em uma temp view.
    """
    def __init__(self, persist_time):
        self.persist_time = persist_time
        self.start = time.time()
        self.listTweets = []
        self.schema = StructType([StructField("tweet_id", StringType(), True),
                                  StructField("created_at", StringType(), True),
                                  StructField("text", StringType(), True),
                                  StructField("hashtags", StringType(), True),
                                  StructField("retweet_count", IntegerType(), True),
                                  StructField("possibly_sensitive", BooleanType(), True),
                                  StructField("lang", StringType(), True),
                                  StructField("user_id", StringType(), True),
                                  StructField("user_name", StringType(), True),
                                  StructField("user_description", StringType(), True),
                                  StructField("user_verification", BooleanType(), True),
                                  StructField("user_followers_count", IntegerType(), True),
                                  StructField("user_friends_count", IntegerType(), True),
                                  StructField("user_created_at", StringType(), True),
                                  StructField("user_location", StringType(), True),])    
    
    def on_data(self, data):
        """"
        Método que irá receber o dados da API e, após decorrido o tempo
        passado no parâmetro 'persist_time', irá salvá-los em uma temp view.
        """
        try:
            tweet = Tweet()
            json_data = json.loads(data)
            if "limit" not in json_data:
                tweet.insert(json_data)
                self.listTweets.append(tweet.get_list())
                
                print(f"Tweets:{len(self.listTweets)}, Time:{(time.time() - self.start)}")
                if (time.time() - self.start) > self.persist_time:
                    try:
                        df = sqlContext.createDataFrame(data=self.listTweets, schema=self.schema)
                        df = df.withColumn("etl_load", F.current_timestamp())
                        df = df.withColumn("etl_load_partition_year", F.date_format("etl_load", "yyyy"))
                        df = df.withColumn("etl_load_partition_month", F.date_format("etl_load", "MM"))
                        df = df.withColumn("etl_load_partition_day", F.date_format("etl_load", "dd"))
                        df = df.withColumn("etl_load_partition_hour", F.date_format("etl_load", "HH"))
                        df.createOrReplaceTempView("tweets")
                    except BaseException as e:
                        print("Erro ao contruir 'df': " + str(e))
                    return False
        except BaseException as e:
            print("Error: " + str(e), "JSON fora do esperado:", json_data)
        return True
    
    def on_error(self, status_code):
        print("Error: " + status_code)
        return True
    
    def on_timeout(self):
        print("Timeout!")
        return True

def authentication():
    """
    Irá buscar as credenciais em uma pasta local e retornar o objeto auth:OAuthHandler
    """
    PATH = "D:/William/Computação/Data Science/keys/TwitterAPI/{0}"
    
    API_KEY = open(file=PATH.format("API_key.txt"), mode="r", encoding="UTF-8").read()
    API_SECRET_KEY = open(file=PATH.format("API_secret_key.txt"), mode="r", encoding="UTF-8").read()
    ACCESS_TOKEN = open(file=PATH.format("access_token.txt"), mode="r", encoding="UTF-8").read()
    ACCESS_TOKEN_SECRET = open(file=PATH.format("access_token_secret.txt"), mode="r", encoding="UTF-8").read()
    
    auth = OAuthHandler(API_KEY, API_SECRET_KEY)
    auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
    return auth

def getData(keywords, languages=None, timeout=None, persist_time=60):
    """
    Método que irá realizar os passo-a-passo de autenticação, conifguração e start do streamig
    e retornar os dados salvos na temp view assim que decorrido o tempo do parâmetro 'persist_time'.
    """
    print('authenticating...')
    auth = authentication()
    print('start twitter listener...')
    twitter_listener = TwitterListener(persist_time=persist_time)
    print('start twitter stream...')
    twitter_stream = Stream(auth=auth, listener=twitter_listener, timeout=timeout)
    try:
        print("getting data...")
        twitter_stream.filter(track=keywords, is_async=False, languages=languages)
        twitter_stream.disconnect()
        return sqlContext.sql("select * from tweets")
    except BaseException as e:
        print("Error: " + str(e))

if __name__ == "__main__":
    sc = SparkContext.getOrCreate()
    spark = SparkSession.builder.getOrCreate()
    sqlContext = SQLContext(sc)
    
    keywords = ["COVID"]
    languages=["pt"]
    timeout = 60 # não funciona mto bem
    persist_time=10
    path = "../layer1/tweets"

    while True:
        df = getData(keywords=keywords,
                     languages=languages,
                     timeout=timeout, # não funciona mto bem
                     persist_time=persist_time,
                    )
        
        print("saving parquet...")
        df.write\
          .partitionBy("etl_load_partition_year",
                       "etl_load_partition_month",
                       "etl_load_partition_day",
                       "etl_load_partition_hour")\
          .format("parquet")\
          .mode("append")\
          .save(path)

authenticating...
start twitter listener...
start twitter stream...
getting data...
Tweets:1, Time:0.6565992832183838
Tweets:2, Time:4.417258024215698
Tweets:3, Time:4.517461776733398
Tweets:4, Time:4.908051013946533
Tweets:5, Time:7.789911985397339
Tweets:6, Time:8.189866065979004
Tweets:7, Time:8.621999740600586
Tweets:8, Time:9.52461838722229
Tweets:9, Time:9.671762466430664
Tweets:10, Time:10.610363006591797
saving parquet...
authenticating...
start twitter listener...
start twitter stream...
getting data...
Tweets:1, Time:0.7210795879364014
Tweets:2, Time:0.7900354862213135
Tweets:3, Time:3.327094316482544
Tweets:4, Time:3.577239990234375
Tweets:5, Time:4.091476678848267
Tweets:6, Time:4.360363960266113
Tweets:7, Time:7.00261116027832
Tweets:8, Time:7.415963172912598
Tweets:9, Time:8.176481485366821
Tweets:10, Time:9.726723432540894
Tweets:11, Time:9.907232284545898
Tweets:12, Time:10.011929750442505
saving parquet...
authenticating...
start twitter listener...
start twitter strea

In [3]:
df = spark.read.parquet("../layer1/tweets").toPandas()

In [4]:
df

Unnamed: 0,tweet_id,created_at,text,hashtags,retweet_count,possibly_sensitive,lang,user_id,user_name,user_description,user_verification,user_followers_count,user_friends_count,user_created_at,user_location,etl_load,etl_load_partition_year,etl_load_partition_month,etl_load_partition_day,etl_load_partition_hour
0,1359253319861026818,Tue Feb 09 21:30:19 +0000 2021,RT @TNTSportsBR: DESFALQUES! Dois jogadores do...,,0,,pt,1198710796286676993,Corinthians Timão,S . C . Corinthians . P - VAI CORINTHIANS ! ⚪⚫...,False,422,1063,Sun Nov 24 21:12:13 +0000 2019,,2021-02-09 18:30:35.713,2021,2,9,18
1,1359253318258819072,Tue Feb 09 21:30:19 +0000 2021,RT @GFiuza_Oficial: Onde estão os democratas p...,,0,,pt,1266403333415239680,Conde von Roscoff,"Penso, logo existo, ou algo bem parecido.",False,79,610,Fri May 29 16:17:57 +0000 2020,,2021-02-09 18:30:35.713,2021,2,9,18
2,1359253330556575744,Tue Feb 09 21:30:22 +0000 2021,RT @CNNBrasil: Amazonas recebe mais de 62 mil ...,,0,False,pt,122272543,Japaxé Brazueiros e Brazueiras,Brazueiros e Brazueiras\n\nJaponês Baiano não ...,False,1670,4993,Fri Mar 12 04:53:50 +0000 2010,Japones Baiano,2021-02-09 18:30:35.713,2021,2,9,18
3,1359253330665627665,Tue Feb 09 21:30:22 +0000 2021,@flavioclaudir Confira! 😎😎VÍDEO NOVO no Canal....,"consciência, empatia, covid, quarentena, pande...",0,False,pt,215139198,Kássia Rocha,"Tenho 36 anos. YouTuber, Escritora, Professora...",False,1254,133,Sat Nov 13 02:54:30 +0000 2010,Brasil,2021-02-09 18:30:35.713,2021,2,9,18
4,1359253331823239181,Tue Feb 09 21:30:22 +0000 2021,RT @noticiaaominuto: Covid-19. Madeira com mai...,"paisaominuto,",0,False,pt,102647685,Micaela Campanário,,False,88,798,Thu Jan 07 11:13:19 +0000 2010,Funchal,2021-02-09 18:30:35.713,2021,2,9,18
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
170,1359252876669943817,Tue Feb 09 21:28:34 +0000 2021,RT @aramalhocorreia: Não há um jornalista q te...,,0,,pt,14582157,Mac Free,O tempo em directo..... em Leça da Palmeira\nW...,False,34,118,Tue Apr 29 08:13:53 +0000 2008,"Leça da Palmeira, Portugal",2021-02-09 18:29:28.339,2021,2,9,18
171,1359252877643034630,Tue Feb 09 21:28:34 +0000 2021,Fica bem,,0,,pt,1246890076019326978,@mery2013silva,,False,408,1003,Sun Apr 05 19:59:15 +0000 2020,,2021-02-09 18:29:28.339,2021,2,9,18
172,1359252885423464450,Tue Feb 09 21:28:36 +0000 2021,RT @fabioirodrigues: Sabemos que a Covid-19 nã...,,0,,pt,926801470158180352,luanalua,,False,144,353,Sat Nov 04 13:21:04 +0000 2017,,2021-02-09 18:29:28.339,2021,2,9,18
173,1359252885645758465,Tue Feb 09 21:28:36 +0000 2021,⚡️ “Senador José Maranhão morre de covid-19” d...,,0,False,pt,72694672,Ricardo 🇧🇷,🇧🇷 Cristão 🇧🇷 Conservador 🇧🇷\n\nPARLER @Ricardo2,False,51621,51660,Tue Sep 08 22:56:17 +0000 2009,Brasil,2021-02-09 18:29:28.339,2021,2,9,18
