In [2]:
import json
import datetime
import pandas as pd
import numpy as np



In [3]:
tweets_df = pd.read_csv('logs/20210214122111.csv')
tweets_df['created_at'] = tweets_df['created_at'].apply(lambda date: datetime.datetime.strptime(date[:16], "%Y-%m-%dT%H:%M"))
tweets_df['folder'] = tweets_df['created_at'].dt.strftime('%Y/%m/%d/%H')
tweets_df.head()

Unnamed: 0,created_at,id,text,tags,folder
0,2021-02-14 17:25:00,1361003520762109952,"Carnaval de Barranquilla, tesoro atravesado po...","{'LOC': ['Carnaval de Barranquilla'], 'PROPN':...",2021/02/14/17
1,2021-02-14 17:23:00,1361003111507197955,Distanciamiento social: los animales tambi√©n s...,"{'NOUN': ['Distanciamiento', 'animal', 'enferm...",2021/02/14/17
2,2021-02-14 17:20:00,1361002417542864897,La Alcald√≠a de de Bogot√° instal√≥ este s√°bado u...,"{'ORG': ['Alcald√≠a de de Bogot√°'], 'PROPN': ['...",2021/02/14/17
3,2021-02-14 17:15:00,1361001123239362574,La ‚Äòllama‚Äô del amor se empieza a apagar cuando...,"{'NOUN': ['‚Äò', 'amor', 'rutina', 'üòè'], 'VERB':...",2021/02/14/17
4,2021-02-14 17:15:00,1361001004288909317,Elecciones clave en Catalu√±a\n\nLa regi√≥n aut√≥...,"{'LOC': ['Madrid'], 'PER': ['Salvador Illa'], ...",2021/02/14/17


In [10]:
from pyspark.sql import SparkSession

schema = "`created_at` STRING, `id` STRING, `tags` STRING, `text` STRING"
spark = (SparkSession
             .builder
             .appName("TweetJob")
             .getOrCreate())

df = (spark.read.format("json")
      .schema(schema)
      .option("inferSchema", "true")
      .load("logs/20210214185653.log"))
df.show()

+--------------------+-------------------+--------------------+--------------------+
|          created_at|                 id|                tags|                text|
+--------------------+-------------------+--------------------+--------------------+
|2021-02-15T00:00:...|1361103087415545860|{"LOC":["Catar","...|El enviado especi...|
|2021-02-15T00:00:...|1361103040149917698|{"ORG":["OMS","Wu...|Conozca algunas d...|
|2021-02-15T00:00:...|1361102929441288205|{"LOC":["Defensor...|#NoticiasUno| Med...|
|2021-02-14T23:55:...|1361101796454842370|{"PER":["‚ÄòJackass...|Actor de ‚ÄòJackass...|
|2021-02-14T23:55:...|1361101667404455942|{"PER":["Lucy Law...|Lucy Lawless, en ...|
|2021-02-14T23:45:...|1361099150893359105|{"LOC":["SENA"],"...|El SENA abri√≥ m√°s...|
|2021-02-14T23:43:...|1361098828225662976|{"PER":["Everton"...|Everton volvi√≥ a ...|
|2021-02-14T23:40:...|1361098084919484418|{"PER":["Boris Jo...|El primer ministr...|
|2021-02-14T23:35:...|1361096634512400386|{"LOC":["Ituango

In [8]:
df.schema

StructType(List(StructField(_corrupt_record,StringType,true),StructField(created_at,StringType,true),StructField(id,StringType,true),StructField(tags,StructType(List(StructField(HASHTAGS,ArrayType(StringType,true),true),StructField(LOC,ArrayType(StringType,true),true),StructField(NOUN,ArrayType(StringType,true),true),StructField(ORG,ArrayType(StringType,true),true),StructField(PER,ArrayType(StringType,true),true),StructField(PROPN,ArrayType(StringType,true),true),StructField(URLS,ArrayType(StringType,true),true),StructField(VERB,ArrayType(StringType,true),true))),true),StructField(text,StringType,true)))

In [66]:
from io import StringIO 
import boto3

def store_tags(row):
    s3_bucket = 'tweet.watcher'
    s3_client = boto3.resource('s3')
    tags_rows = []
    for key in row['tags']:
        for value in row['tags'][key]:
            tags_rows += [(key, value, 1)]
    df = pd.DataFrame(tags_rows, columns=['tag', 'value', 'count'])
    df = df.groupby(['tag', 'value']).count()
    csv_buffer = StringIO()
    df.to_csv(csv_buffer)
    s3_client.Object(s3_bucket, f'{row["folder"]}/{row["id"]}_tags.csv').put(Body=csv_buffer.getvalue())
    
tweets_df[['folder', 'id', 'tags']].apply(store_tags, axis=1)

2021/02/14/11/1360993582694465538_tags.csv
2021/02/14/11/1360993454591991810_tags.csv
2021/02/14/11/1360993454549921798_tags.csv
2021/02/14/11/1360993260676804618_tags.csv
2021/02/14/11/1360992331873341451_tags.csv
2021/02/14/11/1360990938110156803_tags.csv
2021/02/14/11/1360989843472408583_tags.csv
2021/02/14/11/1360989714384375809_tags.csv
2021/02/14/11/1360988421359951877_tags.csv
2021/02/14/11/1360987334091681793_tags.csv
2021/02/14/11/1360987293767639042_tags.csv
2021/02/14/11/1360987163085611009_tags.csv
2021/02/14/11/1360986354105126915_tags.csv
2021/02/14/11/1360985905255878658_tags.csv
2021/02/14/11/1360985401452732419_tags.csv
2021/02/14/11/1360984660571025411_tags.csv
2021/02/14/11/1360984545529630725_tags.csv
2021/02/14/11/1360983388685967360_tags.csv
2021/02/14/11/1360982341674536970_tags.csv
2021/02/14/11/1360982336486207488_tags.csv
2021/02/14/11/1360982190536982533_tags.csv
2021/02/14/11/1360982137999134733_tags.csv
2021/02/14/11/1360981928954986508_tags.csv
2021/02/14/

KeyboardInterrupt: 

In [5]:
dataframes = []
for tag in tag_collection:
    df = pd.DataFrame(tag_collection[tag], columns=['created_at', 'value', 'count'])
    df['tag'] = tag
    for d in df.groupby('created_at'):
        dataframes += [d[1]]

tags_df = pd.concat(dataframes)
tags_df.head()

Unnamed: 0,created_at,value,count,tag
82,2021-02-14 11:45:00,EEUU,1,LOC
83,2021-02-14 11:45:00,COVID19 \n\n,1,LOC
84,2021-02-14 11:45:00,Buenaventura,1,LOC
81,2021-02-14 11:51:00,Per√∫,1,LOC
79,2021-02-14 11:55:00,ContenidoPremium,1,LOC


In [7]:
counts = tags_df.groupby(['created_at', 'value', 'tag']).count()
counts

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,count
created_at,value,tag,Unnamed: 3_level_1
2021-02-14 11:45:00,#,PROPN,1
2021-02-14 11:45:00,#Biden,HASHTAGS,1
2021-02-14 11:45:00,#COVID19,HASHTAGS,1
2021-02-14 11:45:00,#DWNoticias,HASHTAGS,1
2021-02-14 11:45:00,#EEUU,HASHTAGS,1
...,...,...,...
2021-02-14 16:45:00,saber,VERB,1
2021-02-14 16:45:00,vez,NOUN,1
2021-02-14 16:45:00,‚û°,PROPN,1
2021-02-14 16:45:00,Ô∏è,PROPN,1


In [10]:
counts.loc[counts['count'] > 1].sort_values('created_at')

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,count
created_at,value,tag,Unnamed: 3_level_1
2021-02-14 12:00:00,Trump,PROPN,2
2021-02-14 12:00:00,pa√≠s,NOUN,2
2021-02-14 12:30:00,#,PROPN,2
2021-02-14 12:40:00,contar,VERB,2
2021-02-14 12:40:00,‚û°,PROPN,2
2021-02-14 12:40:00,Ô∏è,PROPN,3
2021-02-14 13:00:00,#,PROPN,2
2021-02-14 13:00:00,partir,NOUN,2
2021-02-14 14:00:00,a√±o,NOUN,2
2021-02-14 14:00:00,vacuno,NOUN,2


In [None]:
query = None
with open(file_name) as f:
    query = f.load(json_file)