In [1]:
%load_ext autoreload
%autoreload 2
%load_ext line_profiler

import os
os.environ['SNORKELDB'] = 'postgres:///snorkel-biocorpus-sample'

from snorkel import SnorkelSession
session = SnorkelSession()

# Test 1: Simple non-DB, non-Snorkel

In [2]:
xs         = range(100)
y_set_gold = set(range(1,101))

In [3]:
from snorkel.udf import UDF, UDFRunner, UDFRunnerMP
from time import sleep

class AddOne(UDF):
    def apply(self, x):
        sleep(1)
        yield x + 1

### First, sanity check: `UDFRunner`

In [None]:
addone_runner_st = UDFRunner(AddOne())
y_set            = set()
%time addone_runner_st.run(xs, y_set=y_set)

In [None]:
y_set == y_set_gold

### Next, try parallelism

In [None]:
from multiprocessing import Queue

addone_runner = UDFRunnerMP(AddOne)
y_set         = Queue()
%time addone_runner.run(xs, parallelism=10, y_set=y_set)

In [None]:
from Queue import Empty

# Collect from output queue
y_set_out = set()
while True:
    try:
        y_set_out.add(y_set.get(False))
    except Empty:
        break

y_set_out == y_set_gold

# Test 2: PubTator Parsing

Just create the Session in the UDF: http://docs.sqlalchemy.org/en/latest/core/pooling.html#using-connection-pools-with-multiprocessing

One simple hypothesis: adding to the sets, e.g. `CandidateSet`, is causing the issues...

In [None]:
from snorkel.models import Corpus
corpus = Corpus(name='PubTator Annotated')
corpus
session.add(corpus)
session.commit()

In [4]:
from pubtator_parsers import PubtatorDocParser, pubtator_doc_generator

corpus_parser = UDFRunnerMP(PubtatorDocParser)
doc_generator = pubtator_doc_generator('bioconcepts2pubtator_offsets.sample')

%time corpus_parser.run(doc_generator, parallelism=10)

10 / 10 threads done.

CPU times: user 148 ms, sys: 116 ms, total: 264 ms
Wall time: 21.3 s
