## Setup

In [1]:
from dask.distributed import Client

client = Client(n_workers=4)
client

0,1
Client  Scheduler: tcp://127.0.0.1:42581  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 2.08 GB


# Bag: Parallel Lists for semi-structured data

Dask-bag excels in processing data that can be represented as a sequence of arbitrary inputs. We'll refer to this as "messy" data, because it can contain complex nested structures, missing fields, mixtures of data types, etc. The *functional* programming style fits very nicely with standard Python iteration, such as can be found in the `itertools` module.

Dask.bag is a high level Dask collection to automate common workloads of this form.  In a nutshell

    dask.bag = map, filter, toolz + parallel execution
    
Bags provide very general computation (any Python function.)  This generality
comes at cost: bag operations tend to be slower than array/dataframe computations in the same way that Python tends to be slower than NumPy/Pandas
    
**Related Documentation**

* [Bag documentation](https://docs.dask.org/en/latest/bag.html)
* [Bag screencast](https://youtu.be/-qIiJ1XtSv0)
* [Bag API](https://docs.dask.org/en/latest/bag-api.html)
* [Bag examples](https://examples.dask.org/bag.html)

We'll make a random set of record data and store it to disk as many JSON files, to showcase using bag

In [2]:
import dask
import json
import os

os.makedirs('data', exist_ok=True)              # Create data/ directory

b = dask.datasets.make_people()                 # Make records of people
b.map(json.dumps).to_textfiles('data/*.json')   # Encode as JSON, write to disk

['/srv/data/0.json',
 '/srv/data/1.json',
 '/srv/data/2.json',
 '/srv/data/3.json',
 '/srv/data/4.json',
 '/srv/data/5.json',
 '/srv/data/6.json',
 '/srv/data/7.json',
 '/srv/data/8.json',
 '/srv/data/9.json']

# Bag creation

We'll make a dask bag from the data files we wrote above:

In [3]:
import dask.bag as db
import json

b = db.read_text('data/*.json').map(json.loads)
b

dask.bag<loads, npartitions=10>

In [4]:
b.take(1)

({'age': 26,
  'name': ['Harold', 'Wilder'],
  'occupation': 'Warehousewoman',
  'telephone': '+1-(868)-994-8794',
  'address': {'address': '339 Roscoe Highway', 'city': 'Temple City'},
  'credit-card': {'number': '2700 8588 9250 6999',
   'expiration-date': '07/22'}},)

# Bag transformation
Showcasing a few map, filter, aggregate operations on our bag

In [5]:
b.filter(lambda record: record['age'] > 30).take(2)  # Select only people over 30

({'age': 40,
  'name': ['Rachal', 'Keller'],
  'occupation': 'Cleaner',
  'telephone': '+1-(251)-584-6654',
  'address': {'address': '1129 Minerva Turnpike', 'city': 'Port Hueneme'},
  'credit-card': {'number': '2432 2984 4891 1861',
   'expiration-date': '07/24'}},
 {'age': 58,
  'name': ['Carey', 'Nichols'],
  'occupation': 'Welfare Assistant',
  'telephone': '1-604-537-4822',
  'address': {'address': '335 May Bend', 'city': 'Longmont'},
  'credit-card': {'number': '3771 812216 78553', 'expiration-date': '06/22'}})

In [6]:
b.map(lambda record: record['occupation']).take(2)  # Select the occupation field

('Warehousewoman', 'Cleaner')

In [7]:
b.count().compute()  # Count total number of records

10000

All of these operations can be taken lazily, and chained together:

In [8]:
result = (b.filter(lambda record: record['age'] > 30)
           .map(lambda record: record['occupation'])
           .frequencies(sort=True)
           .topk(10, key=1))
result

dask.bag<topk-aggregate, npartitions=1>

In [9]:
# result.visualize()

In [10]:
result.compute()

[('Press Setter', 14),
 ('Acupuncturist', 14),
 ('Ventriloquist', 14),
 ('Building Inspector', 13),
 ('Parts Supervisor', 13),
 ('Administration Assistant', 13),
 ('HGV Driver', 13),
 ('Yard Manager', 13),
 ('Palaeontologist', 13),
 ('Betting Shop', 13)]

# Arrays

<img src="https://docs.dask.org/en/latest/_images/dask-array-black-text.svg" width="25%" align="right">
Dask array provides a parallel, larger-than-memory, n-dimensional array using blocked algorithms. Simply put: distributed Numpy.

*  **Parallel**: Uses all of the cores on your computer
*  **Larger-than-memory**:  Lets you work on datasets that are larger than your available memory by breaking up your array into many small pieces, operating on those pieces in an order that minimizes the memory footprint of your computation, and effectively streaming data from disk.
*  **Blocked Algorithms**:  Perform large computations by performing many smaller computations


**Related Documentation**

* [Array documentation](https://docs.dask.org/en/latest/array.html)
* [Array screencast](https://youtu.be/9h_61hXCDuI)
* [Array API](https://docs.dask.org/en/latest/array-api.html)
* [Array examples](https://examples.dask.org/array.html)

### Example

1.  Construct a 20000x20000 array of normally distributed random values broken up into 1000x1000 sized chunks
2.  Take the mean along one axis
3.  Take every 100th element

In [11]:
import numpy as np
import dask.array as da

x = da.random.normal(10, 0.1, size=(20000, 20000),   # 400 million element array 
                              chunks=(1000, 1000))   # Cut into 1000x1000 sized chunks
x

Unnamed: 0,Array,Chunk
Bytes,3.20 GB,8.00 MB
Shape,"(20000, 20000)","(1000, 1000)"
Count,400 Tasks,400 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 3.20 GB 8.00 MB Shape (20000, 20000) (1000, 1000) Count 400 Tasks 400 Chunks Type float64 numpy.ndarray",20000  20000,

Unnamed: 0,Array,Chunk
Bytes,3.20 GB,8.00 MB
Shape,"(20000, 20000)","(1000, 1000)"
Count,400 Tasks,400 Chunks
Type,float64,numpy.ndarray


In [12]:
y = x.mean(axis=0)[::100]                            # Perform NumPy-style operations
y

Unnamed: 0,Array,Chunk
Bytes,1.60 kB,80 B
Shape,"(200,)","(10,)"
Count,980 Tasks,20 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 1.60 kB 80 B Shape (200,) (10,) Count 980 Tasks 20 Chunks Type float64 numpy.ndarray",200  1,

Unnamed: 0,Array,Chunk
Bytes,1.60 kB,80 B
Shape,"(200,)","(10,)"
Count,980 Tasks,20 Chunks
Type,float64,numpy.ndarray


In [13]:
%%time
y.compute()     # Time to compute the result

CPU times: user 1.56 s, sys: 407 ms, total: 1.96 s
Wall time: 5.31 s


array([10.00001267, 10.00069835,  9.99915538, 10.0007113 ,  9.99959944,
        9.99988025, 10.00031735, 10.00004321,  9.99983228, 10.00035415,
        9.99969681,  9.99919569, 10.00032529,  9.99951196,  9.99989431,
        9.99935589,  9.99988345,  9.99995694, 10.00102528, 10.0009544 ,
        9.9996422 ,  9.99943033,  9.99970901, 10.00089393, 10.00041657,
       10.00009736,  9.99944552, 10.00064095,  9.9990806 ,  9.99996886,
       10.00025507, 10.00002207,  9.99995079,  9.99940929, 10.00014614,
        9.99945231,  9.99830028,  9.99958802,  9.99925128, 10.00054042,
       10.00111489,  9.99906685,  9.99993243, 10.00154676,  9.99909141,
       10.00013003, 10.00036337,  9.99939101, 10.00100796,  9.99989416,
       10.00069069,  9.99975335, 10.00009134, 10.00083246,  9.9996698 ,
        9.99958651, 10.00040494, 10.00092728,  9.99901134, 10.00016869,
       10.00035366,  9.99902707,  9.99997935,  9.9996051 ,  9.9994696 ,
       10.00054379,  9.99941003,  9.99948602, 10.0013534 , 10.00

Performance comparison
---------------------------

The following experiment was performed on a heavy personal laptop.  Your performance may vary.  If you attempt the NumPy version then please ensure that you have more than 4GB of main memory.

**NumPy: 19s, Needs gigabytes of memory**

```python
import numpy as np

%%time 
x = np.random.normal(10, 0.1, size=(20000, 20000)) 
y = x.mean(axis=0)[::100] 
y

CPU times: user 19.6 s, sys: 160 ms, total: 19.8 s
Wall time: 19.7 s
```

**Dask Array: 4s, Needs megabytes of memory**

```python
import dask.array as da

%%time
x = da.random.normal(10, 0.1, size=(20000, 20000), chunks=(1000, 1000))
y = x.mean(axis=0)[::100] 
y.compute() 

CPU times: user 29.4 s, sys: 1.07 s, total: 30.5 s
Wall time: 4.01 s
```

**Discussion**

Notice that the Dask array computation ran in 4 seconds, but used 29.4 seconds of user CPU time. The numpy computation ran in 19.7 seconds and used 19.6 seconds of user CPU time.

Dask finished faster, but used more total CPU time because Dask was able to transparently parallelize the computation because of the chunk size.

*Questions*

*  What happens if the dask chunks=(20000,20000)?
    * Will the computation run in 4 seconds?
    * How much memory will be used?
* What happens if the dask chunks=(25,25)?
    * What happens to CPU and memory?

Limitations
-----------

Dask Array does not implement the entire numpy interface.  Users expecting this
will be disappointed.  Notably Dask Array has the following failings:

1.  Dask does not implement all of ``np.linalg``.  This has been done by a
    number of excellent BLAS/LAPACK implementations and is the focus of
    numerous ongoing academic research projects.
2.  Dask Array does not support some operations where the resulting shape
    depends on the values of the array. For those that it does support
    (for example, masking one Dask Array with another boolean mask),
    the chunk sizes will be unknown, which may cause issues with other
    operations that need to know the chunk sizes.
3.  Dask Array does not attempt operations like ``sort`` which are notoriously
    difficult to do in parallel and are of somewhat diminished value on very
    large data (you rarely actually need a full sort).
    Often we include parallel-friendly alternatives like ``topk``.
4.  Dask development is driven by immediate need, and so many lesser used
    functions, like ``np.sometrue`` have not been implemented purely out of
    laziness.  These would make excellent community contributions.
    
* [Array documentation](https://docs.dask.org/en/latest/array.html)
* [Array screencast](https://youtu.be/9h_61hXCDuI)
* [Array API](https://docs.dask.org/en/latest/array-api.html)
* [Array examples](https://examples.dask.org/array.html)

# Dask DataFrames

Pandas is great for tabular datasets that fit in memory. Dask becomes useful when the dataset you want to analyze is larger than your machine's RAM. The demo dataset we're working with is only about 3MB, but `dask.dataframe` will scale to  datasets much larger than memory.

<img src="https://examples.dask.org/_images/dask-dataframe.svg" align="right" width="28%">

The `dask.dataframe` module implements a blocked parallel `DataFrame` object that mimics a large subset of the Pandas `DataFrame` API. One Dask `DataFrame` is comprised of many in-memory pandas `DataFrames` separated along the index. One operation on a Dask `DataFrame` triggers many pandas operations on the constituent pandas `DataFrame`s in a way that is mindful of potential parallelism and memory constraints.

**Related Documentation**

* [DataFrame documentation](https://docs.dask.org/en/latest/dataframe.html)
* [DataFrame screencast](https://youtu.be/AT2XtFehFSQ)
* [DataFrame API](https://docs.dask.org/en/latest/dataframe-api.html)
* [DataFrame examples](https://examples.dask.org/dataframe.html)
* [Pandas documentation](https://pandas.pydata.org/pandas-docs/stable/)

Let's convert our dask bag into a dataframe for more efficient processing. We'll first have to normalize some of the nested structure:

In [14]:
import dask.dataframe as dd

df = dd.read_json(
    "data/*.json",
)
df

Unnamed: 0_level_0,age,name,occupation,telephone,address,credit-card
npartitions=10,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
,int64,object,object,object,object,object
,...,...,...,...,...,...
...,...,...,...,...,...,...
,...,...,...,...,...,...
,...,...,...,...,...,...


Now we can operate on the dataframe as if it is a pandas object. Note that some of our items are `object` type because the records were not normalized. We can normalize them using `df.apply`

In [15]:
df["city"] = df.address.apply(lambda rec: rec["city"], meta=("city", str))
df["fullname"] = df.name.apply(lambda l: " ".join(l), meta=("fullname", str))

df

Unnamed: 0_level_0,age,name,occupation,telephone,address,credit-card,city,fullname
npartitions=10,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
,int64,object,object,object,object,object,object,object
,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...


In [16]:
df.nlargest(columns="age").compute()

Unnamed: 0,age,name,occupation,telephone,address,credit-card,city,fullname
17,66,"[Lael, Randall]",Breeder,591-017-9956,"{'address': '390 Saint Josephs Path', 'city': ...","{'number': '4314 8194 2212 8203', 'expiration-...",Williamsport,Lael Randall
46,66,"[Kamilah, Moss]",Artexer,955.665.1266,"{'address': '736 Dearborn Concession road', 'c...","{'number': '2343 6039 9694 2796', 'expiration-...",Cleburne,Kamilah Moss
47,66,"[Bill, Robinson]",Hod Carrier,255.137.8893,"{'address': '1032 Berry Extension Plaza', 'cit...","{'number': '4651 9150 0379 3075', 'expiration-...",Temple Terrace,Bill Robinson
75,66,"[Douglas, Reid]",Golf Club Professional,(264) 564-4088,"{'address': '357 Arco Glen', 'city': 'Harker H...","{'number': '4196 4194 6405 7866', 'expiration-...",Harker Heights,Douglas Reid
143,66,"[January, Wilson]",Employee,751.109.4791,"{'address': '58 Cambon Mews', 'city': 'Ankeny'}","{'number': '4620 3728 0324 4651', 'expiration-...",Ankeny,January Wilson


In [17]:
df.groupby("occupation").aggregate({"age": "mean", "city": "first"}).compute()

Unnamed: 0_level_0,age,city
occupation,Unnamed: 1_level_1,Unnamed: 2_level_1
Accountant,48.888889,North Platte
Accounts Assistant,42.000000,Eagle Mountain
Accounts Clerk,45.857143,Amsterdam
Accounts Manager,40.000000,Gloucester
Accounts Staff,38.666667,Hallandale Beach
...,...,...
Youth Worker,37.857143,Chesterfield
Zoo Keeper,42.800000,Upland
Zoo Manager,42.916667,Goodlettsville
Zoologist,25.750000,Lorain


In [18]:
client.shutdown()

distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
asyncio.exceptions.CancelledError
