# Data Engineer Challenge LATAM

En este archivo puedes escribir lo que estimes conveniente. Te recomendamos detallar tu solución y todas las suposiciones que estás considerando. Aquí puedes ejecutar las funciones que definiste en los otros archivos de la carpeta src, medir el tiempo, memoria, etc.

## Introducción

En el desafío se pide implementar funciones que reciban los datos de tweets y realicen las consultas entregadas en 3 preguntas.

Se explica que la solución puede utilizar herramientas cloud, por lo que se decide utilizar Cloud Storage y Bigquery para almacenar y realizar consultas sobre los datos.

Además, se decide experimentar con pandas para comparar los resultados en tiempo de ejecución con Bigquery.

Por otro lado, se asume que se puede realizar un preprocesamiento a los datos para facilitar las consultas. Además, de esta manera se disminuye el tiempo de ejecución y de memoria utilizada, por lo que cumple con los requerimientos del desafío.

## Análisis introductorio

Primero se realiza un breve análisis de los datos para entender su estructura y los datos que contienen.

In [48]:
import pandas as pd

file_path = "farmers-protest-tweets-2021-2-4.json"
df = pd.read_json(file_path, lines=True)

In [8]:
df.columns

Index(['url', 'date', 'content', 'renderedContent', 'id', 'user', 'outlinks',
       'tcooutlinks', 'replyCount', 'retweetCount', 'likeCount', 'quoteCount',
       'conversationId', 'lang', 'source', 'sourceUrl', 'sourceLabel', 'media',
       'retweetedTweet', 'quotedTweet', 'mentionedUsers'],
      dtype='object')

Se observa que los datos contienen varias columnas y algunas de ellas contienen datos repetidos de otras columnas (source, sourceUrl).

In [17]:
df.head()

Unnamed: 0,url,date,content,renderedContent,id,user,outlinks,tcooutlinks,replyCount,retweetCount,...,lang,source,sourceUrl,sourceLabel,media,retweetedTweet,quotedTweet,mentionedUsers,username,emojis
0,https://twitter.com/ArjunSinghPanam/status/136...,2021-02-24 09:23:35+00:00,The world progresses while the Indian police a...,The world progresses while the Indian police a...,1364506249291784198,"{'username': 'ArjunSinghPanam', 'displayname':...",[https://twitter.com/ravisinghka/status/136415...,[https://t.co/es3kn0IQAF],0,0,...,en,"<a href=""http://twitter.com/download/iphone"" r...",http://twitter.com/download/iphone,Twitter for iPhone,,,{'url': 'https://twitter.com/RaviSinghKA/statu...,"[{'username': 'narendramodi', 'displayname': '...",ArjunSinghPanam,[]
1,https://twitter.com/PrdeepNain/status/13645062...,2021-02-24 09:23:32+00:00,#FarmersProtest \n#ModiIgnoringFarmersDeaths \...,#FarmersProtest \n#ModiIgnoringFarmersDeaths \...,1364506237451313155,"{'username': 'PrdeepNain', 'displayname': 'Pra...",[],[],0,0,...,en,"<a href=""http://twitter.com/download/android"" ...",http://twitter.com/download/android,Twitter for Android,[{'thumbnailUrl': 'https://pbs.twimg.com/ext_t...,,,"[{'username': 'Kisanektamorcha', 'displayname'...",PrdeepNain,"[💪, 🚜, 🌾]"
2,https://twitter.com/parmarmaninder/status/1364...,2021-02-24 09:23:22+00:00,ਪੈਟਰੋਲ ਦੀਆਂ ਕੀਮਤਾਂ ਨੂੰ ਮੱਦੇਨਜ਼ਰ ਰੱਖਦੇ ਹੋਏ \nਮੇ...,ਪੈਟਰੋਲ ਦੀਆਂ ਕੀਮਤਾਂ ਨੂੰ ਮੱਦੇਨਜ਼ਰ ਰੱਖਦੇ ਹੋਏ \nਮੇ...,1364506195453767680,"{'username': 'parmarmaninder', 'displayname': ...",[],[],0,0,...,pa,"<a href=""http://twitter.com/download/android"" ...",http://twitter.com/download/android,Twitter for Android,,,,,parmarmaninder,"[🤫, 🤔]"
3,https://twitter.com/anmoldhaliwal/status/13645...,2021-02-24 09:23:16+00:00,@ReallySwara @rohini_sgh watch full video here...,@ReallySwara @rohini_sgh watch full video here...,1364506167226032128,"{'username': 'anmoldhaliwal', 'displayname': '...",[https://youtu.be/-bUKumwq-J8],[https://t.co/wBPNdJdB0n],0,0,...,en,"<a href=""https://mobile.twitter.com"" rel=""nofo...",https://mobile.twitter.com,Twitter Web App,[{'thumbnailUrl': 'https://pbs.twimg.com/ext_t...,,,"[{'username': 'ReallySwara', 'displayname': 'S...",anmoldhaliwal,[]
4,https://twitter.com/KotiaPreet/status/13645061...,2021-02-24 09:23:10+00:00,#KisanEktaMorcha #FarmersProtest #NoFarmersNoF...,#KisanEktaMorcha #FarmersProtest #NoFarmersNoF...,1364506144002088963,"{'username': 'KotiaPreet', 'displayname': 'Pre...",[],[],0,0,...,und,"<a href=""http://twitter.com/download/iphone"" r...",http://twitter.com/download/iphone,Twitter for iPhone,[{'previewUrl': 'https://pbs.twimg.com/media/E...,,,,KotiaPreet,[]


Lo primero que observo es que no se van a utilizar todas las columnas, por lo que se decide seleccionar solo las columnas que se van a utilizar.
Pruebo algunas funciones para normalizar los datos y ver si se pueden realizar consultas más eficientes.

In [9]:
user_df = pd.json_normalize(df["user"])

In [10]:
df["username"] = df["user"].apply(lambda x: x["username"])

In [11]:
import emoji

df["emojis"] = df["content"].apply(emoji.distinct_emoji_list)

In [12]:
df["emojis"]

0                []
1         [💪, 🚜, 🌾]
2            [🤫, 🤔]
3                []
4                []
            ...    
117402           []
117403           []
117404           []
117405           []
117406          [💪]
Name: emojis, Length: 117407, dtype: object

La información del usuario contiene principalmente datos relacionados solamente con detalles propios del usuario, sin relacionarse directamente con el tweet.

In [14]:
df.content[0]

'The world progresses while the Indian police and Govt are still trying to take India back to the horrific past through its tyranny. \n\n@narendramodi @DelhiPolice Shame on you. \n\n#ModiDontSellFarmers \n#FarmersProtest \n#FreeNodeepKaur https://t.co/es3kn0IQAF'

In [15]:
df.source[0]

'<a href="http://twitter.com/download/iphone" rel="nofollow">Twitter for iPhone</a>'

In [16]:
df.sourceUrl[0]

'http://twitter.com/download/iphone'

In [18]:
def get_mentioned_users(mentioned_users):
    if not mentioned_users:
        return []
    return [user["username"] for user in mentioned_users]


df["mentioned_users"] = df["mentionedUsers"].apply(get_mentioned_users)

## Selección de Columnas

Para las consultas específicas del desafío, me concentraré en las siguientes columnas:

* date: Para identificar las fechas con más tweets.
* user: Para extraer el username.
* content: Para contar emojis y menciones.
* mentionedUsers: Para contar menciones.
* id: Para identificar los tweets.

## Preprocesamiento de Datos

Para el preprocesamiento de los datos es necesario considerar las siguientes condiciones:

1. Filtrar las columnas necesarias:

 * Seleccionamos solo las columnas que necesitamos para cada análisis específico.

2. Normalización y transformación:

 * Extraer usernames de las estructuras anidadas.
 * Extraer y contar emojis de los contenidos.
 * Extraer y contar menciones de las estructuras anidadas.
 * Normalizar la fecha a un formato simple YYYY-MM-DD.


In [4]:
import os
from google.cloud import storage

os.getenv("GOOGLE_APPLICATION_CREDENTIALS")

BUCKET_NAME = "farmers-protest-tweets"
FILE_NAME = "farmers-protest-tweets-2021-2-4.json"
destination_blob_name = "raw/farmers-protest-tweets-2021-2-4.json"

In [2]:
def get_or_create_bucket(bucket_name):
    storage_client = storage.Client()

    try:
        bucket = storage_client.get_bucket(bucket_name)
        print(f"Bucket {bucket_name} found.")
    except storage.exceptions.NotFound:
        bucket = storage_client.create_bucket(bucket_name)
        print(f"Bucket {bucket_name} not found. Created new bucket.")
    except Exception as e:
        print(f"Error accessing bucket {bucket_name}: {e}")
        raise
    return bucket


def upload_file_to_blob(bucket, source_file_name, destination_blob_name):
    try:
        blob = bucket.blob(destination_blob_name)
        blob.upload_from_filename(source_file_name)
        print(
            f"File {source_file_name} uploaded to {destination_blob_name} in bucket {bucket.name}."
        )
    except Exception as e:
        print(f"Error uploading file {source_file_name} to bucket {bucket.name}: {e}")
        raise


def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    bucket = get_or_create_bucket(bucket_name)
    upload_file_to_blob(bucket, source_file_name, destination_blob_name)

In [5]:
upload_blob(BUCKET_NAME, FILE_NAME, destination_blob_name)

Bucket farmers-protest-tweets found.
File farmers-protest-tweets-2021-2-4.json uploaded to raw/farmers-protest-tweets-2021-2-4.json in bucket farmers-protest-tweets.


In [13]:
import os

import emoji
import memory_profiler
import pandas as pd
from typing import List


def extract_emojis(text: str):
    """
    Extract emojis from the text
    @param text: str
    @return: str
    """
    emojis = emoji.distinct_emoji_list(text)
    return ",".join(emojis) if emojis else None


def extract_mentions(mentions: List[dict]):
    """
    Extract usernames from the mentions
    @param mentions: List[dict]
    @return: str
    """
    if mentions is not None:
        users = [user["username"] for user in mentions]
        return ",".join(users) if users else None


@memory_profiler.profile
def preprocess_twitter_data(file_path):
    data = pd.read_json(file_path, lines=True)

    filtered_data = data[["date", "user", "content", "mentionedUsers", "id"]]

    filtered_data["username"] = filtered_data["user"].apply(lambda x: x["username"])
    filtered_data["emojis"] = filtered_data["content"].apply(extract_emojis)
    filtered_data["mentions"] = filtered_data["mentionedUsers"].apply(extract_mentions)
    filtered_data.drop(columns=["user", "mentionedUsers"], inplace=True)
    filtered_data.rename(columns={"id": "tweet_id", "date": "tweet_date"}, inplace=True)
    filtered_data["tweet_date"] = pd.to_datetime(filtered_data["tweet_date"]).dt.date
    filtered_data["tweet_date"] = filtered_data["tweet_date"].apply(
        lambda x: x.strftime("%Y-%m-%d")
    )

    basename = os.path.basename(file_path)
    basename = basename.split(".")[0]
    filtered_name = f"{basename}_filtered.json"

    filtered_data.to_json(filtered_name, orient="records", lines=True)

In [None]:
# Run the memory profiler
!mprof run preprocess_twitter_data.py

mprof: Sampling memory every 0.1s
running new process
running as a Python program...
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  filtered_data['username'] = filtered_data['user'].apply(lambda x: x['username'])
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  filtered_data['emojis'] = filtered_data['content'].apply(extract_emojis)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html

In [None]:
# Generate the memory usage plot
!mprof plot

Using last profile data.
Figure(1260x540)


### Deploy preprocesamiento

* Habilitar la api de Cloud Build en GCP para deployar la cloud function que realizará el preprocesamiento de los datos
* Para preprocesar los datos se utilizará una cloud function que se encargará de realizar el preprocesamiento de los datos y almacenarlos en un bucket de GCP.
* Se utilizará un trigger de Cloud Storage para ejecutar la cloud function cada vez que se suba un archivo al bucket.
* Se utilizará un bucket de GCP para almacenar los datos preprocesados.
* El script de la cloud function se encuentra en el archivo `main.py` en la carpeta `preprocessing`.

In [1]:
!gcloud functions deploy preprocess_twitter_data \
    --runtime python39 \
    --trigger-resource $BUCKET_NAME \
    --trigger-event google.storage.object.finalize \
    --source=preprocessing \
    --entry-point preprocess_twitter_data \
    --project $PROJECT_ID

In a future Cloud SDK release, new functions will be deployed as 2nd gen  functions by default. This is equivalent to currently deploying new  with the --gen2 flag. Existing 1st gen functions will not be impacted and will continue to deploy as 1st gen functions.
You can preview this behavior in beta. Alternatively, you can disable this behavior by explicitly specifying the --no-gen2 flag or by setting the functions/gen2 config property to 'off'.
To learn more about the differences between 1st gen and 2nd gen functions, visit:
https://cloud.google.com/functions/docs/concepts/version-comparison
Deploying function (may take a while - up to 2 minutes)...⠹                    
For Cloud Build Logs, visit: https://console.cloud.google.com/cloud-build/builds;region=us-central1/50602127-cd2f-43fe-b775-7f0de0d7cf38?project=888729167142
Deploying function (may take a while - up to 2 minutes)...done.                
automaticUpdatePolicy: {}
availableMemoryMb: 4096
buildId: 50602127-cd2f-43fe-b775

In [21]:
from google.cloud import bigquery
import os

dataset = "farmers"
table_name = "protest_tweets"
bucket_name = os.environ["BUCKET_NAME"]

file_name = "farmers-protest-tweets-2021-2-4.json"
basename = file_name.split(".")[0]
filtered_file = f"filtered/{basename}_filtered.json"


def create_dataset(dataset_name):
    client = bigquery.Client()
    dataset_id = f"{client.project}.{dataset_name}"
    dataset = bigquery.Dataset(dataset_id)
    dataset.location = "US"
    dataset = client.create_dataset(dataset, timeout=30)
    print(f"Created dataset {dataset.dataset_id}")


def create_table_from_csv_bucket(dataset_name, table_name, file_path, bucket_name):
    client = bigquery.Client()
    dataset_ref = client.dataset(dataset_name)
    table_ref = dataset_ref.table(table_name)
    job_config = bigquery.LoadJobConfig()
    job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
    job_config.autodetect = True
    job_config.allow_quoted_newlines = True
    uri = f"gs://{bucket_name}/{file_path}"
    load_job = client.load_table_from_uri(uri, table_ref, job_config=job_config)
    print(f"Starting job {load_job.job_id}")
    load_job.result()
    print("Job finished")

In [None]:
# create_dataset(dataset)

In [None]:
create_table_from_csv_bucket(dataset, table_name, filtered_file, bucket_name)

## Pregunta 1

Las top 10 fechas donde hay más tweets. Mencionar el usuario (username) que más publicaciones tiene por cada uno de esos días.

### Version Pandas sin preprocesamiento

Para este caso se consideró realizar el conteo de los tweets utilizando el archivo de los datos sin preprocesar, para comparar el tiempo de ejecución con el preprocesamiento.

In [6]:
MAIN_FILE = "farmers-protest-tweets-2021-2-4.json"
FILTERED_FILE = "farmers-protest-tweets-2021-2-4_filtered.json"

In [7]:
from typing import List, Tuple
from datetime import datetime
import pandas as pd
import time


def q1_time(file_path: str) -> List[Tuple[datetime.date, str]]:
    start_time = time.time()

    data = pd.read_json(file_path, lines=True)
    data["date"] = pd.to_datetime(data["date"]).dt.date
    top_dates = data.groupby("date").size().nlargest(10).index
    result = []

    for date in top_dates:
        top_user = (
            data[data["date"] == date]["user"]
            .apply(lambda x: x["username"])
            .value_counts()
            .idxmax()
        )
        result.append((date, top_user))

    end_time = time.time()
    print(f"Execution time: {end_time - start_time} seconds")

    return result

In [8]:
q1_time(MAIN_FILE)

Execution time: 6.122255802154541 seconds


[(datetime.date(2021, 2, 12), 'RanbirS00614606'),
 (datetime.date(2021, 2, 13), 'MaanDee08215437'),
 (datetime.date(2021, 2, 17), 'RaaJVinderkaur'),
 (datetime.date(2021, 2, 16), 'jot__b'),
 (datetime.date(2021, 2, 14), 'rebelpacifist'),
 (datetime.date(2021, 2, 18), 'neetuanjle_nitu'),
 (datetime.date(2021, 2, 15), 'jot__b'),
 (datetime.date(2021, 2, 20), 'MangalJ23056160'),
 (datetime.date(2021, 2, 23), 'Surrypuria'),
 (datetime.date(2021, 2, 19), 'Preetm91')]

#### Version Pandas con preprocesamiento

Para evaluar el rendimiento haciendolo con pandas, se utilizan funcionalidad de la librería para agrupar y contar los tweets por fecha y usuario.

In [85]:
from typing import List, Tuple
from datetime import datetime
import pandas as pd


def q1_time(file_path: str) -> List[Tuple[datetime.date, str]]:
    start_time = time.time()

    data = pd.read_json(file_path, lines=True)
    data["tweet_date"] = pd.to_datetime(data["tweet_date"]).dt.date
    top_dates = data.groupby("tweet_date").size().nlargest(10).index
    result = []

    for date in top_dates:
        top_user = data[data["tweet_date"] == date]["username"].value_counts().idxmax()
        result.append((date, top_user))

    end_time = time.time()
    print(f"Execution time: {end_time - start_time} seconds")

    return result

In [86]:
q1_time(FILTERED_FILE)

Execution time: 0.5751049518585205 seconds


[(datetime.date(2021, 2, 12), 'RanbirS00614606'),
 (datetime.date(2021, 2, 13), 'MaanDee08215437'),
 (datetime.date(2021, 2, 17), 'RaaJVinderkaur'),
 (datetime.date(2021, 2, 16), 'jot__b'),
 (datetime.date(2021, 2, 14), 'rebelpacifist'),
 (datetime.date(2021, 2, 18), 'neetuanjle_nitu'),
 (datetime.date(2021, 2, 15), 'jot__b'),
 (datetime.date(2021, 2, 20), 'MangalJ23056160'),
 (datetime.date(2021, 2, 23), 'Surrypuria'),
 (datetime.date(2021, 2, 19), 'Preetm91')]

#### Versión Bigquery con preprocesamiento

Para evaluar el rendimiento de Bigquery, se utilizan las funcionalidades de la librería para realizar la consulta y obtener los resultados.

In [27]:
from datetime import datetime
from typing import List, Tuple

import os


PROJECT_ID = os.getenv("PROJECT_ID")
DATASET_ID = os.getenv("DATASET_ID")
TABLE_ID = os.getenv("TABLE_ID")


def q1_time() -> List[Tuple[datetime.date, str]]:
    start_time = time.time()

    bigquery_client = bigquery.Client(project=PROJECT_ID)

    query = """
    with top_dates as (
    SELECT DATE(date) as tweet_date, COUNT(*) as tweet_count
        FROM `{dataset_id}.{table_id}`
        GROUP BY DATE(date)
        ORDER BY tweet_count DESC
        LIMIT 10
    ), user_count as (
    SELECT t.tweet_date, f.username, max(t.tweet_count) tweet_count,
    count(username) tweet_num, ROW_NUMBER() OVER (PARTITION BY DATE(t.tweet_date) ORDER BY COUNT(f.username) DESC) AS row_num
    FROM `{dataset_id}.{table_id}` f
    JOIN top_dates t
    on (date(f.date) = t.tweet_date)
    group by t.tweet_date, f.username
    order by 3 desc)
    SELECT 
        tweet_date, 
        username,
        tweet_count
    FROM user_count
    WHERE row_num = 1
    ORDER BY tweet_count DESC;
    """.format(dataset_id=DATASET_ID, table_id=TABLE_ID)

    query_job = bigquery_client.query(query)
    results = query_job.result()

    result = [(row.tweet_date, row.username) for row in results]

    end_time = time.time()
    print(f"Execution time: {end_time - start_time} seconds")

    return result

In [28]:
q1_time()

Execution time: 1.7329068183898926 seconds


[(datetime.date(2021, 2, 12), 'RanbirS00614606'),
 (datetime.date(2021, 2, 13), 'MaanDee08215437'),
 (datetime.date(2021, 2, 17), 'RaaJVinderkaur'),
 (datetime.date(2021, 2, 16), 'jot__b'),
 (datetime.date(2021, 2, 14), 'rebelpacifist'),
 (datetime.date(2021, 2, 18), 'neetuanjle_nitu'),
 (datetime.date(2021, 2, 15), 'jot__b'),
 (datetime.date(2021, 2, 20), 'MangalJ23056160'),
 (datetime.date(2021, 2, 23), 'Surrypuria'),
 (datetime.date(2021, 2, 19), 'Preetm91')]

#### Optimización de memoria

Para optimizar la memoria utilizada se realiza una lectura linea por linea del archivo, se almacena los datos en diccionarios y se realiza el preprocesamiento de los datos en el mismo ciclo.

In [34]:
from typing import List, Tuple
from datetime import datetime
import json


def q1_memory(file_path: str) -> List[Tuple[datetime.date, str]]:
    start_time = time.time()

    tweet_counts = {}
    user_counts = {}

    with open(file_path, "r") as file:
        for line in file:
            tweet = json.loads(line)
            date = datetime.strptime(tweet["tweet_date"], "%Y-%m-%d").date()
            user = tweet["username"]

            if date not in tweet_counts:
                tweet_counts[date] = 0
                user_counts[date] = {}
            tweet_counts[date] += 1

            if user not in user_counts[date]:
                user_counts[date][user] = 0
            user_counts[date][user] += 1

    top_dates = sorted(tweet_counts, key=tweet_counts.get, reverse=True)[:10]
    result = [
        (date, max(user_counts[date], key=user_counts[date].get), tweet_counts[date])
        for date in top_dates
    ]

    end_time = time.time()
    print(f"Execution time: {end_time - start_time} seconds")

    return result

In [87]:
q1_memory(FILTERED_FILE)

Execution time: 0.8252959251403809 seconds


[(datetime.date(2021, 2, 12), 'RanbirS00614606', 12347),
 (datetime.date(2021, 2, 13), 'MaanDee08215437', 11296),
 (datetime.date(2021, 2, 17), 'RaaJVinderkaur', 11087),
 (datetime.date(2021, 2, 16), 'jot__b', 10443),
 (datetime.date(2021, 2, 14), 'rebelpacifist', 10249),
 (datetime.date(2021, 2, 18), 'neetuanjle_nitu', 9625),
 (datetime.date(2021, 2, 15), 'jot__b', 9197),
 (datetime.date(2021, 2, 20), 'MangalJ23056160', 8502),
 (datetime.date(2021, 2, 23), 'Surrypuria', 8417),
 (datetime.date(2021, 2, 19), 'Preetm91', 8204)]

## Pregunta 2

**Los top 10 emojis más usados con su respectivo conteo.**

Para esta pregunta se asume que:
* Los emojis se cuenta una sola vez por tweet, sin importar cuantas veces se repita.
* Se consideran los emojis que están en el contenido del tweet.

#### Versión Pandas con preprocesamiento

In [9]:
from typing import List, Tuple
import pandas as pd


def q2_time(file_path: str) -> List[Tuple[str, int]]:
    start_time = time.time()

    data = pd.read_json(file_path, lines=True)
    emojis_count = (
        data["emojis"]
        .apply(lambda x: x.split(",") if x else [])
        .explode()
        .value_counts()
        .head(10)
    )

    result = [(emoji, count) for emoji, count in emojis_count.items()]
    end_time = time.time()
    print(f"Execution time: {end_time - start_time} seconds")
    return result

In [10]:
q2_time(FILTERED_FILE)

Execution time: 0.45200157165527344 seconds


[('🙏', 3203),
 ('😂', 1387),
 ('🚜', 1334),
 ('🌾', 1298),
 ('❤️', 1205),
 ('✊', 1110),
 ('🇮🇳', 938),
 ('🤣', 759),
 ('👍', 634),
 ('🙏🏻', 580)]

#### Versión Pandas sin preprocesamiento

In [17]:
from typing import List, Tuple
import pandas as pd


def q2_time(file_path: str) -> List[Tuple[str, int]]:
    start_time = time.time()

    data = pd.read_json(file_path, lines=True)
    emoji_counts = data["content"].apply(extract_emojis).explode().value_counts()
    result = [(emoji, count) for emoji, count in emoji_counts.head(10).items()]

    end_time = time.time()
    print(f"Execution time: {end_time - start_time} seconds")

    return result

In [18]:
q2_time(MAIN_FILE)

Execution time: 12.31988263130188 seconds


[('🙏', 1789),
 ('😂', 826),
 ('❤️', 762),
 ('👍', 457),
 ('✊', 455),
 ('🤣', 374),
 ('🙏🏻', 361),
 ('👇', 329),
 ('🙏🏽', 277),
 ('🇮🇳', 269)]

#### Versión Bigquery con preprocesamiento

In [22]:
from datetime import datetime
from typing import List, Tuple

import os


PROJECT_ID = os.getenv("PROJECT_ID")
DATASET_ID = os.getenv("DATASET_ID")
TABLE_ID = os.getenv("TABLE_ID")


def q2_time() -> List[Tuple[datetime.date, str]]:
    start_time = time.time()

    bigquery_client = bigquery.Client(project=PROJECT_ID)

    query = """
    SELECT
    emoji,
    count(emoji) emoji_count
    FROM (
    SELECT
        emojis,
        SPLIT(emojis, ',') AS emoji_list,
        ARRAY_LENGTH(SPLIT(emojis, ',')) length_a
    FROM
        `{dataset_id}.{table_id}`
    WHERE emojis is not null
    ORDER BY 3 DESC
    ), UNNEST(emoji_list) AS emoji
    GROUP BY emoji
    ORDER BY 2 DESC
    LIMIT 10;
    """.format(dataset_id=DATASET_ID, table_id=TABLE_ID)

    query_job = bigquery_client.query(query)
    results = query_job.result()

    result = [(row.emoji, row.emoji_count) for row in results]

    end_time = time.time()
    print(f"Execution time: {end_time - start_time} seconds")

    return result

In [23]:
q2_time()

Execution time: 1.923854112625122 seconds


[('🙏', 3203),
 ('😂', 1387),
 ('🚜', 1334),
 ('🌾', 1298),
 ('❤️', 1205),
 ('✊', 1110),
 ('🇮🇳', 938),
 ('🤣', 759),
 ('👍', 634),
 ('🙏🏻', 580)]

#### Optimización de memoria

In [55]:
from collections import Counter
from typing import List, Tuple

import memory_profiler


@memory_profiler.profile
def q2_memory(file_path: str) -> List[Tuple[str, int]]:
    start_time = time.time()

    emoji_counts = Counter()

    with open(file_path, "r") as file:
        for line in file:
            tweet = json.loads(line)
            emojis = tweet.get("emojis")
            if emojis:
                emoji_list = emojis.split(",")
                emoji_counts.update(emoji_list)

    top_10_emojis = emoji_counts.most_common(10)
    result = [(emoji, count) for emoji, count in top_10_emojis]

    end_time = time.time()

    print(f"Execution time: {end_time - start_time} seconds")

    return result

In [58]:
q2_memory(FILTERED_FILE)

ERROR: Could not find file /tmp/ipykernel_16414/1910330258.py
Execution time: 1.6731047630310059 seconds


[('🙏', 3203),
 ('😂', 1387),
 ('🚜', 1334),
 ('🌾', 1298),
 ('❤️', 1205),
 ('✊', 1110),
 ('🇮🇳', 938),
 ('🤣', 759),
 ('👍', 634),
 ('🙏🏻', 580)]

## Pregunta 3

El top 10 histórico de usuarios (username) más influyentes en función del conteo de las menciones (@) que registra cada uno de ellos.

#### Versión Pandas con preprocesamiento

In [59]:
from typing import List, Tuple
import pandas as pd


def q3_time(file_path: str) -> List[Tuple[str, int]]:
    start_time = time.time()

    data = pd.read_json(file_path, lines=True)
    data["mentions"] = data["mentions"].apply(lambda x: x.split(",") if x else [])
    mentions_counts = Counter()
    for mentions in data["mentions"]:
        mentions_counts.update(mentions)

    top_10_mentions = mentions_counts.most_common(10)
    result = [(mention, count) for mention, count in top_10_mentions]
    end_time = time.time()
    print(f"Execution time: {end_time - start_time} seconds")

    return result

In [60]:
q3_time(FILTERED_FILE)

Execution time: 0.49052858352661133 seconds


[('narendramodi', 2265),
 ('Kisanektamorcha', 1840),
 ('RakeshTikaitBKU', 1644),
 ('PMOIndia', 1427),
 ('RahulGandhi', 1146),
 ('GretaThunberg', 1048),
 ('RaviSinghKA', 1019),
 ('rihanna', 986),
 ('UNHumanRights', 962),
 ('meenaharris', 926)]

#### Versión Pandas sin preprocesamiento

In [24]:
from typing import List, Tuple
import pandas as pd


def q3_time(file_path: str) -> List[Tuple[str, int]]:
    start_time = time.time()

    data = pd.read_json(file_path, lines=True)
    mentions = data["mentionedUsers"].explode().dropna().apply(lambda x: x["username"])
    mention_counts = mentions.value_counts().nlargest(10)
    result = list(mention_counts.items())

    end_time = time.time()
    print(f"Execution time: {end_time - start_time} seconds")

    return result

In [25]:
q3_time(MAIN_FILE)

Execution time: 5.680205821990967 seconds


[('narendramodi', 2265),
 ('Kisanektamorcha', 1840),
 ('RakeshTikaitBKU', 1644),
 ('PMOIndia', 1427),
 ('RahulGandhi', 1146),
 ('GretaThunberg', 1048),
 ('RaviSinghKA', 1019),
 ('rihanna', 986),
 ('UNHumanRights', 962),
 ('meenaharris', 926)]

#### Versión Bigquery con preprocesamiento

In [66]:
from datetime import datetime
from typing import List, Tuple

import os


PROJECT_ID = os.getenv("PROJECT_ID")
DATASET_ID = os.getenv("DATASET_ID")
TABLE_ID = os.getenv("TABLE_ID")


def q3_time() -> List[Tuple[datetime.date, str]]:
    start_time = time.time()

    bigquery_client = bigquery.Client(project=PROJECT_ID)

    query = """
        SELECT
        mention,
        count(mention) mentions_count
        FROM (
        SELECT
            SPLIT(mentions, ',') AS mention_list,
            ARRAY_LENGTH(SPLIT(mentions, ',')) length_a
        FROM
            `{dataset_id}.{table_id}`
        WHERE mentions is not null
        ), UNNEST(mention_list) AS mention
        GROUP BY mention
        ORDER BY 2 DESC
        LIMIT 10;
    """.format(dataset_id=DATASET_ID, table_id=TABLE_ID)

    query_job = bigquery_client.query(query)
    results = query_job.result()

    result = [(row.mention, row.mentions_count) for row in results]

    end_time = time.time()
    print(f"Execution time: {end_time - start_time} seconds")

    return result

In [67]:
q3_time()

Execution time: 2.542377471923828 seconds


[('narendramodi', 2265),
 ('Kisanektamorcha', 1840),
 ('RakeshTikaitBKU', 1644),
 ('PMOIndia', 1427),
 ('RahulGandhi', 1146),
 ('GretaThunberg', 1048),
 ('RaviSinghKA', 1019),
 ('rihanna', 986),
 ('UNHumanRights', 962),
 ('meenaharris', 926)]

#### Optimización de memoria

In [63]:
from typing import List, Tuple

import memory_profiler


@memory_profiler.profile
def q3_memory(file_path: str) -> List[Tuple[str, int]]:
    start_time = time.time()

    mentions_counts = Counter()

    with open(file_path, "r") as file:
        for line in file:
            tweet = json.loads(line)
            mentions = tweet.get("mentions")
            if mentions:
                mention_list = mentions.split(",")
                mentions_counts.update(mention_list)

    top_10_mentions = mentions_counts.most_common(10)

    result = [(mention, count) for mention, count in top_10_mentions]
    end_time = time.time()

    print(f"Execution time: {end_time - start_time} seconds")

    return result

In [64]:
q3_memory(FILTERED_FILE)

ERROR: Could not find file /tmp/ipykernel_16414/947806769.py
Execution time: 1.8369793891906738 seconds


[('narendramodi', 2265),
 ('Kisanektamorcha', 1840),
 ('RakeshTikaitBKU', 1644),
 ('PMOIndia', 1427),
 ('RahulGandhi', 1146),
 ('GretaThunberg', 1048),
 ('RaviSinghKA', 1019),
 ('rihanna', 986),
 ('UNHumanRights', 962),
 ('meenaharris', 926)]