In [None]:
# fill an s3 bucket with processed historical data while this notebook runs

In [47]:
#TODO: EDA on getads

In [2]:
import boto3
import pandas as pd
import numpy as np
from io import BytesIO
from io import StringIO
import gc
from datetime import datetime
import farmhash
from sklearn.preprocessing import OneHotEncoder
import gzip
import uuid

In [3]:
def get_label_from_prefix(prefix):
    if 'notify' in prefix:
        return 'Notification'
    elif 'adreturned' in prefix:
        return 'Response'
    elif 'non_impressed_requests' in prefix or 'getads' in prefix:
        return 'Request'
    else:
        return 'Unknown'
            
os_categories = ['Apple', 'Windows', 'Google', 'Linux', 'TV', 'Other']
publisher_categories = ['appnexus', 'google', 'openx', 'rubicon', 'Other']

def preprocess_os(os):
    if os != os: #NaN satisfies this property
        return os_categories[-1]
    if os in ['iOS', 'iPadOS', 'Apple iOS', 'Apple Mac', 'Macintosh', 'OS X', 'MacOS', 'X11']:
        return os_categories[0]
    elif 'Windows' in os :
        return os_categories[1]
    elif os in ['Chrome OS']:
        return os_categories[2]
    elif os in ['Linux']:
        return os_categories[3]
    
    elif os in ['LG proprietary', 'webOS', 'Tizen', 'tvOS', 'Roku OS']:
        return os_categories[4]
    else:
        return os_categories[-1]
    
    
def preprocess_publisher(publisher):
    if publisher in publisher_categories:
        return publisher
    else:
        return publisher_categories[-1] 
    
# create one hot encoders
os_encoder = OneHotEncoder(categories=[os_categories], sparse_output=False)
publisher_encoder = OneHotEncoder(categories=[publisher_categories], sparse_output=False)

os_encoder.fit(np.array(os_categories).reshape(-1, 1))
publisher_encoder.fit(np.array(publisher_categories).reshape(-1, 1))

def process_row(row, sheet_name):

    if sheet_name == 'Request':
        row = row[['timestamp','LatUsed','LongUsed','PublisherName','BundleId','os','bidfloor','w','h']]
        
    elif sheet_name == 'Notification':
        row['w'], row['h'] = map(int, row['CreativeSize'].split('x'))
        row = row[['timestamp','LatUsed','LongUsed','PublisherName','bundleid','OS','BidFloor','w','h']]
    
    # parsing the datetime string
    date_object = datetime.strptime(row[0], "%Y-%m-%d %H:%M:%S")
    
    # one-hot encoding os and publisher_name
    os_one_hot = os_encoder.transform([[preprocess_os(row[5])]]).tolist()[0]
    publisher_one_hot = publisher_encoder.transform([[preprocess_publisher(row[3])]]).tolist()[0]

    # Continuous values
    continuous_input_val = [
        date_object.weekday(),  # day of the week
        date_object.month,  # month
        date_object.hour,  # hour
        row[1],  # userLong
        row[2],  # userLat
        row[6],  # bidfloor
        row[7],   # creativeWidth
        row[8]   # creativeHeight
    ]
    
    # One-hot encoded values
    ohe_input_val = os_one_hot + publisher_one_hot
    
    # Hashed website name
    if not pd.isna(row[4]):
        embedding_input_val = [farmhash.hash64(row[4]) % 10000]
    else: 
        embedding_input_val = row[4]
    
    return continuous_input_val, ohe_input_val, embedding_input_val

def process_file(bucket, obj, prefix):
    """Process a single CSV file in an S3 bucket and return processed data."""
    processed_rows = []  # List to accumulate processed data
    
    sheet_name = get_label_from_prefix(prefix)
    log_format_df = pd.read_excel('LogFormat 1.8.xlsx', sheet_name) #Always make sure this is updated!
    gzipped_binary_data = obj.get()['Body'].read()
    decompressed_binary_data = gzip.decompress(gzipped_binary_data)
    text_data = StringIO(decompressed_binary_data.decode('ISO-8859-1'))
    data = pd.read_csv(text_data, header=None, names=log_format_df['Field Name'], low_memory=False)
    
    for index, row in data.iterrows():        
        processed_row = process_row(row, sheet_name)
        processed_rows.append(processed_row)  # Add processed row to the list
    
    lst = [data]
    del data
    del lst
    gc.collect()
    
    return processed_rows

def get_processed_keys_from_s3_ledger(bucket_name, ledger_key):
    """Read the processed keys from the S3 ledger file."""
    s3 = boto3.resource('s3')
    try:
        ledger_data = s3.Object(bucket_name, ledger_key).get()['Body'].read().decode('utf-8')
        return set(line.strip() for line in ledger_data.splitlines())
    except s3.meta.client.exceptions.NoSuchKey:
        # Ledger file doesn't exist yet
        return set()

def append_key_to_s3_ledger(bucket_name, ledger_key, key):
    """Append a processed key to the S3 ledger."""
    current_keys = get_processed_keys_from_s3_ledger(bucket_name, ledger_key)
    current_keys.add(key)
    
    # Now, write the updated set of keys back to S3
    s3 = boto3.resource('s3')
    s3.Object(bucket_name, ledger_key).put(Body="\n".join(current_keys))
    
def process_and_transfer_data(source_bucket_name, destination_bucket_name, ledger_key, prefixes, processing_function):
    """
    Processes data from source_bucket using the provided processing function 
    and saves the processed data to the destination_bucket.
    
    Args:
    - source_bucket_name (str): Name of the source S3 bucket.
    - destination_bucket_name (str): Name of the destination S3 bucket.
    - prefixes (list): List of prefixes to process in the source bucket.
    - processing_function (callable): Function to process the data.
    """
    
    s3 = boto3.resource('s3')
    source_bucket = s3.Bucket(source_bucket_name)
    
    # Get the set of processed keys from the S3 ledger
    processed_keys = get_processed_keys_from_s3_ledger(destination_bucket_name, ledger_key)
    
    for prefix in prefixes:
        for obj in source_bucket.objects.filter(Prefix=prefix):
            if obj.key not in processed_keys:
                # Process the data
                processed_data_list = processing_function(source_bucket, obj, prefix)
                
                data = {
                    'continuous': [item[0] for item in processed_data_list],
                    'one-hot-encoded': [item[1] for item in processed_data_list],
                    'embedding': [item[2] for item in processed_data_list]
                }

                # Convert the list of processed data to DataFrame
                df = pd.DataFrame(data)
                
                # Convert the processed data DataFrame to Parquet format
                parquet_buffer = BytesIO()
                df.to_parquet(parquet_buffer)

                # Clean up DataFrame to free memory
                lst = [df]
                del df
                del lst
                gc.collect()
                
                # Define the destination key
                parts = obj.key.split('/')
                event_type, year, month, day = parts[1], parts[2], parts[3], parts[4]
                new_key = f'{event_type}/{year}/{month}/{day}/{str(uuid.uuid4())}.parquet'  # Adding .parquet extension

                # Upload the Parquet data to the destination bucket
                destination_bucket = s3.Bucket(destination_bucket_name)
                destination_bucket.put_object(Key=new_key, Body=parquet_buffer.getvalue())

                # Append this object key to the S3 ledger
                append_key_to_s3_ledger(destination_bucket_name, ledger_key, obj.key)
            else:
                print(f"Object {obj.key} has already been processed. Skipping.")

In [None]:
prefixes = ['reporting_logs/notify/2023/08/28','reporting_logs/getads/2023/08/28']
process_and_transfer_data('cdigital-logs', 'chalk-sm-processing-data', 'ledger/processed_files.txt', prefixes, process_file)