# Loading And Storing Data From And Into S3 With And Without Dask

This test generates a big random data, uploading it to S3 and then processing it with and without Dask. Later it will verify that:
* The data was handled properly and results were equal.
* The stored dataset artifact in S3 is loadable and equal.

## General Configurations

In [1]:
import os
import shutil
import sys

sys.path.append(os.path.dirname(os.path.abspath("../")))

from utils import S3Client

# AWS Credentials:
AWS_ACCESS_KEY_ID = os.environ.get("AWS_ACCESS_KEY_ID", "AKIA5VKOTKSJTOFDQYPY")
AWS_SECRET_ACCESS_KEY = os.environ.get("AWS_SECRET_ACCESS_KEY", "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")
assert AWS_ACCESS_KEY_ID != "" and AWS_SECRET_ACCESS_KEY != "" 
os.environ["AWS_ACCESS_KEY_ID"] = AWS_ACCESS_KEY_ID
os.environ["AWS_SECRET_ACCESS_KEY"] = AWS_SECRET_ACCESS_KEY

# Path to store the generated data:
LOCAL_DATA_PATH = "./data"
S3_BUCKET = os.environ.get("S3_BUCKET", "testbucket-igz-temp")
S3_PROJECT_DIRECTORY = "test-dask-s3"
S3_DATA_PATH = os.path.join(S3_PROJECT_DIRECTORY, "data")

# Number of samples of generated data (number of rows in the data table):
N_SAMPLES = 2_000_000

# Number of features of the generated data (number of columns in the data table):
N_FEATURES = 100

# The amount of parquet partitions to have of the generated data:
N_PARTITIONS = 20

## 1. Generate Data:

1. Generate random data.
2. Turn the data into a `pandas.DataFrame` naming the columns `features_{i}` and adding the partioting column (year).

In [2]:
import numpy as np
import pandas as pd


def generate_data(
    output_path: str,
    n_samples: int, 
    n_features: int, 
    n_partitions: int,
):
    # Generate data:
    data = np.random.random(size=(n_samples, n_features))
    
    # Create a dataframe:
    data = pd.DataFrame(
        data=data, 
        columns=[f"feature_{i}" for i in range(n_features)]
    )
    data["year"] = np.random.randint(2000, 2000 + n_partitions, size=n_samples)
    
    # Save to parquets:
    data.to_parquet(output_path, partition_cols=["year"])

Generate the data (will require writing permissions to the local directory and of course to S3).

In [3]:
# Delete past generated data (in case there was a failure):
if os.path.exists(LOCAL_DATA_PATH):
    shutil.rmtree(os.path.abspath(LOCAL_DATA_PATH))

# Generate new data:
generate_data(
    output_path=LOCAL_DATA_PATH,
    n_samples=N_SAMPLES, 
    n_features=N_FEATURES, 
    n_partitions=N_PARTITIONS,
)

# Upload it to S3:
s3_client = S3Client(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
)
s3_client.upload(
    bucket=S3_BUCKET,
    local_path=LOCAL_DATA_PATH,
    s3_path=S3_DATA_PATH,
    replace=False,
)

# Delete new generated data (data will be loaded from S3):
shutil.rmtree(os.path.abspath(LOCAL_DATA_PATH))

Uploading:   0%|          | 0/20 [00:00<?, ?it/s]

Uploading './data/year=2000/bb4627c3967b48aebd1c19efbd11e40b.parquet' to test-dask-s3/data/year=2000/bb4627c3967b48aebd1c19efbd11e40b.parquet
Uploading './data/year=2001/9ac7f5ce587249a39dd2fcf415025c60.parquet' to test-dask-s3/data/year=2001/9ac7f5ce587249a39dd2fcf415025c60.parquet
Uploading './data/year=2002/ede50f0d863841379ada5c7afce83e84.parquet' to test-dask-s3/data/year=2002/ede50f0d863841379ada5c7afce83e84.parquet
Uploading './data/year=2003/3600932d2ff04cc38036fac84279b467.parquet' to test-dask-s3/data/year=2003/3600932d2ff04cc38036fac84279b467.parquet
Uploading './data/year=2004/0f2df1a58b7746c492ceb5c55e8e2cc2.parquet' to test-dask-s3/data/year=2004/0f2df1a58b7746c492ceb5c55e8e2cc2.parquet
Uploading './data/year=2005/4bdff42d9d7249cc829e648217006463.parquet' to test-dask-s3/data/year=2005/4bdff42d9d7249cc829e648217006463.parquet
Uploading './data/year=2006/052509ff6e254055a64d3068de5d5c85.parquet' to test-dask-s3/data/year=2006/052509ff6e254055a64d3068de5d5c85.parquet
Upload

## 2. Data Processing Code

1. Read the data into a pandas (dask) `DataFrame` using MLRun's `DataItem.as_df`'s method.
2. Do some calculations.

The calculations are accumulated into a single value that will be logged as a result along a single column of data (means in this case) to be stored in S3.

In [4]:
# mlrun: start-code

In [5]:
import pandas as pd
import dask
import mlrun


@mlrun.handler(outputs=["result", "means"])
def process_data(context: mlrun.MLClientCtx, data_path: mlrun.DataItem):
    # Check for a dask client:
    dask_function = context.get_param("dask_function", None)
    dask_client = mlrun.import_function(dask_function).client if dask_function else None
    
    # Get the data:
    data = data_path.as_df(
        df_module=dask.dataframe if dask_client else pd,
        format="parquet"
    )
    
    # Do some random calculations:
    if dask_client:
        data = dask_client.persist(data)
    sum_value = data.sum()
    mean_value = data.mean()
    var_value = data.var()
    if dask_client:
        sum_value = dask.delayed(sum)(sum_value)
        mean_value = dask.delayed(sum)(mean_value)
        var_value = dask.delayed(sum)(var_value)
    else:
        sum_value = sum(sum_value)
        mean_value = sum(mean_value)
        var_value = sum(var_value)
    result = sum_value + mean_value + var_value
    means = data.mean()
    for column in data.columns:
        means = means + means * means
    if dask_client:
        result = result.compute()
        means = means.compute()
    
    # Log the values:
    return result, means

In [6]:
# mlrun: end-code

## 3. Create a Project

1. Create the MLRun project.
2. Use MLRun's `code_to_function` to create an MLRun function of the processing code.

In [7]:
import os
import shutil
import time

import mlrun

In [8]:
# Create the project:
project = mlrun.get_or_create_project(
    name=S3_PROJECT_DIRECTORY, 
    context="./",
    user_project=False
)

# Add the S3 credentials:
project.set_secrets(
    secrets={
        "AWS_ACCESS_KEY_ID": AWS_ACCESS_KEY_ID,
        "AWS_SECRET_ACCESS_KEY": AWS_SECRET_ACCESS_KEY,
    }
)

> 2022-12-18 11:03:52,095 [info] loaded project test-dask-s3 from MLRun DB


In [9]:
# Create the training function:
process_data_function = mlrun.code_to_function(
    name="process_data",
    kind="job",
    image="mlrun/mlrun",
    handler="process_data",
)

# Assign the function to the project:
project.set_function(process_data_function)

<mlrun.runtimes.kubejob.KubejobRuntime at 0x7efd66c31550>

## 4. Run Without Dask

Run the processing without Dask while timing it and storing the result score and means.

In [10]:
without_dask_time = time.time()
without_dask_run = process_data_function.run(
    name="without_dask",
    inputs={
        "data_path": f"s3://{S3_BUCKET}/{S3_PROJECT_DIRECTORY}/data/",
    },
    artifact_path=f"s3://{S3_BUCKET}/{S3_PROJECT_DIRECTORY}",
)
without_dask_time = time.time() - without_dask_time
without_dask_result = without_dask_run.status.results['result']
without_dask_means = np.array(without_dask_run.artifact('means').as_df()["0"])

> 2022-12-18 11:03:59,049 [info] starting run without_dask uid=66ffa9e041f1477ca2fe42d44aef9bca DB=http://mlrun-api:8080
> 2022-12-18 11:03:59,260 [info] Job is running in the background, pod: without-dask-28lzr
> 2022-12-18 11:17:47,839 [info] To track results use the CLI: {'info_cmd': 'mlrun get run 66ffa9e041f1477ca2fe42d44aef9bca -p test-dask-s3', 'logs_cmd': 'mlrun logs 66ffa9e041f1477ca2fe42d44aef9bca -p test-dask-s3'}
> 2022-12-18 11:17:47,840 [info] Or click for UI: {'ui_url': 'https://dashboard.default-tenant.app.dev6.lab.iguazeng.com/mlprojects/test-dask-s3/jobs/monitor/66ffa9e041f1477ca2fe42d44aef9bca/overview'}
> 2022-12-18 11:17:47,840 [info] run executed, status=completed
invalid value encountered in subtract
final state: completed


project,uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
test-dask-s3,...4aef9bca,0,Dec 18 11:04:04,completed,without_dask,v3io_user=guylkind=jobowner=guylmlrun/client_version=1.2.1-rc4host=without-dask-28lzr,data_path,dask_function=None,result=100004548.27844404,means





> 2022-12-18 11:17:48,151 [info] run executed, status=completed


## 5. Run With Dask

1. Create the Dask function.
2. Configure it.
3. Run the data processing with Dask while timing it and storing the result and means.

In [11]:
# Create the dask function:
dask_function = mlrun.new_function(name="my_dask", kind="dask", image="mlrun/mlrun")

# Configure the dask function specs:
dask_function.spec.remote = True
dask_function.spec.replicas = 5
dask_function.spec.service_type = 'NodePort'
dask_function.with_limits(mem="6G")
dask_function.spec.nthreads = 5

# Assign the function to the project:
project.set_function(dask_function)

# Save:
dask_function.save()

'db://test-dask-s3/my-dask'

In [12]:
dask_function.client

> 2022-12-18 11:18:01,379 [info] trying dask client at: tcp://mlrun-my-dask-f4f760b7-d.default-tenant:8786
> 2022-12-18 11:18:01,423 [info] using remote dask scheduler (mlrun-my-dask-f4f760b7-d) at: tcp://mlrun-my-dask-f4f760b7-d.default-tenant:8786


Mismatched versions found

+-------------+--------+-----------+---------+
| Package     | client | scheduler | workers |
+-------------+--------+-----------+---------+
| blosc       | 1.7.0  | None      | None    |
| cloudpickle | 2.0.0  | 2.2.0     | None    |
| lz4         | 3.1.0  | None      | None    |
| msgpack     | 1.0.3  | 1.0.4     | None    |
| toolz       | 0.11.2 | 0.12.0    | None    |
| tornado     | 6.1    | 6.2       | None    |
+-------------+--------+-----------+---------+
Notes: 
-  msgpack: Variation is ok, as long as everything is above 0.6


0,1
Connection method: Direct,
Dashboard: http://mlrun-my-dask-f4f760b7-d.default-tenant:8787/status,

0,1
Comm: tcp://10.201.103.45:8786,Workers: 0
Dashboard: http://10.201.103.45:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [13]:
with_dask_time = time.time()
with_dask_run = process_data_function.run(
    name="with_dask",
    inputs={
        "data_path": f"s3://{S3_BUCKET}/{S3_PROJECT_DIRECTORY}/data/",
    },
    params={
        "dask_function": "db://" + dask_function.uri,
    },
    artifact_path=f"s3://{S3_BUCKET}/{S3_PROJECT_DIRECTORY}",
)
with_dask_time = time.time() - with_dask_time
with_dask_result = with_dask_run.status.results['result']
with_dask_means = np.array(with_dask_run.artifact('means').as_df()["0"])

> 2022-12-18 11:18:01,478 [info] starting run with_dask uid=0ac62c65edbd4973b54472fddd908052 DB=http://mlrun-api:8080
> 2022-12-18 11:18:01,665 [info] Job is running in the background, pod: with-dask-l5cvp
> 2022-12-18 11:18:08,486 [info] trying dask client at: tcp://mlrun-my-dask-f4f760b7-d.default-tenant:8786
> 2022-12-18 11:18:08,497 [info] using remote dask scheduler (mlrun-my-dask-f4f760b7-d) at: tcp://mlrun-my-dask-f4f760b7-d.default-tenant:8786
remote dashboard: default-tenant.app.dev6.lab.iguazeng.com:31276
> 2022-12-18 11:22:45,040 [info] To track results use the CLI: {'info_cmd': 'mlrun get run 0ac62c65edbd4973b54472fddd908052 -p test-dask-s3', 'logs_cmd': 'mlrun logs 0ac62c65edbd4973b54472fddd908052 -p test-dask-s3'}
> 2022-12-18 11:22:45,040 [info] Or click for UI: {'ui_url': 'https://dashboard.default-tenant.app.dev6.lab.iguazeng.com/mlprojects/test-dask-s3/jobs/monitor/0ac62c65edbd4973b54472fddd908052/overview'}
> 2022-12-18 11:22:45,041 [info] run executed, status=comple

project,uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
test-dask-s3,...dd908052,0,Dec 18 11:18:06,completed,with_dask,v3io_user=guylkind=jobowner=guylmlrun/client_version=1.2.1-rc4host=with-dask-l5cvp,data_path,dask_function=db://test-dask-s3/my-dask,result=100004548.27844404,means





> 2022-12-18 11:22:53,567 [info] run executed, status=completed


## 6. Compare Runtimes

1. Print a summary message.
2. Verify that the dask run took less time and yielded an accuracy score that is almost equal or better than the no dask run.

In [14]:
# Delete the project and data in S3:
s3_client.delete(
    bucket=S3_BUCKET,
    s3_path=S3_PROJECT_DIRECTORY,
)

Deleting:   0%|          | 0/22 [00:00<?, ?it/s]

Deleting 'test-dask-s3/data/year=2000/bb4627c3967b48aebd1c19efbd11e40b.parquet'
Deleting 'test-dask-s3/data/year=2001/9ac7f5ce587249a39dd2fcf415025c60.parquet'
Deleting 'test-dask-s3/data/year=2002/ede50f0d863841379ada5c7afce83e84.parquet'
Deleting 'test-dask-s3/data/year=2003/3600932d2ff04cc38036fac84279b467.parquet'
Deleting 'test-dask-s3/data/year=2004/0f2df1a58b7746c492ceb5c55e8e2cc2.parquet'
Deleting 'test-dask-s3/data/year=2005/4bdff42d9d7249cc829e648217006463.parquet'
Deleting 'test-dask-s3/data/year=2006/052509ff6e254055a64d3068de5d5c85.parquet'
Deleting 'test-dask-s3/data/year=2007/a6df52a9d3274dba810fb251a07d2b9c.parquet'
Deleting 'test-dask-s3/data/year=2008/2e2ea6bcc0dc4847b3a1e550ed315fb7.parquet'
Deleting 'test-dask-s3/data/year=2009/e6884c4305c5480eb385d46edce0d9fa.parquet'
Deleting 'test-dask-s3/data/year=2010/939a791c5cf34cefa6df3cfd83e06838.parquet'
Deleting 'test-dask-s3/data/year=2011/c307bf7d109e4a5f8f8ac84a245df151.parquet'
Deleting 'test-dask-s3/data/year=2012/a6

In [15]:
# Print the test's collected results:
print(
    f"Without dask:\n" 
    f"\t{'%.2f' % without_dask_time} Seconds\n"
    f"\tResult: {without_dask_result}"
)
print(
    f"With dask:\n"
    f"\t{'%.2f' % with_dask_time} Seconds\n"
    f"\tResult: {with_dask_result}\n"
)

# Verification:
assert with_dask_time < without_dask_time
assert np.isclose(without_dask_result, with_dask_result)
assert np.allclose(without_dask_means, with_dask_means)

# Summary message:
print(f"Overall x{'%.2f' % (without_dask_time / with_dask_time)} faster!")

Without dask:
	829.11 Seconds
	Result: 100004548.27844404
With dask:
	292.09 Seconds
	Result: 100004548.27844404

Overall x2.84 faster!
