In [1]:
from kubernetes import client, config
import os


config.load_kube_config()
v1 = client.CoreV1Api()

#using CoreV1Api.list_namespaced_pod to list pod in airflow namespace
ret = v1.list_namespaced_pod(namespace="airflow", watch=False)
for pod in ret.items:
    print(f"Name: {pod.metadata.name}, Namespace: {pod.metadata.namespace} IP: {pod.status.pod_ip}")

Name: airflow-postgresql-0, Namespace: airflow IP: 10.244.2.3
Name: airflow-scheduler-7ccc8bbfc6-dbplw, Namespace: airflow IP: 10.244.1.3
Name: airflow-statsd-5bcb9dd76-m86ld, Namespace: airflow IP: 10.244.1.2
Name: airflow-webserver-868856894f-fc562, Namespace: airflow IP: 10.244.2.2


In [4]:
deployment_name = "my-deploy"
deployment_manifest = {
    "apiVersion": "apps/v1",
    "kind": "Deployment",
    "metadata": {"name": deployment_name, "namespace": "airflow"},
    "spec": {"replicas": 3,
             "selector": {
                "matchLabels": {
                    "app": "nginx"
                }},
        "template": {"metadata": {"labels": {"app": "nginx"}},
            "spec": {"containers": [
                {"name": "nginx", "image": "nginx:1.21.6", "ports": [{"containerPort": 80}]}]
            }
        },
    }
}

from kubernetes import client, config
import os, time
from kubernetes.client.rest import ApiException

config.load_kube_config()
v1 = client.AppsV1Api()

#using AppsV1Api.create_namespaced_deployment to create deployment
response = v1.create_namespaced_deployment(body=deployment_manifest, namespace="airflow")
while True:
    try:
        response = v1.read_namespaced_deployment_status(name=deployment_name, namespace="airflow")
        if response.status.available_replicas != 3:
            print("Waiting for Deployment to become ready...")
            time.sleep(5)
        else:
            break
    except ApiException as e:
        print(f"Exception when calling AppsV1Api -> read_namespaced_deployment_status: {e}\n")

Waiting for Deployment to become ready...
Waiting for Deployment to become ready...
Waiting for Deployment to become ready...
Waiting for Deployment to become ready...
Waiting for Deployment to become ready...
Waiting for Deployment to become ready...
Waiting for Deployment to become ready...
Waiting for Deployment to become ready...
Waiting for Deployment to become ready...
Waiting for Deployment to become ready...
Waiting for Deployment to become ready...
Waiting for Deployment to become ready...
Waiting for Deployment to become ready...
Waiting for Deployment to become ready...
Waiting for Deployment to become ready...
Waiting for Deployment to become ready...
Waiting for Deployment to become ready...
Waiting for Deployment to become ready...
Waiting for Deployment to become ready...
Waiting for Deployment to become ready...
Waiting for Deployment to become ready...
Waiting for Deployment to become ready...
Waiting for Deployment to become ready...
Waiting for Deployment to become r

KeyboardInterrupt: 

In [5]:
#example of watch stream of event in namespace
from kubernetes import client, watch, config
from functools import partial
config.load_kube_config()

v1 = client.CoreV1Api()
count = 10
w = watch.Watch()
for event in w.stream(partial(v1.list_namespaced_event, namespace="airflow"), timeout_seconds=10):
    print(f"Event - Message: {event['object']['message']} at {event['object']['metadata']['creationTimestamp']}")
    count -= 1
    if not count:
        w.stop()
print("Finished namespace stream.")

Finished namespace stream.


In [7]:
from kubernetes import dynamic
from kubernetes.client import api_client  # Careful - different import - not the same as previous client!
from kubernetes import config
import datetime

config.load_kube_config()

dynamic_client = dynamic.DynamicClient(api_client.ApiClient())

api = dynamic_client.resources.get(api_version="apps/v1", kind="Deployment")

# Even though the Deployment manifest was previously created with class model, it still behaves as dictionary:
deployment_manifest["spec"]["template"]["metadata"]["annotations"] = {
    "kubectl.kubernetes.io/restartedAt": datetime.datetime.utcnow().isoformat()
}

#patch to update fields of resource
deployment_patched = api.patch(body=deployment_manifest, name=deployment_name, namespace="airflow")

In [9]:
from kubernetes import client, config

config.load_kube_config()

api_client = client.ApiClient()
apps_v1 = client.AppsV1Api(api_client)

# The body can be of different patch types - https://github.com/kubernetes-client/python/issues/1206#issuecomment-668118057
api_response = apps_v1.patch_namespaced_deployment_scale(deployment_name, "airflow", {"spec": {"replicas": 5}})

In [11]:
from kubernetes.stream import stream
from kubernetes import config

config.load_kube_config()

def pod_exec(name, namespace, command, api_instance):
    exec_command = ["/bin/sh", "-c", command]

    resp = stream(api_instance.connect_get_namespaced_pod_exec,
                  name,
                  namespace,
                  command=exec_command,
                  stderr=True, stdin=False,
                  stdout=True, tty=False,
                  _preload_content=False)

    while resp.is_open():
        resp.update(timeout=1)
        if resp.peek_stdout():
            print(f"STDOUT: \n{resp.read_stdout()}")
        if resp.peek_stderr():
            print(f"STDERR: \n{resp.read_stderr()}")

    resp.close()

    if resp.returncode != 0:
        raise Exception("Script failed")

pod = "airflow-webserver-868856894f-fc562"
api_client = client.ApiClient()
v1 = client.CoreV1Api(api_client)

pod_exec(pod, "airflow", "env", v1)

# STDOUT:
# KUBERNETES_SERVICE_PORT=443
# KUBERNETES_PORT=tcp://10.96.0.1:443
# HOSTNAME=example
# HOME=/root
# ...

STDOUT: 
KUBERNETES_PORT=tcp://10.96.0.1:443
KUBERNETES_SERVICE_PORT=443
LANGUAGE=C.UTF-8
DUMB_INIT_SETSID=1
AIRFLOW__CORE__LOAD_EXAMPLES=false
AIRFLOW_STATSD_SERVICE_PORT_STATSD_INGEST=9125
HOSTNAME=airflow-webserver-868856894f-fc562
AIRFLOW_WEBSERVER_SERVICE_HOST=10.96.159.153
AIRFLOW_WEBSERVER_PORT_8080_TCP_ADDR=10.96.159.153
PYTHON_PIP_VERSION=22.0.4
RUNTIME_APT_DEPS=       apt-transport-https        apt-utils        ca-certificates        curl        dumb-init        freetds-bin        gosu        krb5-user        ldap-utils        libldap-2.4-2        libsasl2-2        libsasl2-modules        libssl1.1        locales         lsb-release        netcat        openssh-client        rsync        sasl2-bin        sqlite3        sudo        unixodbc
AIRFLOW_INSTALLATION_METHOD=
AIRFLOW_USER_HOME_DIR=/home/airflow
AIRFLOW_VAR_MY_S3_BUCKET=my_s3_name
HOME=/home/airflow
INSTALL_POSTGRES_CLIENT=true
AIRFLOW_WEBSERVER_PORT_8080_TCP_PORT=8080
AIRFLOW_STATSD_PORT_9102_TCP_ADDR=10.96.233.143
A

In [12]:
from kubernetes import client, config

config.load_kube_config()

api_client = client.ApiClient()
custom_api = client.CustomObjectsApi(api_client)

response = custom_api.list_cluster_custom_object("metrics.k8s.io", "v1beta1", "nodes")  # also works with "pods" instead of "nodes"

for node in response["items"]:
    print(f"{node['metadata']['name']: <30} CPU: {node['usage']['cpu']: <10} Memory: {node['usage']['memory']}")


ApiException: (404)
Reason: Not Found
HTTP response headers: HTTPHeaderDict({'Audit-Id': '08773f8a-9867-4195-a26d-dd8751e3af8d', 'Cache-Control': 'no-cache, private', 'Content-Type': 'text/plain; charset=utf-8', 'X-Content-Type-Options': 'nosniff', 'X-Kubernetes-Pf-Flowschema-Uid': 'c46f06e1-4cec-4a85-9245-6a3f410fdede', 'X-Kubernetes-Pf-Prioritylevel-Uid': 'd9c6796e-34b4-496a-bec6-b8e16aa88d82', 'Date': 'Mon, 27 Jun 2022 08:17:47 GMT', 'Content-Length': '19'})
HTTP response body: 404 page not found

