Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

E2e test in k8s cluster and Namespace option #1774

Merged
merged 10 commits into from
Mar 15, 2023
2 changes: 1 addition & 1 deletion .github/workflows/integration-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,6 @@ jobs:
- name: Run tests
run: |
pip install pytest
python3 -m pip install -e sdk/python; pytest sdk/python/test --log-cli-level=info
python3 -m pip install -e sdk/python; pytest sdk/python/test --log-cli-level=info --namespace=default
env:
GANG_SCHEDULER_NAME: ${{ matrix.gang-scheduler-name }}
2 changes: 1 addition & 1 deletion hack/python-sdk/gen-sdk.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ echo "Generating swagger file ..."
go run "${repo_root}"/hack/swagger/main.go ${VERSION} >"${SWAGGER_CODEGEN_FILE}"

echo "Removing previously generated files ..."
rm -rf "${SDK_OUTPUT_PATH}"/docs/V1*.md "${SDK_OUTPUT_PATH}"/kubeflow/training/models "${SDK_OUTPUT_PATH}"/kubeflow/training/*.py "${SDK_OUTPUT_PATH}"/test/*.py
rm -rf "${SDK_OUTPUT_PATH}"/docs/V1*.md "${SDK_OUTPUT_PATH}"/kubeflow/training/models "${SDK_OUTPUT_PATH}"/kubeflow/training/*.py "${SDK_OUTPUT_PATH}"/test/test_*.py
echo "Generating Python SDK for Training Operator ..."
java -jar "${SWAGGER_CODEGEN_JAR}" generate -i "${repo_root}"/hack/python-sdk/swagger.json -g python -o "${SDK_OUTPUT_PATH}" -c "${SWAGGER_CODEGEN_CONF}"

Expand Down
9 changes: 9 additions & 0 deletions sdk/python/test/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import pytest

def pytest_addoption(parser):
parser.addoption("--namespace", action="store", default="default")


@pytest.fixture
def job_namespace(request):
return request.config.getoption("--namespace")
38 changes: 19 additions & 19 deletions sdk/python/test/e2e/test_e2e_mpijob.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,16 @@
logging.basicConfig(format="%(message)s")
logging.getLogger().setLevel(logging.INFO)

TRAINING_CLIENT = TrainingClient(config_file=os.getenv("KUBECONFIG", "~/.kube/config"))
TRAINING_CLIENT = TrainingClient()
JOB_NAME = "mpijob-mxnet-ci-test"
JOB_NAMESPACE = "default"
CONTAINER_NAME = "mpi"
GANG_SCHEDULER_NAME = os.getenv(TEST_GANG_SCHEDULER_NAME_ENV_KEY)


@pytest.mark.skipif(
GANG_SCHEDULER_NAME in NONE_GANG_SCHEDULERS, reason="For gang-scheduling",
)
def test_sdk_e2e_with_gang_scheduling():
def test_sdk_e2e_with_gang_scheduling(job_namespace):
launcher_container, worker_container = generate_containers()

launcher = V1ReplicaSpec(
Expand All @@ -68,39 +67,39 @@ def test_sdk_e2e_with_gang_scheduling():
)),
)

mpijob = generate_mpijob(launcher, worker, V1SchedulingPolicy(min_available=10))
patched_mpijob = generate_mpijob(launcher, worker, V1SchedulingPolicy(min_available=2))
mpijob = generate_mpijob(launcher, worker, V1SchedulingPolicy(min_available=10), job_namespace)
patched_mpijob = generate_mpijob(launcher, worker, V1SchedulingPolicy(min_available=2), job_namespace)

TRAINING_CLIENT.create_mpijob(mpijob, JOB_NAMESPACE)
TRAINING_CLIENT.create_mpijob(mpijob, job_namespace)
logging.info(f"List of created {constants.MPIJOB_KIND}s")
logging.info(TRAINING_CLIENT.list_mpijobs(JOB_NAMESPACE))
logging.info(TRAINING_CLIENT.list_mpijobs(job_namespace))

verify_unschedulable_job_e2e(
TRAINING_CLIENT,
JOB_NAME,
JOB_NAMESPACE,
job_namespace,
constants.MPIJOB_KIND,
)

TRAINING_CLIENT.patch_mpijob(patched_mpijob, JOB_NAME, JOB_NAMESPACE)
TRAINING_CLIENT.patch_mpijob(patched_mpijob, JOB_NAME, job_namespace)
logging.info(f"List of patched {constants.MPIJOB_KIND}s")
logging.info(TRAINING_CLIENT.list_mpijobs(JOB_NAMESPACE))
logging.info(TRAINING_CLIENT.list_mpijobs(job_namespace))

verify_job_e2e(
TRAINING_CLIENT,
JOB_NAME,
JOB_NAMESPACE,
job_namespace,
constants.MPIJOB_KIND,
CONTAINER_NAME,
)

TRAINING_CLIENT.delete_mpijob(JOB_NAME, JOB_NAMESPACE)
TRAINING_CLIENT.delete_mpijob(JOB_NAME, job_namespace)


@pytest.mark.skipif(
GANG_SCHEDULER_NAME in GANG_SCHEDULERS, reason="For plain scheduling",
)
def test_sdk_e2e():
def test_sdk_e2e(job_namespace):
launcher_container, worker_container = generate_containers()

launcher = V1ReplicaSpec(
Expand All @@ -115,32 +114,33 @@ def test_sdk_e2e():
template=V1PodTemplateSpec(spec=V1PodSpec(containers=[worker_container])),
)

mpijob = generate_mpijob(launcher, worker, None)
mpijob = generate_mpijob(launcher, worker, job_namespace=job_namespace)

TRAINING_CLIENT.create_mpijob(mpijob, JOB_NAMESPACE)
TRAINING_CLIENT.create_mpijob(mpijob, job_namespace)
logging.info(f"List of created {constants.MPIJOB_KIND}s")
logging.info(TRAINING_CLIENT.list_mpijobs(JOB_NAMESPACE))
logging.info(TRAINING_CLIENT.list_mpijobs(job_namespace))

verify_job_e2e(
TRAINING_CLIENT,
JOB_NAME,
JOB_NAMESPACE,
job_namespace,
constants.MPIJOB_KIND,
CONTAINER_NAME,
)

TRAINING_CLIENT.delete_mpijob(JOB_NAME, JOB_NAMESPACE)
TRAINING_CLIENT.delete_mpijob(JOB_NAME, job_namespace)


def generate_mpijob(
launcher: V1ReplicaSpec,
worker: V1ReplicaSpec,
scheduling_policy: V1SchedulingPolicy = None,
job_namespace: str = "default",
) -> KubeflowOrgV1MPIJob:
return KubeflowOrgV1MPIJob(
api_version="kubeflow.org/v1",
kind="MPIJob",
metadata=V1ObjectMeta(name=JOB_NAME, namespace=JOB_NAMESPACE),
metadata=V1ObjectMeta(name=JOB_NAME, namespace=job_namespace),
spec=KubeflowOrgV1MPIJobSpec(
slots_per_worker=1,
run_policy=V1RunPolicy(
Expand Down
38 changes: 19 additions & 19 deletions sdk/python/test/e2e/test_e2e_mxjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,16 @@
logging.basicConfig(format="%(message)s")
logging.getLogger().setLevel(logging.INFO)

TRAINING_CLIENT = TrainingClient(config_file=os.getenv("KUBECONFIG", "~/.kube/config"))
TRAINING_CLIENT = TrainingClient()
JOB_NAME = "mxjob-mnist-ci-test"
JOB_NAMESPACE = "default"
CONTAINER_NAME = "mxnet"
GANG_SCHEDULER_NAME = os.getenv(TEST_GANG_SCHEDULER_NAME_ENV_KEY)


@pytest.mark.skipif(
GANG_SCHEDULER_NAME in NONE_GANG_SCHEDULERS, reason="For gang-scheduling",
)
def test_sdk_e2e_with_gang_scheduling():
def test_sdk_e2e_with_gang_scheduling(job_namespace):
worker_container, server_container, scheduler_container = generate_containers()

worker = V1ReplicaSpec(
Expand Down Expand Up @@ -78,39 +77,39 @@ def test_sdk_e2e_with_gang_scheduling():
)),
)

unschedulable_mxjob = generate_mxjob(scheduler, server, worker, V1SchedulingPolicy(min_available=10))
schedulable_mxjob = generate_mxjob(scheduler, server, worker, V1SchedulingPolicy(min_available=3))
unschedulable_mxjob = generate_mxjob(scheduler, server, worker, V1SchedulingPolicy(min_available=10), job_namespace)
schedulable_mxjob = generate_mxjob(scheduler, server, worker, V1SchedulingPolicy(min_available=3), job_namespace)

TRAINING_CLIENT.create_mxjob(unschedulable_mxjob, JOB_NAMESPACE)
TRAINING_CLIENT.create_mxjob(unschedulable_mxjob, job_namespace)
logging.info(f"List of created {constants.MXJOB_KIND}s")
logging.info(TRAINING_CLIENT.list_mxjobs(JOB_NAMESPACE))
logging.info(TRAINING_CLIENT.list_mxjobs(job_namespace))

verify_unschedulable_job_e2e(
TRAINING_CLIENT,
JOB_NAME,
JOB_NAMESPACE,
job_namespace,
constants.MXJOB_KIND,
)

TRAINING_CLIENT.patch_mxjob(schedulable_mxjob, JOB_NAME, JOB_NAMESPACE)
TRAINING_CLIENT.patch_mxjob(schedulable_mxjob, JOB_NAME, job_namespace)
logging.info(f"List of patched {constants.MXJOB_KIND}s")
logging.info(TRAINING_CLIENT.list_mxjobs(JOB_NAMESPACE))
logging.info(TRAINING_CLIENT.list_mxjobs(job_namespace))

verify_job_e2e(
TRAINING_CLIENT,
JOB_NAME,
JOB_NAMESPACE,
job_namespace,
constants.MXJOB_KIND,
CONTAINER_NAME,
)

TRAINING_CLIENT.delete_mxjob(JOB_NAME, JOB_NAMESPACE)
TRAINING_CLIENT.delete_mxjob(JOB_NAME, job_namespace)


@pytest.mark.skipif(
GANG_SCHEDULER_NAME in GANG_SCHEDULERS, reason="For plain scheduling",
)
def test_sdk_e2e():
def test_sdk_e2e(job_namespace):
worker_container, server_container, scheduler_container = generate_containers()

worker = V1ReplicaSpec(
Expand All @@ -131,33 +130,34 @@ def test_sdk_e2e():
template=V1PodTemplateSpec(spec=V1PodSpec(containers=[scheduler_container])),
)

mxjob = generate_mxjob(scheduler, server, worker)
mxjob = generate_mxjob(scheduler, server, worker, job_namespace=job_namespace)

TRAINING_CLIENT.create_mxjob(mxjob, JOB_NAMESPACE)
TRAINING_CLIENT.create_mxjob(mxjob, job_namespace)
logging.info(f"List of created {constants.MXJOB_KIND}s")
logging.info(TRAINING_CLIENT.list_mxjobs(JOB_NAMESPACE))
logging.info(TRAINING_CLIENT.list_mxjobs(job_namespace))

verify_job_e2e(
TRAINING_CLIENT,
JOB_NAME,
JOB_NAMESPACE,
job_namespace,
constants.MXJOB_KIND,
CONTAINER_NAME,
)

TRAINING_CLIENT.delete_mxjob(JOB_NAME, JOB_NAMESPACE)
TRAINING_CLIENT.delete_mxjob(JOB_NAME, job_namespace)


def generate_mxjob(
scheduler: V1ReplicaSpec,
server: V1ReplicaSpec,
worker: V1ReplicaSpec,
scheduling_policy: V1SchedulingPolicy = None,
job_namespace: str = "default",
) -> KubeflowOrgV1MXJob:
return KubeflowOrgV1MXJob(
api_version="kubeflow.org/v1",
kind="MXJob",
metadata=V1ObjectMeta(name=JOB_NAME, namespace=JOB_NAMESPACE),
metadata=V1ObjectMeta(name=JOB_NAME, namespace=job_namespace),
spec=KubeflowOrgV1MXJobSpec(
job_mode="MXTrain",
run_policy=V1RunPolicy(
Expand Down
38 changes: 19 additions & 19 deletions sdk/python/test/e2e/test_e2e_paddlejob.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,16 @@
logging.basicConfig(format="%(message)s")
logging.getLogger().setLevel(logging.INFO)

TRAINING_CLIENT = TrainingClient(config_file=os.getenv("KUBECONFIG", "~/.kube/config"))
TRAINING_CLIENT = TrainingClient()
JOB_NAME = "paddlejob-cpu-ci-test"
JOB_NAMESPACE = "default"
CONTAINER_NAME = "paddle"
GANG_SCHEDULER_NAME = os.getenv(TEST_GANG_SCHEDULER_NAME_ENV_KEY)


@pytest.mark.skipif(
GANG_SCHEDULER_NAME in NONE_GANG_SCHEDULERS, reason="For gang-scheduling",
)
def test_sdk_e2e_with_gang_scheduling():
def test_sdk_e2e_with_gang_scheduling(job_namespace):
container = generate_container()

worker = V1ReplicaSpec(
Expand All @@ -58,39 +57,39 @@ def test_sdk_e2e_with_gang_scheduling():
)),
)

unschedulable_paddlejob = generate_paddlejob(worker, V1SchedulingPolicy(min_available=10))
schedulable_paddlejob = generate_paddlejob(worker, V1SchedulingPolicy(min_available=2))
unschedulable_paddlejob = generate_paddlejob(worker, V1SchedulingPolicy(min_available=10), job_namespace)
schedulable_paddlejob = generate_paddlejob(worker, V1SchedulingPolicy(min_available=2), job_namespace)

TRAINING_CLIENT.create_paddlejob(unschedulable_paddlejob, JOB_NAMESPACE)
TRAINING_CLIENT.create_paddlejob(unschedulable_paddlejob, job_namespace)
logging.info(f"List of created {constants.PADDLEJOB_KIND}s")
logging.info(TRAINING_CLIENT.list_paddlejobs(JOB_NAMESPACE))
logging.info(TRAINING_CLIENT.list_paddlejobs(job_namespace))

verify_unschedulable_job_e2e(
TRAINING_CLIENT,
JOB_NAME,
JOB_NAMESPACE,
job_namespace,
constants.PADDLEJOB_KIND,
)

TRAINING_CLIENT.patch_paddlejob(schedulable_paddlejob, JOB_NAME, JOB_NAMESPACE)
TRAINING_CLIENT.patch_paddlejob(schedulable_paddlejob, JOB_NAME, job_namespace)
logging.info(f"List of patched {constants.PADDLEJOB_KIND}s")
logging.info(TRAINING_CLIENT.list_paddlejobs(JOB_NAMESPACE))
logging.info(TRAINING_CLIENT.list_paddlejobs(job_namespace))

verify_job_e2e(
TRAINING_CLIENT,
JOB_NAME,
JOB_NAMESPACE,
job_namespace,
constants.PADDLEJOB_KIND,
CONTAINER_NAME,
)

TRAINING_CLIENT.delete_paddlejob(JOB_NAME, JOB_NAMESPACE)
TRAINING_CLIENT.delete_paddlejob(JOB_NAME, job_namespace)


@pytest.mark.skipif(
GANG_SCHEDULER_NAME in GANG_SCHEDULERS, reason="For plain scheduling",
)
def test_sdk_e2e():
def test_sdk_e2e(job_namespace):
container = generate_container()

worker = V1ReplicaSpec(
Expand All @@ -99,31 +98,32 @@ def test_sdk_e2e():
template=V1PodTemplateSpec(spec=V1PodSpec(containers=[container])),
)

paddlejob = generate_paddlejob(worker)
paddlejob = generate_paddlejob(worker, job_namespace=job_namespace)

TRAINING_CLIENT.create_paddlejob(paddlejob, JOB_NAMESPACE)
TRAINING_CLIENT.create_paddlejob(paddlejob, job_namespace)
logging.info(f"List of created {constants.PADDLEJOB_KIND}s")
logging.info(TRAINING_CLIENT.list_paddlejobs(JOB_NAMESPACE))
logging.info(TRAINING_CLIENT.list_paddlejobs(job_namespace))

verify_job_e2e(
TRAINING_CLIENT,
JOB_NAME,
JOB_NAMESPACE,
job_namespace,
constants.PADDLEJOB_KIND,
CONTAINER_NAME,
)

TRAINING_CLIENT.delete_paddlejob(JOB_NAME, JOB_NAMESPACE)
TRAINING_CLIENT.delete_paddlejob(JOB_NAME, job_namespace)


def generate_paddlejob(
worker: V1ReplicaSpec,
scheduling_policy: V1SchedulingPolicy = None,
job_namespace: str = "default",
) -> KubeflowOrgV1PaddleJob:
return KubeflowOrgV1PaddleJob(
api_version="kubeflow.org/v1",
kind="PaddleJob",
metadata=V1ObjectMeta(name=JOB_NAME, namespace=JOB_NAMESPACE),
metadata=V1ObjectMeta(name=JOB_NAME, namespace=job_namespace),
spec=KubeflowOrgV1PaddleJobSpec(
run_policy=V1RunPolicy(
scheduling_policy=scheduling_policy,
Expand Down
Loading