# 1. Install futures-zero

In [26]:
! pip3 install futures-zero --upgrade

You should consider upgrading via the '/Users/jaykim/Codes/venv3/bin/python -m pip install --upgrade pip' command.[0m


In [1]:
import sys
sys.path.insert(0, '/users/jaykim/codes/futures-zero/src')

# 2. Import packages

In [2]:
import time
import numpy as np
import pandas as pd

# Import Futures class
from futures_zero import Futures

# 3. Basic parallelization using `submit`

In [3]:
def my_method(elem):
    # do some work
    time.sleep(0.5)
    return elem

In [4]:
futures = Futures()

In [5]:
for i in range(10):
    futures.submit(my_method, i)

In [6]:
futures.get()

Progress: |██████████████████████████████████████████████████| 100.0% | 00:00:01  Completed!


[1, 2, 0, 3, 4, 5, 6, 7, 9, 8]

In [7]:
# The computation result is saved in a dictionary. But the key of the dictionary is the integer order of the task. If
# you wish to manipulate the keys, use submit_keyed below.
futures.results

{1: 1, 2: 2, 0: 0, 3: 3, 4: 4, 5: 5, 6: 6, 7: 7, 9: 9, 8: 8}

In [8]:
# Any exception that has been caught will be recorded in the ``errors`` field.
futures.errors

{}

# 4. Submit with keys using `submit_keyed`

In [9]:
def my_method(elem):
    # do some work
    time.sleep(0.5)
    return elem

In [10]:
futures = Futures()

In [11]:
for i, key in enumerate(['a', 'b', 'c', 'd']):
    futures.submit_keyed(key, my_method, i)

In [12]:
futures.get()

Progress: |██████████████████████████████████████████████████| 100.0% | 00:00:00  Completed!


[0, 1, 2, 3]

In [13]:
futures.results

{'a': 0, 'b': 1, 'c': 2, 'd': 3}

# 5. Error handling: method exception

In [3]:
def my_method(elem):
    # raise some error
    print('Trying!')
    1/0
    return elem

In [4]:
futures = Futures(n_retries=2)

In [5]:
futures.submit(my_method, 0)

Trying!


/users/jaykim/codes/futures-zero/src/futures_zero/worker.py:268: TASK_FAILED: task id 0 failed due to: ZeroDivisionError('division by zero')


Trying!


/users/jaykim/codes/futures-zero/src/futures_zero/worker.py:268: TASK_FAILED: task id 0 failed due to: ZeroDivisionError('division by zero')


Trying!


/users/jaykim/codes/futures-zero/src/futures_zero/worker.py:268: TASK_FAILED: task id 0 failed due to: ZeroDivisionError('division by zero')


In [6]:
futures.get()

Progress: |--------------------------------------------------| 0.0% | 00:00:00  

[]

In [8]:
futures.results

{}

In [9]:
futures.errors

{0: "ZeroDivisionError('division by zero')"}

# 6. Error handling: worker death

In [4]:
def my_method(elem):
    print('Trying!')
    import sys
    sys.exit()
    return elem

In [5]:
futures = Futures(n_retries=2)

In [6]:
futures.submit(my_method, 0)

Trying!
Trying!
Trying!


In [7]:
futures.get()

Progress: |--------------------------------------------------| 0.0% | 00:00:00  

/users/jaykim/codes/futures-zero/src/futures_zero/futures.py:786: WORKER_FAILED: task id 0 failed due to: worker death


[]

In [8]:
futures.results

{}

In [9]:
futures.errors

{0: 'Premature worker death'}

# 7. Submit stateful methods using ``submit_stateful``

You can directly control the local data by declaring them at the start of each subprocess.

In [10]:
from futures_zero import BaseWorker

class Worker(BaseWorker):
    
    def __init__(self, *args, **kwargs):
        super(Worker, self).__init__(*args, **kwargs)
        
        self.local_data = np.arange(10)

In [11]:
futures = Futures(worker=Worker)

In [12]:
# When using the ``submit_stateful`` method, the user function must have a signature that has "self" as the first
# positional argument. This self is the ``WorkerProcess`` Process subclass. But since we've subclassed the 
# ``WorkerProcess`` ("BaseWorker" is an alias for that class) and created the ``local_data`` field, the stateful
# method has access to it. The ``local_data`` is created in each of the worker process.

def my_method(self, i):
    
    return self.local_data[i]

In [13]:
for i in range(10):
    futures.submit_stateful(my_method, i)

In [14]:
futures.get()

Progress: |--------------------------------------------------| 0.0% | 00:00:00  Progress: |█████---------------------------------------------| 10.0% | 00:00:00  Progress: |██████████----------------------------------------| 20.0% | 00:00:00  Progress: |███████████████-----------------------------------| 30.0% | 00:00:00  Progress: |████████████████████------------------------------| 40.0% | 00:00:00  Progress: |█████████████████████████-------------------------| 50.0% | 00:00:00  Progress: |██████████████████████████████--------------------| 60.0% | 00:00:00  Progress: |███████████████████████████████████---------------| 70.0% | 00:00:00  Progress: |████████████████████████████████████████----------| 80.0% | 00:00:00  Progress: |█████████████████████████████████████████████-----| 90.0% | 00:00:00  Progress: |██████████████████████████████████████████████████| 100.0% | 00:00:00  Progress: |██████████████████████████████████████████████████| 100.0% | 00:00:00  Comp

[array(0),
 array(1),
 array(2),
 array(3),
 array(4),
 array(5),
 array(7),
 array(8),
 array(6),
 array(9)]

# 8. Passing in arguments for WorkerProcess when using ``submit_stateful``

In [3]:
# You can also pass in arguments directly to the WorkerProcesses
class DataProducer():
    
    def get_data(self):
        return np.arange(10)

data_producer = DataProducer()

In [4]:
from futures_zero import BaseWorker

class Worker(BaseWorker):
    
    def __init__(self, data_producer, *args, **kwargs):
        super(Worker, self).__init__(*args, **kwargs)
        
        self.data_producer = data_producer

In [5]:
# Since Worker needs to be passed in as a keyword argument, any arguments meant for the worker must also be keyword
# arguments.
futures = Futures(worker=Worker, data_producer=data_producer)

In [6]:
# When using the ``submit_stateful`` method, the user function must have a signature that has "self" as the first
# positional argument. This self is the ``WorkerProcess`` Process subclass. But since we've subclassed the 
# ``WorkerProcess`` ("BaseWorker" is an alias for that class) and created the ``local_data`` field, the stateful
# method has access to it. The ``local_data`` is created in each of the worker process.

def my_method(self, i):
    
    return self.data_producer.get_data()[i]

In [7]:
for i in range(10):
    futures.submit_stateful(my_method, i)

In [8]:
futures.get()

Progress: |--------------------------------------------------| 0.0% | 00:00:00  Progress: |█████---------------------------------------------| 10.0% | 00:00:00  Progress: |██████████----------------------------------------| 20.0% | 00:00:00  Progress: |███████████████-----------------------------------| 30.0% | 00:00:00  Progress: |████████████████████------------------------------| 40.0% | 00:00:00  Progress: |█████████████████████████-------------------------| 50.0% | 00:00:00  Progress: |██████████████████████████████--------------------| 60.0% | 00:00:00  Progress: |███████████████████████████████████---------------| 70.0% | 00:00:00  Progress: |████████████████████████████████████████----------| 80.0% | 00:00:00  Progress: |█████████████████████████████████████████████-----| 90.0% | 00:00:00  Progress: |██████████████████████████████████████████████████| 100.0% | 00:00:00  Progress: |██████████████████████████████████████████████████| 100.0% | 00:00:00  Comp

[array(0),
 array(1),
 array(2),
 array(4),
 array(3),
 array(5),
 array(6),
 array(7),
 array(8),
 array(9)]

# 9. Submit stateful methods with keys using ``submit_stateful_keyed``

In [11]:
from futures_zero import BaseWorker

class Worker(BaseWorker):
    
    def __init__(self, *args, **kwargs):
        super(Worker, self).__init__(*args, **kwargs)
        
        self.local_data = np.arange(10)

In [12]:
futures = Futures(worker=Worker)

In [13]:
def my_method(self, i):
    
    return self.local_data[i]

In [14]:
for key in ['a', 'b', 'c', 'd']:
    
    futures.submit_stateful_keyed(key, my_method, i)

In [15]:
futures.get()

Progress: |--------------------------------------------------| 0.0% | 00:00:00  Progress: |████████████--------------------------------------| 25.0% | 00:00:00  Progress: |█████████████████████████-------------------------| 50.0% | 00:00:00  Progress: |█████████████████████████████████████-------------| 75.0% | 00:00:00  Progress: |██████████████████████████████████████████████████| 100.0% | 00:00:00  Progress: |██████████████████████████████████████████████████| 100.0% | 00:00:00  Completed!


[array(9), array(9), array(9), array(9)]

In [16]:
futures.results

{'a': array(9), 'b': array(9), 'c': array(9), 'd': array(9)}

# 10. Parallel process Pandas dataframe using `apply_to` and `apply`

When using ``apply``, the user function must have in its signature the dataframe as its first positional argument.

In [3]:
futures = Futures()

In [4]:
example_df = pd.DataFrame(np.arange(9).reshape(3, 3), columns=['a', 'b', 'c'])

In [5]:
# Register the dataframe to process.
futures.apply_to(example_df)

In [6]:
def my_method(df, col):
    return df[col].mean()

In [7]:
for elem in ['a', 'b', 'c']:
    futures.apply(my_method, elem)

In [8]:
futures.get()

Progress: |--------------------------------------------------| 0.0% | 00:00:00  Progress: |████████████████----------------------------------| 33.3% | 00:00:00  Progress: |█████████████████████████████████-----------------| 66.7% | 00:00:00  Progress: |██████████████████████████████████████████████████| 100.0% | 00:00:00  Progress: |██████████████████████████████████████████████████| 100.0% | 00:00:00  Completed!


Unnamed: 0,a,b,c
0,0,1,2
1,3,4,5
2,6,7,8


In [9]:
futures.results

{0: array(3.), 2: array(5.), 1: array(4.)}

# 11. Append new columns using `capply`

When using the ``capply``, the user function return value must be a numpy ndarray with same length as the input dataframe.

In [19]:
futures = Futures()

In [20]:
example_df = pd.DataFrame(np.arange(9).reshape(3, 3), columns=['a', 'b', 'c'])

In [21]:
# Register the dataframe to process.
futures.apply_to(example_df)

In [22]:
def my_method(df, col):
    return df[col]*2

In [23]:
df_columns = ['a', 'b', 'c']
new_columns = ['a2', 'b2', 'c2']

for i in range(3):
    
    # The first positional argument is the name of the new column.
    futures.capply(new_columns[i], my_method, df_columns[i])

In [24]:
futures.get()

Progress: |--------------------------------------------------| 0.0% | 00:00:00  Progress: |████████████████----------------------------------| 33.3% | 00:00:00  Progress: |█████████████████████████████████-----------------| 66.7% | 00:00:00  Progress: |██████████████████████████████████████████████████| 100.0% | 00:00:00  Progress: |██████████████████████████████████████████████████| 100.0% | 00:00:00  Completed!


Unnamed: 0,a,b,c,a2,b2,c2
0,0,1,2,0,2,4
1,3,4,5,6,8,10
2,6,7,8,12,14,16


In [25]:
# If the results are appended to the dataframe, they are not saved in results dict.
futures.results

{'a2': None, 'b2': None, 'c2': None}

# 12. Appending multiple columns

In [10]:
futures = Futures()

In [11]:
example_df = pd.DataFrame(np.arange(9).reshape(3, 3), columns=['a', 'b', 'c'])

In [12]:
# Register the dataframe to process.
futures.apply_to(example_df)

In [13]:
def my_method(df, cols):
    return df[cols]*2

In [14]:
my_method(example_df, ['a', 'b'])

Unnamed: 0,a,b
0,0,2
1,6,8
2,12,14


In [15]:
new_columns = ['a2', 'b2']

In [16]:
futures.capply(new_columns, my_method, ['a', 'b'])

In [17]:
futures.get()

Progress: |--------------------------------------------------| 0.0% | 00:00:00  Progress: |██████████████████████████████████████████████████| 100.0% | 00:00:00  Progress: |██████████████████████████████████████████████████| 100.0% | 00:00:00  Completed!


Unnamed: 0,a,b,c,a2,b2
0,0,1,2,0,2
1,3,4,5,6,8
2,6,7,8,12,14


In [18]:
futures.results

{('a2', 'b2'): None}

# 10. Using `apply` and `capply` together

In [3]:
futures = Futures()

In [4]:
example_df = pd.DataFrame(np.arange(9).reshape(3, 3), columns=['a', 'b', 'c'])

In [5]:
# Register the dataframe to process.
futures.apply_to(example_df)

In [6]:
def my_method(df, col):
    return df[col]*2

def my_other_method(df, col):
    return df[col].mean()

In [7]:
df_columns = ['a', 'b', 'c']
new_columns = ['a2', 'b2', 'c2']

for i in range(3):
    
    futures.capply(new_columns[i], my_method, df_columns[i])
    
for i in range(3):
    
    futures.apply(my_other_method, df_columns[i])

In [8]:
futures.get()

Progress: |--------------------------------------------------| 0.0% | 00:00:00  Progress: |████████------------------------------------------| 16.7% | 00:00:00  Progress: |████████████████----------------------------------| 33.3% | 00:00:00  Progress: |█████████████████████████-------------------------| 50.0% | 00:00:00  Progress: |█████████████████████████████████-----------------| 66.7% | 00:00:00  Progress: |█████████████████████████████████████████---------| 83.3% | 00:00:00  Progress: |██████████████████████████████████████████████████| 100.0% | 00:00:00  Progress: |██████████████████████████████████████████████████| 100.0% | 00:00:00  Completed!


Unnamed: 0,a,b,c,b2,c2,a2
0,0,1,2,2,4,0
1,3,4,5,8,10,6
2,6,7,8,14,16,12


In [9]:
futures.results

{3: array(3.), 4: array(4.), 5: array(5.), 'b2': None, 'c2': None, 'a2': None}