Skip to content

Commit

Permalink
Merge pull request #40 from jkp/master
Browse files Browse the repository at this point in the history
Make Pythonflow thread-compatible.
  • Loading branch information
tillahoffmann committed Feb 13, 2019
2 parents 95811e7 + ee0564d commit 3b57f2b
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 7 deletions.
14 changes: 8 additions & 6 deletions pythonflow/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import functools
import importlib
import operator
import threading
import traceback
import uuid

Expand All @@ -34,16 +35,17 @@ def __init__(self):
self.operations = {}
self.dependencies = []

default_graph = None
_globals = threading.local()

def __enter__(self):
assert self.default_graph is None, "cannot have more than one default graph"
Graph.default_graph = self
assert getattr(self._globals, 'default_graph', None) is None, \
"cannot have more than one default graph"
Graph._globals.default_graph = self
return self

def __exit__(self, *args):
assert self.default_graph is self
Graph.default_graph = None
assert self._globals.default_graph is self
Graph._globals.default_graph = None

def normalize_operation(self, operation): # pylint:disable=W0621
"""
Expand Down Expand Up @@ -183,7 +185,7 @@ def get_active_graph(graph=None):
ValueError
If no `Graph` instance can be obtained.
"""
graph = graph or Graph.default_graph
graph = graph or Graph._globals.default_graph
if not graph:
raise ValueError("`graph` must be given explicitly or a default graph must be set")
return graph
Expand Down
16 changes: 16 additions & 0 deletions tests/test_pythonflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import pickle
import random
import tempfile
import threading
import uuid

import pytest
Expand Down Expand Up @@ -535,3 +536,18 @@ def test_placeholder_with_kwargs():
b, c = a

assert graph([b, c], {a: [1, 2]}) == (1, 2)


def test_thread_compatibility():
def work(event):
with pf.Graph() as graph:
event.wait()
event = threading.Event()
thread = threading.Thread(target=work, args=(event,))
thread.start()
try:
with pf.Graph() as graph:
event.set()
except AssertionError as e:
event.set()
raise e
2 changes: 1 addition & 1 deletion version.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "pythonflow",
"description": "Dataflow programming for python",
"version": "0.2.3",
"version": "0.3.0",
"author": "Till Hoffmann",
"author_email": "till@spotify.com",
"license": "License :: OSI Approved :: Apache Software License",
Expand Down

0 comments on commit 3b57f2b

Please sign in to comment.