In [1]:
# multiprocessing using task and queue
# https://pymotw.com/2/multiprocessing/communication.html

# Use multiprocessing with joinable queue.
## find all the files in the folder specified and create the image_files_arr array
## nfts is the number of files to process per task
## detect number of cores and start workers for one less count
## using the nfts, create appropriate number of task objects
## each worker will get two pieces of info in the data: a prefix number, the slice of the array
### worker is to return a list with the prefix added at start of each file name
## main process waits for all the workers to finish, then prints the result arrays returned by each worker

In [2]:
import multiprocessing, os

In [3]:
image_path = r"/home/rohit/PyWDUbuntu/thesis/Imgs2Detect"
image_files_arr = [os.path.join(image_path, f) for f in os.listdir(image_path) if os.path.isfile(os.path.join(image_path, f))]

In [4]:
len(image_files_arr)

12

In [5]:
nfts = 4
num_jobs = int(len(image_files_arr) / nfts + ( (len(image_files_arr) % nfts)!=0 ) * 1)
print(f"{num_jobs}")

3


In [6]:
image_files_arr

['/home/rohit/PyWDUbuntu/thesis/Imgs2Detect/371903.jpg',
 '/home/rohit/PyWDUbuntu/thesis/Imgs2Detect/371902 (copy).jpg',
 '/home/rohit/PyWDUbuntu/thesis/Imgs2Detect/36979 (copy).jpg',
 '/home/rohit/PyWDUbuntu/thesis/Imgs2Detect/371897.jpg',
 '/home/rohit/PyWDUbuntu/thesis/Imgs2Detect/36979.jpg',
 '/home/rohit/PyWDUbuntu/thesis/Imgs2Detect/1317156 (copy).jpg',
 '/home/rohit/PyWDUbuntu/thesis/Imgs2Detect/148284.jpg',
 '/home/rohit/PyWDUbuntu/thesis/Imgs2Detect/148284 (copy).jpg',
 '/home/rohit/PyWDUbuntu/thesis/Imgs2Detect/371897 (copy).jpg',
 '/home/rohit/PyWDUbuntu/thesis/Imgs2Detect/1317156.jpg',
 '/home/rohit/PyWDUbuntu/thesis/Imgs2Detect/371903 (copy).jpg',
 '/home/rohit/PyWDUbuntu/thesis/Imgs2Detect/371902.jpg']

In [7]:
if num_jobs > 1:
    for i in range(num_jobs - 1):
        print(f"job {i+1} =\n{image_files_arr[i*nfts : (i+1)*nfts]}")
print(f"job {num_jobs} =\n{image_files_arr[(num_jobs-1)*nfts : ]}")

job 1 =
['/home/rohit/PyWDUbuntu/thesis/Imgs2Detect/371903.jpg', '/home/rohit/PyWDUbuntu/thesis/Imgs2Detect/371902 (copy).jpg', '/home/rohit/PyWDUbuntu/thesis/Imgs2Detect/36979 (copy).jpg', '/home/rohit/PyWDUbuntu/thesis/Imgs2Detect/371897.jpg']
job 2 =
['/home/rohit/PyWDUbuntu/thesis/Imgs2Detect/36979.jpg', '/home/rohit/PyWDUbuntu/thesis/Imgs2Detect/1317156 (copy).jpg', '/home/rohit/PyWDUbuntu/thesis/Imgs2Detect/148284.jpg', '/home/rohit/PyWDUbuntu/thesis/Imgs2Detect/148284 (copy).jpg']
job 3 =
['/home/rohit/PyWDUbuntu/thesis/Imgs2Detect/371897 (copy).jpg', '/home/rohit/PyWDUbuntu/thesis/Imgs2Detect/1317156.jpg', '/home/rohit/PyWDUbuntu/thesis/Imgs2Detect/371903 (copy).jpg', '/home/rohit/PyWDUbuntu/thesis/Imgs2Detect/371902.jpg']


In [8]:
multiprocessing.cpu_count() - 1

11

In [9]:
class Consumer(multiprocessing.Process):
    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill means shutdown
                print(f"Exiting process {proc_name}")
                self.task_queue.task_done()
                break
            print(f"Process {proc_name} picked job = {next_task.job_num}")
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)
        return

In [10]:
class Task(object):
    def __init__(self, _job_num, _data):
        self.job_num = _job_num
        self.data = _data
    def __call__(self):
        ans = []
        imgarr, prefix = self.data
        for each_entry in imgarr:
            ans.append("j" + str(prefix) + "_" + each_entry)
        return ans
    def __str__(self):
        pass
        #return '%s * %s' % (self.a, self.b)

In [11]:
if __name__ == '__main__':
    
    image_path = r"/home/rohit/PyWDUbuntu/thesis/Imgs2Detect"
    image_files_arr = [os.path.join(image_path, f) for f in os.listdir(image_path) if os.path.isfile(os.path.join(image_path, f))]
    nfts = 4 # num of files per task

    # Establish communication queues
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()
    
    # Start consumers
    num_consumers = multiprocessing.cpu_count() - 1
    print(f"Creating {num_consumers} consumers")
    consumers = [ Consumer(tasks, results) for i in range(num_consumers) ]
    for w in consumers:
        w.start()
    
    # Enqueue jobs
    num_jobs = int(len(image_files_arr) / nfts + ( (len(image_files_arr) % nfts)!=0 ) * 1)
    abc = [val for val in range(99,99+89*num_jobs,89)]
    if num_jobs > 1:
        for i in range(num_jobs - 1):
            data_for_job = []
            data_for_job.append(image_files_arr[i*nfts : (i+1)*nfts])
            data_for_job.append(abc[i])
            tasks.put(Task(i+1, data_for_job))
    data_for_job = []
    data_for_job.append(image_files_arr[(num_jobs-1)*nfts : ])
    data_for_job.append(abc[-1])
    tasks.put(Task(num_jobs, data_for_job))
    
    # Add a poison pill for each consumer
    for i in range(num_consumers):
        tasks.put(None)

    # Wait for all of the tasks to finish
    tasks.join()
    print(f"\n\nAll consumers rejoined\n\n")
    
    # Start printing results
    for i in range(num_jobs):
        result = results.get()
        print(f"Job {i+1} result=\n{result}")

Creating 11 consumers
Process Consumer-2 picked job = 2
Process Consumer-11 picked job = 3
Process Consumer-1 picked job = 1
Exiting process Consumer-5
Exiting process Consumer-4
Exiting process Consumer-6
Exiting process Consumer-11
Exiting process Consumer-3
Exiting process Consumer-2
Exiting process Consumer-10
Exiting process Consumer-9
Exiting process Consumer-1
Exiting process Consumer-7
Exiting process Consumer-8


All consumers rejoined


Job 1 result=
['j188_/home/rohit/PyWDUbuntu/thesis/Imgs2Detect/36979.jpg', 'j188_/home/rohit/PyWDUbuntu/thesis/Imgs2Detect/1317156 (copy).jpg', 'j188_/home/rohit/PyWDUbuntu/thesis/Imgs2Detect/148284.jpg', 'j188_/home/rohit/PyWDUbuntu/thesis/Imgs2Detect/148284 (copy).jpg']
Job 2 result=
['j277_/home/rohit/PyWDUbuntu/thesis/Imgs2Detect/371897 (copy).jpg', 'j277_/home/rohit/PyWDUbuntu/thesis/Imgs2Detect/1317156.jpg', 'j277_/home/rohit/PyWDUbuntu/thesis/Imgs2Detect/371903 (copy).jpg', 'j277_/home/rohit/PyWDUbuntu/thesis/Imgs2Detect/371902.jpg']
Jo