En este archivo puedes escribir lo que estimes conveniente. Te recomendamos detallar tu solución y todas las suposiciones que estás considerando. Aquí puedes ejecutar las funciones que definiste en los otros archivos de la carpeta src, medir el tiempo, memoria, etc.

# Q1 Memory

Estratégia: Delegar el procesamiento de los datos a un microservicio de cloud run de (GCP) asi mismo el almacenamiento del archivo se hace mediante un bucket de GCP. Esto permite enviar unicamente el request con los datos del archivo a procesar y el contenedor de lo aloja.

Servicios cloud utilizados (GCP): Cloud Run, Cloud Storage y container registry. 

La lógica del microservicio se encuentra en el directorio ./Cloud Run/Q1_memory_cloud_run con el uso de flask, la implementación algorithm de map-reduce y el archivo dockerfile para la creación de la imagen.

Mejoras: La implementación se puede hacer con un cluster de dataproc para el procesamiento de los datos, esto permitiría el uso de spark para el procesamiento de los datos y el uso de un bucket de GCP para el almacenamiento de los archivos.

In [None]:
import requests
from datetime import datetime
from typing import List, Tuple
 

def q1_memory(file_path: str) -> List[Tuple[datetime.date, str]]:
    url = 'https://q1memory-qmij3rko2a-ue.a.run.app/q1_memory'
    params = {
        "bucket_name": "lat_optimization_challenge",
        "blob_name": "data/q1_memory_farmers-protest-tweets-2021-2-4.json"
    }

    response = requests.post(url, json=params)

    if response.status_code == 200:
        results_raw = response.json()
        results = [(datetime.strptime(date_str, '%Y-%m-%d').date(), username) for date_str, username in results_raw]
        return results
    else:
        print(f"Error: {response.status_code}")
        return []

# Q2 Memory

Estratégia: Delegar el procesamiento de los datos a un microservicio de cloud run de (GCP) asi mismo el almacenamiento del archivo se hace mediante un bucket de GCP. Esto permite enviar unicamente el request con los datos del archivo a procesar y el contenedor de lo aloja.

Servicios cloud utilizados (GCP): Cloud Run, Cloud Storage y container registry.

La lógica del microservicio se encuentra en el directorio: ./Cloud Run/Q2_memory_cloud_run con el uso de flask, la implementación algorithm de map-reduce y el archivo dockerfile para la creación de la imagen.

Mejoras: La implementación se puede hacer con un cluster de dataproc para el procesamiento de los datos, esto permitiría el uso de spark para el procesamiento de los datos y el uso de un bucket de GCP para el almacenamiento de los archivos.

In [None]:
import requests
from datetime import datetime
from typing import List, Tuple

def q2_memory(file_path: str) -> List[Tuple[str, int]]:
    url = 'https://q2memory-qmij3rko2a-ue.a.run.app/q2_memory'
    params = {
        "bucket_name": "lat_optimization_challenge",
        "blob_name": "data/q2_memory_farmers-protest-tweets-2021-2-4.json"
    }

    response = requests.post(url, json=params)
    data = response.json()

    result = [(entry["emoji"], entry["count"]) for entry in data]

    return result

# Q3 Memory

Estratégia: Delegar el procesamiento de los datos a un microservicio de cloud run de (GCP) asi mismo el almacenamiento del archivo se hace mediante un bucket de GCP. Esto permite enviar unicamente el request con los datos del archivo a procesar y el contenedor de lo aloja.

Servicios cloud utilizados (GCP): Cloud Run, Cloud Storage y container registry.

La lógica del microservicio se encuentra en el directorio ./Cloud Run/Q3_memory_cloud_run con el uso de flask, la implementación algorithm de map-reduce y el archivo dockerfile para la creación de la imagen.

Mejoras: La implementación se puede hacer con un cluster de dataproc para el procesamiento de los datos, esto permitiría el uso de spark para el procesamiento de los datos y el uso de un bucket de GCP para el almacenamiento de los archivos.

In [None]:
import requests
from datetime import datetime
from typing import List, Tuple

def q3_memory(file_path: str) -> List[Tuple[str, int]]:
    url = 'https://q3memory-qmij3rko2a-ue.a.run.app/q3_time'
    params = {
        "bucket_name": "lat_optimization_challenge",
        "blob_name": "data/q3_memory_farmers-protest-tweets-2021-2-4.json"
    }

    response = requests.post(url, json=params)
    data = response.json()

    # Modificar la estructura de salida para que coincida con el formato requerido
    result = [(entry[0], entry[1]) for entry in data]

    return result

# Q1 Time

Estratégia: Implementar un algoritmo de map-reduce para el procesamiento de los datos, el cual se encarga de leer el archivo y procesar los datos para obtener el top 10 de usuarios que más tweets han realizado

Servicios cloud utilizados (GCP): Ninguno

Mejoras: Manejo de librerias como apache spark para el procesamiento de los datos, esto permitiría el uso de spark para el procesamiento de los datos y el uso de un bucket de GCP para el almacenamiento de los archivos.

In [None]:
import json
from collections import defaultdict, Counter
from datetime import datetime
from typing import List, Tuple, Optional


def map_function(line: str) -> Optional[Tuple[datetime.date, str]]:
    try:
        tweet = json.loads(line)
        fecha = datetime.strptime(tweet['date'][:10], '%Y-%m-%d').date()
        usuario = tweet['user']['username']
        return fecha, usuario
    except json.JSONDecodeError as e:
        print(f"Error decodificando JSON: {e}")
        return None

def reduce_function(mapped_values: List[Tuple[datetime.date, str]]) -> List[Tuple[datetime.date, str]]:
    tweets_por_fecha = defaultdict(list)
    for value in mapped_values:
        if value is not None: 
            fecha, usuario = value
            tweets_por_fecha[fecha].append(usuario)

    usuarios_top_por_fecha = []
    for fecha, usuarios in tweets_por_fecha.items():
        conteo_usuarios = Counter(usuarios)
        usuario_top = conteo_usuarios.most_common(1)[0][0]
        usuarios_top_por_fecha.append((fecha, usuario_top))

    return sorted(usuarios_top_por_fecha, key=lambda x: len(tweets_por_fecha[x[0]]), reverse=True)[:10]

def q1_time(file_path: str) -> List[Tuple[datetime.date, str]]:
    mapped_values = []
    with open(file_path, 'r', encoding='utf-8') as file:
        for line in file:
            result = map_function(line)
            if result is not None:
                mapped_values.append(result)

    top_fechas = reduce_function(mapped_values)
    return top_fechas

# Q2 Time

Estratégia: Implementar un algoritmo de map-reduce para el procesamiento de los datos, el cual se encarga de leer el archivo y procesar los datos para obtener el top 10 de emojis más utilizados en los tweets

Servicios cloud utilizados (GCP): Ninguno

Mejoras: Manejo de librerias como apache spark para el procesamiento de los datos, esto permitiría el uso de spark para el procesamiento de los datos y el uso de un bucket de GCP para el almacenamiento de los archivos.

In [None]:
import emoji
import json
from collections import Counter
from typing import List, Tuple

# Función Map: Procesa cada línea (tweet) para encontrar emojis
def map_function(line):
    try:
        tweet = json.loads(line)
        emojis_found = emoji.emoji_list(tweet['content'])
        return [item['emoji'] for item in emojis_found]
    except json.JSONDecodeError:
        return []

# Función Shuffle: Organiza los emojis encontrados para su procesamiento en el Reduce
def shuffle_and_sort(mapped_values):
    all_emojis = []
    for emoji_list in mapped_values:
        all_emojis.extend(emoji_list)
    return Counter(all_emojis)

# Función Reduce: Agrega los conteos de cada emoji
def reduce_function(shuffled_data):
    return shuffled_data.most_common(10)

def q2_time(file_path: str) -> List[Tuple[str, int]]:
    mapped_values = []

    with open(file_path, 'r', encoding='utf-8') as file:
        for line in file:
            mapped_values.extend(map_function(line))

    shuffled_data = shuffle_and_sort(mapped_values)
    reduced_data = reduce_function(shuffled_data)

    return reduced_data

# Q3 Time

Estratégia: Implementar un algoritmo de map-reduce para el procesamiento de los datos, el cual se encarga de leer el archivo y procesar los datos para obtener el top 10 de usuarios más mencionados en los tweets

Servicios cloud utilizados (GCP): Ninguno

Mejoras: Manejo de librerias como apache spark para el procesamiento de los datos, esto permitiría el uso de spark para el procesamiento de los datos y el uso de un bucket de GCP para el almacenamiento de los archivos.

In [None]:
import json
import re
from collections import Counter
from typing import List, Tuple

def map_mentions(file_path: str) -> List[Tuple[str, int]]:
    user_mentions_counter = Counter()

    with open(file_path, 'r', encoding='utf-8') as file:
        for linea in file:
            try:
                tweet = json.loads(linea)
                user_mentions = re.findall(r'@(\w+)', tweet['content'])
                user_mentions_counter.update(user_mentions)
            except json.JSONDecodeError as e:
                print(f"Error decodificando JSON: {e}")

    return user_mentions_counter.items()

def reduce_mentions(mapped_mentions: List[Tuple[str, int]]) -> List[Tuple[str, int]]:
    # Reducir los recuentos sumando los valores para cada usuario
    user_mentions_counter = Counter()
    for user, count in mapped_mentions:
        user_mentions_counter[user] += count

    # Obtener los top 10 usuarios más mencionados
    top_user_mentions = user_mentions_counter.most_common(10)

    return top_user_mentions

def q3_time(file_path: str) -> List[Tuple[str, int]]:
    mapped_mentions = map_mentions(file_path)
    top_user_mentions = reduce_mentions(mapped_mentions)

    # Adaptar la estructura de salida
    adapted_output = [(user, count) for user, count in top_user_mentions]

    return adapted_output

