# Snowflake + Dask

How to load data from a Snowflake table or query into a Dask dataframe

## Connect to Snowflake

See [README](README.md) for more details on how to set up the credentials file.

In [None]:
import yaml
import snowflake.connector

creds = yaml.full_load(open('/home/jovyan/snowflake_creds.yml'))

# get connection info
conn_info = {
    'warehouse': 'COMPUTE_WH',
    'database': 'NYC_TAXI',
    'schema': 'PUBLIC',
    **creds,
}
conn = snowflake.connector.connect(**conn_info)

## Setup query

We need to set up a query that will return chunks of the full result based on a column in the data. These become our partitions in a Dask dataframe. We use a [binding for the Snowflake query](https://docs.snowflake.com/en/user-guide/python-connector-example.html#binding-data) so that we can pass different values at execution time.

In [None]:
query = """
SELECT *
FROM taxi_yellow
WHERE
    date_trunc('MONTH', tpep_pickup_datetime) = '2020-01-01'
    AND day(tpep_pickup_datetime) = %s
"""

Validate the query is good with pandas

In [None]:
cur = conn.cursor().execute(query, 1)
df = cur.fetch_pandas_all()
len(df), df.memory_usage().sum() / 1e6  # memory size in MB

## Initialize Dask cluster

In [None]:
from dask.distributed import Client, wait
from dask_saturn import SaturnCluster
import time

n_workers = 3
cluster = SaturnCluster(n_workers=n_workers, scheduler_size='medium', worker_size='large', nthreads=2)
client = Client(cluster)
cluster

If you initialized your cluster here in this notebook, it might take a few minutes for all your nodes to become available. You can run the chunk below to block until all nodes are ready

> **Pro tip:** Create and/or start your cluster from the "Dask" page in Saturn if you want to get a head start!

In [None]:
while len(client.scheduler_info()['workers']) < n_workers:
    print('Waiting for workers, got', len(client.scheduler_info()['workers']))
    time.sleep(30)
print('Done!')

## Load larger data with Dask!

We set up a function with `dask.delayed`. `@delayed` is a decorator that turns a Python function into a function suitable for running on the Dask cluster. When you execute a delayed function, instead of executing the operation, it returns a delayed result that represents what the return value of the function will be. `dask.dataframe.from_delayed` takes a list of these delayed objects, and concatenates them into a Dask dataframe.

In [None]:
from dask import delayed
import dask.dataframe as dd

In [None]:
@delayed
def load(conn_info, query, day):
    conn = snowflake.connector.connect(**conn_info)
    cur = conn.cursor().execute(query, day)
    return cur.fetch_pandas_all()

Notice that now `ddf` has 3 partitions, corresponding to the 3 days we chose to load

In [None]:
ddf = dd.from_delayed([load(conn_info, query, day) for day in range(1, 32)])
ddf

<br>

`repartition()` to introduce more parallelism

In [None]:
ddf = ddf.repartition(100)
ddf

<br>
The cell below will execute the Snowflake queries across the cluster and compute the length and size of each partition

In [None]:
len(ddf), ddf.memory_usage().sum().compute() / 1e6

The partitions in the Dask dataframe are pandas dataframes

In [None]:
ddf_part = ddf.partitions[0].compute()
type(ddf_part)

If we are performing a lot of operations using this Dask dataframe (such as training a machine learning model), and the data will fit in the memory of the _cluster_, we should `persist()` the dataframe to perform all the loading up-front.

In [None]:
from dask.distributed import wait

ddf = ddf.persist()
_ = wait(ddf)