In [1]:
#!pip install git+https://github.com/Jwink3101/parmapper

In [2]:
import sys; sys.path.append('..')
import cadence as cd


In [6]:
from parmapper import parmap

In [17]:
# data = parmap(
#     do_something,
#     cd.scan_iter(cd.txt),
#     N=2,
#     progress=False
# )

In [19]:
import os,tempfile,multiprocessing as mp
import numpy as np,pandas as pd
from tqdm import tqdm
import random

# default num proc is?
DEFAULT_NUM_PROC = mp.cpu_count() - 1

# boost to max if just 2? (hack for google colab/cloud context)
if mp.cpu_count()==2: DEFAULT_NUM_PROC=2


def pmap_iter(func, objs, args=[], kwargs={}, num_proc=DEFAULT_NUM_PROC, use_threads=False, progress=True, progress_pos=0,desc=None, **y):
    """
    Yields results of func(obj) for each obj in objs
    Uses multiprocessing.Pool(num_proc) for parallelism.
    If use_threads, use ThreadPool instead of Pool.
    Results in any order.
    """

    # check num proc
    num_cpu = mp.cpu_count()
    if num_proc>num_cpu: num_proc=num_cpu
    if num_proc>len(objs): num_proc=len(objs)

    # if parallel
    if not desc: desc=f'Mapping {func.__name__}()'
    if desc and num_cpu>1: desc=f'{desc} [x{num_proc}]'
    if num_proc>1 and len(objs)>1:

        # real objects
        objects = [(func,obj,args,kwargs) for obj in objs]

        # create pool
        pool=mp.Pool(num_proc) if not use_threads else mp.pool.ThreadPool(num_proc)

        # yield iter
        iterr = pool.imap(_pmap_do, objects)

        for res in tqdm(iterr,total=len(objects),desc=desc,position=progress_pos) if progress else iterr:
            yield res

        # Close the pool?
        pool.close()
        pool.join()
    else:
        # yield
        for obj in (tqdm(objs,desc=desc,position=progress_pos) if progress else objs):
            yield func(obj,*args,**kwargs)

def _pmap_do(inp):
    func,obj,args,kwargs = inp
    return func(obj,*args,**kwargs)

def pmap(*x,**y):
    """
    Non iterator version of pmap_iter
    """
    # return as list
    return list(pmap_iter(*x,**y))




"""
Pandas functions
"""

def pmap_df(df, func, num_proc=DEFAULT_NUM_PROC):
    df_split = np.array_split(df, num_proc)
    df = pd.concat(pmap(func, df_split, num_proc=num_proc))
    return df


def pmap_groups(func,df_grouped,use_cache=False,num_proc=DEFAULT_NUM_PROC,shuffle=True,**attrs):


    # get index/groupby col name(s)
    group_key=df_grouped.grouper.names
    # if not using cache
    # if not use_cache or attrs.get('num_proc',1)<2:
    if not use_cache or len(df_grouped)<2 or num_proc<2:
        objs=[
            (func,group_df,group_key,group_name)
            for group_name,group_df in df_grouped
        ]
    else:
        objs=[]
        groups=list(df_grouped)
        for i,(group_name,group_df) in enumerate(tqdm(groups,desc='Preparing input')):
            # print([i,group_name,tmp_path,group_df])
            if use_cache:
                tmpdir=tempfile.mkdtemp()
                tmp_path = os.path.join(tmpdir, str(i)+'.pkl')
                group_df.to_pickle(tmp_path)
            objs+=[(func,tmp_path if use_cache else group_df,group_key,group_name)]

    # desc?
    if not attrs.get('desc'): attrs['desc']=f'Mapping {func.__name__}'

    if shuffle: random.shuffle(objs)

    return pd.concat(
        pmap(
            _do_pmap_group,
            objs,
            num_proc=num_proc,
            **attrs
        )
    ).set_index(group_key)




def _do_pmap_group(obj,*x,**y):
    # unpack
    func,group_df,group_key,group_name = obj
    # load from cache?
    if type(group_df)==str:
        group_df=pd.read_pickle(group_df)
    # run func
    outdf=func(group_df,*x,**y)
    # annotate with groupnames on way out
    if type(group_name) not in {list,tuple}:group_name=[group_name]
    for x,y in zip(group_key,group_name):
        outdf[x]=y
    # return
    return outdf



def pmap_apply_cols(func, df, lim=None, **y):
    cols=list(df.columns)[:lim]
    new_seriess = pmap(
        func,
        [df[col] for col in cols],
        **y
    )
    odf=pd.DataFrame(dict(zip(cols,new_seriess)))
    return odf

def pmap_iter_gen_process(q, iolock):
    while True:
        stuff = q.get()
        if stuff is None:
            break
        with iolock:
            print("processing", stuff)
#         sleep(stuff)




In [21]:



#!/usr/bin/env python3
from multiprocessing import Process, Queue
import sys
import logging
import traceback
import inspect

#TODO: Make these more unique
STOP = "STOP"
SHUTDOWN = "SHUTDOWN"
SHUTDOWN_LAST = "SHUTDOWN_LAST"

log = None

class Task:
    def __init__(self, id, fn, inputQueue, outputQueue, multiplicity):
        self.id = id
        self.fn = fn
        self.inputQueue = inputQueue
        self.outputQueue = outputQueue
        self.multiplicity = multiplicity

    def start(self):
        self.process = Process(target=self.main, args=(self.inputQueue, self.outputQueue))
        self.process.start()

    def main(self, inputQueue, outputQueue):
        self.inputQueue = inputQueue
        self.outputQueue = outputQueue

        if inspect.isfunction(self.fn):
            logger = logging.getLogger(str(self.id) + ":" +
                    self.fn.__name__)
        else:
            logger = logging.getLogger(str(self.id) + ":" +
                    type(self.fn).__name__)
        global log
        log = lambda a: logger.debug(a)

        try:
            if hasattr(self.fn, "init"):
                self.fn.init()

            log("Running")

            while True:
                input = self.inputQueue.get()
                log("Input is {}".format(input))
                if input == SHUTDOWN: break
                if input == SHUTDOWN_LAST:
                    self.outputQueue.put(STOP)
                    break
                if input == STOP: 
                    for i in range(self.multiplicity-1):
                        self.inputQueue.put(SHUTDOWN)
                    self.inputQueue.put(SHUTDOWN_LAST)
                    continue

                result = self.fn(input)
                if inspect.isgenerator(result):
                    for x in result:
                        if x == STOP: 
                            self.inputQueue.put(STOP)
                            break
                        self.outputQueue.put(x)
                else:
                    if result == STOP: 
                        self.inputQueue.put(STOP)
                    else:
                        self.outputQueue.put(result)
                    

            log("Shutting down")
            if hasattr(self.fn, "shutdown"):
                self.fn.shutdown()

        except KeyboardInterrupt:
            pass
        except Exception:
            print("For {}".format(self.fn))
            raise


class Pipeline:
    def __init__(self):
        self.tasks = []
        self.inputQueue = Queue(1)
        self.outputQueue = Queue(1)
        self.nextId = 1

    def run(self, arg = None):

        for task in self.tasks:
            task.start()

        self.inputQueue.put(arg)
        while True:
            x = self.outputQueue.get()
            if x == STOP: break

    def add(self, fn, fanOut=1):
        inputQueue = self.inputQueue
        outputQueue = self.outputQueue
        if len(self.tasks):
            inputQueue = Queue(2)
            for task in self.tasks:
                if task.outputQueue == self.outputQueue:
                    task.outputQueue = inputQueue

        for i in range(fanOut):
            task = Task(self.nextId, fn, inputQueue, outputQueue, fanOut)
            self.nextId += 1
            self.tasks.append(task)


In [28]:
import time
def do_something(obj):
    for n in range(obj):
        yield n
    #print(len(obj))
    #time.sleep(1)

In [29]:
data = parmap(
    do_something,
    cd.scan_iter(cd.txt),
    N=2,
    progress=False
)

In [None]:
for x in data:
    print(x)

Iterating over line scansions [x4]:   0%|          | 0/7 [00:00<?, ?it/s]Traceback (most recent call last):
Traceback (most recent call last):
  File "/home/ryan/miniconda3/lib/python3.8/multiprocessing/queues.py", line 239, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/ryan/miniconda3/lib/python3.8/multiprocessing/queues.py", line 239, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/ryan/miniconda3/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
  File "/home/ryan/miniconda3/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot pickle 'generator' object
TypeError: cannot pickle 'generator' object
Traceback (most recent call last):
  File "/home/ryan/miniconda3/lib/python3.8/multiprocessing/queues.py", line 239, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/ryan/miniconda3/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
    c

In [7]:


def pmap_iter_gen(
        func,
        objs,
        args=[],
        kwargs={},
        num_proc=DEFAULT_NUM_PROC,
        use_threads=False,
        progress=True,
        progress_pos=0,
        desc=None,
        **y):
    """
    Yields results of func(obj) for each obj in objs
    Uses multiprocessing.Pool(num_proc) for parallelism.
    If use_threads, use ThreadPool instead of Pool.
    Results in any order.
    """

    # check num proc
    num_cpu = mp.cpu_count()
    if num_proc>num_cpu: num_proc=num_cpu
    if num_proc>len(objs): num_proc=len(objs)

    # if parallel
    if not desc: desc=f'Mapping {func.__name__}()'
    if desc and num_cpu>1: desc=f'{desc} [x{num_proc}]'
    if num_proc>1 and len(objs)>1:
        q = mp.Queue(maxsize=NCORE)
        iolock = mp.Lock()
        pool = mp.Pool(
            num_cpu,
            initializer=process,
            initargs=(q, iolock)
        )
        for stuff in range(20):
        q.put(stuff)  # blocks until q below its max size
        with iolock:
        print("queued", stuff)
        for _ in range(NCORE):  # tell workers we're done
        q.put(None)
        pool.close()
        pool.join()
        
        

        # real objects
        objects = [(func,obj,args,kwargs) for obj in objs]

        # create pool
        pool=mp.Pool(num_proc) if not use_threads else mp.pool.ThreadPool(num_proc)

        # yield iter
        iterr = pool.imap(_pmap_do, objects)

        for res in tqdm(iterr,total=len(objects),desc=desc,position=progress_pos) if progress else iterr:
            yield res

        # Close the pool?
        pool.close()
        pool.join()
    else:
        # yield
        for obj in (tqdm(objs,desc=desc,position=progress_pos) if progress else objs):
            yield func(obj,*args,**kwargs)



IndentationError: expected an indented block (<ipython-input-7-0deb9470836f>, line 36)

In [3]:
# def process(q, iolock):
#     from time import sleep
#     while True:
#         stuff = q.get()
#         if stuff is None:
#             break
#         with iolock:
#             print("processing", stuff)
#         sleep(stuff)

# if __name__ == '__main__':
#     q = mp.Queue(maxsize=NCORE)
#     iolock = mp.Lock()
#     pool = mp.Pool(NCORE, initializer=process, initargs=(q, iolock))
#     for stuff in range(20):
#         q.put(stuff)  # blocks until q below its max size
#         with iolock:
#             print("queued", stuff)
#     for _ in range(NCORE):  # tell workers we're done
#         q.put(None)
#     pool.close()
#     pool.join()

In [None]:
def pmap_iter_gen()