In [1]:
import pandas as pd
import os
from multiprocessing import  Pool
import multiprocessing
import datetime

from functools import partial
import numpy as np

DATA_FOLDER = "data"
INPUT_DATASET = "batchdf"
OUTPUT_DATASET = "dataset"
SIZES = ['-0.01', '-0.1', '']
SELECTED_SIZE = 0

In [2]:
measure_groups = {     'South America': {         'providers': {             'AWS': ['sa-east-1'],             'AZURE': ['brazilsouth'],             'GCP': ['southamerica-east1']         },         'timezone': 'America/Sao_Paulo'     },     'Canada': {         'providers': {             'AWS': ['ca-central-1'],             'AZURE': [],             'GCP': ['northamerica-northeast1']         },         'timezone': 'America/Montreal'     },     'East US': {         'providers': {             'AWS': ['us-east-1'],             'AZURE': ['eastus'],             'GCP': ['us-east4']         },         'timezone': 'America/New_York'     },     'West US': {         'providers': {             'AWS': ['us-west-1', 'us-west-2'],             'AZURE': ['westus', 'westus2'],             'GCP': ['us-west2', 'us-west4']         },         'timezone': 'America/Los_Angeles'     },     'United Kingdom': {         'providers': {             'AWS': ['eu-west-2'],             'AZURE': ['uksouth'],             'GCP': ['europe-west2']         },         'timezone': 'Europe/London'     },     'Germany': {         'providers': {             'AWS': ['eu-central-1'],             'AZURE': ['germanywestcentral'],             'GCP': ['europe-west3']         },         'timezone': 'Europe/Berlin'     },     'India': {         'providers': {             'AWS': ['ap-south-1'],             'AZURE': ['centralindia'],             'GCP': ['asia-south1']         },         'timezone': 'Asia/Kolkata'     },     'Japan': {         'providers': {             'AWS': ['ap-northeast-1'],             'AZURE': ['japaneast'],             'GCP': ['asia-northeast1']         },         'timezone': 'Asia/Tokyo'     },     'Australia': {         'providers': {             'AWS': ['ap-southeast-2'],             'AZURE': ['australiaeast'],             'GCP': ['australia-southeast1']         },         'timezone': 'Australia/Sydney'     } }

In [3]:
def DATASET_PATH(DATASET_NAME):
    full_filename = f"{DATASET_NAME}{SIZES[SELECTED_SIZE]}.parquet"
    return os.path.join(DATA_FOLDER, full_filename)

In [4]:
dataset = pd.read_parquet(DATASET_PATH(INPUT_DATASET)).convert_dtypes()

In [5]:
# Turn dataset into datetime:
dataset['driver_invocation'] = pd.to_datetime(
    dataset['driver_invocation'], format='%Y%m%dT%H%M%S%f')
dataset['workload_invocation'] = pd.to_datetime(
    dataset['workload_invocation'], format='%Y%m%dT%H%M%S%f')

In [6]:
# Cut Off Data
data_from = datetime.datetime(year=2021,month=8,day=1)
data_to = datetime.datetime(year=2021,month=10,day=1)

dataset = dataset[dataset['driver_invocation'] >= data_from]
dataset = dataset[dataset['driver_invocation'] < data_to]

In [7]:
# Preprocessing based on utc:
dataset['dow_utc'] = dataset['driver_invocation'].dt.day_name()
dataset['tod_utc'] = dataset['driver_invocation'].dt.strftime('%H%M')

In [8]:
# Localize Timezones for driver invocation
for mg in measure_groups:
    regions = []
    for provider in measure_groups[mg]['providers']:
        regions.extend(measure_groups[mg]['providers'][provider])

    dataset.loc[(dataset['region'].isin(regions)),
                'timezone'] = measure_groups[mg]['timezone']
    dataset.loc[(dataset['region'].isin(regions)), 'measure group'] = mg

In [9]:
def get_local_dow_of_the_week(df):
    ts = df['driver_invocation']
    tz = df['timezone']
    return  ts.tz_localize('utc').tz_convert(tz).day_name()

def get_local_tod_of_the_week(df):
    ts = df['driver_invocation']
    tz = df['timezone']
    return ts.tz_localize('utc').tz_convert(tz).strftime('%H%M')

def parallelize(data, func, num_of_processes=8):
    data_split = np.array_split(data, num_of_processes)
    pool = Pool(num_of_processes)
    data = pd.concat(pool.map(func, data_split))
    pool.close()
    pool.join()
    return data

def run_on_subset(func, data_subset):
    return data_subset.apply(func, axis=1)

def parallelize_on_rows(data, func):
    return parallelize(data, partial(run_on_subset, func), multiprocessing.cpu_count())

In [10]:
dataset['local_dow'] = parallelize_on_rows(dataset[['driver_invocation', 'timezone']], get_local_dow_of_the_week) 
dataset['local_tod'] = parallelize_on_rows(dataset[['driver_invocation', 'timezone']], get_local_tod_of_the_week) 

In [11]:
dataset[['local_dow', 'local_tod']] = dataset[['local_dow', 'local_tod']].astype('category')

In [12]:
dataset = dataset.convert_dtypes()

In [13]:
custom_dtypes = {
    'SAAFMemoryDeltaError': 'category',
    'SAAFMemoryError': 'category',
    'vendorId': 'category',
    'platform': 'category',
    'payload': 'category',
    'functionRegion': 'category',
    'linuxVersion': 'category',
    'lang': 'category',
    'functionName': 'category',
    'cpuType': 'category',
    'provider': 'category',
    'timezone': 'category',
    'region': 'category',
    'dow_utc': 'category',
    'tod_utc': 'category',
    'measure group': 'category',
    'cpuModel': 'category',
    '2_thread_id': 'int64',
    '1_run_id': 'int64',
    'newcontainer': 'bool',
    'platform': 'category',
    'version': 'category',
    'containerID': 'category',
}

In [14]:
for column, dtype in custom_dtypes.items():
    dataset[column] = dataset[column].astype(dtype)

In [15]:
# for col in dataset.columns:
#     print(f"--- {col} ---")
#     print(dataset[col].dtypes)
#     print(dataset[col].describe())
#     print(dataset[col].memory_usage())

In [16]:
# Drop Columns
drop_cols = ['message', 'payload', 'platform', 'lang', 'cpuNiceDelta', 'cpuIrq', 'version', 'vendorId', 'functionRegion', ]
dataset = dataset.drop(drop_cols, axis = 1)

In [17]:
dataset.sort_values(['driver_invocation', 'provider', 'region', 'workload_invocation', '1_run_id'], inplace=True, ignore_index=True)
dataset.memory_usage().sum()

187645892

In [18]:
dataset.to_parquet(DATASET_PATH(OUTPUT_DATASET), index=False)