In [31]:
from tweepy.streaming import StreamListener
from tweepy import Stream
from tweepy import OAuthHandler

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import functions as F

import json
import time

class Tweet():
    """
    Objeto com os atributos refrentes aos dados a serem extraídos do JSON da API do Twitter
    """
    tweet_id: str = None
    created_at: str = None
    text: str = None
    hashtags: str = None
    retweet_count: int = None
    possibly_sensitive: bool = None
    lang: str = None
    user_id: str = None
    user_name: str = None
    user_description: str = None
    user_verification: bool = None
    user_followers_count: int = None
    user_friends_count: int = None
    user_created_at: str = None
    user_location: str = None
    
    def get_list(self):
        """
        Retorna uma lista com os valores atribuídos ao objeto instanciado
        """
        list_data = [
            self.tweet_id,
            self.created_at,
            self.text,
            self.hashtags,
            self.retweet_count,
            self.possibly_sensitive,
            self.lang,
            self.user_id,
            self.user_name,
            self.user_description,
            self.user_verification,
            self.user_followers_count,
            self.user_friends_count,
            self.user_created_at,
            self.user_location,
        ]
        return list_data
    
    def insert(self, data):
        """
        Recebe o retorno da API e insere os dados em seus respectivos atributos
        """
        hashtags = ""
        if "extended_tweet" in data:
            for h in data['extended_tweet']["entities"]["hashtags"]:
                hashtags = h["text"] + ", " + hashtags
        else:
            for h in data["entities"]["hashtags"]:
                hashtags = h["text"] + ", " + hashtags
        self.tweet_id = data["id"]
        self.created_at = data["created_at"]
        self.text = data['extended_tweet']['full_text'] if "extended_tweet" in data else data["text"]
        self.user_id = data["user"]["id"]
        self.hashtags = hashtags
        self.retweet_count = data["retweet_count"]
        self.possibly_sensitive = data['possibly_sensitive'] if "possibly_sensitive" in data else None
        self.lang = data["lang"]
        self.user_id = data["user"]["id"]
        self.user_name = data["user"]["name"]
        self.user_description = data["user"]["description"]
        self.user_verification = data["user"]["verified"]
        self.user_followers_count = data["user"]["followers_count"]
        self.user_friends_count = data["user"]["friends_count"]
        self.user_created_at = data["user"]["created_at"]
        self.user_location = data["user"]["location"]

class TwitterListener(StreamListener):
    """
    Listener da API do Twitter, esta classe realiza o streaming de tweets,
    recebendo os tweets, processando e persistindo em uma temp view.
    """
    def __init__(self, persist_time):
        self.persist_time = persist_time
        self.start = time.time()
        self.listTweets = []
        self.schema = StructType([StructField("tweet_id", StringType(), True),
                                  StructField("created_at", StringType(), True),
                                  StructField("text", StringType(), True),
                                  StructField("hashtags", StringType(), True),
                                  StructField("retweet_count", IntegerType(), True),
                                  StructField("possibly_sensitive", BooleanType(), True),
                                  StructField("lang", StringType(), True),
                                  StructField("user_id", StringType(), True),
                                  StructField("user_name", StringType(), True),
                                  StructField("user_description", StringType(), True),
                                  StructField("user_verification", BooleanType(), True),
                                  StructField("user_followers_count", IntegerType(), True),
                                  StructField("user_friends_count", IntegerType(), True),
                                  StructField("user_created_at", StringType(), True),
                                  StructField("user_location", StringType(), True),])    
    
    def on_data(self, data):
        try:
            tweet = Tweet()
            json_data = json.loads(data)
            if "limit" not in json_data:
                tweet.insert(json_data)
                self.listTweets.append(tweet.get_list())
                
                print(f"Tweets:{len(self.listTweets)}, Time:{(time.time() - self.start)}")
                if (time.time() - self.start) > self.persist_time:
                    try:
                        df = sqlContext.createDataFrame(data=self.listTweets, schema=self.schema)
                        df = df.withColumn("etl_load", F.current_timestamp())
                        df = df.withColumn("etl_load_partition_year", F.date_format("etl_load", "yyyy"))
                        df = df.withColumn("etl_load_partition_month", F.date_format("etl_load", "MM"))
                        df = df.withColumn("etl_load_partition_day", F.date_format("etl_load", "dd"))
                        df = df.withColumn("etl_load_partition_hour", F.date_format("etl_load", "HH"))
                        df.createOrReplaceTempView("tweets")
                    except BaseException as e:
                        print("Erro ao contruir 'df': " + str(e))
                    return False
        except BaseException as e:
            print("Error: " + str(e), "JSON fora do esperado:", json_data)
        return True
    
    def on_error(self, status_code):
        print("Error: " + status_code)
        return True
    
    def on_timeout(self):
        print("Timeout!")
        return True

def authentication():
    PATH = "D:/William/Computação/Data Science/Cases/SerasaExperian/{0}"
    
    API_KEY = open(file=PATH.format("API_key.txt"), mode="r", encoding="UTF-8").read()
    API_SECRET_KEY = open(file=PATH.format("API_secret_key.txt"), mode="r", encoding="UTF-8").read()
    ACCESS_TOKEN = open(file=PATH.format("access_token.txt"), mode="r", encoding="UTF-8").read()
    ACCESS_TOKEN_SECRET = open(file=PATH.format("access_token_secret.txt"), mode="r", encoding="UTF-8").read()
    
    auth = OAuthHandler(API_KEY, API_SECRET_KEY)
    auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
    return auth

def getData(keywords, languages=None, timeout=None, persist_time=60):
    print('authenticating...')
    auth = authentication()
    print('start twitter listener...')
    twitter_listener = TwitterListener(persist_time=persist_time)
    print('start twitter streaming...')
    twitter_stream = Stream(auth=auth, listener=twitter_listener, timeout=timeout)
    try:
        print("getting data...")
        twitter_stream.filter(track=keywords, is_async=False, languages=languages)
        twitter_stream.disconnect()
        return sqlContext.sql("select * from tweets")
    except BaseException as e:
        print("Error: " + str(e))

if __name__ == "__main__":
    sc = SparkContext.getOrCreate()
    spark = SparkSession.builder.getOrCreate()
    sqlContext = SQLContext(sc)

    keywords = ["PALMEIRAS"]
    languages=["pt"]
    timeout=60 # não funciona mto bem
    persist_time=10

    while True:
        df = getData(keywords=keywords,
                     languages=languages,
                     timeout=timeout, # não funciona mto bem
                     persist_time=persist_time,
                    )
        
        print("saving parquet...")
        path = "./tweets"
        df.write\
          .partitionBy("etl_load_partition_year",
                       "etl_load_partition_month",
                       "etl_load_partition_day",
                       "etl_load_partition_hour")\
          .format("parquet")\
          .mode("append")\
          .save(path)

authenticating...
start twitter listener...
start twitter streaming...
getting data...
Tweets:1, Time:0.9296777248382568
Tweets:2, Time:0.9304366111755371
Tweets:3, Time:1.1358051300048828
Tweets:4, Time:1.1368024349212646
Tweets:5, Time:1.1377992630004883
Tweets:6, Time:1.1377992630004883
Tweets:7, Time:1.1387972831726074
Tweets:8, Time:1.139803409576416
Tweets:9, Time:1.343890905380249
Tweets:10, Time:1.3448991775512695
Tweets:11, Time:1.345935583114624
Tweets:12, Time:1.346886157989502
Tweets:13, Time:1.3478944301605225
Tweets:14, Time:1.3478944301605225
Tweets:15, Time:1.348893642425537
Tweets:16, Time:1.3498785495758057
Tweets:17, Time:1.3498785495758057
Tweets:18, Time:1.5511283874511719
Tweets:19, Time:1.5521304607391357
Tweets:20, Time:1.5531690120697021
Tweets:21, Time:1.5541276931762695
Tweets:22, Time:1.555128812789917
Tweets:23, Time:1.556123971939087
Tweets:24, Time:1.556123971939087
Tweets:25, Time:1.5571155548095703
Tweets:26, Time:1.558112621307373
Tweets:27, Time:1.558

Tweets:237, Time:6.202124118804932
Tweets:238, Time:6.205171346664429
Tweets:239, Time:6.207017421722412
Tweets:240, Time:6.215805530548096
Tweets:241, Time:6.277601957321167
Tweets:242, Time:6.277601957321167
Tweets:243, Time:6.279292821884155
Tweets:244, Time:6.279292821884155
Tweets:245, Time:6.294064521789551
Tweets:246, Time:6.325687408447266
Tweets:247, Time:6.326690435409546
Tweets:248, Time:6.37439489364624
Tweets:249, Time:6.385871648788452
Tweets:250, Time:6.44320011138916
Tweets:251, Time:6.457852840423584
Tweets:252, Time:6.466680288314819
Tweets:253, Time:6.495241641998291
Tweets:254, Time:6.496270656585693
Tweets:255, Time:6.4972405433654785
Tweets:256, Time:6.513810157775879
Tweets:257, Time:6.527796983718872
Tweets:258, Time:6.569933176040649
Tweets:259, Time:6.57647705078125
Tweets:260, Time:6.693485736846924
Tweets:261, Time:6.713819265365601
Tweets:262, Time:6.758934497833252
Tweets:263, Time:6.7632057666778564
Tweets:264, Time:6.792909860610962
Tweets:265, Time:6.84

Tweets:46, Time:2.0098876953125
Tweets:47, Time:2.0245652198791504
Tweets:48, Time:2.1128616333007812
Tweets:49, Time:2.1647861003875732
Tweets:50, Time:2.172799587249756
Tweets:51, Time:2.175607681274414
Tweets:52, Time:2.199812650680542
Tweets:53, Time:2.2158987522125244
Tweets:54, Time:2.2183890342712402
Tweets:55, Time:2.2506468296051025
Tweets:56, Time:2.2711172103881836
Tweets:57, Time:2.29354190826416
Tweets:58, Time:2.2953109741210938
Tweets:59, Time:2.2963502407073975
Tweets:60, Time:2.3166921138763428
Tweets:61, Time:2.317697286605835
Tweets:62, Time:2.3186917304992676
Tweets:63, Time:2.354279041290283
Tweets:64, Time:2.3682479858398438
Tweets:65, Time:2.385227918624878
Tweets:66, Time:2.3995702266693115
Tweets:67, Time:2.4302480220794678
Tweets:68, Time:2.4499685764312744
Tweets:69, Time:2.4620840549468994
Tweets:70, Time:2.4766128063201904
Tweets:71, Time:2.4776151180267334
Tweets:72, Time:2.4921042919158936
Tweets:73, Time:2.5071635246276855
Tweets:74, Time:2.5262799263000

AttributeError: 'NoneType' object has no attribute 'write'

In [33]:
df = spark.read.parquet("./tweets").toPandas()

In [35]:
df.count()

tweet_id                    4116
created_at                  4116
text                        4116
hashtags                    4116
retweet_count               4116
possibly_sensitive           663
lang                        4116
user_id                     4116
user_name                   4116
user_description            3604
user_verification           4116
user_followers_count        4116
user_friends_count          4116
user_created_at             4116
user_location               2879
etl_load                    4116
etl_load_partition_year     4116
etl_load_partition_month    4116
etl_load_partition_day      4116
etl_load_partition_hour     4116
dtype: int64

In [34]:
df.describe()

Unnamed: 0,retweet_count,user_followers_count,user_friends_count,etl_load_partition_year,etl_load_partition_month,etl_load_partition_day,etl_load_partition_hour
count,4116.0,4116.0,4116.0,4116.0,4116.0,4116.0,4116.0
mean,0.0,1866.241,824.576288,2021.0,2.0,7.0,17.0
std,0.0,26370.54,1389.394044,0.0,0.0,0.0,0.0
min,0.0,0.0,0.0,2021.0,2.0,7.0,17.0
25%,0.0,101.0,191.0,2021.0,2.0,7.0,17.0
50%,0.0,309.0,404.5,2021.0,2.0,7.0,17.0
75%,0.0,838.5,860.0,2021.0,2.0,7.0,17.0
max,0.0,1239637.0,20010.0,2021.0,2.0,7.0,17.0
