<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg"
     align="right"
     width="30%"
     alt="Dask logo\">
# Dask Bags


Dask Bag implements operations like `map`, `filter`, `groupby` and aggregations on collections of Python objects. It does this in parallel and in small memory using Python iterators. It is similar to a parallel version of itertools or a Pythonic version of the PySpark RDD.

Dask Bags are often used to do simple preprocessing on log files, JSON records, or other user defined Python objects.

### Design
Dask bags coordinate many Python lists or Iterators, each of which forms a partition of a larger collection.

### Common Uses
Dask bags are often used to parallelize simple computations on unstructured or semi-structured data like text data, log files, JSON records, or user defined Python objects.



## Start Dask Client for Dashboard

Starting the Dask Client is optional.  It will provide a dashboard which 
is useful to gain insight on the computation.  

The link to the dashboard will become visible when you create the client below.  We recommend having it open on one side of your screen while using your notebook on the other side.  This can take some effort to arrange your windows, but seeing them both at the same is very useful when learning.

In [1]:
from dask.distributed import Client, progress
client = Client(n_workers=4, threads_per_worker=1)
client

0,1
Client  Scheduler: tcp://127.0.0.1:64251  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 8.50 GB


## Create Random Data

We create a random set of record data and store it to disk as many JSON files.  This will serve as our data for this notebook.

In [5]:
import dask
import json
import os

os.makedirs('Jsondata', exist_ok=True)              # Create data/ directory

b = dask.datasets.make_people()                 # Make records of people
b.map(json.dumps).to_textfiles('Jsondata/*.json')   # Encode as JSON, write to disk

['Jsondata/0.json',
 'Jsondata/1.json',
 'Jsondata/2.json',
 'Jsondata/3.json',
 'Jsondata/4.json',
 'Jsondata/5.json',
 'Jsondata/6.json',
 'Jsondata/7.json',
 'Jsondata/8.json',
 'Jsondata/9.json']

## Read JSON data

Now that we have some JSON data in a file lets take a look at it with Dask Bag and Python JSON module.

In [6]:
import time
import dask.bag as db
import json

%time b = db.read_text('Jsondata/*.json').map(json.loads)
b

Wall time: 5 ms


dask.bag<map-loa..., npartitions=10>

In [7]:
b.take(2)

({'age': 48,
  'name': ['Tad', 'Barlow'],
  'occupation': 'Ironmonger',
  'telephone': '279.519.4496',
  'address': {'address': '1043 Shafter Walk', 'city': 'Atlantic City'},
  'credit-card': {'number': '3780 377319 92901', 'expiration-date': '07/23'}},
 {'age': 21,
  'name': ['Robbie', 'Hoffman'],
  'occupation': 'Advertising Agent',
  'telephone': '+1-(897)-875-8153',
  'address': {'address': '416 Saint Croix Thruway', 'city': 'Douglas'},
  'credit-card': {'number': '3457 605880 18823', 'expiration-date': '11/25'}})

## Map, Filter, Aggregate

We can process this data by filtering out only certain records of interest, mapping functions over it to process our data, and aggregating those results to a total value.

In [8]:
b.filter(lambda record: record['age'] > 30).take(2)  # Select only people over 30

({'age': 48,
  'name': ['Tad', 'Barlow'],
  'occupation': 'Ironmonger',
  'telephone': '279.519.4496',
  'address': {'address': '1043 Shafter Walk', 'city': 'Atlantic City'},
  'credit-card': {'number': '3780 377319 92901', 'expiration-date': '07/23'}},
 {'age': 45,
  'name': ['Marlin', 'Fisher'],
  'occupation': 'Health Service',
  'telephone': '055-952-1222',
  'address': {'address': '1002 Carver Park', 'city': 'Millbrook'},
  'credit-card': {'number': '4921 2361 1927 2548',
   'expiration-date': '03/20'}})

In [11]:
b.map(lambda record: record['occupation']).take(5)  # Select the occupation field

('Ironmonger',
 'Advertising Agent',
 'Applications Programmer',
 'Health Service',
 'Production Hand')

In [12]:
b.count().compute()  # Count total number of records

10000

## Chain computations

It is common to do many of these steps in one pipeline, only calling `compute` or `take` at the end.

In [13]:
result = (b.filter(lambda record: record['age'] > 30)
           .map(lambda record: record['occupation'])
           .frequencies(sort=True)
           .topk(10, key=1))
result

dask.bag<topk-ag..., npartitions=1>

As with all lazy Dask collections, we need to call `compute` to actually evaluate our result.  The `take` method used in earlier examples is also like `compute` and will also trigger computation.

In [14]:
result.compute()

[('Sheriff Clerk', 16),
 ('Inspector', 14),
 ('Planning Engineer', 14),
 ('Treasurer', 14),
 ('Quality Inspector', 14),
 ('Tyre Technician', 13),
 ('Garda', 13),
 ('Leather Worker', 13),
 ('Wholesale Newspaper', 13),
 ('Baker', 13)]

## Transform and Store

Sometimes we want to compute aggregations as above, but sometimes we want to store results to disk for future analyses.  For that we can use methods like `to_textfiles` and `json.dumps`, or we can convert to Dask Dataframes and use their storage systems, which we'll see more of in the next section.

In [15]:
(b.filter(lambda record: record['age'] > 30)  # Select records of interest
  .map(json.dumps)                            # Convert Python objects to text
  .to_textfiles('data/processed.*.json'))     # Write to local disk

['data/processed.0.json',
 'data/processed.1.json',
 'data/processed.2.json',
 'data/processed.3.json',
 'data/processed.4.json',
 'data/processed.5.json',
 'data/processed.6.json',
 'data/processed.7.json',
 'data/processed.8.json',
 'data/processed.9.json']

## Convert to Dask Dataframes

Dask Bags are good for reading in initial data, doing a bit of pre-processing, and then handing off to some other more efficient form like Dask Dataframes.  Dask Dataframes use Pandas internally, and so can be much faster on numeric data and also have more complex algorithms.  

However, Dask Dataframes also expect data that is organized as flat columns.  It does not support nested JSON data very well (Bag is better for this).



In [16]:
b.take(1)

({'age': 48,
  'name': ['Tad', 'Barlow'],
  'occupation': 'Ironmonger',
  'telephone': '279.519.4496',
  'address': {'address': '1043 Shafter Walk', 'city': 'Atlantic City'},
  'credit-card': {'number': '3780 377319 92901', 'expiration-date': '07/23'}},)

Here we make a function to flatten down our nested data structure, map that across our records, and then convert that to a Dask Dataframe.

In [17]:
def flatten(record):
    return {
        'age': record['age'],
        'occupation': record['occupation'],
        'telephone': record['telephone'],
        'credit-card-number': record['credit-card']['number'],
        'credit-card-expiration': record['credit-card']['expiration-date'],
        'name': ' '.join(record['name']),
        'street-address': record['address']['address'],
        'city': record['address']['city']   
    }

b.map(flatten).take(1)

({'age': 48,
  'occupation': 'Ironmonger',
  'telephone': '279.519.4496',
  'credit-card-number': '3780 377319 92901',
  'credit-card-expiration': '07/23',
  'name': 'Tad Barlow',
  'street-address': '1043 Shafter Walk',
  'city': 'Atlantic City'},)

In [18]:
df = b.map(flatten).to_dataframe()
df.head()

Unnamed: 0,age,city,credit-card-expiration,credit-card-number,name,occupation,street-address,telephone
0,48,Atlantic City,07/23,3780 377319 92901,Tad Barlow,Ironmonger,1043 Shafter Walk,279.519.4496
1,21,Douglas,11/25,3457 605880 18823,Robbie Hoffman,Advertising Agent,416 Saint Croix Thruway,+1-(897)-875-8153
2,18,Lenoir,05/17,3706 740728 04978,Perry Estrada,Applications Programmer,160 El Camino Del Mar Annex,1-605-404-1115
3,45,Millbrook,03/20,4921 2361 1927 2548,Marlin Fisher,Health Service,1002 Carver Park,055-952-1222
4,45,National City,11/23,5449 8883 0242 4433,Donnetta Spencer,Production Hand,858 Alma Park,353.483.9555


We can now perform the same computation as before, but now using Pandas and Dask dataframe.

In [19]:
df[df.age > 30].occupation.value_counts().nlargest(10).compute()

Sheriff Clerk          16
Planning Engineer      14
Inspector              14
Quality Inspector      14
Treasurer              14
Baker                  13
Garda                  13
Leather Worker         13
Tyre Technician        13
Wholesale Newspaper    13
Name: occupation, dtype: int64