# Distributed DuckDB on Dask

Set up a Dask cluster on EC2 to process data parallel DuckDB operations. The same query will be sent to every worker, with each worker querying different parquet files.

This notebook should run on a dedicated Jupyter notebook server, and will serve as the Dask **Client**. We will use the Dask Cloud Provider module to launch a cluster on EC2, which will be composed of a **Scheduler** instance and multiple **Worker** instances.

The client (where this notebook is running) issues commands to the scheduler, and the scheduler distributes the work to the workers.

The Dask web dashboard runs on the scheduler, not the client. To access it, a separate tunnel from your localhost to the scheduler on port 8787 could be created. This can be useful for understanding Dask scheduling and performance. 

In [1]:
import boto3
from botocore.exceptions import ClientError
import configparser
import contextlib
from dask_cloudprovider.aws import EC2Cluster
from dask.distributed import Client
import duckdb
import os
import pandas

## Utility Functions

In [2]:
# based on function of the same name from:
# https://cloudprovider.dask.org/en/latest/aws.html#elastic-compute-cloud-ec2


def get_aws_credentials(
    profile, config_path="~/.aws/config", creds_path="~/.aws/credentials"
):
    """Read in your AWS credentials file and convert to environment variables."""
    parser = configparser.RawConfigParser()

    if config_path != "":
        parser.read(os.path.expanduser())
        config = parser.items("default")
    else:
        config = []

    if creds_path != "":
        parser.read(os.path.expanduser(creds_path))
        credentials = parser.items(profile)
    else:
        credentials = []

    all_credentials = {key.upper(): value for key, value in [*config, *credentials]}
    with contextlib.suppress(KeyError):
        all_credentials["AWS_REGION"] = all_credentials.pop("REGION")

    return all_credentials

Functions to get the IDs of an EFS volume, Security Group, and VPC.

In [3]:
def get_efs_id_by_name(efs_name, session):
    client = session.client("efs", region_name="us-east-1")

    response = client.describe_file_systems()

    for file_system in response["FileSystems"]:
        if "Name" in file_system and file_system["Name"] == efs_name:
            return file_system["FileSystemId"]

    return None


def get_sg_id_by_name(sg_name, session, vpc_id):
    client = session.client("ec2")

    response = client.describe_security_groups(
        Filters=[
            {"Name": "group-name", "Values": [sg_name]},
            {"Name": "vpc-id", "Values": [vpc_id]},
        ]
    )

    return response["SecurityGroups"][0]["GroupId"]


# For this particular account, there is not a 'default' VPC, which would
# normally be used by Dask Cloud Provider. But there is a single VPC, so we get
# it by the 0th index.
def get_vpc_id(session):
    client = session.client("ec2")
    vpc_id = client.describe_vpcs()["Vpcs"][0]["VpcId"]
    return vpc_id

The security group used by the EFS volume needs a rule added that allows the instances Dask will create to use it.

In [4]:
def add_nfs_rule_to_security_group(
    session, vpc_id, security_group_id, source_security_group_id, description
):
    """
    Add an inbound rule to allow NFS connections from a specified security group.

    :param session: boto3 session
    :param vpc_id: ID of the VPC
    :param security_group_id: ID of the security group to modify
    :param source_security_group_id: ID of the source security group
    :param description: text description of inbound rule
    """
    ec2 = session.client("ec2")

    try:
        # Get the security group
        response = ec2.describe_security_groups(
            Filters=[
                {"Name": "vpc-id", "Values": [vpc_id]},
                {"Name": "group-id", "Values": [security_group_id]},
            ]
        )

        if not response["SecurityGroups"]:
            print(f"Security group {security_group_id} not found in VPC {vpc_id}")
            return

        security_group = response["SecurityGroups"][0]

        # Check if the rule already exists
        rule_exists = any(
            rule.get("IpProtocol") == "tcp"
            and rule.get("FromPort") == 2049
            and rule.get("ToPort") == 2049
            and any(
                pair.get("GroupId") == source_security_group_id
                and pair.get("Description") == description
                for pair in rule.get("UserIdGroupPairs", [])
            )
            for rule in security_group.get("IpPermissions", [])
        )

        if rule_exists:
            print(
                f"NFS rule with description '{description}' already exists in security group {security_group_id}"
            )
            return

        # Add the inbound rule
        ec2.authorize_security_group_ingress(
            GroupId=security_group_id,
            IpPermissions=[
                {
                    "IpProtocol": "tcp",
                    "FromPort": 2049,
                    "ToPort": 2049,
                    "UserIdGroupPairs": [
                        {
                            "GroupId": source_security_group_id,
                            "Description": description,
                        }
                    ],
                }
            ],
        )

        print(f"Added NFS inbound rule to security group {security_group_id}")

    except ClientError as e:
        if e.response["Error"]["Code"] == "InvalidPermission.Duplicate":
            print(
                f"NFS rule for cluster access already exists in EFS security group {security_group_id}, but with a different description. Did not add new rule."
            )
        else:
            print(f"Error: {e}")

## Global Parameters

In [5]:
aws_profile_name = "cmr-sit-johnathan"  # change to your profile name
efs_name = "bigstac-duckdb-01"  # pre-created
sg_name_efs = "bigstac-nfs"  # pre-created
sg_name_dask = "bigstac-dask"  # specified in cdk_dask_sg.py
ssh_key_name = "bigstac-johnathan"  # pre-created

## Get Credentials

Credentials should be placed in `~/.aws/credentials`. Recommend the use of short term keys.  
You can use Jupyterlab to open a terminal, create the file and paste your credentials into it.  
Set `aws_profile_name` above to the value of your profile stored in the credentials file.  

In [6]:
creds = get_aws_credentials(aws_profile_name, "")
# Create boto3 session for functions that will accept it
session = boto3.Session(
    region_name="us-east-1",
    aws_access_key_id=creds["AWS_ACCESS_KEY_ID"],
    aws_secret_access_key=creds["AWS_SECRET_ACCESS_KEY"],
    aws_session_token=creds["AWS_SESSION_TOKEN"],
)

# Launching the EC2Cluster relies on instance profile permissions, credentials
# in the "default" profile, or environment variables. We'll use the latter:
os.environ["AWS_ACCESS_KEY_ID"] = creds["AWS_ACCESS_KEY_ID"]
os.environ["AWS_SECRET_ACCESS_KEY"] = creds["AWS_SECRET_ACCESS_KEY"]
os.environ["AWS_SESSION_TOKEN"] = creds["AWS_SESSION_TOKEN"]
os.environ["AWS_REGION"] = creds["AWS_REGION"]

## Start Cluster

Get resource IDs from our AWS account

In [7]:
vpc_id = get_vpc_id(session)
efs_id = get_efs_id_by_name(efs_name, session)
nfs_sg_id = get_sg_id_by_name(sg_name_efs, session, vpc_id)
cluster_sg_id = get_sg_id_by_name(sg_name_dask, session, vpc_id)

Create a rule that allows the Dask instances to mount the EFS volume

In [8]:
rule_description = "from Dask EC2Cluster"
add_nfs_rule_to_security_group(
    session, vpc_id, nfs_sg_id, cluster_sg_id, rule_description
)

Added NFS inbound rule to security group sg-


Configure the Dask-managed instances to mount the EFS volume. These additional bootstrapping commands will be run by cloud-init after the default commands created by Dask Cloud Provider.

In [9]:
bootstrap_script = [
    "mkdir -p /mnt/efs",
    "apt-get install nfs-common -y",
    f"mount -t nfs4 -o nfsvers=4.1,rsize=1048576,wsize=1048576,hard,timeo=600,retrans=2,noresvport {efs_id}.efs.us-east-1.amazonaws.com:/ /mnt/efs",
]

Bind mount the EFS volume into the Docker container that will run the Dask scheduler and workers. Also install some extra necessary packages with `pip`.

In [10]:
docker_args = (
    "--mount type=bind,src=/mnt/efs,dst=/mnt/efs,ro "
    '-e EXTRA_PIP_PACKAGES="duckdb pyarrow"'
)

Start the cluster. This will take a couple of minutes.

In [11]:
cluster = EC2Cluster(
    n_workers=2,
    scheduler_instance_type="c5.xlarge",
    worker_instance_type="c5.xlarge",
    # One thread per worker ensures tasks get split across multiple workers
    worker_options={"nthreads": 1, "memory_limit": "7GiB"},
    # debug=True,
    key_name=ssh_key_name,
    region=creds["AWS_REGION"],
    vpc=vpc_id,
    security_groups=[cluster_sg_id],
    extra_bootstrap=bootstrap_script,
    docker_args=docker_args,
    # Default expects public IPs
    use_private_ip=True,
    # TLS doesn't currently work: https://github.com/dask/dask-cloudprovider/issues/249
    security=False,
)

Creating scheduler instance
Created instance i-0d190df1b812ce655 as dask-eda6680f-scheduler
Waiting for scheduler to run at 10.5.45.35:8786
Scheduler is running
Creating worker instance
Creating worker instance


  next(self.gen)


Created instance i-015d9a6692c3c8a47 as dask-eda6680f-worker-b8542521
Created instance i-08a9996da6e441243 as dask-eda6680f-worker-3802ecce


In [13]:
cluster

0,1
Dashboard: http://10.5.45.35:8787/status,Workers: 2
Total threads: 2,Total memory: 14.00 GiB

0,1
Comm: tcp://10.5.45.35:8786,Workers: 2
Dashboard: http://10.5.45.35:8787/status,Total threads: 2
Started: 1 minute ago,Total memory: 14.00 GiB

0,1
Comm: tcp://10.5.44.194:33113,Total threads: 1
Dashboard: http://10.5.44.194:37081/status,Memory: 7.00 GiB
Nanny: tcp://10.5.44.194:34827,
Local directory: /tmp/dask-scratch-space/worker-cfusj0ml,Local directory: /tmp/dask-scratch-space/worker-cfusj0ml

0,1
Comm: tcp://10.5.46.251:38885,Total threads: 1
Dashboard: http://10.5.46.251:38323/status,Memory: 7.00 GiB
Nanny: tcp://10.5.46.251:41597,
Local directory: /tmp/dask-scratch-space/worker-b_yovnqg,Local directory: /tmp/dask-scratch-space/worker-b_yovnqg


## Prepare DuckDB Query

Below we create a function that will executes a DuckDB query. Each worker will run the same query, but receive a different paruqet file to query. This query filters by the bounding box of California, a temporal condition, and specifies a sort order. We limit the results to the first 2000, which is the upper limit for the current CMR API.

In [16]:
def ddb_worker_func(filename):
    df = duckdb.query(
        (
            f"SELECT GranuleUR FROM read_parquet('{filename}') "
            "WHERE (-124.409202 <= MBREast AND -114.119061 >= MBRWest AND "
            "32.531669 <= MBRNorth AND 41.99954 >= MBRSouth) "
            "ORDER BY GranuleUR LIMIT 2000"
        )
    ).df()
    return df

Provide the workers a list of input files

In [17]:
ddb_inputs = ["/mnt/efs/17m_set1.parquet", "/mnt/efs/17m_set2.parquet"]

## Run Query on Dask Cluster

In [18]:
client = cluster.get_client()


+---------+--------+-----------+---------+
| Package | Client | Scheduler | Workers |
+---------+--------+-----------+---------+
| lz4     | 4.4.3  | 4.3.3     | 4.3.3   |
| toolz   | 1.0.0  | 0.12.0    | 0.12.0  |
+---------+--------+-----------+---------+


In [19]:
%%time
the_future = client.map(ddb_worker_func, ddb_inputs)
results = client.gather(the_future)

CPU times: user 8.86 ms, sys: 596 μs, total: 9.46 ms
Wall time: 1.94 s


## Run Same Query on Notebook Server

For this run, the Jupyter server is the same instance type as the Dask workers: `c5.xlarge`. They have 4 vCPUs and 8 GB of RAM.

Instead of two files, it will read the same data from a single file.

In [20]:
single_17m_file = "~/efs/StartTime_17m.parquet"

In [23]:
%%time
notebook_result = duckdb.query(
    (
        f"SELECT GranuleUR FROM read_parquet('{single_17m_file}') "
        "WHERE (-124.409202 <= MBREast AND -114.119061 >= MBRWest AND "
        "32.531669 <= MBRNorth AND 41.99954 >= MBRSouth) "
        "ORDER BY GranuleUR LIMIT 2000"
    )
).df()

CPU times: user 3.29 s, sys: 400 ms, total: 3.69 s
Wall time: 3.08 s


## Shut Down Cluster

In [24]:
client.shutdown()
client.close()

Terminated dask-eda6680f-worker-b8542521 (i-015d9a6692c3c8a47)
Terminated dask-eda6680f-worker-3802ecce (i-08a9996da6e441243)
Terminated dask-eda6680f-scheduler (i-0d190df1b812ce655)
