In [2]:
import tarfile, gzip, bz2, zipfile
import os, json
import shutil
import dask
from dask import delayed
from dask.distributed import Client
import polars as pl
import time
import re
from datetime import datetime
import sys
from pathlib import Path
import timeit

## 1 Fonte dos arquivos

Iremos extrair os arquivos coletados do twitter entre 2012 e 2022 na página do *[Internet Archive](https://archive.org)*.

A equipe responsável pela coleção dos dados é *[The Twitter Stream Grab](https://archive.org/details/twitterstream)* e o nome do projeto chamava-se ***Twitter Stream***.

> [Python Tweepy – Getting the ID of a status](https://www.geeksforgeeks.org/python-tweepy-getting-the-id-of-a-status/?ref=ml_lbp)
>
> [Python – Status object in Tweepy](https://www.geeksforgeeks.org/python-status-object-in-tweepy/?ref=header_search)
>
> [Python Tweepy – Getting the number of times a tweet has been retweeted](https://www.geeksforgeeks.org/python-tweepy-getting-the-number-of-times-a-tweet-has-been-retweeted/)
>
> [Python Tweepy – Getting the language of a tweet](https://www.geeksforgeeks.org/python-tweepy-getting-the-language-of-a-tweet/?ref=ml_lbp)
>
> [X entities](https://developer.x.com/en/docs/twitter-api/enterprise/data-dictionary/native-enriched-objects/entities#entitiesobject)
>
> [Post Object](https://developer.x.com/en/docs/twitter-api/enterprise/data-dictionary/native-enriched-objects/tweet)
>
> [Understanding the new Tweet payload](https://developer.x.com/en/blog/product-news/2020/understanding-the-new-post-payload)
>
> [Twitter Sentiment Analysis by Python | best NLP model 2022](https://youtu.be/uPKnSq6TaAk)
>
> [huggingface/transformers-tensorflow-cpu](https://hub.docker.com/r/huggingface/transformers-tensorflow-cpu)
>
> [Silly-Machine/TuPy-Bert-Base-Multilabel](https://huggingface.co/Silly-Machine/TuPy-Bert-Base-Multilabel)

## 2 Dask para paralelizar a extração dos arquivos

In [3]:
client = Client(n_workers=4, threads_per_worker=4)
#client = Client(n_workers=4)
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 16,Total memory: 15.28 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:55115,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 16
Started: Just now,Total memory: 15.28 GiB

0,1
Comm: tcp://127.0.0.1:55135,Total threads: 4
Dashboard: http://127.0.0.1:55138/status,Memory: 3.82 GiB
Nanny: tcp://127.0.0.1:55118,
Local directory: C:\Users\S78886~1\AppData\Local\Temp\dask-scratch-space\worker-_cmh7i35,Local directory: C:\Users\S78886~1\AppData\Local\Temp\dask-scratch-space\worker-_cmh7i35

0,1
Comm: tcp://127.0.0.1:55144,Total threads: 4
Dashboard: http://127.0.0.1:55145/status,Memory: 3.82 GiB
Nanny: tcp://127.0.0.1:55119,
Local directory: C:\Users\S78886~1\AppData\Local\Temp\dask-scratch-space\worker-6zx50ney,Local directory: C:\Users\S78886~1\AppData\Local\Temp\dask-scratch-space\worker-6zx50ney

0,1
Comm: tcp://127.0.0.1:55136,Total threads: 4
Dashboard: http://127.0.0.1:55140/status,Memory: 3.82 GiB
Nanny: tcp://127.0.0.1:55120,
Local directory: C:\Users\S78886~1\AppData\Local\Temp\dask-scratch-space\worker-is5hocvt,Local directory: C:\Users\S78886~1\AppData\Local\Temp\dask-scratch-space\worker-is5hocvt

0,1
Comm: tcp://127.0.0.1:55137,Total threads: 4
Dashboard: http://127.0.0.1:55142/status,Memory: 3.82 GiB
Nanny: tcp://127.0.0.1:55121,
Local directory: C:\Users\S78886~1\AppData\Local\Temp\dask-scratch-space\worker-4qmspseb,Local directory: C:\Users\S78886~1\AppData\Local\Temp\dask-scratch-space\worker-4qmspseb


## 3 Lista de funções que vamos usar

#### Salva os tweets no formato parquet

In [4]:
def save_parquet(tweets_br, parquet_dir, file_name):
    df_tweets = pl.from_dicts(tweets_br, infer_schema_length=None)
    parquet = os.path.join(parquet_dir,parquet_dir.split("/")[-1:][0] + f"_{file_name}.parquet")
    df_tweets.write_parquet(parquet)
    print("++     Parquet {:s} salvo".format(parquet))

#### Subdiretórios

In [5]:
#t = timeit.Timer(stmt="extract_tweets('Dados/2022/04/10/20220410000000.json.gz')", setup=setup_v1)
#total = t.repeat(5, number=5)
#print(np.mean( total ))
#t = timeit.Timer(stmt="extract_tweets('Dados/2022/04/10/20220410000000.json.gz')", setup=setup_v2)
#total = t.repeat(5, number=5)
#print(np.mean( total ))

In [6]:
#len(members)
#extracts = [delayed(extract_tweets)(None, member) for member in members[:10]]
#tasks = delayed()(extracts)
#outs = tasks.compute()
#for member in members[:1]:
#    tweets, usuarios, apagados, status_tweets, retweets, hashtags, mencoes = extract_tweets(tar, member)
#extracts

In [7]:
#for member in members[:10]:
#    tweets, usuarios, apagados, status_tweets, retweets, hashtags, mencoes = extract_tweets(tar, member)

In [8]:
def extract_tweets(compressed, member):
    try:
        f_ext = compressed.split(".")[-1]

        if f_ext == "zip":
            member_ext = member.filename.split("/")[-1].split(".")[-1] #extensão do arquivo            
            compf = zipfile.ZipFile(compressed, mode="r")
            f_input = (bz2.open if member_ext=="bz2" else gzip.open)(compf.extract(member), mode="rt", encoding="utf8")
        elif f_ext == "tar":            
            member_ext = members[0].name.split("/")[-1].split(".")[-1] #extensão do arquivo        
            compf = tarfile.open(compressed, mode="r")
            f_input = (bz2.open if member_ext=="bz2" else gzip.open)(compf.extractfile(member), mode="rt", encoding="utf8")

        lines = list(map(json.loads,f_input))
        f_input.close()
        compf.close() 
        
        tweets, usuarios, apagados, status_tweets, retweets, hashtags, mencoes = [],[],[],[],[],[],[]

        for i, line in enumerate(lines):
            if not "delete" in line: 
                if ("lang" in line and line["lang"]=="pt"):
                    longitude=None
                    latitude=None
                    if line["coordinates"] != None:
                        latitude=line["coordinates"]["coordinates"][0]
                        longitude=line["coordinates"]["coordinates"][1]
                        
                    tweets.append({"id_tweet": line["id"],
                                   "dt_criacao_tweet": line["created_at"],
                                   "tx_tweet": line["text"],
                                   "id_twt_replied": line["in_reply_to_status_id"],
                                   "id_usr_replied": line["in_reply_to_user_id"],
                                   "no_lingua": line["lang"],
                                   "latitude": latitude,
                                   "longitude": longitude
                                  })
                    ##usuários
                    usuarios.append({"id_usuario": line["user"]["id"],
                                     "dt_criacao_usr": line["user"]["created_at"],
                                     "nm_usuario": line["user"]["name"]
                                  })  
                    ##status do tweet                
                    status_tweets.append({"id_tweet": line["id"],
                                          "dt_status": line["created_at"],
                                          "no_citacoes": int(re.sub("\D", "", str(line["quote_count"]))  if "quote_count" in line else "0"),
                                          "no_retweets": int(re.sub("\D", "", str(line["retweet_count"])) if "retweet_count" in line else "0"),
                                          "no_curtidas": int(re.sub("\D", "", str(line["favorite_count"])) if "favorite_count" in line else "0"),
                                          "id_usuario": line["user"]["id"],
                                          "nm_usuario": line["user"]["name"],
                                          "nm_tela_usr": line["user"]["screen_name"],
                                          "no_seguidores_usr": line["user"]["followers_count"],
                                          "no_amigos_usr": line["user"]["friends_count"],
                                          "no_listas_usr": line["user"]["listed_count"],
                                          "no_curtidas_usr": line["user"]["favourites_count"],
                                          "no_total_twt_usr": line["user"]["statuses_count"],
                                          "no_lingua_usr": line["user"]["lang"],
                                          "local_usr": line["user"]["location"]
                                         }) 
    
                    ## hashtags neste tweet
                    #hashtags += [{"id_tweet": line["id"],
                    #              "nm_hashtag":hashtag["text"]
                    #             } for hashtag in line["entities"]["hashtags"]]
                    for hashtag in line["entities"]["hashtags"]:
                        hashtags.append({"id_tweet": line["id"], "nm_hashtag": hashtag["text"]})
                    
                    ## menções neste tweet
                    #mencoes += [{"id_tweet": line["id"],
                    #              "id_usuario": mention["id"],
                    #              "nm_usuario": mention["name"],
                    #             "nm_tela_usr": mention["screen_name"]
                    #             } for mention in line["entities"]["user_mentions"]]
                    for mention in line["entities"]["user_mentions"]:
                        mencoes.append({"id_tweet": line["id"], "id_usuario": mention["id"], "nm_usuario": mention["name"],
                                  "nm_tela_usr": mention["screen_name"]})
                    
                    ## se o usuário retweetou ou curtiu um tweet
                    if "retweeted_status" in line:
                        longitude=None
                        latitude=None                     
                        if line["retweeted_status"]["coordinates"] != None:
                            latitude=line["retweeted_status"]["coordinates"]['coordinates'][0]
                            longitude=line["retweeted_status"]["coordinates"]['coordinates'][1]
                        ## tweet retweetado
                        tweets.append({"id_tweet": line["retweeted_status"]["id"],
                                       "dt_criacao_tweet": line["retweeted_status"]["created_at"],
                                       "tx_tweet": line["retweeted_status"]["text"],
                                       "id_twt_replied": line["retweeted_status"]["in_reply_to_status_id"],
                                       "id_usr_replied": line["retweeted_status"]["in_reply_to_user_id"],
                                       "no_lingua": line["retweeted_status"]["lang"],
                                       "latitude": latitude,
                                       "longitude": longitude
                                      })
                        
                        ## usuário que criou o tweet retweetado
                        usuarios.append({"id_usuario": line["retweeted_status"]["user"]["id"],
                                       "dt_criacao_usr": line["retweeted_status"]["user"]["created_at"],
                                       "nm_usuario": line["retweeted_status"]["user"]["name"]
                                      })
    
                        ## status do tweet retweetado
                        status_tweets.append({"id_tweet": line["retweeted_status"]["id"],
                                              "dt_status": line["created_at"],## data do tweet principal
                                              "no_citacoes": int(re.sub("\D", "", str(line["retweeted_status"]["quote_count"]))  if "quote_count" in line["retweeted_status"] else "0"),
                                              "no_retweets": int(re.sub("\D", "", str(line["retweeted_status"]["retweet_count"])) if "retweet_count" in line["retweeted_status"] else "0"),
                                              "no_curtidas": int(re.sub("\D", "", str(line["retweeted_status"]["favorite_count"])) if "favorite_count" in line["retweeted_status"] else "0"),
                                              "id_usuario": line["retweeted_status"]["user"]["id"],
                                              "nm_usuario": line["retweeted_status"]["user"]["name"],
                                              "nm_tela_usr": line["retweeted_status"]["user"]["screen_name"],
                                              "no_seguidores_usr": line["retweeted_status"]["user"]["followers_count"],
                                              "no_amigos_usr": line["retweeted_status"]["user"]["friends_count"],
                                              "no_listas_usr": line["retweeted_status"]["user"]["listed_count"],
                                              "no_curtidas_usr": line["retweeted_status"]["user"]["favourites_count"],
                                              "no_total_twt_usr": line["retweeted_status"]["user"]["statuses_count"],
                                              "no_lingua_usr": str(line["retweeted_status"]["user"]["lang"] or '') if "lang" in line["retweeted_status"]["user"] else "",
                                              "local_usr": line["retweeted_status"]["user"]["location"]
                                             })
                        
                        ## relação entre o tweet e o retweet
                        retweets.append({
                            "id_tweet": line["id"], "id_twt_retuitado": line["retweeted_status"]["id"] 
                        })
                        
                        ## hashtags do tweet retweetado
                        #hashtags += [{"id_tweet": line["retweeted_status"]["id"],
                        #              "nm_hashtag": hashtag["text"]
                        #             } for hashtag in line["retweeted_status"]["entities"]["hashtags"]] 
                        for hashtag in line["retweeted_status"]["entities"]["hashtags"]:
                            hashtags.append({"id_tweet": line["retweeted_status"]["id"], "nm_hashtag": hashtag["text"]})
                        
                        ## menções do tweet retweetado
                        #mencoes += [{"id_tweet": line["retweeted_status"]["id"],
                        #              "id_usuario": mention["id"],
                        #              "nm_usuario": mention["name"],
                        #              "nm_tela_usr": mention["screen_name"]
                        #             } for mention in line["retweeted_status"]["entities"]["user_mentions"]]
                        for mention in line["retweeted_status"]["entities"]["user_mentions"]:
                            mencoes.append({"id_tweet": line["retweeted_status"]["id"],
                                      "id_usuario": mention["id"],
                                      "nm_usuario": mention["name"],
                                      "nm_tela_usr": mention["screen_name"]
                                     })
                
        return tweets, usuarios, apagados, status_tweets, retweets, hashtags, mencoes
    except Exception as e:
        print("erro: ", e)
        return [],[],[],[],[],[],[]      

### Descompactação, busca de tweets BR e salvamento em .parquet

In [9]:
'''

'''
def computeTasks(compressed, members, parquet_dir):  
    #gera a estrutura de diretórios onde os parquet serão salvos, caso ainda não haja a estrutura
    extracts = [delayed(extract_tweets)(compressed, member) for member in members]
    
    tasks = delayed()(extracts)
    outs = tasks.compute()

    tweets, usuarios, apagados, status_tweets, retweets, hashtags, mencoes = [],[],[],[],[],[],[]
    for out in outs:
        tweets.extend( out[0] )
        usuarios.extend( out[1] )
        #apagados.extend( out[2] )
        status_tweets.extend( out[3] )
        retweets.extend( out[4] )
        hashtags.extend( out[5] )
        mencoes.extend( out[6] )
    
    save_parquet(tweets, parquet_dir, "tweets")
    save_parquet(usuarios, parquet_dir, "usuarios")
    #save_parquet(apagados, parquet_dir, "apagados")
    save_parquet(status_tweets, parquet_dir, "status")
    save_parquet(retweets, parquet_dir, "retwts")
    save_parquet(hashtags, parquet_dir, "hashtags")
    save_parquet(mencoes, parquet_dir, "mencoes") 
    

In [10]:
year="2018"
root = f"Dados/{year}/"
compressed = [os.path.join(path, name) for path, subdirs, files in os.walk(root) for name in files if name.endswith(tuple(["tar","zip"]))]

In [11]:
#import datetime
#import zipfile
#import tarfile

#f_ext = tar_files[-1].split(".")[-1]
#f_ext

#with zipfile.ZipFile(tar_files[0], mode="r") as archive:
#    for info in archive.infolist():
#        if not info.is_dir():
#            print(f"Compress Type: {info.is_dir()}")
#            print(f"Filename: {info.filename}")
#            print(f"Modified: {datetime.datetime(*info.date_time)}")
#            print(f"Normal size: {info.file_size} bytes")
#            print(f"Compressed size: {info.compress_size} bytes")
#            print(info.filename.split(".")[-1:])
#            #archive.read(info.filename)
#            print("-" * 20)
#            break

In [12]:
'''
    Retorna a lista dos arquivos compactados no tar file
'''
def get_tar_members(tarf):
    members = []
    with tarfile.open(tarf, mode="r") as tar:
        #navegando na estrutura do arquivo compactado
        for member in tar.getmembers():
            if not member.isdir():
                members.append(member)
    tar.close()
    return members

In [13]:
'''
    Retorna a lista dos arquivos compactados no zip file
'''
def get_zip_members(zipf):
    members = []
    with zipfile.ZipFile(zipf, mode="r") as zip:
        for member in zip.infolist():
            if not member.is_dir():
                members.append(member)
                #archive.read(info.filename)
    zip.close()
    return members

In [14]:
#tests = ["Dados/2021/twitter-stream-2021-07-01.zip"]
#compressed

In [15]:
start_for = time.time()
print("+++++++++++++++++++++++++++++++++++++++++++++")   
print("++ Hora inicial do processo => {:s}".format(datetime.now().strftime('%d/%m/%Y %H:%M:%S')))    
for file in compressed:
    members = []
    #+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
    start = time.time()
    print("+++++++++++++++++++++++++++++++++++++++++++++")   
    print("++     Início do passo => {:s}".format(datetime.now().strftime('%d/%m/%Y %H:%M:%S')))    
    f_ext = file.split(".")[-1]
    f_name = file.replace("-","").replace("_","")
    path = root + f_name[-8:-6] + "/" + f_name[-6:-4]
    Path(path).mkdir(parents=True, exist_ok=True)    
    print("++     Diretório => {:s}".format(path))
    #+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++      
    if f_ext=="zip":
        members= get_zip_members(file)
    elif f_ext=="tar":
        members = get_tar_members(file)    
    #o último dia extraído será salvo aqui
    print("++     Total de arquivos => {:d}".format(len(members)))
    if len(members)>0:
        #for member in members:
        #    tweets, usuarios, apagados, status_tweets, retweets, hashtags, mencoes = extract_tweets(file, member)
        #    break
        computeTasks(file, members, path)
    os.remove( file )
    try:
        shutil.rmtree(f'{year}/'+f_name[-8:-6])
    except Exception as e:
        print("erro: ", e)
    #+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
    end = time.time()
    hours, rem = divmod(end-start, 3600)
    minutes, seconds = divmod(rem, 60) 
    print("++     Fim do passo => {:s}".format(datetime.now().strftime('%d/%m/%Y %H:%M:%S'))) 
    print("++     Tempo do passo        => {:0>2}:{:0>2}:{:05.2f}".format(int(hours),int(minutes),seconds))
    print("+++++++++++++++++++++++++++++++++++++++++++++")
    #+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
print("+++++++++++++++++++++++++++++++++++++++++++++")
print("++ Fim do processo => {:s}".format(datetime.now().strftime('%d/%m/%Y %H:%M:%S')))    

end_for = time.time()    
hours, rem = divmod(end_for-start_for, 3600)
minutes, seconds = divmod(rem, 60)
print("++ Tempo total do processo => {:0>2}:{:0>2}:{:05.2f}".format(int(hours),int(minutes),seconds))
print("+++++++++++++++++++++++++++++++++++++++++++++")   
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

+++++++++++++++++++++++++++++++++++++++++++++
++ Hora inicial do processo => 22/10/2024 19:49:55
+++++++++++++++++++++++++++++++++++++++++++++
++     Início do passo => 22/10/2024 19:49:55
++     Diretório => Dados/2018/06/16
++     Total de arquivos => 1410
++     Parquet Dados/2018/06/16\16_tweets.parquet salvo
++     Parquet Dados/2018/06/16\16_usuarios.parquet salvo
++     Parquet Dados/2018/06/16\16_status.parquet salvo
++     Parquet Dados/2018/06/16\16_retwts.parquet salvo
++     Parquet Dados/2018/06/16\16_hashtags.parquet salvo
++     Parquet Dados/2018/06/16\16_mencoes.parquet salvo
erro:  [WinError 3] O sistema não pode encontrar o caminho especificado: '2018/06'
++     Fim do passo => 22/10/2024 19:59:41
++     Tempo do passo        => 00:09:46.04
+++++++++++++++++++++++++++++++++++++++++++++
+++++++++++++++++++++++++++++++++++++++++++++
++ Fim do processo => 22/10/2024 19:59:41
++ Tempo total do processo => 00:09:46.04
+++++++++++++++++++++++++++++++++++++++++++++


In [16]:
#paralelização
#tweets, usuarios, apagados, status_tweets, retweets, hashtags, mencoes = [],[],[],[],[],[],[]
##extracts.append(delayed(extract_tweets)(f_ext, member))
#extracts = [delayed(extract_tweets)(member) for member in members[:1]]
#tasks = delayed()(extracts)
#outs = tasks.compute()

In [17]:
client.close()

###### FIM