Skip to content

Commit

Permalink
Add error class, tidy up tests
Browse files Browse the repository at this point in the history
  • Loading branch information
micw523 committed Feb 9, 2019
1 parent fe2f853 commit 8342a8e
Show file tree
Hide file tree
Showing 4 changed files with 289 additions and 111 deletions.
288 changes: 194 additions & 94 deletions kubernetes/e2e_test/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,180 +17,280 @@
from kubernetes import utils, client
from kubernetes.e2e_test import base


class TestUtils(unittest.TestCase):

@classmethod
def setUpClass(cls):
cls.config = base.get_e2e_configuration()

def test_app_yaml(self):
"""
Test for creating and deleting a deployment on default namespace.
Using apps API
Using yaml file apps-deployment.yaml
"""
k8s_client = client.api_client.ApiClient(configuration=self.config)
k8s_api = utils.create_from_yaml(k8s_client,
"kubernetes/e2e_test/test_yaml/apps-deployment.yaml")
self.assertEqual("apps/v1beta1",
k8s_api.get_api_resources().group_version)
k8s_api = utils.create_from_yaml(
k8s_client, "kubernetes/e2e_test/test_yaml/apps-deployment.yaml")
self.assertEqual("apps/v1beta1",
k8s_api.get_api_resources().group_version)
dep = k8s_api.read_namespaced_deployment(name="nginx-app",
namespace="default")
namespace="default")
self.assertIsNotNone(dep)
resp = k8s_api.delete_namespaced_deployment(
name="nginx-app", namespace="default",
k8s_api.delete_namespaced_deployment(
name="nginx-app", namespace="default",
body={})

def test_extension_yaml(self):
"""
Test for creating and deleting a deployment on default namespace.
Using extensions API
Using yaml file extensions-deployment.yaml
"""
k8s_client = client.api_client.ApiClient(configuration=self.config)
k8s_api = utils.create_from_yaml(k8s_client,
"kubernetes/e2e_test/test_yaml/extensions-deployment.yaml")
self.assertEqual("extensions/v1beta1",
k8s_api.get_api_resources().group_version)
dep = k8s_api.read_namespaced_deployment(name="nginx-deployment",
namespace="default")
k8s_api = utils.create_from_yaml(
k8s_client, "kubernetes/e2e_test/test_yaml/extensions-deployment.yaml")
self.assertEqual("extensions/v1beta1",
k8s_api.get_api_resources().group_version)
dep = k8s_api.read_namespaced_deployment(name="nginx-deployment",
namespace="default")
self.assertIsNotNone(dep)
resp = k8s_api.delete_namespaced_deployment(
name="nginx-deployment", namespace="default",
k8s_api.delete_namespaced_deployment(
name="nginx-deployment", namespace="default",
body={})

def test_core_pod_yaml(self):
"""
Test for creating and deleting a pod on default namespace.
Using core API
Using yaml file core-pod.yaml
"""
k8s_client = client.api_client.ApiClient(configuration=self.config)
k8s_api = utils.create_from_yaml(k8s_client,
"kubernetes/e2e_test/test_yaml/core-pod.yaml")
self.assertEqual("v1",
k8s_api.get_api_resources().group_version)
pod = k8s_api.read_namespaced_pod(name="myapp-pod",
namespace="default")
k8s_api = utils.create_from_yaml(
k8s_client, "kubernetes/e2e_test/test_yaml/core-pod.yaml")
self.assertEqual("v1",
k8s_api.get_api_resources().group_version)
pod = k8s_api.read_namespaced_pod(name="myapp-pod",
namespace="default")
self.assertIsNotNone(pod)
resp = k8s_api.delete_namespaced_pod(
k8s_api.delete_namespaced_pod(
name="myapp-pod", namespace="default",
body={})

def test_core_service_yaml(self):
"""
Test for creating and deleting a service on default namespace.
Using core API
Using yaml file core-service.yaml
"""
k8s_client = client.api_client.ApiClient(configuration=self.config)
k8s_api = utils.create_from_yaml(k8s_client,
"kubernetes/e2e_test/test_yaml/core-service.yaml")
self.assertEqual("v1",
k8s_api.get_api_resources().group_version)
k8s_api = utils.create_from_yaml(
k8s_client, "kubernetes/e2e_test/test_yaml/core-service.yaml")
self.assertEqual("v1",
k8s_api.get_api_resources().group_version)
svc = k8s_api.read_namespaced_service(name="my-service",
namespace="default")
namespace="default")
self.assertIsNotNone(svc)
resp = k8s_api.delete_namespaced_service(
k8s_api.delete_namespaced_service(
name="my-service", namespace="default",
body={})

def test_core_namespace_yaml(self):
"""
Test for creating and deleting namespace
Using core API
Using yaml file core-namespace.yaml
"""
k8s_client = client.api_client.ApiClient(configuration=self.config)
k8s_api = utils.create_from_yaml(k8s_client,
"kubernetes/e2e_test/test_yaml/core-namespace.yaml")
self.assertEqual("v1",
k8s_api.get_api_resources().group_version)
k8s_api = utils.create_from_yaml(
k8s_client, "kubernetes/e2e_test/test_yaml/core-namespace.yaml")
self.assertEqual("v1",
k8s_api.get_api_resources().group_version)
nmsp = k8s_api.read_namespace(name="development")
self.assertIsNotNone(nmsp)
resp = k8s_api.delete_namespace(name="development", body={})
k8s_api.delete_namespace(name="development", body={})

def test_deployment_in_namespace(self):
"""
Test: Create a namespace. Create and delete a deployment on
the namespace just created. Delete the namespace created at the
beginning of the test.
Using core, extensions API
Using yaml file core-namespace-dep.yaml
Using yaml file extensions-deployment-dep.yaml
"""
k8s_client = client.ApiClient(configuration=self.config)
core_api = utils.create_from_yaml(k8s_client,
"kubernetes/e2e_test/test_yaml/core-namespace-dep.yaml")
self.assertEqual("v1",
core_api.get_api_resources().group_version)
core_api = utils.create_from_yaml(
k8s_client, "kubernetes/e2e_test/test_yaml/core-namespace-dep.yaml")
self.assertEqual("v1",
core_api.get_api_resources().group_version)
nmsp = core_api.read_namespace(name="dep")
self.assertIsNotNone(nmsp)
dep_api = utils.create_from_yaml(k8s_client,
"kubernetes/e2e_test/test_yaml/extensions-deployment-dep.yaml")
dep = dep_api.read_namespaced_deployment(name="nginx-deployment",
namespace="dep")
dep_api = utils.create_from_yaml(
k8s_client, "kubernetes/e2e_test/test_yaml/extensions-deployment-dep.yaml")
dep = dep_api.read_namespaced_deployment(name="nginx-deployment",
namespace="dep")
self.assertIsNotNone(dep)
resp = dep_api.delete_namespaced_deployment(
name="nginx-deployment", namespace="dep",
dep_api.delete_namespaced_deployment(
name="nginx-deployment", namespace="dep",
body={})
resp = core_api.delete_namespace(name="dep", body={})
core_api.delete_namespace(name="dep", body={})

def test_api_service_with_conflict(self):
"""
Test: Create an APIService. Recreate the APIService to test conflict
handling. Delete the APIService.
Check the program raises CreationFailedError.
Using apiregistration API
Using yaml file api-service.yaml
"""
k8s_client = client.api_client.ApiClient(configuration=self.config)
k8s_api = utils.create_from_yaml(k8s_client,
"kubernetes/e2e_test/test_yaml/api-service.yaml")
self.assertEqual("apiregistration.k8s.io/v1beta1",
k8s_api.get_api_resources().group_version)
k8s_api = utils.create_from_yaml(
k8s_client, "kubernetes/e2e_test/test_yaml/api-service.yaml")
self.assertEqual("apiregistration.k8s.io/v1beta1",
k8s_api.get_api_resources().group_version)
svc = k8s_api.read_api_service(
name="v1alpha1.wardle.k8s.io")
self.assertIsNotNone(svc)
svc_conflict = utils.create_from_yaml(k8s_client,
"kubernetes/e2e_test/test_yaml/api-service.yaml")
self.assertEqual([], svc_conflict)
resp = k8s_api.delete_api_service(
with self.assertRaises(utils.FailToCreateError):
utils.create_from_yaml(
k8s_client, "kubernetes/e2e_test/test_yaml/api-service.yaml")
k8s_api.delete_api_service(
name="v1alpha1.wardle.k8s.io", body={})

def test_list(self):
"""
Test for creating and deleting a service and a deployment using
the List kind.
Using core, extensions API
Using yaml file list.yaml
"""
k8s_client = client.api_client.ApiClient(configuration=self.config)
k8s_api = utils.create_from_yaml(k8s_client,
"kubernetes/e2e_test/test_yaml/list.yaml")
k8s_api = utils.create_from_yaml(
k8s_client, "kubernetes/e2e_test/test_yaml/list.yaml")
svc_api = k8s_api[0]
self.assertEqual("v1", svc_api.get_api_resources().group_version)
svc = svc_api.read_namespaced_service(name="list-service-test",
namespace="default")
namespace="default")
self.assertIsNotNone(svc)
ext_api = k8s_api[1]
self.assertEqual("extensions/v1beta1",
ext_api.get_api_resources().group_version)
self.assertEqual("extensions/v1beta1",
ext_api.get_api_resources().group_version)
dep = ext_api.read_namespaced_deployment(name="list-deployment-test",
namespace="default")
namespace="default")
self.assertIsNotNone(dep)
ext_api.delete_namespaced_deployment(name="list-deployment-test",
namespace="default", body={})
namespace="default", body={})
svc_api.delete_namespaced_service(name="list-service-test",
namespace="default", body={})
namespace="default", body={})

def test_multi_resource(self):
"""
Test for handling a multi-resource yaml file. The yaml file contains
a service and a replication controller. These two objects are
created and deleted.
Using core API
Using yaml file multi-resource-yaml.yaml
"""
k8s_client = client.api_client.ApiClient(configuration=self.config)
k8s_api = utils.create_from_yaml(k8s_client,
"kubernetes/e2e_test/test_yaml/multi-resource-yaml.yaml")
k8s_api = utils.create_from_yaml(
k8s_client, "kubernetes/e2e_test/test_yaml/multi-resource-yaml.yaml")
svc_api = k8s_api[0]
self.assertEqual("v1", svc_api.get_api_resources().group_version)
svc = svc_api.read_namespaced_service(name="mock",
namespace="default")
namespace="default")
self.assertIsNotNone(svc)
ctr_api = k8s_api[1]
self.assertEqual("v1", ctr_api.get_api_resources().group_version)
ctr = ctr_api.read_namespaced_replication_controller(
name="mock", namespace="default")
self.assertIsNotNone(ctr)
ctr_api.delete_namespaced_replication_controller(name="mock",
namespace="default", body={})
ctr_api.delete_namespaced_replication_controller(
name="mock", namespace="default", body={})
svc_api.delete_namespaced_service(name="mock",
namespace="default", body={})
namespace="default", body={})

def test_multi_resource_with_conflict(self):
"""
Test for handling conflict.
Create a service from the first yaml file. Attempt to create the
same service and a replication controller using a second yaml file.
The second yaml file is a multi-part file.
The service and the replication controller should be created,
while error is reported when attempting to create the service again.
Since output_list is disabled, a single api object will be returned.
Both object are deleted at the end of the test.
Using core API
Using yaml file yaml-conflict-first.yaml
Using yaml file yaml-conflict-multi.yaml
"""
k8s_client = client.api_client.ApiClient(configuration=self.config)
svc_api = utils.create_from_yaml(k8s_client,
"kubernetes/e2e_test/test_yaml/yaml-conflict-first.yaml")
svc_api = utils.create_from_yaml(
k8s_client, "kubernetes/e2e_test/test_yaml/yaml-conflict-first.yaml")
self.assertEqual("v1", svc_api.get_api_resources().group_version)
svc = svc_api.read_namespaced_service(name="mock-2",
namespace="default")
namespace="default")
self.assertIsNotNone(svc)
ctr_api = utils.create_from_yaml(k8s_client,
"kubernetes/e2e_test/test_yaml/yaml-conflict-multi.yaml")
self.assertEqual("v1", ctr_api.get_api_resources().group_version)
with self.assertRaises(utils.FailToCreateError):
utils.create_from_yaml(
k8s_client, "kubernetes/e2e_test/test_yaml/yaml-conflict-multi.yaml")
ctr_api = client.CoreV1Api(k8s_client)
ctr = ctr_api.read_namespaced_replication_controller(
name="mock-2", namespace="default")
self.assertIsNotNone(ctr)
ctr_api.delete_namespaced_replication_controller(name="mock-2",
namespace="default", body={})
ctr_api.delete_namespaced_replication_controller(
name="mock-2", namespace="default", body={})
svc_api.delete_namespaced_service(name="mock-2",
namespace="default", body={})
namespace="default", body={})

def test_svc_list_with_conflict_no_kind(self):
"""
Test for handling yaml-compatible json files and a special kind
of list. The kind field of the yaml file is ServiceList while the
individual components does not contain the kind field.
Two replicates exist in the json file. An exception will be raised.
The kind should be inferred as Service and all these services
are created and deleted.
Using core API
Using json file multi-resource-svclist.json
"""
k8s_client = client.api_client.ApiClient(configuration=self.config)
svc_apis = utils.create_from_yaml(k8s_client,
"kubernetes/e2e_test/test_yaml/multi-resource-svclist.json")
svc_api_0 = svc_apis[0]
self.assertEqual("v1", svc_api_0.get_api_resources().group_version)
svc_0 = svc_api_0.read_namespaced_service(name="mock-3",
namespace="default")
with self.assertRaises(utils.FailToCreateError):
utils.create_from_yaml(
k8s_client, "kubernetes/e2e_test/test_yaml/multi-resource-svclist.json")
svc_api = client.CoreV1Api(k8s_client)
svc_0 = svc_api.read_namespaced_service(name="mock-3",
namespace="default")
self.assertIsNotNone(svc_0)
svc_api_1 = svc_apis[1]
self.assertEqual("v1", svc_api_1.get_api_resources().group_version)
svc_1 = svc_api_1.read_namespaced_service(name="mock-4",
namespace="default")
svc_1 = svc_api.read_namespaced_service(name="mock-4",
namespace="default")
self.assertIsNotNone(svc_1)
svc_api_0.delete_namespaced_service(name="mock-3",
namespace="default", body={})
svc_api_1.delete_namespaced_service(name="mock-4",
namespace="default", body={})
svc_api.delete_namespaced_service(name="mock-3",
namespace="default", body={})
svc_api.delete_namespaced_service(name="mock-4",
namespace="default", body={})

def test_double_fail(self):
"""
Test for a file that contains triple occurences of the same object.
Test if the correct exception is raised and correct message lines are
being shown.
"""
k8s_client = client.api_client.ApiClient(configuration=self.config)
with self.assertRaises(utils.FailToCreateError) as cm:
utils.create_from_yaml(
k8s_client, "kubernetes/e2e_test/test_yaml/triple-nginx.yaml")
exp_error = ('Error from server (Conflict): '
'deployments.extensions "triple-nginx" '
'already exists\n'
)
exp_error += exp_error
self.assertEqual(exp_error, str(cm.exception))
k8s_api = client.ExtensionsV1beta1Api(k8s_client)
dep = k8s_api.read_namespaced_deployment(name="triple-nginx",
namespace="default")
self.assertIsNotNone(dep)
k8s_api.delete_namespaced_deployment(
name="triple-nginx", namespace="default",
body={})
Loading

0 comments on commit 8342a8e

Please sign in to comment.