Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial Skeleton for Streaming API #4126

Merged
merged 6 commits into from
Feb 26, 2019

Conversation

jliagouris
Copy link
Contributor

What do these changes do?

Related issue number

@robertnishihara
Copy link
Collaborator

Replaces #4022.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/12223/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/12239/
Test FAILed.

@robertnishihara
Copy link
Collaborator

@jliagouris there is an error, but this looks fixable (seems to be happening in Python 2)

2.45s$ python -m pytest -v --durations=10 python/ray/tests/test_logical_graph.py
============================= test session starts ==============================
platform darwin -- Python 2.7.15, pytest-4.3.0, py-1.8.0, pluggy-0.8.1 -- /Users/travis/miniconda/bin/python
cachedir: .pytest_cache
rootdir: /Users/travis/build/ray-project/ray/python, inifile:
plugins: timeout-1.3.3, flaky-3.5.3
collected 5 items                                                              
python/ray/tests/test_logical_graph.py::test_parallelism PASSED          [ 20%]
python/ray/tests/test_logical_graph.py::test_partitioning PASSED         [ 40%]
python/ray/tests/test_logical_graph.py::test_forking FAILED              [ 60%]
python/ray/tests/test_logical_graph.py::test_channel_generation PASSED   [ 80%]
python/ray/tests/test_logical_graph.py::test_wordcount PASSED            [100%]
=================================== FAILURES ===================================
_________________________________ test_forking _________________________________
    def test_forking():
        """Tests stream forking."""
        env = Environment()
        # Try forking a stream
        stream = env.source(None).map(None).set_parallelism(2)
        # First branch with a shuffle partitioning strategy
        _ = stream.shuffle().key_by(0).sum(1)
        # Second branch with the default partitioning strategy
        _ = stream.key_by(1).sum(2)
        env._collect_garbage()
        # Operator ids
        source_id = None
        map_id = None
        keyby1_id = None
        keyby2_id = None
        sum1_id = None
        sum2_id = None
        # Collect ids
        for id, operator in env.operators.items():
            if operator.type == OpType.Source:
                source_id = id
            elif operator.type == OpType.Map:
                map_id = id
            elif operator.type == OpType.KeyBy:
                if operator.other_args == 0:
                    keyby1_id = id
                else:
                    assert operator.other_args == 1, (operator.other_args, 1)
                    keyby2_id = id
            elif operator.type == OpType.Sum:
                if operator.other_args == 1:
                    sum1_id = id
                else:
                    assert operator.other_args == 2, (operator.other_args, 2)
                    sum2_id = id
        # Check generated streams and their partitioning
        for source, destination in env.logical_topo.edges:
            if source == source_id:
                assert destination == map_id, (destination, map_id)
            elif source == map_id:
                operator = env.operators[map_id]
                key_index = env.operators[destination].other_args
                p_scheme = operator.partitioning_strategies[destination]
                strategy = p_scheme.strategy
                if key_index == 0:  # This must be the first branch
                    assert strategy == PStrategy.Shuffle, (strategy,
                                                           PStrategy.Shuffle)
                    assert destination == keyby1_id, (destination, keyby1_id)
                else:  # This must be the second branch
                    assert key_index == 1, (key_index, 1)
                    assert strategy == PStrategy.Forward, (strategy,
                                                           PStrategy.Forward)
                    assert destination == keyby2_id, (destination, keyby2_id)
            elif source == keyby1_id or source == keyby2_id:
                operator = env.operators[source]
                p_scheme = operator.partitioning_strategies[destination]
                key_index = env.operators[destination].other_args
                if key_index == 1:  # This must be the first branch
                    assert strategy == PStrategy.Forward, (strategy,
                                                           PStrategy.Forward)
                    assert destination == sum1_id, (destination, sum1_id)
                else:  # This must be the second branch
                    assert key_index == 2, (key_index, 2)
>                   assert strategy == PStrategy.Forward, (strategy,
                                                           PStrategy.Forward)
E                   UnboundLocalError: local variable 'strategy' referenced before assignment

@robertnishihara
Copy link
Collaborator

It seems like strategy isn't always defined.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/12309/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/12314/
Test FAILed.

@robertnishihara robertnishihara merged commit 89ce4c5 into ray-project:master Feb 26, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants