In [None]:
import pandas as pd
import glob
import awswrangler as wr
import boto3
import gzip
import json
import io
import pandas as pd
from tqdm import tqdm
import time
import os
import json
import pandas as pd
import gc
import time
from concurrent.futures import ProcessPoolExecutor

In [None]:
sts = boto3.Session().client("sts", region_name="us-east-1")
response = sts.assume_role(
    RoleArn="arn:aws:iam::375084544312:role/mimesample_delegate",
    RoleSessionName="mimesamples-access"
)

ACCESS_KEY = response["Credentials"]["AccessKeyId"]
SECRET_KEY = response["Credentials"]["SecretAccessKey"]
SESSION_TOKEN = response["Credentials"]["SessionToken"]

session = boto3.Session(
    aws_access_key_id=ACCESS_KEY,
    aws_secret_access_key=SECRET_KEY,
    aws_session_token=SESSION_TOKEN
)

In [None]:

s3_client = session.client('s3')

BUCKET = 'mime-samples-production'
PREFIX = 'users'

def list_json_files(user_id):
    prefix = f"{PREFIX}/{user_id}/"
    response = s3_client.list_objects_v2(Bucket=BUCKET, Prefix=prefix)
    files = [obj['Key'] for obj in response.get('Contents', []) if obj['Key'].endswith('.json')]
    return files

def download_and_flatten(user_id):
    files = list_json_files(user_id)
    if not files:
        return None
    file_key = files[0]
    obj = s3_client.get_object(Bucket=BUCKET, Key=file_key)
    content = obj['Body'].read().decode('utf-8')
    data = json.loads(content)
    flattened = pd.json_normalize(data)
    flattened['user_id'] = user_id
    del content, data
    gc.collect()
    return flattened

def process_user_ids(user_ids, temp_dir, process_id):
    os.makedirs(temp_dir, exist_ok=True)
    batch_size = 5000
    buffer = []
    current_size = 0
    batch_idx = 0

    for user_id in tqdm(user_ids, desc=f"Process {process_id}"):
        try:
            df = download_and_flatten(user_id)
            if df is not None:
                buffer.append(df)
                current_size += len(df)
        except Exception as e:
            print(f"Error processing {user_id}: {e}")

        if current_size >= batch_size:
            full_df = pd.concat(buffer, ignore_index=True)
            batch_df = full_df.iloc[:batch_size]
            batch_file = os.path.join(temp_dir, f"partial_{process_id}_{batch_idx}.parquet")
            batch_df.to_parquet(batch_file, index=False)
            print(f"Process {process_id}: Wrote partial batch {batch_idx} with {len(batch_df)} records.")
            batch_idx += 1

            remaining_df = full_df.iloc[batch_size:]
            buffer = [remaining_df] if not remaining_df.empty else []
            current_size = len(remaining_df)
            del full_df, batch_df, remaining_df
            gc.collect()

    if buffer:
        full_df = pd.concat(buffer, ignore_index=True)
        batch_file = os.path.join(temp_dir, f"partial_{process_id}_{batch_idx}.parquet")
        full_df.to_parquet(batch_file, index=False)
        print(f"Process {process_id}: Wrote final partial batch {batch_idx} with {len(full_df)} records.")
        del full_df, buffer
        gc.collect()

def merge_and_batch(temp_dir, final_output_dir, batch_size=5000):
    os.makedirs(final_output_dir, exist_ok=True)
    buffer = []
    current_size = 0
    batch_idx = 0
    partial_files = [os.path.join(temp_dir, f) for f in os.listdir(temp_dir) if f.endswith('.parquet')]

    for file in partial_files:
        df = pd.read_parquet(file)
        buffer.append(df)
        current_size += len(df)

        while current_size >= batch_size:
            full_buffer_df = pd.concat(buffer, ignore_index=True)
            batch_df = full_buffer_df.iloc[:batch_size]
            batch_file = os.path.join(final_output_dir, f"batch_{batch_idx}.parquet")
            batch_df.to_parquet(batch_file, index=False)
            print(f"Wrote batch {batch_idx} with {len(batch_df)} records.")
            batch_idx += 1
            remaining_df = full_buffer_df.iloc[batch_size:]
            buffer = [remaining_df] if not remaining_df.empty else []
            current_size = len(remaining_df)
            del full_buffer_df, batch_df, remaining_df
            gc.collect()

    if buffer:
        full_buffer_df = pd.concat(buffer, ignore_index=True)
        if not full_buffer_df.empty:
            batch_file = os.path.join(final_output_dir, f"batch_{batch_idx}.parquet")
            full_buffer_df.to_parquet(batch_file, index=False)
            print(f"Wrote final batch {batch_idx} with {len(full_buffer_df)} records.")
            del full_buffer_df
            gc.collect()

def main():
    with open('pending_list.txt', 'r') as f:
        user_ids = [line.strip() for line in f if line.strip()]
    
    n_jobs = min(32, len(user_ids))
    chunk_size = len(user_ids) // n_jobs
    chunks = [user_ids[i*chunk_size : (i+1)*chunk_size] for i in range(n_jobs)]

    if len(user_ids) % n_jobs != 0:
        chunks[-1].extend(user_ids[n_jobs*chunk_size:])

    temp_dir = 'temp_output'
    final_dir = 'final_output'
    os.makedirs(temp_dir, exist_ok=True)

    with ProcessPoolExecutor(max_workers=n_jobs) as executor:
        futures = []
        for idx, chunk in enumerate(chunks):
            futures.append(executor.submit(process_user_ids, chunk, temp_dir, idx))
        for f in futures:
            f.result()

    merge_and_batch(temp_dir, final_dir, batch_size=5000)

if __name__ == "__main__":
    start_time = time.time()
    main()
    end_time = time.time()
    print("Time taken:", end_time - start_time)
