In [5]:
import pandas as pd
import ujson
from typing import List, Tuple
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor

# Función para analizar un bloque de líneas del archivo JSON
def parse_json_lines(lines: List[str]) -> List[Tuple[str, str]]:
    result = []
    for line in lines:
        try:
            # Intenta cargar el JSON de la línea
            item = ujson.loads(line.strip())
            # Verifica que los campos necesarios estén presentes antes de agregar
            if 'date' in item and 'user' in item and 'id' in item:
                result.append((item['date'], item['user']['username']))
        except ValueError:
            # Maneja errores de decodificación JSON
            continue
    return result

# Función para paralelizar la lectura y procesamiento del archivo
def q1_time(file_path: str) -> List[Tuple[datetime.date, str]]:
    # Lee el archivo en memoria
    with open(file_path, 'r') as json_file:
        lines = json_file.readlines()

    # Divide las líneas en bloques para procesamiento paralelo
    num_threads = 16  # Ajustar basado en la cantidad de núcleos de CPU
    chunk_size = len(lines) // num_threads
    
    # Usa ThreadPoolExecutor para paralelizar el análisis del JSON
    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        # Envía tareas para procesar cada bloque de líneas en paralelo
        futures = [executor.submit(parse_json_lines, lines[i:i + chunk_size]) for i in range(0, len(lines), chunk_size)]
    
    # Recoge los resultados de todos los hilos
    data = []
    for future in futures:
        data.extend(future.result())

    # Convierte la lista de tuplas en un DataFrame
    df = pd.DataFrame(data, columns=['date', 'user'])
    
    # Convierte la columna 'date' a formato datetime.date
    df['date'] = pd.to_datetime(df['date']).dt.date

    # Obtiene las 10 fechas con más tweets
    tweet_counts = df['date'].value_counts().nlargest(10)
    top_10_dates = tweet_counts.index

    # Filtra el DataFrame para incluir solo las fechas top 10
    df_top_10 = df[df['date'].isin(top_10_dates)]

    # Encuentra el usuario con más tweets para cada una de las fechas top 10
    top_users = df_top_10.groupby('date')['user'].agg(lambda x: x.value_counts().idxmax())

    # Convierte el resultado a una lista de tuplas (fecha, usuario)
    result = [(date, user) for date, user in zip(top_10_dates, top_users)]
    
    return result

# Ruta del archivo JSON a procesar
file_path = '/Users/juanignaciomagarinoscastro/Downloads/farmers-protest-tweets-2021-2-4.json'
q1_time(file_path)


[(datetime.date(2021, 2, 12), 'RanbirS00614606'),
 (datetime.date(2021, 2, 13), 'MaanDee08215437'),
 (datetime.date(2021, 2, 17), 'rebelpacifist'),
 (datetime.date(2021, 2, 16), 'jot__b'),
 (datetime.date(2021, 2, 14), 'jot__b'),
 (datetime.date(2021, 2, 18), 'RaaJVinderkaur'),
 (datetime.date(2021, 2, 15), 'neetuanjle_nitu'),
 (datetime.date(2021, 2, 20), 'Preetm91'),
 (datetime.date(2021, 2, 23), 'MangalJ23056160'),
 (datetime.date(2021, 2, 19), 'Surrypuria')]

In [11]:
import pandas as pd
import ujson
import psutil
import os
from typing import List, Tuple
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor

# Función para obtener el uso actual de memoria en MB
def memory_usage():
    process = psutil.Process(os.getpid())
    mem_info = process.memory_info()
    return mem_info.rss / 1024 / 1024  # Convertir a MB

# Función para analizar un bloque de líneas del archivo JSON
def parse_json_lines(lines: List[str]) -> List[Tuple[str, str]]:
    result = []
    for line in lines:
        try:
            # Intenta cargar el JSON de la línea
            item = ujson.loads(line.strip())
            # Verifica que los campos necesarios estén presentes antes de agregar
            if 'date' in item and 'user' in item and 'id' in item:
                result.append((item['date'], item['user']['username']))
        except ValueError:
            # Maneja errores de decodificación JSON
            continue
    return result

# Función para paralelizar la lectura y procesamiento del archivo
def q1_time(file_path: str) -> List[Tuple[datetime.date, str]]:
    # Imprime el uso de memoria antes de cargar los datos
    print(f"Memory usage before loading data: {memory_usage()} MB")
    
    # Lee el archivo en memoria
    with open(file_path, 'r') as json_file:
        lines = json_file.readlines()

    # Imprime el uso de memoria después de leer el JSON
    print(f"Memory usage after reading JSON: {memory_usage()} MB")
    
    # Divide las líneas en bloques para procesamiento paralelo
    num_threads = 24  # Ajustar basado en la cantidad de núcleos de CPU
    chunk_size = len(lines) // num_threads
    
    # Usa ThreadPoolExecutor para paralelizar el análisis del JSON
    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        # Envía tareas para procesar cada bloque de líneas en paralelo
        futures = [executor.submit(parse_json_lines, lines[i:i + chunk_size]) for i in range(0, len(lines), chunk_size)]
    
    # Recoge los resultados de todos los hilos
    data = []
    for future in futures:
        data.extend(future.result())

    # Imprime el uso de memoria después de analizar el JSON con hilos
    print(f"Memory usage after parsing JSON with threads: {memory_usage()} MB")
    
    # Convierte la lista de tuplas en un DataFrame
    df = pd.DataFrame(data, columns=['date', 'user'])
    
    # Convierte la columna 'date' a formato datetime.date
    df['date'] = pd.to_datetime(df['date']).dt.date

    # Imprime el uso de memoria después de crear el DataFrame
    print(f"Memory usage after creating DataFrame: {memory_usage()} MB")
    
    # Obtiene las 10 fechas con más tweets
    tweet_counts = df['date'].value_counts().nlargest(10)
    top_10_dates = tweet_counts.index

    # Filtra el DataFrame para incluir solo las fechas top 10
    df_top_10 = df[df['date'].isin(top_10_dates)]

    # Imprime el uso de memoria después de filtrar las fechas top 10
    print(f"Memory usage after filtering top 10 dates: {memory_usage()} MB")
    
    # Encuentra el usuario con más tweets para cada una de las fechas top 10
    top_users = df_top_10.groupby('date')['user'].agg(lambda x: x.value_counts().idxmax())

    # Convierte el resultado a una lista de tuplas (fecha, usuario)
    result = [(date, user) for date, user in zip(top_10_dates, top_users)]
    
    # Imprime el uso de memoria después de finalizar los resultados
    print(f"Memory usage after finalizing results: {memory_usage()} MB")

    return result

# Short file
#file_path='/Users/juanignaciomagarinoscastro/Downloads/farmers-protest-tweets-2021-2-4.json'
# Long file
file_path = '/Users/juanignaciomagarinoscastro/Downloads/farmers-protest-tweets-2021-2-4-large.json'
q1_time(file_path)

Memory usage before loading data: 39.40625 MB
Memory usage after reading JSON: 1021.34375 MB
Memory usage after parsing JSON with threads: 2976.9375 MB
Memory usage after creating DataFrame: 4573.171875 MB
Memory usage after filtering top 10 dates: 4625.453125 MB
Memory usage after finalizing results: 4653.25 MB


[(datetime.date(2021, 2, 12), 'RanbirS00614606'),
 (datetime.date(2021, 2, 13), 'MaanDee08215437'),
 (datetime.date(2021, 2, 17), 'rebelpacifist'),
 (datetime.date(2021, 2, 16), 'jot__b'),
 (datetime.date(2021, 2, 14), 'jot__b'),
 (datetime.date(2021, 2, 18), 'RaaJVinderkaur'),
 (datetime.date(2021, 2, 15), 'neetuanjle_nitu'),
 (datetime.date(2021, 2, 20), 'Preetm91'),
 (datetime.date(2021, 2, 23), 'MangalJ23056160'),
 (datetime.date(2021, 2, 19), 'Surrypuria')]