### Supuestos y Comentarios
- Se asume que los datos están almacenados sin compresión
- Se asume que es posible instalar las libreias:
> - apache-beam == 2.50.0
> - emoji==2.8.0

- No se utilizó cloud computing pues para el volumen particular de registros que este challenge contenía, no compensaba vs los tiempos de set up que una instancia de Dataflow puede tomar.
- Por temas de hardware personal este desafío fue realizado usando Google Colab


In [None]:
pip install apache_beam==2.50.0

In [2]:
pip install emoji==2.8.0

Collecting emoji==2.8.0
  Downloading emoji-2.8.0-py2.py3-none-any.whl (358 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m358.9/358.9 kB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: emoji
Successfully installed emoji-2.8.0


##1.- Las top 10 fechas donde hay más tweets.

#### 1.1 Importamos librerias a utilizar

In [3]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import argparse #
import sys
import datetime
import json
import collections
from typing import Tuple

#### 1.2 Definición Funciones Auxiliares

In [4]:
# Define una función DoFn que devuelve el valor más repetido de la lista
class MostRepeatedFn(beam.DoFn):
  def process(self, element):
    key, values = element
    counter = collections.Counter(values)
    most_common_value = counter.most_common(1)[0]
    yield (key, most_common_value)

#Obtener fecha y usuario del tweet
def fecha_user(obj):
    if obj:
        fecha = str(obj.get("date"))[:10]
        user = obj.get("user",{}).get("username","")
        return (fecha,user)

# Generar elementos desde una lista
def generate_elements(elements):
    for element in elements:
      yield element

# Define una función que formatea los elementos de la lista según lo solicitado en el punto1
def formatter_1(elements):
      date_obj = datetime.datetime.strptime(elements[0], '%Y-%m-%d').date()
      return (date_obj,elements[2])


def f2(elemento):
      return elemento[0]

class MisOpciones(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--projectid', help = 'Proyecto ID')

#### 1.3.1 Flujos: Enfoque Tiempo

Definimos el pipeline para el enfoque de tiempo de ejecución.

In [5]:
def proceso1_1(pipeline_options,ruta):

  #creamos pipeline proceso
  with beam.Pipeline(options=pipeline_options) as p:

    #leer datos
    p_datos = ( p
              | "Leer Archivo" >> beam.io.ReadFromText(ruta)
              | "to Json"  >> beam.Map(json.loads)
              | "Obtener fecha-usuario" >> beam.Map(fecha_user).with_output_types(Tuple[str, str])
              | 'Groupby' >> beam.GroupByKey()
               )
    User_top1 = ( p_datos
              | "Conteo User" >> beam.ParDo(MostRepeatedFn())
              #| "print Salida1" >> beam.Map(print)
              )
    Top_dias = ( p_datos
              | "Conteo2" >> beam.MapTuple(lambda k, vs: (k, len(vs)))
              | "top 10" >> beam.combiners.Top.Of(10, key = lambda x: x[1])
              | "flat" >> beam.FlatMap(generate_elements)
             #| "print Salida" >> beam.Map(print)
    )
    joined = (({ 'p1': Top_dias, 'p2': User_top1 })
            | 'Join' >> beam.CoGroupByKey()
            | 'Filter' >> beam.Filter(lambda v: v[1]['p1'] != [])
            | "convertir a tupla" >> beam.Map (lambda x : (1,(x[0], x[1]['p1'][0],x[1]['p2'][0]))).with_output_types(Tuple[int,Tuple])
            | 'Groupby2' >> beam.GroupByKey()
            |  "Ordenar" >> beam.MapTuple(lambda k, vs: (k, sorted(vs,key=lambda x: x[1], reverse=True)))
            | "Formatear" >> beam.MapTuple(lambda k, vs: (k, list(map(formatter_1, vs))))
            | "output" >> beam.Values()
            | "print Salida" >> beam.Map(print)
            )


In [6]:
def q1_time(file_path):

    #Recibo Argumentos
    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args(sys.argv)
    pipeline_options = PipelineOptions(pipeline_args)

    #Ejecutamos pipeline
    return proceso1_1(pipeline_options, file_path)

In [9]:
q1_time("/content/drive/MyDrive/DE Challenge/data.json")





[(datetime.date(2021, 2, 12), ('RanbirS00614606', 176)), (datetime.date(2021, 2, 13), ('MaanDee08215437', 178)), (datetime.date(2021, 2, 17), ('RaaJVinderkaur', 185)), (datetime.date(2021, 2, 16), ('jot__b', 133)), (datetime.date(2021, 2, 14), ('rebelpacifist', 119)), (datetime.date(2021, 2, 18), ('neetuanjle_nitu', 195)), (datetime.date(2021, 2, 15), ('jot__b', 134)), (datetime.date(2021, 2, 20), ('MangalJ23056160', 108)), (datetime.date(2021, 2, 23), ('Surrypuria', 135)), (datetime.date(2021, 2, 19), ('Preetm91', 267))]


#### 1.3.2 Flujos: Enfoque Memoria

Ojo que si corre, solo que demora 😅

In [10]:
def q1_memory(file_path):
  ar = open(file_path, "r", buffering=1024)

  # Crear un diccionario vacío para almacenar la frecuencia de los días
  fd = {}


  for linea in ar:
      tweet = json.loads(linea)
      dia = str(tweet.get("date"))[:10]
      fd[dia] = fd.get(dia, 0) + 1
  ar.close()
  del dia, ar, linea, tweet

  # Obtener el día con más tweets
  dias_maximo =tuple(map( f2 ,sorted(fd.items(), key=lambda x: -x[1])[:10]))
  del fd

  #creamos la lista de salida
  salida = []
  for d in dias_maximo:
      ar = open(file_path, "r", buffering=1024)
      # Crear un diccionario vacío para almacenar la frecuencia de los usuarios
      fu = {}
      for linea in ar:
          tweet = json.loads(linea)
          if str(tweet.get("date"))[:10] == d:
              usuario = tweet.get("user",{}).get("username","")
              fu[usuario] = fu.get(usuario, 0) + 1
      ar.close()
      del ar, linea, tweet, usuario
      salida.append((datetime.datetime.strptime(d, '%Y-%m-%d').date(),max(fu.items(), key=lambda x: x[1])[0]))
      del fu
  return salida

In [11]:
q1_memory("/content/drive/MyDrive/DE Challenge/data.json")

[(datetime.date(2021, 2, 12), 'RanbirS00614606'),
 (datetime.date(2021, 2, 13), 'MaanDee08215437'),
 (datetime.date(2021, 2, 17), 'RaaJVinderkaur'),
 (datetime.date(2021, 2, 16), 'jot__b'),
 (datetime.date(2021, 2, 14), 'rebelpacifist'),
 (datetime.date(2021, 2, 18), 'neetuanjle_nitu'),
 (datetime.date(2021, 2, 15), 'jot__b'),
 (datetime.date(2021, 2, 20), 'MangalJ23056160'),
 (datetime.date(2021, 2, 23), 'Surrypuria'),
 (datetime.date(2021, 2, 19), 'Preetm91')]

## 2. Los top 10 emojis más usados con su respectivo conteo.

#### 2.1 Importamos librerias a utilizar

In [24]:
import json
import collections
import heapq
import emoji as em

#### 2.2 Definición de funciones auxiliares

In [25]:
# Estraer Emojis
def get_emojis(text):
    matches = em.emoji_list(text)
    for match in matches:
        # Devolvemos solo el emoji usando yield
        yield match["emoji"]

#### 2.3.1 Flujos: Enfoque Tiempo
Este punto, la función utilizando collections y objetos nativos de python resultó mas rápida que Apache Beam para este volumen de datos.

In [26]:
def q2_time(file_path):

  # Creamos un contador vacío para almacenar los emojis y sus frecuencias
  emojis = collections.Counter()

  # Abrimos el archivo json con los datos de Twitter
  with open(file_path, "r") as f:
      for line in f:
          tweet = json.loads(line)
          text = tweet["content"]
          matches = em.emoji_list(text)
          if matches != []:
            emojis_list_ = [match["emoji"] for match in matches]
            emojis.update(emojis_list_)


  # Obtenemos el resultado ordenado por frecuencia
  result = emojis.most_common(10)
  print(result)

In [27]:
q2_time("/content/drive/MyDrive/DE Challenge/data.json")

[('🙏', 5049), ('😂', 3072), ('🚜', 2972), ('🌾', 2182), ('🇮🇳', 2086), ('🤣', 1668), ('✊', 1651), ('❤️', 1382), ('🙏🏻', 1317), ('💚', 1040)]


#### 2.3.2 Flujos: Enfoque Memoria

In [28]:
def q2_memory(file_path):
  #Aqui guardaremos las frecuencias
  emojis = {}

  # Abrimos el archivo json con los datos de Twitter
  with open(file_path, "r", buffering=1024) as f:
      for line in f:
          tweet = json.loads(line)
          text = tweet["content"]
          del tweet
          # Iteramos sobre los emojis del texto
          for emoji_ in get_emojis(text):
              emojis[emoji_] = emojis.get(emoji_, 0) + 1
          del text

  # Obtenemos la lista usando la función heapq.nlargest.
  result = heapq.nlargest(10, emojis.items(), key=lambda x: x[1])
  print(result)

In [29]:
q2_memory("/content/drive/MyDrive/DE Challenge/data.json")

[('🙏', 5049), ('😂', 3072), ('🚜', 2972), ('🌾', 2182), ('🇮🇳', 2086), ('🤣', 1668), ('✊', 1651), ('❤️', 1382), ('🙏🏻', 1317), ('💚', 1040)]


## 3. El top 10 histórico de usuarios (username) más influyentes en función del conteo de las menciones

#### 3.1 Importamos librerias a utilizar

In [30]:
import json
import collections
import heapq
import emoji as em

#### 3.2 Definición Funciones auxiliares

In [31]:
# Estraer usuarios mencionados
def get_mencionados(matches):
    for match in matches:
        # Devolvemos solo el los usuarios mencionados usando yield
        yield match["username"]

#### 3.3.1 Flujos: Enfoque Tiempo
Este punto, la función utilizando collections y objetos nativos de python resultó mas rápida que Apache Beam para este volumen de datos.

In [32]:
def q3_time(file_path):

  # Creamos un contador vacío para almacenar los emojis y sus frecuencias
  menciones = collections.Counter()

  # Abrimos el archivo json con los datos de Twitter
  with open(file_path, "r") as f:
      for line in f:
          tweet = json.loads(line)
          users_m = tweet["mentionedUsers"] #lista de menciones
          if users_m:
            menciones_list_ = [match["username"] for match in users_m]
            menciones.update(menciones_list_)


  # Obtenemos el resultado ordenado por frecuencia
  result = menciones.most_common()
  print(result)

In [33]:
q3_time("/content/drive/MyDrive/DE Challenge/data.json")

[('narendramodi', 2265), ('Kisanektamorcha', 1840), ('RakeshTikaitBKU', 1644), ('PMOIndia', 1427), ('RahulGandhi', 1146), ('GretaThunberg', 1048), ('RaviSinghKA', 1019), ('rihanna', 986), ('UNHumanRights', 962), ('meenaharris', 926), ('Tractor2twitr', 884), ('hrw', 833), ('DelhiPolice', 814), ('BJP4India', 777), ('INCIndia', 723), ('jazzyb', 697), ('UN', 681), ('priyankagandhi', 612), ('YouTube', 608), ('BBCWorld', 551), ('ndtv', 547), ('AmandaCerny', 543), ('AmitShah', 514), ('amnesty', 513), ('diljitdosanjh', 500), ('amaanbali', 490), ('PunYaab', 490), ('sushant_says', 481), ('ClaudiaWebbe', 480), ('KanganaTeam', 467), ('aajtak', 426), ('ZeeNews', 422), ('JustinTrudeau', 401), ('sachin_rt', 395), ('CNN', 393), ('ArvindKejriwal', 388), ('JoeBiden', 375), ('nytimes', 364), ('POTUS', 348), ('neetuanjle_nitu', 348), ('nstomar', 347), ('Monica_Gill1', 344), ('ANI', 329), ('miakhalifa', 320), ('saahilmenghani', 307), ('OfficialBKU', 300), ('rupikaur_', 299), ('TanDhesi', 296), ('ndtvindia'

#### 3.3.2 Flujos: Enfoque Memoria

In [34]:
def q3_memory(file_path):
  #Aqui guardaremos las frecuencias
  menciones = {}

  # Abrimos el archivo json con los datos de Twitter
  with open(file_path, "r", buffering=1024) as f:
      for line in f:
          tweet = json.loads(line)
          list_usu = tweet["mentionedUsers"]
          del tweet
          if list_usu:
            for usuario in get_mencionados(list_usu):
                menciones[usuario] = menciones.get(usuario, 0) + 1
          del list_usu

  # Obtenemos la lista usando la función heapq.nlargest.
  result = heapq.nlargest(len(menciones), menciones.items(), key=lambda x: x[1])
  print(result)

In [35]:
q3_memory("/content/drive/MyDrive/DE Challenge/data.json")

[('narendramodi', 2265), ('Kisanektamorcha', 1840), ('RakeshTikaitBKU', 1644), ('PMOIndia', 1427), ('RahulGandhi', 1146), ('GretaThunberg', 1048), ('RaviSinghKA', 1019), ('rihanna', 986), ('UNHumanRights', 962), ('meenaharris', 926), ('Tractor2twitr', 884), ('hrw', 833), ('DelhiPolice', 814), ('BJP4India', 777), ('INCIndia', 723), ('jazzyb', 697), ('UN', 681), ('priyankagandhi', 612), ('YouTube', 608), ('BBCWorld', 551), ('ndtv', 547), ('AmandaCerny', 543), ('AmitShah', 514), ('amnesty', 513), ('diljitdosanjh', 500), ('amaanbali', 490), ('PunYaab', 490), ('sushant_says', 481), ('ClaudiaWebbe', 480), ('KanganaTeam', 467), ('aajtak', 426), ('ZeeNews', 422), ('JustinTrudeau', 401), ('sachin_rt', 395), ('CNN', 393), ('ArvindKejriwal', 388), ('JoeBiden', 375), ('nytimes', 364), ('POTUS', 348), ('neetuanjle_nitu', 348), ('nstomar', 347), ('Monica_Gill1', 344), ('ANI', 329), ('miakhalifa', 320), ('saahilmenghani', 307), ('OfficialBKU', 300), ('rupikaur_', 299), ('TanDhesi', 296), ('ndtvindia'