## Data management on the cloud and dask arrays

The way this works: data is stored in one place (our GCS bucket) and we pull it down from there and push it back there. 

As much as possible, avoid creating data locally on the machine that is running your notebook. This will help us: 

1. Collaborate more readily -- everyone's code points to one place.
2. Work more rapidly -- worker nodes on the cluster get their own data.
3. Avoid losing data -- GCS is solid, but nodes are kinda ephemeral.

In the following code, we're going to read and write data into GCS using `h5py`, which is a library for reading and writing hdf5 files. The `gcsfs` library lets us interact with GCS as though it was the file-system of the machine that we are working  on (and later on as though it was mounted on all nodes of our cluster).

In [None]:
import h5py
import gcsfs

We create a file system object based on our project name on GCP:

In [None]:
fs = gcsfs.GCSFileSystem(project='learning-2-learn-221016')

We can do different file-sytem operations with this object. For example, we can get a listing of the objects inside of our bucket. Our project is the top-level or root of the file-system:

In [None]:
fs.ls('/')

And we can drill down further into the file-system:

In [None]:
fs.ls('/learning2learn')

In [None]:
fs.ls('/learning2learn/tutorial')

For the tutorial's sake, we have an example of some data stored in our bucket in an hdf5 file. 

As long as the file is open, we can operate on the data within it. For this purpose, we will create a context manager within which the file is open:

In [None]:
with fs.open('learning2learn/tutorial/random.hdf5', 'rb') as f:
    f = h5py.File(f,'r')
    dset = f['/x']

Once outside of the context manager, we can do certain operations with the data:

In [None]:
len(dset)

But not others:

In [None]:
dset[1:10]

For data this size, it's reasonable to operate in blocks:

In [None]:
10000000 == 10e6

In [None]:
%%time
sums = []
lengths = []

with fs.open('learning2learn/tutorial/random.hdf5', 'rb') as f:
    f = h5py.File(f,'r')
    dset = f['/x']
    # Compute sum of large, O(10e8)-element array, 10e6 numbers at a time
    for i in range(0, int(10e8), int(10e6)):
        chunk = dset[i: i + int(10e6)]  # pull out numpy array
        sums.append(chunk.sum())
        lengths.append(len(chunk))

mean = sum(sums) / sum(lengths)
print(mean)

But that's pretty slow... 

You can probably already see for yourself how we might use the delayed interface to write our own parallelized version of this. In addition to the tools you've already seen, dask provides a specialized API for representation and computation with arrays, which we will introduce here.

Let's start by firing up a cluster:

In [None]:
from dask.distributed import Client, progress
from dask_kubernetes import KubeCluster

In [None]:
cluster = KubeCluster(n_workers=20)
cluster

In [None]:
client = Client(cluster)
client

We'll need the dask `delayed` function and we'll need the dask `array` module

In [None]:
from dask import delayed
import dask.array as da

Because our workers are distributed, we need to take a radically different approach here. Each node in our cluster will have to open the file on its own  and access its own part of the data. We do that by first writing a function that knows how to read the data and extract the needed part of it:

In [None]:
def read_chunk(i):
    with fs.open('learning2learn/tutorial/random.hdf5', 'rb') as f:
        f = h5py.File(f, 'r')
        dset = f['/x'] 
        chunk = dset[i: i + int(10e6)]
    return chunk

Then, we create delayed arrays. Each element in this list is just the instructions for creating an array from a delayed function. The arrays have not been materialized yet. 

In [None]:
chunks = []

for i in range(0, int(10e8), int(10e6)):
    this_chunk = delayed(read_chunk)(i)
    chunks.append(da.from_delayed(this_chunk, 
                                  shape=(int(10e6), ), 
                                  dtype=chunk.dtype))


We concatenate all of these arrays together to form one large array:

In [None]:
x = da.concatenate(chunks)

Question: to calculate the mean, do you need to ever have all of the data in one node's memory? Why is that?

In [None]:
m = x.mean()

In [None]:
%%time
the_mean = m.compute()

Our setup allows us to read data directly from our bucket, without needing to further authenticate. To be able to upload data into our bucket, we'll need to authenticate. The method for authentication is `'browser'`, which means that you'll get a link to use for authentication and then you can 

    fs = gcsfs.GCSFileSystem(project='learning-2-learn-221016', token='browser')

Once you've entered the token, the following code should work: 

    fs.put('path/to/local/file', '/learning2learn/path/to/remote/file')