# Distributed SQL queries with Dask-SQL

In this guide, you'll use [Dask-SQL](https://dask-sql.readthedocs.io/en/latest/), an open-source library which adds a SQL query layer on top of Dask. This allows you to query and transform Dask DataFrames using common SQL operations. You can download {download}`this jupyter notebook <dask-sql.ipynb>` to follow along in your own JupyterLab session.

## Before you start

You'll first need to create consistent local and remote software environments
with `dask`, `coiled`, and the necessary dependencies installed.
If you are unfamiliar with creating software environments, you can first
follow the [tutorial on setting up a custom software environment](https://docs.coiled.io/user_guide/tutorials/matching_coiled_senvs.html)

First, you will install `dask-sql` and [coiled-runtime](https://docs.coiled.io/user_guide/software_environment.html#coiled-runtime), a Dask meta-package.
Save the following file as `environment.yml`. You can get most up-to-date version of coiled-runtime from the latest
[tag](https://github.com/coiled/coiled-runtime/tags) in the public coiled-runtime repository.

```
channels:
  - conda-forge
dependencies:
  - python=3.9
  - coiled-runtime=<x.x.x>
  - dask-sql=<x.x.x>
  ```

Next, create a local software environment using the `environment.yml` file:

```
conda env create -f environment.yml -n dask-sql-example
conda activate dask-sql-example
```

When you create a cluster, Coiled will automatically replicate your local `dask-sql-example` environment in your cluster (see [package sync](https://docs.coiled.io/user_guide/package_sync.html)).

## Launch your cluster

You'll start by creating a Coiled cluster:

In [None]:
import coiled
    
cluster = coiled.Cluster(
    name="dask-sql-example",
    n_workers=5,
)

and then connecting Dask to your remote Coiled cluster:

In [2]:
from dask.distributed import Client

# connect to the remote cluster
client = Client(cluster)

## Getting started with Dask-SQL

The main interface for interacting with Dask-SQL is the `dask_sql.Context` object. It allows you to register Dask DataFrames as data sources and can convert SQL queries to Dask DataFrame operations.

In [4]:
from dask_sql import Context

c = Context()

You'll use the `dask.datasets.timeseries` dataset of random timeseries data:

In [5]:
import dask

# Load dataset
df = dask.datasets.timeseries(
    "2000", "2005", partition_freq="2w"
)

You can then use the `dask_sql.Context` to assign a table name to this Dask DataFrame, and use that table name for SQL queries:

In [6]:
# Register the Dask DataFrame df as a table
# use persist=True for reducing overhead for multiple queries
c.register_dask_table(df, "timeseries", persist=True)

# Perform a SQL operation on the "timeseries" table
result = c.sql("SELECT count(1) FROM timeseries")

Note that this returned another Dask DataFrame and no computation has been run yet. This is similar to other Dask operations, which are lazily evaluated. We can use `compute` to run the computation on our cluster.

In [7]:
result.compute()

Unnamed: 0,COUNT(1)
0,157248000


You've run your first SQL query with Dask-SQL! Let’s try out some more complex queries.

## More complex SQL examples

With Dask-SQL we can run more complex SQL statements like, for example, a groupby-aggregation:

In [8]:
c.sql('SELECT max(y) as "max", name FROM timeseries GROUP BY name').compute()

Unnamed: 0,max,name
0,1.0,Alice
1,1.0,Bob
2,1.0,Charlie
3,0.999999,Dan
4,1.0,Edith
5,1.0,Frank
6,1.0,George
7,1.0,Hannah
8,1.0,Ingrid
9,0.999999,Jerry


The equivalent operation using the Dask DataFrame API would be `df.groupby("passenger_count").tip_amount.mean().compute()`. We can build up complexity by adding a ``WHERE`` clause to filter for certain values of ``x``:

In [9]:
c.sql("""
    SELECT name, AVG(y) as "average"
    FROM timeseries 
    WHERE x > 0.2
    GROUP BY name
""").compute()

Unnamed: 0,name,average
0,Alice,-0.0004057128
1,Bob,0.0003807399
2,Charlie,0.0001664932
3,Dan,0.0002467381
4,Edith,9.742376e-05
5,Frank,-9.143104e-05
6,George,-0.000214639
7,Hannah,-6.085306e-05
8,Ingrid,2.383526e-05
9,Jerry,-0.0002286118


Once you're done, you can shutdown the cluster (it will shutdown automatically after 20 minutes of inactivity):

In [10]:
cluster.close()
client.close()

## Next Steps

For a more in-depth look at what you can do with Dask-SQL, see the [Operating on Dask DataFrames with SQL](https://examples.dask.org/sql.html) how-to guide. You can also reference the [Dask-SQL docs](https://dask-sql.readthedocs.io/) or [GitHub repo](https://github.com/dask-contrib/dask-sql).