# Dask Data Structures

Dask provides several Pythonic data structures designed to handle and manipulate data that exceeds our local memory capacity:

- `dask.bag`: A distributed generic Python list of objects, analogous to a PySpark RDD.
- `dask.array`: Distributed NumPy arrays.
- `dask.dataframe`: Distributed pandas dataframes, offering functionality similar to pandas but capable of handling larger-than-memory datasets.

All these high-level data structure APIs are optimized to leverage the Directed Acyclic Graph (DAG) optimization features of the Dask scheduler. Consequently, they rely on lazy computation, allowing for efficient execution of operations on large datasets.


## Start the Dask cluster

In [None]:
from dask.distributed import Client
 
# use the provided master
client = Client('dask-scheduler:8786')
    
# print the status of the client    
client

## Dask Bag

Bags are highly versatile and flexible data structures in Dask.

Dask Bag offers a level of flexibility similar to PySpark's RDDs. They serve as parallelized collections of objects, similar to Python's built-in `list`, capable of holding any Python objects, whether they're custom classes or built-in types. 

This flexibility allows for the storage of complex data structures like raw text or nested JSON data, which can be easily navigated.

Due to this versatility, Dask bags are commonly employed to parallelize simple computations on unstructured or semi-structured data, such as text data, log files, JSON records, or user-defined Python objects. They facilitate MapReduce-like approaches for loading, inspecting, filtering, and processing arbitrary datasets, whether structured or unstructured.

Dask Bag provides operations such as `map`, `filter`, `groupby`, and aggregations on collections of Python objects. It accomplishes these tasks in parallel using Python iterators, resembling a parallel version of `itertools`.

After completing an initial stage of data preparation with Dask Bag, it's often customary to reduce and convert the data into more suitable data structures, such as Dask Arrays or Dask DataFrames, which will be covered in subsequent sections.

### Create and Take from a Bag

We can create a `Bag` from various data sources, including Python sequences, files, and cloud storage services like Amazon AWS S3, among others. 
For a comprehensive overview on accessing remote data from Distributed File Systems, S3, and other sources, please refer to the official documentation [here](https://docs.dask.org/en/stable/how-to/connect-to-remote-data.html).

Additionally, we can create a Bag from a function declared as `delayed`. 
This approach allows us to generate data within a distributed application and subsequently access it using the Bag API before computing a result.

The data within the Bag is partitioned into blocks, typically with multiple items per block. The number of partitions (`npartitions`) depends on factors such as the dataset size, cluster resources, and our specified partitioning strategy.


Let's start by creating a simple data `bag` from a Python list.

In Python, we can easily create data from a list. 
Since Python is a dynamically typed language, the list can contain a variety of data types such as integers, strings, or even objects.

For example, we can create a simple array of integers:

In [None]:
import dask.bag as db

# create a Dask Bag from a Python sequence
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

# create a Dask Bag from the sequence with 4 partitions
b = db.from_sequence(data, npartitions=4)

As previously mentioned, Dask data structures embody the lazy programming paradigm.
The data is thus not yet stored on the cluster, as we have not performed an operation such as `compute`.

In general, we don't want to retrieve the entire data stored on the cluster, but we might want to inspect a few elements.
We can do that with the `take(n_elements)` method.
The returned data will be a tuple containing the first `n_elements` of the Bag.

In [None]:
# retrieve the first 3 elements of the Bag
b.take(3)

Data from text files can be extracted in Dask by providing either a list of all files or using the `*` wildcard.

By default, the resulting Dask Bag will have one item per line and one file per partition. 
It's important to be mindful of partitioning when working with large datasets to avoid inefficient processing.

Dask provides automatic handling of standard compression libraries (e.g., gzip, bz2, xz) when reading text files. The compression type can be inferred from the file name extension or explicitly specified using the `compression='gzip'` keyword argument.

For example, to load a number of compressed files from a local folder into a Dask Bag:


In [None]:
# list the files provided
! ls datasets/accounts_json/. 

In [None]:
import os

# read text files from the specified directory and use gzip compression
b = db.read_text(os.path.join('datasets',
                              'accounts_json',
                              'accounts.*.json.gz'),
                 files_per_partition=4)

# take the first element from the bag
example = b.take(1)

# print the type of the example variable
print(type(example))

# print the example variable
print(example)

`Bag` objects in Dask support standard functional APIs, such as `map`, `filter`, `groupby`, and more. These operations create new bags, allowing for chaining multiple operations together to manipulate the data until the desired result is obtained.

To execute the computations on a `Bag` object and obtain the final result, we can use the `.compute()` method, similar to how we handle `delayed` objects.

Since a `Bag` is inherently a delayed object, there's no need to explicitly specify that the functions applied to the dataset are further delayed. Dask handles the delayed execution transparently, allowing for efficient processing of large datasets.

In [None]:
# a silly function to check if a number is even
def is_even(n):
    return n % 2 == 0

# create a bag from a sequence of numbers
b = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

# 1. filter the bag elements to retain only the even ones
# 2. then, map a lambda function to square each even number
c = b.filter(is_even).map(lambda x: x ** 2)

# print the resulting bag
c

In [None]:
# visualize the computational graph
c.visualize()

In [None]:
# compute the results
c.compute()

One of the crucial parameters when optimizing a computing task on a cluster is ensuring the effective utilization of available computing resources. This involves rewriting the computational task to be distributed efficiently and optimizing the number of partitions used to store data and perform map-like data transformations.

### Considerations for Partitioning

Consider the following ("extreme") scenarios:
- Having only one partition with 100 available CPUs
- Having 10,000 tiny partitions with 3 available CPUs

#### Inefficient Single Partition
Having only one partition with a large number of available CPUs would be inefficient as there would be no parallelization at all.

#### Excessive Tiny Partitions
Conversely, having an excessive number of tiny partitions with few available CPUs each would likely be inefficient due to the overhead of starting and stopping computation on each individual tiny partition.

#### Optimal Partitioning Strategy
The optimal number of partitions depends on factors such as the amount of shuffling required, especially for operations like `groupby`. For instance, having a moderate number of partitions with a balanced distribution of available CPUs might be more efficient than having a smaller or larger number of partitions. However, the optimal choice will ultimately depend on your specific workload and cluster characteristics.

#### Experimentation and Fine-Tuning
Determining the optimal partitioning strategy is a task that requires experimentation and educated guesses based on the available processing units in your cluster. There is no one-size-fits-all answer to this question. You'll need to experiment and fine-tune your partitioning strategy based on your specific workload, computational requirements, and cluster characteristics.

In [None]:
# create a new bag with the same data as in the previous example
# but a different number of partitions
b = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 
                     npartitions=3)

# 1. filter the bag elements to retain only the even ones
# 2. then, map a lambda function to square each even number
c = b.filter(is_even).map(lambda x: x ** 2)

# visualize the computation graph
c.visualize()

In [None]:
# compute the results
c.compute()

## Exercise 1 - Open and Preprocess JSON Data

We'll start with a dummy dataset of gzipped JSON data located in your data directory. This dataset simulates data that you might collect from a document store database (e.g., MongoDB) or by scraping a website using a dedicated API.

Each line of each document is a JSON-encoded dictionary with the following keys:

- `id`: Unique identifier of the customer.
- `name`: Name of the customer.
- `transactions`: A list of key-value pairs in the form of `transaction-id` and `amount` pairs, representing each transaction made by the customer in that file.

1. **Create a Bag reading out the dataset from the text files**
2. **Map the `json.loads` function on each message to convert the records in the form of Python dictionaries**

In [None]:
# 1. create a Dask Bag from the files

# load up the files from dask directly in a bag

# take the first 3 elements from the Bag

# print the examples

# print the type of examples and the type of its first element


In [None]:
import json

# 2. read the data from the JSON format

# map each line to a JSON object using json.loads

# take the first 3 elements from the Bag

# pretty-print the examples

# print the type of examples and the type of its first element


Once the JSON data is mapped into the appropriate Python objects (dictionaries, lists, etc.) within a Dask Bag, we can perform specific operations by creating small Python functions to run on our data.

The most fundamental operations we can perform on a Dask Bag include:

- `map`: Apply a function to each element.
- `filter`: Retain only the elements that satisfy a given function.
- `pluck`: Select a specific nested field, such as `element[field]` from a Python dictionary.
- `flatten`: Unfold the dictionary into a list-like object.

These operations serve as building blocks for manipulating and transforming the data within a Dask Bag, providing flexibility in data processing pipelines.


#### 1. Compute the average number of transactions for each entry of a user named "Alice"

To achieve this task using Dask Bag operations, we'll follow these steps:

1. Filter the dataset to retain only entries for users named "Alice".
2. Map each entry to extract the number of transactions.
3. Compute the average number of transactions for each entry.


In [None]:
# retain only the records from users named "Alice"
db_js.

In [None]:
# retain only the records from users named "Alice"
# AND count the total number of transactions for each entry in the dataset 

# function reformatting each record with the new information
def count_transactions(d):
    return 

db_js.


In [None]:
# retain only the records from users named "Alice"
# AND count the total number of transactions (as 'count') for each entry in the dataset 
# AND return only the 'count' values
db_js.

In [None]:
# retain only the records from users named "Alice"
# AND count the total number of transactions (as 'count') for each entry in the dataset 
# AND return only the 'count' values
# AND compute the average of the counts
db_js.

In [None]:
# visualize the graph of the tasks composing the job
db_js.

#### 2. Compute the average amount of transactions users named "Alice"

To compute the average amount for all transactions made by users named "Alice" using Dask Bag operations, we'll follow these steps:

1. Filter and pluck the dataset to retain only entries for users named "Alice".
2. Flatten the list of transaction amounts.
3. Compute the average of all transaction amounts.


In [None]:
# retain only the records from users named "Alice"
db_js.filter(lambda record: record['name'] == 'Alice')\
     .pluck('transactions')\
     .take(3)

In [None]:
# retain only the records from users named "Alice"
# AND flatten and pluck to return only the "amount" in a bag
db_js.filter(lambda record: record['name'] == 'Alice')\
     .pluck('transactions')\
     .flatten()\
     .pluck('amount')\
     .take(3)

In [None]:
# retain only the relevant transactions
# AND flatten and pluck to return only the "amount" in a bag
# AND compute the average of all transactions amounts
db_js.filter(lambda record: record['name'] == 'Alice')\
     .pluck('transactions')\
     .flatten()\
     .pluck('amount')\
     .mean()\
     .compute()

In [None]:
# visualize the graph of the tasks composing the job
db_js.filter(lambda record: record['name'] == 'Alice')\
     .pluck('transactions')\
     .flatten()\
     .pluck('amount')\
     .mean()\
     .visualize()

### Additional operations on Dask Bags: Groupby and Foldby

Additional standard operations on Dask Bags can be performed using the `groupby` and `foldby` methods, together with data aggregation functions.

#### Groupby
The `groupby` method shuffles the data so that all items with the same key are grouped together in the same key-value pair. This operation is useful for tasks that involve grouping data by a specific key.

#### Foldby
The `foldby` method walks through the data and accumulates a result per key. It combines the functionality of `groupby` and a reduction operation, making it suitable for efficient parallel split-apply-combine tasks. While more complex to use, `foldby` significantly reduces computational time compared to `groupby`, especially for tasks involving heavy data shuffling.

#### Considerations
It's important to note that operations involving heavy data shuffling, such as `groupby`, can be computationally expensive as they require moving data across workers. In such cases, using the `foldby` method provides a more efficient alternative.

Consider using the `foldby` method whenever possible to optimize performance and minimize data shuffling overhead, especially in large-scale distributed computing tasks.

In [None]:
names_data = ['Alice', 'Bob', 'Charlie', 'Dan', 'Edith', 'Frank']

# create a bag from the list of names
b = db.from_sequence(names_data)

# group names by length
res = b.groupby(len) 

# visualize this "simple" graph before computing the results
res.visualize()

In [None]:
# compute the results
res.compute()

When the result of a `groupby` operation in Dask is a tuple and we need to apply functions to the elements of these tuples, we can use the `starmap` function.

The `starmap` function in Dask allows us to apply a function using argument tuples, similar to what the standard `itertools.starmap` does in Python.

For example:

In [None]:
# create a simple bag from a list of integers
b = db.from_sequence(list(range(10)))

# group numbers into even/odd groups
b.groupby(lambda x: x % 2).compute()

In [None]:
# return the max value for all elements in each group
b.groupby(lambda x: x % 2)\
 .starmap(lambda k, v: (k, max(v)))\
 .compute()

In [None]:
# return the sum of the elements in each group
b.groupby(lambda x: x % 2)\
 .starmap(lambda k, v: (k, sum(v)))\
 .compute()

In [None]:
# visualize the graph of this latest "extremely simple" computation
res = b.groupby(lambda x: x % 2)\
       .starmap(lambda k, v: (k, sum(v)))

res.visualize()

### Understanding `foldby` in Dask

`foldby` in Dask can initially seem peculiar, but it shares similarities with functions in other libraries:

- [`toolz.reduceby`](http://toolz.readthedocs.io/en/latest/streaming-analytics.html#streaming-split-apply-combine)
- [`pyspark.RDD.combineByKey`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.combineByKey.html)

When using `foldby`, you need to provide:

1. A **key function to group elements** (similar to `groupby`).
2. Either:
    - A **binary operator** (function that takes 2 elements and returns 1 of the same type) that performs **reduction within each group**.
    - Or a **combine binary operator** that can **combine the results of two `reduce` calls on different parts of your dataset**.


In Dask, a `foldby` call like this:

```python
dask_bag.foldby(key, binop, init)
```
is equivalent to a combination of two operations: `groupby` and `reduce`:

```python
def reduction(group):
    return reduce(binop, group, init)

dask_bag.groupby(key).map(lambda (k, v): (k, reduction(v)))
```

The reduction operation must be associative and is executed in parallel within each partition of the dataset. The intermediate results are then combined using the `combine` binary operator.


Let's re-write the equivalent group-by + starmap operation with a foldby call

```python
b.groupby(lambda x: x % 2)\
 .starmap(lambda k, v: (k, sum(v)))\
 .compute()
```

In [None]:
# create a simple bag from a list of integers
b = db.from_sequence(list(range(10)))

In [None]:
# groupby even/odd numbers with a foldby and find the total sum per group
#
#   write down a **binary filter function** to select only even or odd numbers
#   write down a **reduce-like operation** to sum all elements
is_even = lambda x: x % 2 == 0 
add     = lambda x, y: x + y
b.foldby(key=is_even, 
         binop=add, 
         initial=0).compute()

In [None]:
# have a look at the graph and compare it 
# with the groupby implementation
# 
# the `split_every` option instructs foldby to group 
# partitions into groups of this size while performing the reduction.
# (`split_every` defaults to 8)
b.foldby(is_even, 
         add, 
         initial=0, 
         split_every=8).visualize()

## Exercise 2 - Account Data

In this exercise, we'll work with account data to get the total number of users with the same name from the dataset. We'll compare the computational time of achieving this task using both the `groupby` and `foldby` functions in Dask.

- Take a moment to look at the `foldby` API documentation [here](https://docs.dask.org/en/latest/generated/dask.bag.Bag.foldby.html#dask.bag.Bag.foldby).

1. **Using Groupby Function:**
   - Utilize the `groupby` function to group users by their name and count the number of users with the same name.
   - Measure the computational time required for this operation.

2. **Using Foldby Function:**
   - Explore the `foldby` function in the Dask Bag API to achieve the same task.
   - Measure the computational time required for this operation.

In [None]:
%%time
# groupby on the 'name' key 
# count the number of items in each group
result_groupby = db_js.

Let's inspect what `groupby` is doing in Dask...

In [None]:
sorted(result_groupby)

What about `foldby`?

In [None]:
%%time
# foldby on 'name' key 
# increment by one each time we see an element (binop function)
# use a final combination function with add (combine function)
result_foldby = db_js.

In [None]:
sorted(result_foldby)

## Exercise 3 - Computing Total Transfers Amount per Name

In this exercise, we'll work with account data to compute the total transfers amount per each name using a foldby operation in Dask.

1. **Create a Function for Summing Amounts:**
   - Create a function that takes an input dictionary containing the name and transactions, and produces the sum of the amounts.
   - For example:
     ```python
     {'name': 'Alice', 'transactions': [{'amount': 1, 'id': 123}, {'amount': 2, 'id': 456}]}
     ```
     The sum of the amounts for this input would be 3.

2. **Modify Binary Operator for Foldby:**
   - Modify the binary operator of the `foldby` examples to accumulate the sum of the transferred amounts instead of counting the number of entries.


In [None]:
# add the sum_transactions values together
def add_values(tot, _):
    return 

# compute the sum of transaction amounts per name
def sum_amount(d):
    
    return 

# apply sum_amount function to each item in db_js
# perform foldby operation on 'name' key to accumulate the sum of transactions for each name
result_foldby = db_js.

In [None]:
result_foldby.visualize()

In [None]:
%%time
result_foldby = result_foldby.compute()

In [None]:
sorted(result_foldby)

## From Bag to pre-processed output datasets

Dask Bags are often used as an entry point to ingest, decode, and preprocess data before further processing or analysis. Once the data has been processed and transformed, it's desirable to convert the Dask Bag into a structured data format, such as a Dask DataFrame, for easier manipulation and analysis.

Dask provides several methods to convert Bags into output data objects, including:

- `to_textfiles`: Writes the Bag data to multiple text files.
- `to_avro`: Writes the Bag data to Avro files.
- `to_delayed`: Converts the Bag into a list of delayed objects.

By far, the most widely used approach in data preprocessing using Dask Bags follows the Extract-Transform-Load (ETL) process:

1. **Extract**: Raw data is extracted from the original input source.
2. **Transform**: The extracted data is transformed by applying functions to filter, reduce, or create features from the original (often messy) dataset.
3. **Load**: The transformed and cleansed dataset is loaded into a database or further data processing pipeline based on structured data.


To convert a Dask Bag into a Dask DataFrame, the dataset needs to be flattened and normalized before invoking the `to_dataframe` function on the Bag. This flattening and normalization process ensures that the data is in a structured format that can be easily loaded into a DataFrame.

This conversion process is similar to converting from RDD to a Spark DataFrame and is a common operation in data processing workflows.


## Flattening Deeply Nested Account Data

As a purely illustrative example, our account data is deeply nested and not suitable for being transformed into a table-like DataFrame structure. 

We may want to retain only the first transaction per customer or retain other features such as the max-amount transfer or an aggregated quantity per user.

To achieve this, we can flatten the dataset by mapping a dedicated function. This function will extract the desired features from the nested structure and create a flattened representation suitable for further processing or conversion into a Dask DataFrame.

In [None]:
# print one element of the json bag
pprint( db_js.take(1) )

In [None]:
# function to flatten the nested structure of the record
# and extract specific fields
def dummy_flatten(record):
    return {
        'id': record['id'],
        'name': record['name'],
        'first_transaction_id': record['transactions'][0]['transaction-id'],
        'first_transaction_amount': record['transactions'][0]['amount']
    }

# apply the dummy_flatten function to each record in db_js
# and take the first element from the resulting Bag
pprint(db_js.map(dummy_flatten).take(1))

To create a Dask distributed DataFrame, we can use the `to_dataframe` method on the flattened Dask Bag.

The Dask DataFrame is still a Dask distributed object, i.e.:

- **Partitioned Across Workers**: Like other Dask data structures, Dask DataFrames are partitioned across the workers in the cluster. This allows for parallel processing of data.
- **Lazy Operations**: Operations performed on Dask DataFrames are lazy, meaning that they are not executed immediately. Instead, a task graph is created, and computations are deferred until the result is explicitly needed.

In [None]:
# create a Dask DataFrame from the flattened Bag
dd = db_js.map(dummy_flatten).to_dataframe()

Unlike a Pandas DataFrame, printing a Dask DataFrame won't show its records but rather its structure and the number of partitions it has been divided into.

- **Structure Display**: When you print a Dask DataFrame, you'll see its structure, including column names, data types, and the number of partitions.
- **Absence of Records**: The actual data records are not shown in their entirety. Displaying all records would be equivalent to collecting all data in the scheduler and passing it to the client, which is similar to the `collect` operation. This could lead to performance issues with large datasets.

In [None]:
# show the DataFrame
dd

In Dask DataFrames, issuing the `head` or `tail` methods triggers computation and retrieves results from the cluster. 

This behavior is different from the printing of the DataFrame structure, which only displays metadata without fetching the actual data.

- **Computation Trigger**: When you call the `head` or `tail` methods on a Dask DataFrame, Dask will compute a small portion of the DataFrame to display. This computation is performed on the cluster, and the results are then returned to the client for display.
- **Performance Consideration**: It's important to consider the performance implications of calling `head` or `tail`, especially for large datasets. Triggering computation for a large portion of the DataFrame can lead to significant overhead, as it involves transferring data between workers and the client.

In [None]:
# show the first 10 elements of the dask dataframe
dd.head(10)

## Stop client

In [None]:
client.close()

Finally, use `docker compose down` to stop and clear all running containers.