# dask.bag

Bag: Parallel Lists for semi-structured data

Dask-bag excels in processing data that can be represented as a sequence of arbitrary inputs. We'll refer to this as "messy" data, because it can contain complex nested structures, missing fields, mixtures of data types, etc.

Messy data is often encountered at the beginning of data processing pipelines when large volumes of raw data are first consumed. The initial set of data might be log files, or data stored in JSON, CSV, XML, or any other format that does not enforce strict structure and datatypes. For this reason, the initial data massaging and processing is often done with Python lists, dicts, and sets.

These core data structures are optimized for general-purpose storage and processing. Adding streaming computation with iterators/generator expressions or libraries like itertools or toolz let us process large volumes in a small space. If we combine this with parallel processing then we can churn through a fair amount of data.

Dask Bag implements operations like `map`, `filter`, `groupby` and aggregations on collections of Python objects. It does this in parallel and in small memory using Python iterators.

Full API documentation is available here: http://docs.dask.org/en/latest/bag-api.html

## An aside about dirty, unstructured data from web/REST APIs

The term `REST API` is used a lot to mean a number of things. REST means [Representational State Transfer](https://en.wikipedia.org/wiki/Representational_state_transfer). Most people take REST to mean "a web host that gives me data in JSON format" (and JSON means [Javascript Object Notation](https://en.wikipedia.org/wiki/JSON)). This technically isn't accurate, but you will often hear people use the terms `REST API` and `web API` interchangably.

As an example, the Compute Canada docunentation has a web API for doing searches and fetching answers in a machine readable format. For example, visit this page:

<https://docs.computecanada.ca/mediawiki/api.php?action=query&list=search&srsearch=Python&format=json>

Python has a library for fetching and parsing data from web APIs called `Requests`. Below is an example of using requests to fetch this same data from the Compute Canada documentation:

In [None]:
import requests

r = requests.get('https://docs.computecanada.ca/mediawiki/api.php?action=query&list=search&srsearch=Python&format=json'
)

# We should always check the response status code coming from the server ... 200 is the one we want from a GET request
r.status_code

In [None]:
# Decode the JSON response from the server into something Python understands ...
data = r.json()
data

In [None]:
# The output is a python dict
type(data)

In [None]:
# We can check out the keys of the dict
data.keys()

In [None]:
# But better yet, we can explore the data to narrow down on the information we want
data['query']['search'][0]['title']

In [None]:
data['query']['search'][0]['snippet']

## Start Dask Client for Dashboard

Starting the Dask Client is optional.  It will provide a dashboard which 
is useful to gain insight on the computation.  

The link to the dashboard will become visible when you create the client below.  We recommend having it open on one side of your screen while using your notebook on the other side.  This can take some effort to arrange your windows, but seeing them both at the same is very useful when learning.

In [None]:
# NOTE!!! Colab, don't do this

from dask.distributed import Client, progress
client = Client(n_workers=4, threads_per_worker=1)
client

## Create Random Data

We create a random set of record data and store it to disk as many JSON files.  This will serve as our data for this notebook.

In [None]:
# We might need this, uncomment out as needed ...
# !pip install fsspec
# !conda install fsspec

# Note, Colab needs this:
import sys
if 'google.colab' in sys.modules:
    !pip install mimesis

In [None]:
import dask
import json
import os

os.makedirs('data', exist_ok=True)              # Create data/ directory

b = dask.datasets.make_people()                 # Make records of people
b.map(json.dumps).to_textfiles('data/*.json')   # Encode as JSON, write to disk

## Read JSON data

Now that we have some JSON data in a file lets take a look at it with Dask Bag and Python JSON module.

In [None]:
!head -n 2 data/0.json

In [None]:
import dask.bag as db
import json

b = db.read_text('data/*.json').map(json.loads)
b

In [None]:
b.take(2)

## Map, Filter, Aggregate

We can process this data by filtering out only certain records of interest, mapping functions over it to process our data, and aggregating those results to a total value.

In [None]:
b.filter(lambda record: record['age'] > 30).take(2)  # Select only people over 30

In [None]:
b.map(lambda record: record['occupation']).take(2)  # Select the occupation field

In [None]:
b.count().compute()  # Count total number of records

## Chain computations

It is common to do many of these steps in one pipeline, only calling `compute` or `take` at the end.

In [None]:
result = (b.filter(lambda record: record['age'] > 30)
           .map(lambda record: record['occupation'])
           .frequencies(sort=True)
           .topk(10, key=1))
result

As with all lazy Dask collections, we need to call `compute` to actually evaluate our result.  The `take` method used in earlier examples is also like `compute` and will also trigger computation.

In [None]:
result.compute()

## Transform and Store

Sometimes we want to compute aggregations as above, but sometimes we want to store results to disk for future analyses.  For that we can use methods like `to_textfiles` and `json.dumps`, or we can convert to Dask Dataframes and use their storage systems, which we'll see more of in the next section.

In [None]:
(b.filter(lambda record: record['age'] > 30)  # Select records of interest
  .map(json.dumps)                            # Convert Python objects to text
  .to_textfiles('data/processed.*.json'))     # Write to local disk

We can use standard UNIX commands to look at some of the files created:

In [None]:
!ls -l data/*.json

In [None]:
!head data/processed.7.json

## Convert to Dask Dataframes

Dask Bags are good for reading in initial data, doing a bit of pre-processing, and then handing off to some other more efficient form like Dask Dataframes.  Dask Dataframes use Pandas internally, and so can be much faster on numeric data and also have more complex algorithms.  

However, Dask Dataframes also expect data that is organized as flat columns.  It does not support nested JSON data very well (Bag is better for this).

Here we make a function to flatten down our nested data structure, map that across our records, and then convert that to a Dask Dataframe.

In [None]:
b.take(1)

In [None]:
def flatten(record):
    return {
        'age': record['age'],
        'occupation': record['occupation'],
        'telephone': record['telephone'],
        'credit-card-number': record['credit-card']['number'],
        'credit-card-expiration': record['credit-card']['expiration-date'],
        'name': ' '.join(record['name']),
        'street-address': record['address']['address'],
        'city': record['address']['city']   
    }

b.map(flatten).take(1)

In [None]:
df = b.map(flatten).to_dataframe()
df.head()

We can now perform the same computation as before, but now using Pandas and Dask dataframe.

In [None]:
df[df.age > 30].occupation.value_counts().nlargest(10).compute()

## Learn More

You may be interested in the following links:

-  [Dask Bag Documentation](http://docs.dask.org/en/latest/bag-overview.html)
-  [API Documentation](http://docs.dask.org/en/latest/bag-api.html)

[On to the next (optional) notebook (HPC Clusters)](07-hpc-clusters.ipynb) ...