# Interactive Dask on SLURMCluster: JupyterLab Tutorial for Distributed Data Processing and AWS Integration

This notebook provides a tutorial on running Dask in a SLURMCluster using a JupyterLab interactive session. The steps include:

1. SLURM Cluster Configuration: Define a SLURM cluster configuration using the SLURMCluster object, specifying parameters like the compute queue, number of CPU cores per job, and memory allocation. The cluster is then scaled to a desired number of workers using the adapt method.

2. Connect to Dask Cluster: A Dask client is connected to the SLURMCluster, enabling interaction with the Dask computation.

3. Display Dask Dashboard in Jupyter Lab: Utilize the Dask extension for JupyterLab to integrate the Dask Dashboard directly. Instructions are provided to establish the connection to the dashboard.

4. Set AWS Credentials: Set temporary AWS credentials to access an AWS bucket resource defined in the Parallel Works platform, facilitating data transfer.

5. Generate Random Data: Create a Dask DataFrame with randomly generated data, and adjust the number of rows as needed.

6. Write and Read Data to/from AWS Bucket: Write the generated data to the specified AWS bucket and read it back into a Dask DataFrame.

7. Process Data: Perform data processing on the Dask DataFrame, filtering rows and grouping by specific columns.

7. Write Processed Data Back to AWS Bucket: Write the processed data back to the AWS bucket using the to_csv method.

The notebook also provides additional details on connecting to the Dask Dashboard, setting AWS credentials, generating and processing random data, and writing and reading data to and from AWS. The provided code snippets and explanations guide users through each step of the process.

In [None]:
import dask
import dask.dataframe as dd
import pandas as pd
from faker import Faker
import os

In [None]:
from dask.distributed import Client
from dask_jobqueue import SLURMCluster

### 1. Define SLURM cluster configuration
In this section, we utilize the [SLURMCluster](https://jobqueue.dask.org/en/latest/generated/dask_jobqueue.SLURMCluster.html) object to deploy Dask within a SLURM cluster. The SLURMCluster is configured with specific parameters, including the compute queue, the number of CPU cores per job, and the memory allocated per job. To facilitate this configuration, the `job_directives_skip` argument is employed, allowing Dask to bypass specific SLURM directives related to memory. It is worth noting that the `--mem` directive needs to be skipped because it is not explicitly defined for the nodes in the SLURM configuration file (`/mnt/shared/etc/slurm/slurm.conf`) of the clusters in the Parallel Works platform.

In [None]:
cluster = SLURMCluster(
    queue = 'compute',
    cores = 2,  # Number of CPU cores per job
    memory = '8GB',  # Memory per job
    job_directives_skip = ['--mem'], # Adding this argument allows Dask to ignore the memory parameter
)


Next, the cluster is scaled to a desired number of workers using the adapt method.

In [None]:
cluster.adapt(
    minimum = 0, 
    maximum = 2
)


Lastly, a Dask client is connected to the cluster.

In [None]:
client = Client(cluster)
client

### 2. Display the Dask Dashboard in Jupyter Lab
The [Dask extension for JupyterLab](https://github.com/dask/dask-labextension) comes pre-installed in the Jupyter Lab interactive session. This extension facilitates the integration of the Dask Dashboard directly into JupyterLab, as demonstrated in this accompanying [video](https://www.youtube.com/watch?v=EX_voquHdk0). To establish the connection to the Dashboard, we employ a proxy, and you can find detailed instructions on this setup in the provided [link](https://jobqueue.dask.org/en/stable/interactive.html). 

In this case, simply paste the link that is generated below in the DASK DASHBOARD URL search bar.

In [None]:
from urllib.parse import urlsplit
port = urlsplit(client.dashboard_link).port
os.environ['DASHBOARD_PORT'] = str(port)

**To connect to the Dashboard copy the link below in the DASK DASHBOARD URL search bar and press enter**

In [None]:
!echo https://cloud.parallel.works/me/$openPort/proxy/$DASHBOARD_PORT/status

### 3. Set AWS credentials
Follow the instructions in [this link](https://docs-staging.parallel.works/docs/storage/transferring-data/obtaining-credentials) to obtain the **temporary credentials** for an AWS bucket resource defined in the Parallel Works platform. AWS credentials are set as environment variables.

In [None]:
# Set AWS credentials. Can obtain the credentials in this link. 
# https://docs-staging.parallel.works/docs/storage/transferring-data/obtaining-credentials

bucket_name='abc'
os.environ['AWS_ACCESS_KEY_ID']='123'
os.environ['AWS_SECRET_ACCESS_KEY']='xyz'
os.environ['AWS_SESSION_TOKEN']='456'

# If accessing an S3 bucket on a cluster from another 
# cloud service provider, currently you need to specify 
# the URL (note the bucket region in the URL has to match
# your bucket!) as well as the secrets for the 
# underlying s3fs library. This is not necessary if your
# cluster happens to be in the same CSP and region as 
# your bucket. These additional storage options
# need to be included in any bucket write commands below.
storage_options={"client_kwargs": {"endpoint_url": "https://s3-us-east-2.amazonaws.com"}}

### 4. Generate random data
In this section, a function generate_random_data is defined to create a Dask DataFrame with randomly generated data. The number of rows in the generated data can be adjusted.

In [None]:
# Function to generate random data
def generate_random_data(num_rows):
    fake = Faker()
    data = {
        'Name': [fake.name() for _ in range(num_rows)],
        'Age': [fake.random_int(min=18, max=99) for _ in range(num_rows)],
        'City': [fake.city() for _ in range(num_rows)]
    }
    return dd.from_pandas(pd.DataFrame(data), npartitions=2)  # Create Dask DataFrame

In [None]:
num_rows = 100000  # Adjust the number of rows as needed
random_data = generate_random_data(num_rows)


### 5. Write data to the AWS bucket
Data generated in the previous step is written to the specified AWS bucket using the to_csv method. It's important to note that Dask employs the SLURM queue to submit jobs, acquiring workers responsible for the data transfer process. To monitor the status of this job, you can execute watch squeue in a terminal within the cluster. This command provides real-time updates on the job's progress and status.

In [None]:
csv_filename = 'random_data.csv'
random_data.to_csv(f's3://{bucket_name}/{csv_filename}', index=False, single_file=True, storage_options=storage_options)

### 6. Read data from the AWS bucket
In this section, data is read from the AWS bucket into a Dask DataFrame.

In [None]:
dask_df = dd.read_csv(f's3://{bucket_name}/{csv_filename}')

### 7. Process data
The Dask DataFrame is processed by filtering rows where the 'Age' column is greater than 21 and then grouping by the 'City' column.

In [None]:
processed_dask_df = dask_df[dask_df['Age'] > 21].groupby('City').size()

### 8. Write the processed data back to the AWS bucket
The processed data is written back to the AWS bucket using the to_csv method.

In [None]:
processed_csv_filename = 'processed_data.csv'
processed_dask_df.to_csv(f's3://{bucket_name}/{processed_csv_filename}', single_file=True, storage_options=storage_options)

Additionally, a sample computation is triggered using compute() to showcase the processed data.

In [None]:
# Trigger computation if needed
processed_dask_df.compute()