In [1]:
import boto3
import pandas as pd
import io

s3 = boto3.client('s3')

def calculate_vviq_score(bucket_name, file_key):
    # Download the file from S3
    response = s3.get_object(Bucket=bucket_name, Key=file_key)
    file_content = response['Body'].read()
    
    # Read the Parquet content into a DataFrame
    df = pd.read_parquet(io.BytesIO(file_content))
    
    # Calculate the VVIQ_Score
    score_columns = [f'VVIQ_Q{i}' for i in range(1, 17)]
    df['VVIQ_Score'] = df[score_columns].sum(axis=1)
    
    # Save the modified DataFrame back to Parquet
    scored_file_content = io.BytesIO()
    df.to_parquet(scored_file_content, index=False)
    scored_file_content.seek(0)
    
    # Define the new file name with "_scored" suffix
    new_file_key = file_key.replace('.parquet', '_scored.parquet')
    
    # Upload the new file to S3
    s3.put_object(Bucket=bucket_name, Key=new_file_key, Body=scored_file_content.getvalue())
    
    # Delete the original file from S3
    s3.delete_object(Bucket=bucket_name, Key=file_key)
    
    return f'Successfully processed and deleted {file_key}'


In [2]:
import boto3
import json

s3 = boto3.client('s3')

def filter_and_distribute(bucket_name, max_workers):
    # List all objects in the bucket
    response = s3.list_objects_v2(Bucket=bucket_name)
    files_to_process = [obj['Key'] for obj in response.get('Contents', []) if not obj['Key'].endswith('_scored.parquet')]
    
    # Divide the files into batches
    batches = [files_to_process[i:i + max_workers] for i in range(0, len(files_to_process), max_workers)]
    
    return batches


In [3]:
bucket_name = 'exp-data-parquet'
max_workers = 10

# Filter and distribute tasks
batches = filter_and_distribute(bucket_name, max_workers)

# Process each batch
for batch in batches:
    for file_key in batch:
        result = calculate_vviq_score(bucket_name, file_key)
        print(result)


Successfully processed and deleted 07e9dl4p4m.parquet
Successfully processed and deleted 0ew2g7jfla.parquet
Successfully processed and deleted 0jiwb7eyeq.parquet
Successfully processed and deleted 0qa175qzmn.parquet
Successfully processed and deleted 0tff6mmdws.parquet
Successfully processed and deleted 0yofabs7se.parquet
Successfully processed and deleted 0z710d02cc.parquet
Successfully processed and deleted 18um8nliha.parquet
Successfully processed and deleted 19wspycaqb.parquet
Successfully processed and deleted 1czlbmtpxs.parquet
Successfully processed and deleted 1hscz1e9gu.parquet
Successfully processed and deleted 1i4dgd97m3.parquet
Successfully processed and deleted 2amk5u3cgd.parquet
Successfully processed and deleted 2c1z3zfvrj.parquet
Successfully processed and deleted 2gcy5cm42i.parquet
Successfully processed and deleted 2jth6vvh0d.parquet
Successfully processed and deleted 2syoniq5tq.parquet
Successfully processed and deleted 2tmd1xetkv.parquet
Successfully processed and d

In [5]:
import boto3
import pandas as pd
import os

# Initialize boto3 S3 client
s3 = boto3.client('s3')

# Define buckets and file names
source_bucket = 'exp-data-parquet'
destination_bucket = 'aggregated-exp-data-parquet'
aggregated_file = 'aggregated_exp_data.parquet'

# Define columns to include in the final aggregated file
columns_to_include = [
    'EmailAddress', 'FutureContactConsent', 'Age', 'Sex', 'Ethnicity',
    'Race', 'IdealParticipation', 'AttentionCheckTask', 'VVIQ_Score'
]


In [6]:
def ensure_bucket_exists(bucket_name):
    try:
        s3.head_bucket(Bucket=bucket_name)
        print(f"Bucket '{bucket_name}' already exists.")
    except:
        s3.create_bucket(Bucket=bucket_name)
        print(f"Bucket '{bucket_name}' created.")


In [7]:
def check_file_exists(bucket_name, file_key):
    try:
        s3.head_object(Bucket=bucket_name, Key=file_key)
        print(f"File '{file_key}' already exists in bucket '{bucket_name}'.")
        return True
    except:
        print(f"File '{file_key}' does not exist in bucket '{bucket_name}'.")
        return False

def read_parquet_from_s3(bucket_name, file_key):
    obj = s3.get_object(Bucket=bucket_name, Key=file_key)
    return pd.read_parquet(io.BytesIO(obj['Body'].read()))


In [8]:
def aggregate_and_upload_files():
    ensure_bucket_exists(destination_bucket)
    
    # List objects in the source bucket
    response = s3.list_objects_v2(Bucket=source_bucket)
    files_to_process = [obj['Key'] for obj in response.get('Contents', []) 
                        if obj['Key'].endswith('_scored.parquet') and not obj['Key'].startswith('merged_')]

    aggregated_data = pd.DataFrame(columns=columns_to_include)

    # Process each file
    for file_key in files_to_process:
        df = read_parquet_from_s3(source_bucket, file_key)
        df_filtered = df[columns_to_include]
        aggregated_data = pd.concat([aggregated_data, df_filtered], ignore_index=True)
        
        # Rename and move the processed file
        new_file_key = 'merged_' + file_key
        s3.copy_object(Bucket=source_bucket, CopySource={'Bucket': source_bucket, 'Key': file_key}, Key=new_file_key)
        s3.delete_object(Bucket=source_bucket, Key=file_key)

    # Check if the aggregated file already exists
    if check_file_exists(destination_bucket, aggregated_file):
        existing_data = read_parquet_from_s3(destination_bucket, aggregated_file)
        aggregated_data = pd.concat([existing_data, aggregated_data], ignore_index=True)
    
    # Save the aggregated data to a Parquet file
    aggregated_file_content = io.BytesIO()
    aggregated_data.to_parquet(aggregated_file_content, index=False)
    aggregated_file_content.seek(0)
    
    # Upload the aggregated Parquet file to the destination bucket
    s3.put_object(Bucket=destination_bucket, Key=aggregated_file, Body=aggregated_file_content.getvalue())
    print(f"Aggregated data uploaded to '{destination_bucket}/{aggregated_file}'.")

aggregate_and_upload_files()

Bucket 'aggregated-exp-data-parquet' created.
File 'aggregated_exp_data.parquet' does not exist in bucket 'aggregated-exp-data-parquet'.
Aggregated data uploaded to 'aggregated-exp-data-parquet/aggregated_exp_data.parquet'.


In [9]:
# Verify the aggregated file
if check_file_exists(destination_bucket, aggregated_file):
    print(f"The file '{aggregated_file}' was successfully created or updated in bucket '{destination_bucket}'.")
else:
    print(f"Failed to create or update the file '{aggregated_file}' in bucket '{destination_bucket}'.")


File 'aggregated_exp_data.parquet' already exists in bucket 'aggregated-exp-data-parquet'.
The file 'aggregated_exp_data.parquet' was successfully created or updated in bucket 'aggregated-exp-data-parquet'.
