In [1]:
import pandas as pd
from PIL import Image
from io import BytesIO
import requests
import numpy as np

In [2]:
df = pd.read_csv('../sample_url.csv') # this csv is kaggle example
df

Unnamed: 0,id,url
0,000088da12d664db,https://lh3.googleusercontent.com/-k45wfamuhT8...
1,0001623c6d808702,https://lh3.googleusercontent.com/-OQ0ywv8KVIA...
2,0001bbb682d45002,https://lh3.googleusercontent.com/-kloLenz1xZk...
3,0002362830cfe3a3,https://lh3.googleusercontent.com/-N6z79jNZYTg...
4,000270c9100de789,https://lh3.googleusercontent.com/-keriHaVOq1U...
5,0002b0fab5d3ccc4,https://lh3.googleusercontent.com/-ciWklpsrab8...
6,000396be3c24830a,https://lh3.googleusercontent.com/-6W9F179t59Q...
7,000506dc6ab3a40e,https://lh3.googleusercontent.com/-_XHsAXB2LZA...
8,0005292fc4b005a3,https://lh3.googleusercontent.com/-RBZ4F1ZKNc0...
9,0005456a82264bc8,https://lh3.googleusercontent.com/-MRK7_uiKO6A...


In [3]:
from multiprocessing import Queue, Process
from tensorflow.python.client import device_lib

In [4]:
def get_available_gpu():
    local_device_protos = device_lib.list_local_devices()
    return [x.name for x in local_device_protos if x.device_type == "GPU"]

In [5]:
get_available_gpu()

[]

In [6]:
class ModelWorker(Process):
    
    def __init__(self, data, idx, gpuid, img_load_workes, queue):
        Process.__init__(self, name="ModelProcessor")
        self._gpuid = gpuid
        self._data = data
        self._idx = idx
        self._img_load_workers = img_load_workes
        self.queue = queue
    
    def run(self):
        import os
        os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID'
        os.environ['CUDA_VISIBLE_DEVICES'] = str(self._gpuid)
        
        from keras.applications.resnet50 import ResNet50
        resnet = ResNet50(weights='/Users/1003874/.keras/models/resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5', include_top=False)
        print("resnet init done", self._gpuid)
        
        from custom_data_generator import DataGenerator
        data_size = self._idx[1] - self._idx[0]
        batch_size = 100
        
        step = 0
        if data_size%batch_size == 0:
            step = int(data_size/batch_size)
        else:
            step = int(data_size/batch_size) + 1
        
        print(data_size, batch_size, step)
        pred_generator = DataGenerator(self._data[self._idx[0]:self._idx[1]].tolist(), batch_size)
        
        result = resnet.predict_generator(pred_generator,
                                          steps=step,
                                          workers=self._img_load_workers,
                                          max_queue_size=20,
                                          use_multiprocessing=True
                                          )
        print(result.shape, self._gpuid)
        self.queue.put((self._gpuid, result))
        
        print('prediction done',self._gpuid)

In [7]:
def multi_gpu_predictor_from_image_path(image_path, gpus):
    _gpus = len(get_available_gpu())
    
    if  _gpus < gpus:
        gpus = len(get_available_gpu())
        print('Available gpu number exceed, you can use only {} gpus'.format(gpus))
    
    cpu_core = 8
    
    queue = Queue()
    idx = (np.linspace(0,len(image_path), num=gpus+1)).astype(int)
    
    
    worker_list = []
    if gpus == 0:
        print('CPU used')
        worker = ModelWorker(image_path,(0,len(image_path)), -1, cpu_core, queue)
        worker_list.append(worker)
        
    else:
        img_load_workers = int(cpu_core/gpus)
        for gpuid in range(gpus):
            worker = ModelWorker(image_path,(idx[gpuid], idx[gpuid+1]), gpuid, img_load_workers, queue)
            worker_list.append(worker)
        
    for worker in worker_list:
        worker.start()
        
    results = []
    if gpus == 0: gpus = 1
    for i in range(gpus):
        results.append(queue.get())
    
    for worker in worker_list:
        worker.join()
        
    sorted_result = sorted(results, key=lambda tup:tup[0])
    results = [tup[1] for tup in sorted_result]
    arr = np.concatenate(results)
    shape = arr.shape
    print(shape)
    arr = arr.reshape(shape[0],shape[-1])
    
    return arr

In [8]:
sample = df.url[:100]

In [None]:
%%time
multi_gpu_predictor_from_image_path(sample,2)

Available gpu number exceed, you can use only 0 gpus
CPU used


Using TensorFlow backend.


In [9]:
queue = Queue()
worker = ModelWorker(sample.tolist(),(0,100), -1, 2, queue)

In [None]:
worker.start()
result = queue.get()
worker.join()

Using TensorFlow backend.
