Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dask support #21

Merged
merged 18 commits into from
Mar 24, 2023
Merged

Add dask support #21

merged 18 commits into from
Mar 24, 2023

Conversation

PythonFZ
Copy link
Member

@PythonFZ PythonFZ commented Feb 9, 2023

Open this PR: Binder

Install via pip install znflow[dask] to deploy the ZnFlow graph using Dask either locally or on the supported dask workers.

@PythonFZ PythonFZ linked an issue Feb 9, 2023 that may be closed by this pull request
@PythonFZ
Copy link
Member Author

Check large graphs:

import dataclasses
import znflow
import random
import time

@dataclasses.dataclass
class Node(znflow.Node):
    inputs: float
    outputs: float = None
    
    def run(self):
        time.sleep(0.1)
        self.outputs = self.inputs * 2

@dataclasses.dataclass
class SumNodes(znflow.Node):
    inputs: float
    outputs: float = None

    def run(self):
        time.sleep(0.1)
        self.outputs = sum(self.inputs)

k = 5
j = 5
i = 5

with znflow.DiGraph() as graph:
    kdx_nodes = []
    for kdx in range(k):
        jdx_nodes = []
        for jdx in range(j):
            idx_nodes = []
            for idx in range(i):
                idx_nodes.append(Node(inputs=random.random()))
            jdx_nodes.append(SumNodes(inputs=[x.outputs for x in idx_nodes]))
        kdx_nodes.append(SumNodes(inputs=[x.outputs for x in jdx_nodes]))
            
    end_node = SumNodes(inputs=[x.outputs for x in kdx_nodes])
deployment = znflow.deployment.Deployment(graph=graph, client=client)
deployment.submit_graph()
deployment.get_results(graph.nodes)

Copy link
Member

@SamTov SamTov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks really nice. Just a general comment on Dask at a large scale. The issue we came across was that Dask will deploy as many jobs as possible to a single task which will mean if you set your cluster parameters such that you want 5 GB of memory because that is how much on job requires, it will violate it immediately as it just gives this amount of memory to one worker who can run as many jobs as possible.

The way I am trying to avoid this is to assign worker resources which limit the number of tasks it can deploy, e.g. 1 GPU per model training means only one model can run on the worker at a time. Or, for espresso, 1 "espresso". But in the case you have here, if you task was a large matrix computation and you heavily parallelised over nodes, I think you would reach a dead worker pretty fast.

README.md Outdated Show resolved Hide resolved
@PythonFZ
Copy link
Member Author

Check large graphs:

import dataclasses
import znflow
import random
import time

@dataclasses.dataclass
class Node(znflow.Node):
    inputs: float
    outputs: float = None
    
    def run(self):
        time.sleep(0.1)
        self.outputs = self.inputs * 2

@dataclasses.dataclass
class SumNodes(znflow.Node):
    inputs: float
    outputs: float = None

    def run(self):
        time.sleep(0.1)
        self.outputs = sum(self.inputs)

k = 5
j = 5
i = 5

with znflow.DiGraph() as graph:
    kdx_nodes = []
    for kdx in range(k):
        jdx_nodes = []
        for jdx in range(j):
            idx_nodes = []
            for idx in range(i):
                idx_nodes.append(Node(inputs=random.random()))
            jdx_nodes.append(SumNodes(inputs=[x.outputs for x in idx_nodes]))
        kdx_nodes.append(SumNodes(inputs=[x.outputs for x in jdx_nodes]))
            
    end_node = SumNodes(inputs=[x.outputs for x in kdx_nodes])
deployment = znflow.deployment.Deployment(graph=graph, client=client)
deployment.submit_graph()
deployment.get_results(graph.nodes)

Not a Problem anymore. I can run 3600 Nodes with dask in ~ 20 s

@PythonFZ PythonFZ mentioned this pull request Mar 24, 2023
@PythonFZ PythonFZ merged commit 0169cd7 into main Mar 24, 2023
@PythonFZ PythonFZ deleted the 8-dask-support branch March 24, 2023 15:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Dask support
2 participants