In [1]:
%reload_ext autoreload
%autoreload 2

In [2]:
import logging

from mleko.utils.custom_logger import CustomLogger
from mleko.data.converters import CsvToArrowConverter
from mleko.data.sources import S3DataSource
from mleko.pipeline.pipeline import Pipeline
from mleko.pipeline.steps.convert import ConvertStep
from mleko.pipeline.steps.ingest import IngestStep


CustomLogger.set_global_log_level(logging.DEBUG)

In [3]:
DATASET_NAME = "00_uk_plip_merchant"
S3_KEY_PREFIX = f"access-purpose/fraud_modelling_eu/fraud-strategy-uk-au-nz/user_erik_bavenstrand/{DATASET_NAME}"
S3_BUCKET_NAME = "eu-production-klarna-data-redshift-unload-eu"
AWS_PROFILE_NAME = "iam-sync/lakehouse-redshift/lakehouse-redshift.IdP_fraud_modelling_eu@922587933573"

In [4]:
s3DataSource = S3DataSource(
    destination_dir=f"data/{DATASET_NAME}/raw",
    s3_bucket_name=S3_BUCKET_NAME,
    s3_key_prefix=S3_KEY_PREFIX,
    aws_profile_name=AWS_PROFILE_NAME,
    num_workers=64,
)

csvToArrowConverter = CsvToArrowConverter(output_directory=f"data/{DATASET_NAME}/converted", downcast_float=True)

In [5]:
pipeline = Pipeline(steps=[
    IngestStep(s3DataSource),
    ConvertStep(csvToArrowConverter)
])
pipeline

Pipeline:
  1. IngestStep(data_source=S3DataSource(aws_profile_name='iam-sync/lakehouse-redshift/lakehouse-redshift.IdP_fraud_modelling_eu@922587933573', aws_region_name='eu-west-1', num_workers=64, manifest_file_name='manifest', check_s3_timestamps=True, destination_dir='data/00_uk_plip_merchant/raw', s3_bucket_name='eu-production-klarna-data-redshift-unload-eu', s3_key_prefix='access-purpose/fraud_modelling_eu/fraud-strategy-uk-au-nz/user_erik_bavenstrand/00_uk_plip_merchant'))
  2. ConvertStep(converter=CsvToArrowConverter(forced_datetime_columns=(), forced_numerical_columns=(), forced_categorical_columns=(), forced_boolean_columns=(), drop_columns=(), na_values=('-9998', '-9998.0', '-9999', '-9999.0', '-99', '-99.0', 'nan', 'none', 'non', 'Nan', 'None', 'Non', '', 'N/A', 'N/a', 'unknown', 'missing'), true_values=('t', 'True', 'true', '1'), false_values=('f', 'False', 'false', '0'), downcast_float=True, random_state=1337, workers=12, max_cache_entries=1, output_directory='data/00_uk

In [6]:
df = pipeline.run().data

[2023-05-02 20:00:04] [[1;32mINFO[0m] Local dataset is up to date with S3 bucket contents, skipping download. [1m(sources.py:130)[0m
[2023-05-02 20:00:21] [[1;32mINFO[0m] [32mCache Hit[0m (LRUCache) CsvToArrowConverter.convert: Using cached output. [1m(cache.py:81)[0m
