Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Orca yarn test #3546

Merged
merged 11 commits into from
Nov 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
400 changes: 300 additions & 100 deletions python/orca/dev/example/run-example-test-ray-integration.sh

Large diffs are not rendered by default.

17 changes: 11 additions & 6 deletions python/orca/example/learn/bigdl/attention/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# limitations under the License.
#


import argparse
import numpy as np
from tensorflow.python.keras.datasets import imdb
Expand All @@ -35,7 +34,7 @@
cluster_mode = args.cluster_mode
conf = {"spark.executor.extraJavaOptions": "-Xss512m",
"spark.driver.extraJavaOptions": "-Xss512m"}
max_features = 20000
max_features = 2000
max_len = 200

if cluster_mode == "local":
Expand All @@ -44,12 +43,19 @@
driver_memory="20g",
conf=conf
)
elif cluster_mode == "yarn":
sc = init_orca_context(cluster_mode="yarn-client", num_nodes=8, cores=8,
elif cluster_mode.startswith("yarn"):
if cluster_mode == "yarn_client":
sc = init_orca_context(cluster_mode="yarn-client", num_nodes=8, cores=8,
memory="100g",
driver_memory="20g",
conf=conf
)
)
else:
sc = init_orca_context(cluster_mode="yarn-cluster", num_nodes=8, cores=8,
memory="100g",
driver_memory="20g",
conf=conf
)
elif cluster_mode == "spark-submit":
sc = init_orca_context(cluster_mode="spark-submit")
else:
Expand Down Expand Up @@ -106,4 +112,3 @@

print("finished...")
stop_orca_context()

10 changes: 7 additions & 3 deletions python/orca/example/learn/bigdl/imageInference/imageInference.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType, DoubleType

from bigdl.dllib.nncontext import *
from bigdl.dllib.feature.image import *
from bigdl.dllib.nnframes import *
from bigdl.orca.learn.bigdl.estimator import Estimator
Expand Down Expand Up @@ -53,7 +54,7 @@ def inference(image_path, model_path, batch_size, sc):
help="training data path.")
parser.add_option("--b", "--batch_size", type=int, dest="batch_size", default="56",
help="The number of samples per gradient update. Default is 56.")
parser.add_option('--cluster_mode', type=str, default="local",
parser.add_option('--cluster_mode', type=str, dest="cluster_mode", default="local",
help='The mode for the Spark cluster. local, yarn or spark-submit.')

(options, args) = parser.parse_args(sys.argv)
Expand All @@ -69,8 +70,11 @@ def inference(image_path, model_path, batch_size, sc):
cluster_mode = options.cluster_mode
if cluster_mode == "local":
sc = init_orca_context(memory="3g")
elif cluster_mode == "yarn":
sc = init_orca_context(cluster_mode="yarn-client", num_nodes=2, memory="3g")
elif cluster_mode.startswith("yarn"):
if cluster_mode == "yarn-client":
sc = init_orca_context(cluster_mode="yarn-client", num_nodes=2, memory="3g")
else:
sc = init_orca_context(cluster_mode="yarn-cluster", num_nodes=2, memory="3g")
elif cluster_mode == "spark-submit":
sc = init_orca_context(cluster_mode="spark-submit")
else:
Expand Down
11 changes: 7 additions & 4 deletions python/orca/example/learn/horovod/pytorch_estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,13 @@ def train_example(workers_per_node):
if args.cluster_mode == "local":
init_orca_context(cluster_mode="local", cores=args.cores,
num_nodes=args.num_nodes, memory=args.memory)
elif args.cluster_mode == "yarn":
init_orca_context(cluster_mode="yarn-client", cores=args.cores,
num_nodes=args.num_nodes, memory=args.memory)
elif args.cluster_mode.startswith("yarn"):
if args.cluster_mode == "yarn-client":
init_orca_context(cluster_mode="yarn-client", cores=args.cores,
num_nodes=args.num_nodes, memory=args.memory)
else:
init_orca_context(cluster_mode="yarn-cluster", cores=args.cores,
num_nodes=args.num_nodes, memory=args.memory)
elif args.cluster_mode == "k8s":
if not args.k8s_master or not args.container_image \
or not args.k8s_driver_host or not args.k8s_driver_port:
Expand All @@ -150,4 +154,3 @@ def train_example(workers_per_node):
init_orca_context(cluster_mode="spark-submit")
train_example(workers_per_node=args.workers_per_node)
stop_orca_context()

6 changes: 6 additions & 0 deletions python/orca/example/learn/horovod/simple_horovod_pytorch.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ class AppURLopener(urllib.FancyURLopener):
# Horovod: limit # of CPU threads to be used per worker.
torch.set_num_threads(4)

# new_mirror = 'https://ossci-datasets.s3.amazonaws.com/mnist'
# datasets.MNIST.resources = [
# ('/'.join([new_mirror, url.split('/')[-1]]), md5)
# for url, md5 in datasets.MNIST.resources
# ]

kwargs = {}
train_dataset = \
datasets.MNIST('data-%d' % hvd.rank(), train=True, download=True,
Expand Down
1 change: 1 addition & 0 deletions python/orca/example/learn/mxnet/lenet_mnist.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,4 @@ def get_metrics(config):
epochs=opt.epochs, batch_size=opt.batch_size)
estimator.shutdown()
stop_orca_context()

3 changes: 1 addition & 2 deletions python/orca/example/learn/openvino/predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def crop(img, w, h):

if args.cluster_mode == "local":
init_orca_context(cores=args.core_num, memory=args.memory)
elif args.cluster_mode == "yarn":
elif args.cluster_mode.startswith("yarn"):
init_orca_context(cluster_mode=args.cluster_mode, cores=args.core_num,
num_nodes=args.executor_num, memory=args.memory)
elif args.cluster_mode == "spark-submit":
Expand All @@ -78,4 +78,3 @@ def crop(img, w, h):
assert result[1].shape == (args.data_num, 255, 26, 26)
assert result[2].shape == (args.data_num, 255, 52, 52)
stop_orca_context()

Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
import os
import time

from python.orca.example.ray_on_spark.parameter_server import model
import ray
import model
#import model

from bigdl.orca import init_orca_context, stop_orca_context
from bigdl.orca import OrcaContext
Expand Down Expand Up @@ -97,7 +98,7 @@ def worker_task(ps, worker_index, batch_size=50):
if __name__ == "__main__":
args = parser.parse_args()
cluster_mode = args.cluster_mode
if cluster_mode == "yarn":
if cluster_mode.startswith("yarn"):
sc = init_orca_context(cluster_mode=cluster_mode,
cores=args.executor_cores,
memory=args.executor_memory,
Expand All @@ -106,8 +107,7 @@ def worker_task(ps, worker_index, batch_size=50):
driver_memory=args.driver_memory,
driver_cores=args.driver_cores,
extra_executor_memory_for_ray=args.extra_executor_memory_for_ray,
object_store_memory=args.object_store_memory,
additional_archive="MNIST_data.zip#MNIST_data")
object_store_memory=args.object_store_memory)
ray_ctx = OrcaContext.get_ray_context()
elif cluster_mode == "local":
sc = init_orca_context(cores=args.driver_cores)
Expand Down Expand Up @@ -143,4 +143,3 @@ def worker_task(ps, worker_index, batch_size=50):
time.sleep(1)
ray_ctx.stop()
stop_orca_context()

Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@
import argparse
import os

from python.orca.example.ray_on_spark.parameter_server import model
import numpy as np
import ray
import model
#import model

from bigdl.orca import init_orca_context, stop_orca_context
from bigdl.orca import OrcaContext
Expand Down Expand Up @@ -88,7 +89,7 @@ def compute_gradients(self, weights):
if __name__ == "__main__":
args = parser.parse_args()
cluster_mode = args.cluster_mode
if cluster_mode == "yarn":
if cluster_mode.startswith("yarn"):
sc = init_orca_context(cluster_mode=cluster_mode,
cores=args.executor_cores,
memory=args.executor_memory,
Expand Down Expand Up @@ -139,4 +140,3 @@ def compute_gradients(self, weights):
i += 1
ray_ctx.stop()
stop_orca_context()

3 changes: 1 addition & 2 deletions python/orca/example/ray_on_spark/rl_pong/rl_pong.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ def compute_gradient(self, model):

args = parser.parse_args()
cluster_mode = args.cluster_mode
if cluster_mode == "yarn":
if cluster_mode.startswith("yarn"):
sc = init_orca_context(cluster_mode=cluster_mode,
cores=args.executor_cores,
memory=args.executor_memory,
Expand Down Expand Up @@ -282,4 +282,3 @@ def compute_gradient(self, model):

ray_ctx.stop()
stop_orca_context()

32 changes: 21 additions & 11 deletions python/orca/example/ray_on_spark/rllib/multiagent_two_trainers.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,27 @@
if __name__ == "__main__":
args = parser.parse_args()
cluster_mode = args.cluster_mode
if cluster_mode == "yarn":
sc = init_orca_context(cluster_mode="yarn",
cores=args.executor_cores,
memory=args.executor_memory,
init_ray_on_spark=True,
driver_memory=args.driver_memory,
driver_cores=args.driver_cores,
num_executors=args.slave_num,
extra_executor_memory_for_ray=args.extra_executor_memory_for_ray,
object_store_memory=args.object_store_memory)
if cluster_mode.startswith("yarn"):
if cluster_mode == "yarn-client":
sc = init_orca_context(cluster_mode="yarn-client",
cores=args.executor_cores,
memory=args.executor_memory,
init_ray_on_spark=True,
driver_memory=args.driver_memory,
driver_cores=args.driver_cores,
num_executors=args.slave_num,
extra_executor_memory_for_ray=args.extra_executor_memory_for_ray,
object_store_memory=args.object_store_memory)
else:
sc = init_orca_context(cluster_mode="yarn-cluster",
cores=args.executor_cores,
memory=args.executor_memory,
init_ray_on_spark=True,
driver_memory=args.driver_memory,
driver_cores=args.driver_cores,
num_executors=args.slave_num,
extra_executor_memory_for_ray=args.extra_executor_memory_for_ray,
object_store_memory=args.object_store_memory)
ray_ctx = OrcaContext.get_ray_context()
elif cluster_mode == "local":
sc = init_orca_context(cores=args.driver_cores)
Expand Down Expand Up @@ -159,4 +170,3 @@ def policy_mapping_fn(agent_id):

ray_ctx.stop()
stop_orca_context()

25 changes: 24 additions & 1 deletion python/orca/example/tfpark/estimator/estimator_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,15 @@
from bigdl.dllib.nncontext import init_nncontext
from bigdl.orca.tfpark import TFDataset, TFEstimator
from bigdl.orca.tfpark import ZooOptimizer
from bigdl.dllib.utils.common import *

import os
import argparse

parser = argparse.ArgumentParser(description="Run the tfpark keras "
"dataset example.")
parser.add_argument('--cluster_mode', type=str, default="local",
help='The mode for the Spark cluster. local, yarn or spark-submit.')

def get_data(dataset):
from bigdl.dllib.feature.dataset import mnist
Expand All @@ -29,7 +37,22 @@ def get_data(dataset):


def main():
sc = init_nncontext()
args = parser.parse_args()
cluster_mode = args.cluster_mode
if cluster_mode.startswith("yarn"):
hadoop_conf = os.environ.get("HADOOP_CONF_DIR")
assert hadoop_conf, "Directory path to hadoop conf not found for yarn-client mode. Please " \
"set the environment variable HADOOP_CONF_DIR"
spark_conf = create_spark_conf().set("spark.executor.memory", "5g") \
.set("spark.executor.cores", 2) \
.set("spark.executor.instances", 2) \
.set("spark.driver.memory", "2g")
if cluster_mode == "yarn-client":
sc = init_nncontext(spark_conf, cluster_mode="yarn-client", hadoop_conf=hadoop_conf)
else:
sc = init_nncontext(spark_conf, cluster_mode="yarn-cluster", hadoop_conf=hadoop_conf)
else:
sc = init_nncontext()

def model_fn(features, labels, mode):
from nets import lenet
Expand Down
21 changes: 20 additions & 1 deletion python/orca/example/tfpark/estimator/estimator_inception.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# limitations under the License.
#
from optparse import OptionParser
import sys
import os

import tensorflow as tf

Expand All @@ -23,11 +25,26 @@
from bigdl.dllib.feature.image.imageset import *
from bigdl.orca.tfpark import TFDataset, TFEstimator
from bigdl.orca.tfpark import ZooOptimizer
from bigdl.dllib.utils.common import *


def main(option):
batch_size = 16 if not option.batch_size else int(option.batch_size)
sc = init_nncontext()
cluster_mode = options.cluster_mode
if cluster_mode.startswith("yarn"):
hadoop_conf = os.environ.get("HADOOP_CONF_DIR")
assert hadoop_conf, "Directory path to hadoop conf not found for yarn-client mode. Please " \
"set the environment variable HADOOP_CONF_DIR"
spark_conf = create_spark_conf().set("spark.executor.memory", "5g") \
.set("spark.executor.cores", 2) \
.set("spark.executor.instances", 2) \
.set("spark.driver.memory", "2g")
if cluster_mode == "yarn-client":
sc = init_nncontext(spark_conf, cluster_mode="yarn-client", hadoop_conf=hadoop_conf)
else:
sc = init_nncontext(spark_conf, cluster_mode="yarn-cluster", hadoop_conf=hadoop_conf)
else:
sc = init_nncontext()

def input_fn(mode, params):

Expand Down Expand Up @@ -88,6 +105,8 @@ def model_fn(features, labels, mode, params):
parser.add_option("--image-path", dest="image_path")
parser.add_option("--num-classes", dest="num_classes")
parser.add_option("--batch_size", dest="batch_size")
parser.add_option('--cluster_mode', type=str, default="local",
help='The mode for the Spark cluster. local, yarn or spark-submit.')

(options, args) = parser.parse_args(sys.argv)
main(options)
27 changes: 25 additions & 2 deletions python/orca/example/tfpark/gan/gan_train_and_evaluate.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,23 @@
from bigdl.dllib.nncontext import init_nncontext
from bigdl.orca.tfpark import TFDataset
from bigdl.orca.tfpark import ZooOptimizer
from bigdl.dllib.utils.common import *
import numpy as np
import matplotlib.pyplot as plt

from tensorflow_gan.examples.mnist.networks import *
from tensorflow_gan.python.losses.losses_impl import *
import tensorflow_datasets as tfds

import os
import argparse

MODEL_DIR = "/tmp/gan_model"
NOISE_DIM = 64

parser = argparse.ArgumentParser()
parser.add_argument('--cluster_mode', type=str, default="local",
help='The mode for the Spark cluster. local, yarn or spark-submit.')

def eval():

Expand All @@ -53,7 +60,23 @@ def eval():


if __name__ == "__main__":
sc = init_nncontext()
conf = {}
args = parser.parse_args()
cluster_mode = args.cluster_mode
if cluster_mode.startswith("yarn"):
hadoop_conf = os.environ.get("HADOOP_CONF_DIR")
assert hadoop_conf, "Directory path to hadoop conf not found for yarn-client mode. Please " \
"set the environment variable HADOOP_CONF_DIR"
spark_conf = create_spark_conf().set("spark.executor.memory", "5g") \
.set("spark.executor.cores", 2) \
.set("spark.executor.instances", 2) \
.set("spark.driver.memory", "2g")
if cluster_mode == "yarn-client":
sc = init_nncontext(spark_conf, cluster_mode="yarn-client", hadoop_conf=hadoop_conf)
else:
sc = init_nncontext(spark_conf, cluster_mode="yarn-cluster", hadoop_conf=hadoop_conf)
else:
sc = init_nncontext()

def input_fn():
def map_func(data):
Expand All @@ -67,7 +90,7 @@ def map_func(data):

ds = tfds.load("mnist", split="train")
ds = ds.map(map_func)
dataset = TFDataset.from_tf_data_dataset(ds, batch_size=36)
dataset = TFDataset.from_tf_data_dataset(ds, batch_size=56)
return dataset

opt = GANEstimator(
Expand Down
Loading