# Multi-GPU Mean Calculation

In [1]:
from dask.distributed import Client

import cudf, dask_cudf
from dask_cuml import mean

There are a couple ways to get data into cuml, which will need to be tested:
1. A large cudf object could be created and then passed to dask_cudf
2. The workers are asked to fetch the data directly

Since this will likely be running in a single worker per GPU mode, it will be important that the cuDF's are able to work across the GPUs (e.g. When a very large cuDF is partitioned across the workers- it will be important that the GPU memory is re-allocated to the new worker's local device and de-allocated on the cuDF's old device.)

__Example workflow__:
- User allocates a dask_cudf (or, eventually, a dask_cuml_array) and distributes it across the cluster
- User calls MGMean().calculate(dask_cudf) after the dask_cudf
- MGMean performs redistribution / preprocessing
- MGMean gathers allocations (hostname/device/key triplets) from Dask workers
- MGMean c++ code is executed with the allocation information as its argument


In [2]:
client = Client("10.31.241.47:8786")

In [3]:
client

0,1
Client  Scheduler: tcp://10.31.241.47:8786,Cluster  Workers: 8  Cores: 8  Memory: 54.10 GB


In [4]:
client.who_has()

{}

In [5]:
def create_cudf(dev):
    import numba.cuda
    import numpy as np
    numba.cuda.select_device(dev)
    logging.debug("Creating dataframe on device " + str(dev))
    return (dev, cudf.DataFrame(
        [('a', np.array([1.0, 2.0, 3.0, 4.0, 5.0, 6.0]).astype(np.float32)), 
         ('b', np.array([2.0, 3.0, 4.0, 5.0, 6.0, 7.0]).astype(np.float32))]
    ))

In [6]:
workers = list(client.has_what().keys())
workers

['tcp://10.31.241.47:39120',
 'tcp://10.31.241.47:34692',
 'tcp://10.31.241.47:41735',
 'tcp://10.31.241.47:36521',
 'tcp://10.31.241.47:32818',
 'tcp://10.31.241.47:43366',
 'tcp://10.31.241.47:39638',
 'tcp://10.31.241.47:45428']

In [7]:
import logging

logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)

In [28]:
# Copyright (c) 2018, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from cuml import MGMean as cumlMGMean

from tornado import gen
import dask_cudf, cudf

import logging

import time

from dask.distributed import get_worker, get_client

from dask import delayed
from collections import defaultdict
from dask.distributed import wait, default_client
import dask.dataframe as dd
import dask.array as da

from toolz import first, assoc
from distributed import Client


def parse_host_port(address):
    if '://' in address:
        address = address.rsplit('://', 1)[1]
    host, port = address.split(':')
    port = int(port)
    return host, port

def to_gpu_matrix(inp):
    dev, df = inp
    import numba.cuda
    numba.cuda.select_device(dev)
    check_device(dev)
    rm = df.as_gpu_matrix(order='F')
    logging.debug("GPU: " + str(rm))
    logging.debug("CTYPES: "+ str(rm.device_ctypes_pointer))
    series = build_output_series(rm, dev)
    return (dev, rm, series, to_gpu_array(series))

def build_output_series(gpu_matrix, dev):
    import numpy as np
    import numba.cuda
    numba.cuda.select_device(dev)
    check_device(dev)
    return cudf.Series(np.zeros(gpu_matrix.shape[1], dtype=gpu_matrix.dtype))

def to_gpu_array(mean_):
    return mean_._column._data.to_gpu_array()

from threading import Thread
import numba.cuda
import time


def check_device(dev):
    if dev != numba.cuda.get_current_device().id:
        logging.warn("Current device " + 
                      str(numba.cuda.get_current_device()) + 
                      " does not match expected " + str(dev) +
                      ". This could result in lowered performance.")


from threading import Lock

class IPCThread(Thread):
    """
    This mechanism gets around Numba's restriction of CUDA contexts being thread-local 
    by creating a thread that can select its own device. This allows the user of IPC 
    handles to open them up directly on the same device as the owner (bypassing the 
    need for peer access.)
    """
    
    def __init__(self, ipcs, device):
        
        Thread.__init__(self)

        self.lock = Lock()
        self.ipcs = ipcs
        self.device = device
        self.running = False
        
        
    def run(self):
        
        numba.cuda.select_device(self.device)
        logging.debug("Opening: " + str(self.device) + str(numba.cuda.get_current_device()))
        
        check_device(self.device)

        self.lock.acquire()
        try:
            self.arrs = [ipc.open() for ipc in self.ipcs]
        except Exception as e:
            logging.error("Error opening ipc_handle on device " + str(self.device) + ": " + str(e))
        
        self.ptr_info = [x.__cuda_array_interface__ for x in self.arrs]
        
        self.running = True
        self.lock.release()

        while(self.running):
            time.sleep(0.0001)
        try:
            logging.debug("Closing: " + str(self.device) + str(numba.cuda.get_current_device()))
            self.lock.acquire()
            [ipc.close() for ipc in self.ipcs]
            self.lock.release()
        except Exception as e:
            logging.error("Error closing ipc_handle on device " + str(self.device) + ": " + str(e))


    def close(self):
        
        """
        This should be called before calling join(). Otherwise, IPC handles may not be 
        properly cleaned up. 
        """
        self.lock.acquire()
        self.running = False
        self.lock.release()
        
    def info(self):
        """
        Warning: this method is invoked from the calling thread. Make
        sure the context in the thread reading the memory is tied to
        self.device, otherwise an expensive peer access might take
        place underneath.
        """
        while(not self.running):
            time.sleep(0.0001)
            
        return self.ptr_info


def build_alloc_info(data):
    dev, gpu_matrix, series, gpu_array = data
    return [gpu_matrix.__cuda_array_interface__, gpu_array.__cuda_array_interface__]

def get_ipc_handles(data):
    
    import numba.cuda
    dev, gpu_matrix, series, gpu_array = data
    check_device(dev)
    
    try:
        in_handle = gpu_matrix.get_ipc_handle()
        out_handle = gpu_matrix.get_ipc_handle()
        return (dev, gpu_matrix.get_ipc_handle(), gpu_array.get_ipc_handle())
    except Exception as e:
        logging.error("Error: " + str(e))
        return (dev, None, None)


# Run on a single worker on each unique host
def calc_mean(data):

    import numba.cuda
    print("begin calc_mean_device: " + str(numba.cuda.get_current_device()))
    
    ipcs, raw_arrs = data

    def new_ipc_thread(dev, ipcs):
        t = IPCThread(ipcs, dev)
        t.start()
        return t
    
    open_ipcs = [new_ipc_thread(dev, [inp, outp]) for dev, inp, outp in ipcs]
    logging.debug("calc_mean_device: " + str(numba.cuda.get_current_device()))
    m = cumlMGMean()
    
    alloc_info = [t.info() for t in open_ipcs]
    alloc_info.extend([build_alloc_info(t) for t in raw_arrs])
    
    logging.debug("calc_mean_device: " + str(numba.cuda.get_current_device()))
    m.calculate(alloc_info)

    logging.debug("end calc_mean_device: " + str(numba.cuda.get_current_device()))
    return open_ipcs, raw_arrs
    
class MGMean(object):

    def calculate(self, futures):
        client = default_client()

        # Keep the futures around so the GPU memory doesn't get
        # deallocated on the workers.
        gpu_futures = client.sync(self._get_mg_info, futures)

        who_has = client.who_has(gpu_futures)

        key_to_host_dict = {}
        for key in who_has:
            key_to_host_dict[key] = parse_host_port(who_has[key][0])
            
        hosts_to_key_dict = {}
        for key, host in key_to_host_dict.items():
            if host not in hosts_to_key_dict:
                hosts_to_key_dict[host] = set([key])
            else:
                hosts_to_key_dict[host].add(key)

        workers = [key[0] for key in list(who_has.values())]
        hosts_dict = build_host_dict(workers)
        f = []
        for host, ports in hosts_dict.items():
            exec_node = (host, random.sample(ports, 1)[0])
            
            logging.debug("Chosen exec node is "  + str(exec_node))
            
            # Don't build an ipc_handle for exec nodes (we can just grab the local data)
            keys = set(hosts_to_key_dict[exec_node])
            
            # build ipc handles
            gpu_data_excl_worker = filter(lambda d: d[0] != exec_node, gpu_futures)
            gpu_data_incl_worker = filter(lambda d: d[0] == exec_node, gpu_futures)
            
            ipc_handles = [client.submit(get_ipc_handles, future, workers=[worker])
                           for worker, future in gpu_data_excl_worker]
            raw_arrays = [future for worker, future in gpu_data_incl_worker]
            
            logging.debug(str(ipc_handles))
            logging.debug(str(raw_arrays))
            
            f.append(client.submit(calc_mean, (ipc_handles, raw_arrays), workers = [exec_node]))

        wait(f)
        
        def close_threads(d):
            logging.debug(str("Closing threads!"))
            ipc_threads, rawarrays = d
            [t.close() for t in ipc_threads]
            
        d = [client.submit(close_threads, future) for future in f]
        wait(d)
        
        def join_threads(d):
            logging.debug(str("Joining threads!"))
            ipc_threads, rawarrays = d
            [t.join() for t in ipc_threads]
            
        d = [client.submit(join_threads, future) for future in f]
        
        def print_it(data):
            dev, gpu_mat, series, gpu_arr = data
            return str(series)

        return client.gather([client.submit(print_it, future) for worker, future in gpu_futures])
    

    @gen.coroutine
    def _get_mg_info(self, futures):

        client = default_client()

        if isinstance(futures, dd.DataFrame):
            data_parts = futures.to_delayed()
            parts = list(map(delayed, data_parts))
            parts = client.compute(parts)  # Start computation in the background
            yield wait(parts)
            for part in parts:
                if part.status == 'error':
                    yield part  # trigger error locally
        else:
            data_parts = futures

        key_to_part_dict = dict([(str(part.key), part) for part in data_parts])

        who_has = yield client.who_has(data_parts)
        worker_map = []
        for key, workers in who_has.items():
            
            worker = parse_host_port(first(workers))
            worker_map.append((worker, key_to_part_dict[key]))

        gpu_data = [(worker, client.submit(to_gpu_matrix, part, workers=[worker]))
                    for worker, part in worker_map]
        
        yield wait(gpu_data)

        raise gen.Return(gpu_data)

In [29]:
from dask.distributed import wait
import random

def parse_host_port(address):
    if '://' in address:
        address = address.rsplit('://', 1)[1]
    host, port = address.split(':')
    port = int(port)
    return host, port

def build_host_dict(workers):
    hosts = set(map(lambda x: parse_host_port(x), workers))
    hosts_dict = {}
    for host, port in hosts:
        if host not in hosts_dict:
            hosts_dict[host] = set([port])
        else:
            hosts_dict[host].add(port)
            
    return hosts_dict
    

def assign_gpus(client):
    
    """
    Supports a multi-GPU & multi-Node environment by assigning a single local GPU
    to each worker in the cluster. This is necessary due to Numba's restriction
    that only a single CUDA context (and thus a single device) can be active on a 
    thread at a time. 
    
    The GPU assignments are valid as long as the future returned from this function
    is held in scope. This allows any functions that need to allocate GPU data to
    utilize the CUDA context on the same device, otherwise data could be lost.
    """

    workers = list(client.has_what().keys())
    hosts_dict = build_host_dict(workers)
    
    def get_gpu_info():
        import numba.cuda
        return [x.id for x in numba.cuda.gpus]
    
    gpu_info = dict([(host, 
                      client.submit(get_gpu_info, 
                                    workers = [(host, random.sample(hosts_dict[host], 1)[0])])) 
                     for host in hosts_dict])
    wait(list(gpu_info.values()))
    
    # Scatter out a GPU device ID to workers
    f = []
    for host, future in gpu_info.items():
        gpu_ids = future.result()
        ports = random.sample(hosts_dict[host], min(len(gpu_ids), len(hosts_dict[host])))
        
        f.extend([client.scatter(device_id, workers = [(host,port)]) for device_id, port in zip(gpu_ids, ports)])
    wait(f)
        
    return f

In [56]:
assignments = assign_gpus(client)

In [57]:
client.who_has()

{'create_cudf-1061f27a156e69abc48b9d8f28ab094c': ('tcp://10.31.241.47:39638',),
 'create_cudf-12b4ebdab71e5fed1115fb6601d9bd3b': ('tcp://10.31.241.47:43366',),
 'create_cudf-32018a29c137d10bc927cfc09f688d6a': ('tcp://10.31.241.47:34692',),
 'create_cudf-529975090709e867d21a78db55045ea2': ('tcp://10.31.241.47:32818',),
 'create_cudf-92e5a84baff59ec4abc4fe01cd531ad4': ('tcp://10.31.241.47:45428',),
 'create_cudf-af457634ff7347c0060d1cf27548c9a9': ('tcp://10.31.241.47:41735',),
 'create_cudf-b63b42d1f3ea3d90a7cf277f9ba12bb3': ('tcp://10.31.241.47:39120',),
 'create_cudf-be4ade7a2625c915b15a25eba948b141': ('tcp://10.31.241.47:36521',),
 'int-06e5a71c9839bd98760be56f629b24cc': ('tcp://10.31.241.47:45428',
  'tcp://10.31.241.47:39120',
  'tcp://10.31.241.47:43366'),
 'int-58e78e1b34eb49a68c65b54815d1b158': ('tcp://10.31.241.47:39638',
  'tcp://10.31.241.47:36521',
  'tcp://10.31.241.47:41735'),
 'int-5c8a950061aa331153f4a172bbcbfd1b': ('tcp://10.31.241.47:39638',
  'tcp://10.31.241.47:39120'

In [58]:
res = [client.submit(create_cudf, future, workers = [worker]) for future, worker in zip(assignments, workers)]
wait(res)

DoneAndNotDoneFutures(done={<Future: status: finished, type: tuple, key: create_cudf-529975090709e867d21a78db55045ea2>, <Future: status: finished, type: tuple, key: create_cudf-1061f27a156e69abc48b9d8f28ab094c>, <Future: status: finished, type: tuple, key: create_cudf-af457634ff7347c0060d1cf27548c9a9>, <Future: status: finished, type: tuple, key: create_cudf-12b4ebdab71e5fed1115fb6601d9bd3b>, <Future: status: finished, type: tuple, key: create_cudf-92e5a84baff59ec4abc4fe01cd531ad4>, <Future: status: finished, type: tuple, key: create_cudf-32018a29c137d10bc927cfc09f688d6a>, <Future: status: finished, type: tuple, key: create_cudf-be4ade7a2625c915b15a25eba948b141>, <Future: status: finished, type: tuple, key: create_cudf-b63b42d1f3ea3d90a7cf277f9ba12bb3>}, not_done=set())

In [59]:
client.who_has()

{'create_cudf-1061f27a156e69abc48b9d8f28ab094c': ('tcp://10.31.241.47:39638',),
 'create_cudf-12b4ebdab71e5fed1115fb6601d9bd3b': ('tcp://10.31.241.47:43366',),
 'create_cudf-32018a29c137d10bc927cfc09f688d6a': ('tcp://10.31.241.47:34692',),
 'create_cudf-529975090709e867d21a78db55045ea2': ('tcp://10.31.241.47:32818',),
 'create_cudf-92e5a84baff59ec4abc4fe01cd531ad4': ('tcp://10.31.241.47:45428',),
 'create_cudf-af457634ff7347c0060d1cf27548c9a9': ('tcp://10.31.241.47:41735',),
 'create_cudf-b63b42d1f3ea3d90a7cf277f9ba12bb3': ('tcp://10.31.241.47:39120',),
 'create_cudf-be4ade7a2625c915b15a25eba948b141': ('tcp://10.31.241.47:36521',),
 'int-06e5a71c9839bd98760be56f629b24cc': ('tcp://10.31.241.47:45428',
  'tcp://10.31.241.47:39120',
  'tcp://10.31.241.47:43366'),
 'int-58e78e1b34eb49a68c65b54815d1b158': ('tcp://10.31.241.47:39638',
  'tcp://10.31.241.47:36521',
  'tcp://10.31.241.47:41735'),
 'int-5c8a950061aa331153f4a172bbcbfd1b': ('tcp://10.31.241.47:39638',
  'tcp://10.31.241.47:39120'

In [82]:
m = MGMean()

In [83]:
m.calculate(res)

DEBUG:Chosen exec node is ('10.31.241.47', 32818)
DEBUG:[<Future: status: pending, key: get_ipc_handles-2f54e0237a42be03bb830d81ab3bed47>, <Future: status: pending, key: get_ipc_handles-3c5f2e6cfd750509650928c3342dedb0>, <Future: status: pending, key: get_ipc_handles-86be7d40d5944f014e1dedcbbdf38b73>, <Future: status: pending, key: get_ipc_handles-6395a9d5c900b02427322665b8c2ae51>, <Future: status: pending, key: get_ipc_handles-8987247991c6885caec835d903c82053>, <Future: status: pending, key: get_ipc_handles-48cc9e14dcb666d1f084708291a1410a>, <Future: status: pending, key: get_ipc_handles-4dbc566ab0124746f0707240d2c58f3c>]
DEBUG:[<Future: status: finished, type: tuple, key: to_gpu_matrix-bfa84d1b44f20bfa00c5fb1dc2f35882>]


['      \n0  3.5\n1  4.5',
 '      \n0  3.5\n1  4.5',
 '      \n0  3.5\n1  4.5',
 '      \n0  3.5\n1  4.5',
 '      \n0  3.5\n1  4.5',
 '      \n0  3.5\n1  4.5',
 '      \n0  3.5\n1  4.5',
 '      \n0  3.5\n1  4.5']

In [None]:
client.who_has()

In [104]:
print(str(result))

NameError: name 'result' is not defined

In [None]:
client.who_has()

