# dask-awkward Proof-of-Concept: October 2021

**_This notebook is using a very alpha version of dask-awkward; this is not a stable notebook! The features shown are also not fully tested._**

In [None]:
import awkward as ak
import dask_awkward as dak
import dask_awkward.data

## I/O

So far we've implemented two very simple readers: JSON and Parquet. The JSON reader partitions the data such that each file or string in a list of sources will be a chunk/partition of the complete `Array` collection. The parquet reader can partition via a user provided `row_groups` argument or by using a list of files. For this small demo we'll use the JSON reader.

We've created a simple `dask_awkward.data` module to provide some simple JSON data for testing/demoing:

In [None]:
data = dask_awkward.data.json_data("numbers")
len(data)  # 3 JSON formatted strings

In [None]:
# This function is designed to be the dask version of ak.from_json
daa = dak.from_json(data)

The `daa` variable will be our Dask awkward array.

In [None]:
# provides a Array collection
daa

In [None]:
daa.visualize()

In [None]:
# Sanity check; let's compare to the concrete awkward array version
a0 = ak.from_json(data[0])
a1 = ak.from_json(data[1])
a2 = ak.from_json(data[2])

In [None]:
ak.all(ak.concatenate([a0, a1, a2]) == daa.compute())

The `caa` variable will be our concrete awkward array

In [None]:
caa = ak.concatenate([a0, a1, a2])

In [None]:
ak.min(caa)

In [None]:
dak.min(daa)

We currently have very minimal metadata support; in the case where `axis=None` is passed to `dak.min`, we know the result will be a scalar. We can see the constructed task graph:

In [None]:
dak.min(daa).visualize()

In [None]:
dak.min(daa).compute()

Now with an axis argument (fewer steps in the graph!):

In [None]:
dak.min(daa, axis=1).visualize()

In [None]:
dak.min(daa, axis=1).compute()

In [None]:
ak.all(ak.min(caa, axis=1) == dak.min(daa, axis=1).compute())

Now we'll look at some data with labels; the data is structured such that each element has an `analysis` record; where inside we have `x{1,2}`, `y{1,2}`, `z{1,2}`, `t{1,2}` records with random different lengths. We'll use this to showcase the attribute access functionality.

In [None]:
daa = dak.from_json(dask_awkward.data.json_data(kind="records"))

In [None]:
daa

In [None]:
daa.compute().to_list()

Let's define a function that squares the input:

In [None]:
def sq(x):
    return x * x

Now square all `x1`'s and all `y1`'s via the collection's `map_partitions` method.

In [None]:
x1sq = daa.analysis.x1.map_partitions(sq)
y1sq = daa.analysis.y1.map_partitions(sq)

Now the module level & more general `map_partitions` function can be used with multiple collections, let's map `np.add` to add all of the squares, then get the max of the result.

In [None]:
import numpy as np

calc = dak.map_partitions(np.add, x1sq, y1sq)
res = dak.max(calc)

In [None]:
res.visualize()

From the task graph visualization we can see all of the steps:
- reading the JSON
- `getitem` for the `.` access (`.analysis.x1` and `.analysis.y1`)
- Our `sq` function
- `add` from the last `map_partitions` call
- finally, the `max` call - yielding multiple nodes in the graph (calculating max on the partitions followed up with a naïve reduction).

In [None]:
res.compute()

Sanity check (using the simplest form provided by the complete awkward API which we will eventually have in dask-awkward!):

In [None]:
caa = daa.compute()
ak.max(caa.analysis.x1 ** 2 + caa.analysis.y1 ** 2)