Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement DaskExecutor #9

Merged

Conversation

deepyaman
Copy link
Collaborator

@deepyaman deepyaman commented Oct 28, 2021

Description

Development notes

I tested this on the pandas-iris starter pipeline. The modifications I made were to:

  • set tags=["executor:kedro.symphony.executor.dask_executor.DaskExecutor"], for the DS pipeline
  • add print("Model accuracy on test set: %0.2f%%", accuracy * 100) to the report_accuracy node, because the log wasn't showing up on the worker

I started up a local Dask cluster following https://towardsdatascience.com/just-start-with-the-dask-localcluster-saturn-cloud-7dbfe5a89c9a ("What to do" section), with export PYTHONPATH=/Users/deepyaman_datta/new-kedro-project/src in each shell first as a quick hack (as per https://stackoverflow.com/a/39994128, where MRocklin suggests using Client.upload_file as a better method, which makes sense).

And then kedro run!

dask-scheduler console:

(kedro) WASH-178551-C02X31K9JHD4:kedro deepyaman_datta$ export PYTHONPATH=/Users/deepyaman_datta/new-kedro-project/src
(kedro) WASH-178551-C02X31K9JHD4:kedro deepyaman_datta$ dask-scheduler
distributed.scheduler - INFO - -----------------------------------------------
distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:   tcp://10.17.20.117:8786
distributed.scheduler - INFO -   dashboard at:                     :8787
distributed.scheduler - INFO - Register worker <Worker 'tcp://127.0.0.1:62838', name: tcp://127.0.0.1:62838, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:62838
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Receive client connection: Client-02393ed6-37ad-11ec-adc3-acde48001122
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Receive client connection: Client-worker-02eb9748-37ad-11ec-adba-acde48001122
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Receive client connection: Client-02ed214e-37ad-11ec-adc3-acde48001122
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Receive client connection: Client-02ef2c00-37ad-11ec-adc3-acde48001122
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Remove client Client-02ef2c00-37ad-11ec-adc3-acde48001122
distributed.scheduler - INFO - Remove client Client-02ef2c00-37ad-11ec-adc3-acde48001122
distributed.scheduler - INFO - Close client connection: Client-02ef2c00-37ad-11ec-adc3-acde48001122
distributed.scheduler - INFO - Remove client Client-02ed214e-37ad-11ec-adc3-acde48001122
distributed.scheduler - INFO - Remove client Client-02393ed6-37ad-11ec-adc3-acde48001122
distributed.scheduler - INFO - Close client connection: Client-02ed214e-37ad-11ec-adc3-acde48001122
distributed.scheduler - INFO - Close client connection: Client-02393ed6-37ad-11ec-adc3-acde48001122

dask-worker console:

(kedro) WASH-178551-C02X31K9JHD4:kedro deepyaman_datta$ export PYTHONPATH=/Users/deepyaman_datta/new-kedro-project/src
(kedro) WASH-178551-C02X31K9JHD4:kedro deepyaman_datta$ dask-worker tcp://127.0.0.1:8786
distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:62837'
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:62838
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:62838
distributed.worker - INFO -          dashboard at:            127.0.0.1:62839
distributed.worker - INFO - Waiting to connect to:       tcp://127.0.0.1:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          8
distributed.worker - INFO -                Memory:                   17.18 GB
distributed.worker - INFO -       Local Directory: /Users/deepyaman_datta/kedro/dask-worker-space/dask-worker-space/worker-oqend4ii
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:       tcp://127.0.0.1:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
Model accuracy on test set: %0.2f%% 90.0

A lot of the Dask code is taken from https://github.com/hhuuggoo/kedro-dask-example/blob/416034880d9cf3500409fd49ff8053870ce6e15e/src/kedro_dask_example/io/__init__.py.

TODO:

  • Test whether we can return to non-Dask workflow.

Checklist

  • Read the contributing guidelines
  • Opened this PR as a 'Draft Pull Request' if it is work-in-progress
  • Updated the documentation to reflect the code changes
  • Added a description of this change and added my name to the list of supporting contributions in the RELEASE.md file
  • Added tests to cover my changes

Notice

  • I acknowledge and agree that, by checking this box and clicking "Submit Pull Request":

  • I submit this contribution under the Apache 2.0 license and represent that I am entitled to do so on behalf of myself, my employer, or relevant third parties, as applicable.

  • I certify that (a) this contribution is my original creation and / or (b) to the extent it is not my original creation, I am authorised to submit this contribution on behalf of the original creator(s) or their licensees.

  • I certify that the use of this contribution as authorised by the Apache 2.0 license does not violate the intellectual property rights of anyone else.

@deepyaman deepyaman merged commit 5998099 into quantumhack/kedro-executor-scheduler Oct 28, 2021
@deepyaman deepyaman deleted the feature/dask-executor branch October 28, 2021 08:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant