# Cloud Data Transfer Speeds Benchmarking Workflow

Add overview of workflow

## Step 0: Load Required Setup Packages & Classes

Enter the following parameters to install packages to the correct conda instance and environment.


`jupyter_conda_path : str`
    - The path to miniconda that this Jupyter notebook is running from. Do not include a terminal `/` at the end of the path.
    
`jupyter_conda_env : str`
    - The conda environment that this Jupyter notebook is running in

In [1]:
import os

# PW_USER is an alternative
my_user_name = os.environ['USER']

# Default location for Conda from Jupyter workflow
jupyter_conda_path = f"/home/{my_user_name}/.miniconda3c"
jupyter_conda_env = "jupyter"

Installs required workflow setup packages and calls UI generation script. If one or more of the packages don't exist in the specified environment, they will install for you. Note that if installation is required, this cell will take a few minutes to complete execution.

**NOTE: If you recieve an import error for `jupyter-ui-poll`, you will have to manually install the package in a user container terminal with the following commands:**
```
source <jupyter_conda_path>/etc/profile.d/conda.sh
conda activate <jupyter_conda_env>
pip install jupyter-ui-poll
```

In [2]:
import json
import sys

print('Checking conda environment for UI depedencies...')
os.system("bash " + os.getcwd() + f"/jupyter-helpers/install_ui_packages.sh {jupyter_conda_path} {jupyter_conda_env}")
print('All dependencies installed.')

sys.path.insert(0, os.getcwd() + '/jupyter-helpers')
import ui_helpers as ui
import pandas as pd

Checking conda environment for UI depedencies...
All dependencies installed.


## Step 1: Define Workflow Inputs

Run the following cells to provide your workflow inputs. Simple inputs are handled by normal Python variables, while interactive widgets are generated for the more complicated options.  **All inputs must be filled out to proceed with the benchmarking process.**

### Cloud Resources

#### Compute Resources

Before defining anything else, the resources you intend to use with the benchmarking must be defined. Currently, only resources defined in the Parallel Works platform may be used. Also of note are options that will be passed to Dask: you must specify the number of cores and memory per worker node in the cluster. Without these values, Dask will not be able to submit jobs.

In particular, these options are included so that you can form fair comparisons between different cloud service providers (CSPs). Generally, different CSPs won't have worker nodes with the exact same specs, and in order to achieve a fair comparison between two CSPs one cluster must be limited such that it does not exceed the computational power of the other.

Before submitting your options, read and adhere to the following guidelines:

- **When entering the number of CPUs in the following widget, you must know if the worker nodes of your clusters have hyperthreading enabled.** The instance type of a partition in the resource definition will display a number of vCPUs: this number represents the number of CPUs with hyperthreading enabled. **If hyperthreading is disabled, enter the number of vCPUs divided by 2 into this workflow.** If you do not know, assume that hyperthreading is disabled.

</br>

- Supply a path to an existing miniconda installation or the desired installation path. Leave the default option of `~` to install to the home directory of the cluster. **Miniconda must be installed in a directory accessible to the head node and all worker nodes. If you are unsure if your current/desired installation directory is a shared directory, use the default option.**

</br>

- For `SSH Hostname`, go to the `COMPUTE` tab, find your resource in the `Computing Resources` section, and click on the `<user>@<IP>` shown in the upper right-hand corner of the resource's dashboard. Paste this into the `SSH Hostname` field.

<div class="alert alert-block alert-info">
 The resource name is an arbitrary field used only to help you identify when the benchmark is running on that particular cluster; you may set this to whatever you want. Additional information such as memory and cores can be found in the <code>Instance Type</code> field in the resource definition.
</div>

In [3]:
resource = ui.resourceWidgets()
resource.display()
resources = resource.processInput()
print(f'Your resource inputs:\n {resources}')

HBox(children=(Button(description='Add field', style=ButtonStyle()), Button(description='Remove field', style=…

Accordion(children=(VBox(children=(HBox(children=(Label(value='Resource Name: '), Text(value='cluster1'))), HB…

Button(description='Submit', style=ButtonStyle())


-----------------------------------------------------------------------------
If you wish to change information about cloud resources, run this cell again.

Your resource inputs:
 [{'Name': 'gcptestnew', 'SSH': 'jgreen@34.31.86.225', 'MinicondaDir': '~', 'CSP': 'GCP', 'Dask': {'Scheduler': 'SLURM', 'Partition': 'compute', 'CPUs': 2, 'Memory': 16.0}}]


#### Object Stores

This set of inputs is where you enter the cloud object store Universal Resource Identifiers (URIs). Both public and private buckets are supported. For the latter, ensure that you have access credentials with *at least* read, write, list, and put (copy from local storage to cloud) permissions, as format conversions will need to be made during the benchmarking process.

Be sure to double-check your inputs to ensure that the bucket names and credentials are correct!

In [4]:
store = ui.storageWidgets()
store.display()
storage = store.processInput()

# Following line should be commented out if you don't want AWS S3 credentials shown in plain text
print(f'Your storage inputs:\n {storage}')

HBox(children=(Button(description='Add field', style=ButtonStyle()), Button(description='Remove field', style=…

Accordion(children=(VBox(children=(HBox(children=(Label(value='Storage URI: '), Text(value='', placeholder='gc…

Button(description='Submit', style=ButtonStyle())


--------------------------------------------------------------------------------------
If you wish to change information about cloud storage locations, run this cell again.

Your storage inputs:
 [{'Path': 'gs://cloud-data-benchmarks', 'Type': 'Private', 'CSP': 'GCP', 'Credentials': {'token': './.cloud-data-benchmarks.json'}}]


### Datasets

#### User-Supplied Datasets

Below you can specify datasets that you want to be tested in the benchmarking. You can either enter single files or multiple files that belong to a single dataset, provided that the dataset matches one of the supported formats. **Read the following input rules after running the UI cell below this one.**


1. Activate the checkbox if you desire to record your user-defined datasets. **If it is not checked, none of your inputs will be recorded.**

<br>

2. If speciying data in a bucket, input the path to the data within the bucket, *not* the full URI (i.e., use `path/to/file.extension` and not `<URI prefix>://<bucketname>/path/to/file.extension`)

<br>

3. Use absolute paths for data stored in the user container (or a filesystem mounted in the user container): `/path/to/data.extension`

<br>

4. Use globstrings (`path/to/files/*`) to specify datasets that are split up into multiple subfiles.
    - If using a globstring, ensure that *only* files that belong to the dataset exist in that directory. The workflow will take all files in the directory before the `*` and attempt to gather them into a single dataset.
    - **Globstrings are NOT supported for NetCDF files**
    
<br>

5. If you have a dataset stored in multiple cloud storage locations that will be used in the benchmarking, you must define an input for each of the locations. That is, you must define each location of the data separately.

In [5]:
userdata = ui.userdataWidgets(storage=storage)
userdata.display()
user_files = userdata.processInput()
print(f'Your data inputs:\n {user_files}')

Checkbox(value=False, description='Provide datasets to workflow?')

HBox(children=(Button(description='Add field', style=ButtonStyle()), Button(description='Remove field', style=…

Accordion(children=(VBox(children=(HBox(children=(Label(value='Data Format'), Dropdown(options=('NetCDF4', 'CS…

Button(description='Submit', style=ButtonStyle())


---------------------------------------------------------------------------
If you wish to change information about your input data, run this cell again.

Your data inputs:
 [{'Format': 'NetCDF4', 'SourcePath': 'gs://cloud-data-benchmarks/ETOPO1_Ice_g_gmt4.nc', 'DataVars': ['*'], 'Type': 'Private', 'CSP': 'GCP', 'Credentials': {'token': './.cloud-data-benchmarks.json'}}]


#### Randomly-Generated Datasets

Another option to supply data to the benchmarking is to create randomly-generated datasets. CSV datasets can be as large as you want and provide a great option if you are new to the world of cloud-native data formats. There are currently two supported randomly-generated data formats: CSV and NetCDF4. Since NetCDF4 is a gridded data format, options to specify the number of coordinate axes and data variables are also included.

<div class="alert alert-block alert-info">
Randomly-generated NetCDF4 file sizes are limited by available disk space in the cluster you are generating the file with. Ensure that you have adequate disk space in your cluster, or the file will not fully generate.
    </div>

In [6]:
randgen = ui.randgenWidgets(resources=resources)
randgen.display()
randfiles = randgen.processInput()
print(f'Your randomly-generated file options:\n {randfiles}')

Accordion(children=(HBox(children=(Label(value='Resource to write random files with: '), Dropdown(options=('gc…

Button(description='Submit', style=ButtonStyle())


-------------------------------------------------------------------------------
If you wish to change the randomly generated file options, run this cell again.

Your randomly-generated file options:
 [{'Format': 'CSV', 'Generate': False, 'SizeGB': 0.0}, {'Format': 'NetCDF4', 'Generate': False, 'SizeGB': 0.0, 'Data Variables': 1.0, 'Float Coords': 2.0}, {'Resource': 'gcptestnew'}]


### Benchmark Options

#### Legacy to Cloud-Native Conversion Options

There are a number of different options to choose from when legacy files (e.g., CSV, NetCDF) are converted to cloud-native (e.g., Parquet, Zarr). In fact, this input field is where you have the most control over the type of resutls you want to see. You may specify different compression algorithms, compression levels, and chunksizes to use when cloud-native files are written to cloud storage, which each have different effects on how fast files will be written and read.

Note that for large chunksizes the workflow may fail. This is an intended consequence, as the workflow is designed to let you explore the effects of different options on your data. **In general, it is recommended that you keep chunksizes in the range of 50-150 megabytes**: anything higher will result in either poor performance or out-of-memory errors. Similarly, some compression algorithms will result in poor performance depending on the data type.

The widget below allows you to define different sets of options to convert files with. To make multiple selections in the compression algorithm and dataset fields, simply hold `SHIFT` and/or `CTRL` (or `COMMAND` on Mac). Using these option sets, the files defined in each set will be converted using the specified compression & chunksize information. The following rules apply:

- An individual dataset will be converted `(# of compression algorithms in option set 1)*...*(# of compression algorithms in option set n)` times
- The total number of conversions is equal to the sum of individual dataset conversions
- **Datasets will only be converted if they are *highlighted* in at least one option set. That is, you must click on a dataset from the datasets list if you want the conversion options to apply to it.**
- **If you want to keep the internal chunking scheme of a NetCDF4 dataset, set chunksize to 0**

Gridded data chunk dimensions are computed as n-dimensional cubes (i.e., a data variable described by 3 dimensions will have a 3-dimensional chunk of uniform dimension lengths, and so on for higher-order cases).

In [7]:
convertOptions = ui.convertOptions(user_files, randfiles)
convertOptions.display()
convert_options = convertOptions.processInput()
print(f'Your cloud-native data options:\n {convert_options}')

HBox(children=(Button(description='Add field', style=ButtonStyle()), Button(description='Remove field', style=…

Accordion(children=(VBox(children=(HBox(children=(Label(value='Data Format Type: '), Dropdown(options=('Gridde…

Button(description='Submit', style=ButtonStyle())


---------------------------------------------------------------------------
If you wish to change information about your conversion options, run this cell again.

Your cloud-native data options:
 [{'Algorithms': ('lz4',), 'Level': 5, 'Chunksize': 0.0, 'Datasets': ('ETOPO1_Ice_g_gmt4.nc',)}]


#### Core Options

Finally, there are three very important options that apply to the core of the benchmarks. **All of the following parameters must be nonnegative:**

- `workers_per_node : int` - The number of Dask workers (parallel operations) each worker node of a cluster should spawn while carrying out a computation. Writing and reading from cloud storage is almost always bandwidth limited; in general, the more individual streams of data flowing into your worker nodes, the slower operations using data stored in the cloud will be. 

    This parameter is included to give you control of this behavior. Larger instance types often include increased network bandwidth compared to smaller ones (varies by CSP), and are able to support more parallel operations than the latter before hitting the instance's bandwidth limit.

</br>

- `worker_step : int` - When performing file reads (option not yet implemented in conversions), the workflow will loop through a range of Dask workers (or number of parallel reads). The loop starts at the maximum defined worker amount, and reduces by `worker_step` until the lowest possible value of workers is reached. This value must be a multiple of `workers_per_node`

</br>

- `tests : int` - The number of times each individual file will be read for measurement. Entering a number greater than 1 will take much longer to run, but results will reflect the volaility in network performance you can expect when using the data for computations.

In [8]:
workers_per_node = 2
worker_step = 3 * workers_per_node
tests = 2

## Step 2: Notebook Setup

Executing the following cell will write all of your inputs to `inputs.json`, install miniconda3 and the "cloud-data" Python environment to all resources, and write randomly-generated files to all cloud storage locations (if any files were specified). If writing randomly-generated files, especially large ones, the execution of this cell may take a while.

During this setup, clusters will be limited such that operations performed with each have identical Dask workers per node, cores per Dask worker, and memory per Dask worker. This may result in worker nodes of a particular cluster not using a large portion of their resources: this is indended behavior designed to allow for fair comparisons between different clusters.

<div class="alert alert-block alert-info">
While randomly-generated files are written in parallel by default, if you wish to speed up the execution of this cell, consider creating/choosing a resource with more powerful worker nodes.
    </div>

In [14]:
print('Setting up workflow...')

user_input = json.dumps({"RESOURCES" : resources,
                         "STORAGE" : storage,
                         "USERFILES" : user_files,
                         "RANDFILES" : randfiles,
                         "CONVERTOPTS" : convert_options,
                         "GLOBALOPTS" : {'worker_step' : worker_step,
                                         'tests' : tests,
                                         'Dask': {'Workers': workers_per_node},
                                         'local_conda_sh' : jupyter_conda_path}
                        })

minimum_cpus = min([v['Dask']['CPUs'] for v in resources])
if workers_per_node > minimum_cpus:
    print(f'Choose a value of \'workers_per_node\' that does not exceed {minimum_cpus}')

else:
    with open('ipynb_inputs.json', 'w') as outfile:
        outfile.write(user_input)

    os.system("bash workflow_setup.sh \"ipynb_inputs.json\"")

    print('Workflow setup complete.')

Setting up workflow...
Will install miniconda3 to "/home/jgreen/.miniconda3"
Installing Miniconda-23.5.2 on "gcptestnew"...
Miniconda is already installed in "/home/jgreen/.miniconda3"!
Finished installing Miniconda on "gcptestnew".

Building "cloud-data" environment on "gcptestnew"...
Environment already exists!
Finished building "cloud-data" environment on "gcptestnew".

Done installing Miniconda-23.5.2 and building `cloud-data` on all requested resources.


Workflow setup complete.


## Step 3: Run Benchmarking

### Convert File to Cloud-Native

Since one of the major goals of this benchmarking is testing legacy formats against cloud-native ones, we must convert your legacy-formatted data (CSV and NetCDF4) into their corresponding cloud-native formats. This cell will execute and time the conversion process, writing each new format in parallel. The conversion will be done using each cluster's full amount of resources, so be mindful of this feature when using clusters that are expensive to operate.

In [15]:
os.system("bash benchmarks-core/run_benchmark_step.sh \"ipynb_inputs.json\" \"convert-data.py\" \"conversions.csv\"")
df_conversions = pd.read_csv(os.getcwd() + '/results/csv-files/conversions.csv')
df_conversions

Waiting for worker nodes to start up...
Workers active.


Converting files in "gs://cloud-data-benchmarks" with "gcptestnew"...

Converting data variables "Z1" from "ETOPO1_Ice_g_gmt4.nc" with 134.203968MB chunks & level 5 lz4 compression to Zarr...
Written to "gs://cloud-data-benchmarks/cloud-data-transfer-benchmarking/cloudnativefiles/ETOPO1_Ice_g_gmt4.nc_134MB_lz4_lvl5.zarr"
Done converting files in "gs://cloud-data-benchmarks".
Shutting down worker nodes...
Workers shut down. (this may take a while to register in the platform UI)


Unnamed: 0,ncores,resource,resource_csp,bucket,bucket_csp,conversionType,orig_dataset_name,data_vars,orig_mem_size_bytes,orig_cloud_size_bytes,compr_alg,compr_lvl,chunksize_MB,conversion_time_seconds
0,20,gcptestnew,GCP,gs://cloud-data-benchmarks,GCP,NetCDF-to-Zarr,ETOPO1_Ice_g_gmt4.nc,*,1866499208,362236855,lz4,5,134.203968,8.324833631515503
1,ncores,resource,resource_csp,bucket,bucket_csp,conversionType,orig_dataset_name,data_vars,orig_mem_size_bytes,orig_cloud_size_bytes,compr_alg,compr_lvl,chunksize_MB,conversion_time_seconds
2,20,gcptestnew,GCP,gs://cloud-data-benchmarks,GCP,NetCDF-to-Zarr,ETOPO1_Ice_g_gmt4.nc,*,1866499208,362236855,lz4,5,134.203968,12.298370361328125


### File Reads

The last computation-intensive test in the benchmarking is reading and timing files from cloud storage. This will give you an idea of what data transfer throughput you can expect when using cloud storage and different data formats in other workflows.

In [40]:
1866499208 / 1e6 / 3.487549

535.1893860129277

In [39]:
os.system("bash benchmarks-core/run_benchmark_step.sh \"ipynb_inputs.json\" \"read-data.py\" \"reads.csv\"")
df_reads = pd.read_csv(os.getcwd() + '/results/csv-files/reads.csv')
df_reads



Reading files in "gs://cloud-data-benchmarks" with "gcptestnew"...

Reading the variable "Z1" from "ETOPO1_Ice_g_gmt4.nc"...
Active Workers: 20
Active Workers: 14
Active Workers: 8
Active Workers: 2

Reading the variable "Z1" from "cloud-data-transfer-benchmarking/cloudnativefiles/ETOPO1_Ice_g_gmt4.nc_134MB_lz4_lvl5.zarr"...
Active Workers: 20
Active Workers: 14
Active Workers: 8
Active Workers: 2
Shutting down worker nodes...
Workers shut down. (this may take a while to register in the platform UI)


Unnamed: 0,nworkers,nthreads,ncores,resource,resource_csp,bucket,bucket_csp,fileFormat,original_dataset_name,data_variable,mem_bytes,cloud_bytes,chunksize,read_time_seconds,throughput_MBps,throughput_std_dev
0,20,20,20,gcptestnew,GCP,gs://cloud-data-benchmarks,GCP,NetCDF4,ETOPO1_Ice_g_gmt4.nc,Z1,1866499208,362236855,134203968,3.487549,535.189383,98.212107
1,14,14,14,gcptestnew,GCP,gs://cloud-data-benchmarks,GCP,NetCDF4,ETOPO1_Ice_g_gmt4.nc,Z1,1866499208,362236855,134203968,1.782855,1046.915614,208.670067
2,8,8,8,gcptestnew,GCP,gs://cloud-data-benchmarks,GCP,NetCDF4,ETOPO1_Ice_g_gmt4.nc,Z1,1866499208,362236855,134203968,2.129929,876.320081,32.736628
3,2,2,2,gcptestnew,GCP,gs://cloud-data-benchmarks,GCP,NetCDF4,ETOPO1_Ice_g_gmt4.nc,Z1,1866499208,362236855,134203968,6.051105,308.455928,1.644334
4,20,20,20,gcptestnew,GCP,gs://cloud-data-benchmarks,GCP,Zarr,ETOPO1_Ice_g_gmt4.nc_134MB_lz4_lvl5.zarr,Z1,1866499208,271346855,134203968,0.980424,1903.768134,1582.484025
5,14,14,14,gcptestnew,GCP,gs://cloud-data-benchmarks,GCP,Zarr,ETOPO1_Ice_g_gmt4.nc_134MB_lz4_lvl5.zarr,Z1,1866499208,271346855,134203968,1.198084,1557.903434,816.141956
6,8,8,8,gcptestnew,GCP,gs://cloud-data-benchmarks,GCP,Zarr,ETOPO1_Ice_g_gmt4.nc_134MB_lz4_lvl5.zarr,Z1,1866499208,271346855,134203968,1.160945,1607.740719,91.064208
7,2,2,2,gcptestnew,GCP,gs://cloud-data-benchmarks,GCP,Zarr,ETOPO1_Ice_g_gmt4.nc_134MB_lz4_lvl5.zarr,Z1,1866499208,271346855,134203968,3.47132,537.69156,3.248137


## TODO: Step 4: Visualize Results

**Feature not ready**

## Step 5: Remove Benchmarking Files from Cloud Resources (Optional)

Running the following cell will remove all files that the benchmark has copied/written to both clusters and cloud storage. It will not delete any of the original user-supplied datasets.

In [13]:
os.system("bash postprocessing/remove-benchmark-files.sh \"ipynb_inputs.json\"")

Removing gs://cloud-data-benchmarks/cloud-data-transfer-benchmarking/cloudnativefiles/ETOPO1_Ice_g_gmt4.nc_134.203968MB_lz4_5.zarr/Z1/0.0#1692395790148016...
Removing gs://cloud-data-benchmarks/cloud-data-transfer-benchmarking/cloudnativefiles/ETOPO1_Ice_g_gmt4.nc_134.203968MB_lz4_5.zarr/.zattrs#1692395785070895...
Removing gs://cloud-data-benchmarks/cloud-data-transfer-benchmarking/cloudnativefiles/ETOPO1_Ice_g_gmt4.nc_134.203968MB_lz4_5.zarr/.zgroup#1692395784544351...
Removing gs://cloud-data-benchmarks/cloud-data-transfer-benchmarking/cloudnativefiles/ETOPO1_Ice_g_gmt4.nc_134.203968MB_lz4_5.zarr/Z1/0.1#1692395791845648...
Removing gs://cloud-data-benchmarks/cloud-data-transfer-benchmarking/cloudnativefiles/ETOPO1_Ice_g_gmt4.nc_134.203968MB_lz4_5.zarr/Z1/1.0#1692395792047268...
Removing gs://cloud-data-benchmarks/cloud-data-transfer-benchmarking/cloudnativefiles/ETOPO1_Ice_g_gmt4.nc_134.203968MB_lz4_5.zarr/Z1/2.1#1692395792126024...
Removing gs://cloud-data-benchmarks/cloud-data-tra

0