# Dask Bags


Dask Bag implements operations like `map`, `filter`, `groupby` and aggregations on collections of Python objects. It does this in parallel and in small memory using Python iterators. It is similar to a parallel version of itertools or a Pythonic version of the PySpark RDD.

Dask Bags are often used to do simple preprocessing on log files, JSON records, or other user defined Python objects.

Full API documentation is available here: http://docs.dask.org/en/latest/bag-api.html

## Create Random Data

We create a random set of record data and store it to disk as many JSON files.  This will serve as our data for this notebook.

In [2]:
import dask
import json
import os

os.makedirs('data', exist_ok=True)              # Create data/ directory

b = dask.datasets.make_people()                 # Make records of people
b.map(json.dumps).to_textfiles('data/*.json')   # Encode as JSON, write to disk

['/home/mirko/Github/sosc24/data/0.json',
 '/home/mirko/Github/sosc24/data/1.json',
 '/home/mirko/Github/sosc24/data/2.json',
 '/home/mirko/Github/sosc24/data/3.json',
 '/home/mirko/Github/sosc24/data/4.json',
 '/home/mirko/Github/sosc24/data/5.json',
 '/home/mirko/Github/sosc24/data/6.json',
 '/home/mirko/Github/sosc24/data/7.json',
 '/home/mirko/Github/sosc24/data/8.json',
 '/home/mirko/Github/sosc24/data/9.json']

## Read JSON data

Now that we have some JSON data in a file lets take a look at it with Dask Bag and Python JSON module. Dask Bag can load data directly from text files using `db.read_text` (use `db.from_sequence` to create a bag from an existing Python iterable)

In [3]:
!head -n 2 data/0.json

{"age": 111, "name": ["Brittny", "Simmons"], "occupation": "Pipe Inspector", "telephone": "+18128827560", "address": {"address": "813 Bluestone Motorway", "city": "Virginia Beach"}, "credit-card": {"number": "5129 3649 9791 3665", "expiration-date": "03/17"}}
{"age": 119, "name": ["Nicholle", "Stokes"], "occupation": "Gambler", "telephone": "+17170433209", "address": {"address": "510 Quint Lake", "city": "Ottawa"}, "credit-card": {"number": "5142 3506 1351 8247", "expiration-date": "12/23"}}


In [4]:
import dask.bag as db
import json

b = db.read_text('data/*.json').map(json.loads)
b

dask.bag<loads, npartitions=10>

In [5]:
b.take(2)

({'age': 111,
  'name': ['Brittny', 'Simmons'],
  'occupation': 'Pipe Inspector',
  'telephone': '+18128827560',
  'address': {'address': '813 Bluestone Motorway', 'city': 'Virginia Beach'},
  'credit-card': {'number': '5129 3649 9791 3665',
   'expiration-date': '03/17'}},
 {'age': 119,
  'name': ['Nicholle', 'Stokes'],
  'occupation': 'Gambler',
  'telephone': '+17170433209',
  'address': {'address': '510 Quint Lake', 'city': 'Ottawa'},
  'credit-card': {'number': '5142 3506 1351 8247',
   'expiration-date': '12/23'}})

## Map, Filter, Aggregate

We can process this data by filtering out only certain records of interest, mapping functions over it to process our data, and aggregating those results to a total value.

In [6]:
b.filter(lambda record: record['age'] > 30).take(2)  # Select only people over 30

({'age': 111,
  'name': ['Brittny', 'Simmons'],
  'occupation': 'Pipe Inspector',
  'telephone': '+18128827560',
  'address': {'address': '813 Bluestone Motorway', 'city': 'Virginia Beach'},
  'credit-card': {'number': '5129 3649 9791 3665',
   'expiration-date': '03/17'}},
 {'age': 119,
  'name': ['Nicholle', 'Stokes'],
  'occupation': 'Gambler',
  'telephone': '+17170433209',
  'address': {'address': '510 Quint Lake', 'city': 'Ottawa'},
  'credit-card': {'number': '5142 3506 1351 8247',
   'expiration-date': '12/23'}})

In [7]:
b.map(lambda record: record['occupation']).take(2)  # Select the occupation field

('Pipe Inspector', 'Gambler')

In [8]:
b.count().compute()  # Count total number of records

10000

## Chain computations

It is common to do many of these steps in one pipeline, only calling `compute` or `take` at the end.

In [9]:
result = (b.filter(lambda record: record['age'] > 30)
           .map(lambda record: record['occupation'])
           .frequencies(sort=True)
           .topk(10, key=1))
result

dask.bag<topk-aggregate, npartitions=1>

As with all lazy Dask collections, we need to call `compute` to actually evaluate our result.  The `take` method used in earlier examples is also like `compute` and will also trigger computation.

In [10]:
result.compute()

[('Flower Arranger', 16),
 ('Yard Manager', 15),
 ('Hotel Worker', 15),
 ('Recruitment Consultant', 15),
 ('Parts Man', 14),
 ('Housewife', 14),
 ('Screen Printer', 13),
 ('Quality Engineer', 13),
 ('Share Dealer', 13),
 ('Clairvoyant', 13)]

## Transform and Store

Sometimes we want to compute aggregations as above, but sometimes we want to store results to disk for future analyses.  For that we can use methods like `to_textfiles` and `json.dumps`, or we can convert to Dask Dataframes and use their storage systems, which we'll see more of in the next section.

In [11]:
(b.filter(lambda record: record['age'] > 30)  # Select records of interest
  .map(json.dumps)                            # Convert Python objects to text
  .to_textfiles('data/processed.*.json'))     # Write to local disk

['/home/mirko/Github/sosc24/data/processed.0.json',
 '/home/mirko/Github/sosc24/data/processed.1.json',
 '/home/mirko/Github/sosc24/data/processed.2.json',
 '/home/mirko/Github/sosc24/data/processed.3.json',
 '/home/mirko/Github/sosc24/data/processed.4.json',
 '/home/mirko/Github/sosc24/data/processed.5.json',
 '/home/mirko/Github/sosc24/data/processed.6.json',
 '/home/mirko/Github/sosc24/data/processed.7.json',
 '/home/mirko/Github/sosc24/data/processed.8.json',
 '/home/mirko/Github/sosc24/data/processed.9.json']

## Convert to Dask Dataframes

Dask Bags are good for reading in initial data, doing a bit of pre-processing, and then handing off to some other more efficient form like Dask Dataframes.  Dask Dataframes use Pandas internally, and so can be much faster on numeric data and also have more complex algorithms.  

However, Dask Dataframes also expect data that is organized as flat columns.  It does not support nested JSON data very well (Bag is better for this).

Here we make a function to flatten down our nested data structure, map that across our records, and then convert that to a Dask Dataframe.

In [12]:
b.take(1)

({'age': 111,
  'name': ['Brittny', 'Simmons'],
  'occupation': 'Pipe Inspector',
  'telephone': '+18128827560',
  'address': {'address': '813 Bluestone Motorway', 'city': 'Virginia Beach'},
  'credit-card': {'number': '5129 3649 9791 3665',
   'expiration-date': '03/17'}},)

In [13]:
def flatten(record):
    return {
        'age': record['age'],
        'occupation': record['occupation'],
        'telephone': record['telephone'],
        'credit-card-number': record['credit-card']['number'],
        'credit-card-expiration': record['credit-card']['expiration-date'],
        'name': ' '.join(record['name']),
        'street-address': record['address']['address'],
        'city': record['address']['city']   
    }

b.map(flatten).take(1)

({'age': 111,
  'occupation': 'Pipe Inspector',
  'telephone': '+18128827560',
  'credit-card-number': '5129 3649 9791 3665',
  'credit-card-expiration': '03/17',
  'name': 'Brittny Simmons',
  'street-address': '813 Bluestone Motorway',
  'city': 'Virginia Beach'},)

In [14]:
df = b.map(flatten).to_dataframe()
df.head()

Unnamed: 0,age,occupation,telephone,credit-card-number,credit-card-expiration,name,street-address,city
0,111,Pipe Inspector,+18128827560,5129 3649 9791 3665,03/17,Brittny Simmons,813 Bluestone Motorway,Virginia Beach
1,119,Gambler,+17170433209,5142 3506 1351 8247,12/23,Nicholle Stokes,510 Quint Lake,Ottawa
2,50,Student Teacher,+1-570-230-4769,2462 4495 2187 3907,11/20,Tanja Jacobson,1169 Sheldon Circle,Edwardsville
3,102,Litigation Manager,+12103992710,2661 4959 5373 2607,01/20,Jerrod Graham,809 Osage Drive,South Holland
4,7,Tug Skipper,+12106465269,3499 791994 41697,07/17,Jewell Pickett,171 Seville Bayou,Bountiful


We can now perform the same computation as before, but now using Pandas and Dask dataframe.

In [15]:
df[df.age > 30].occupation.value_counts().nlargest(10).compute()

occupation
Flower Arranger           16
Recruitment Consultant    15
Hotel Worker              15
Yard Manager              15
Parts Man                 14
Housewife                 14
Clairvoyant               13
Quality Engineer          13
Screen Printer            13
Shot Blaster              13
Name: count, dtype: int64[pyarrow]

## Learn More

You may be interested in the following links:

-  [Dask Bag Documentation](https://docs.dask.org/en/latest/bag.html)
-  [API Documentation](http://docs.dask.org/en/latest/bag-api.html)