In [20]:
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import rsa
import base64
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
from kubernetes import client, config
import subprocess

In [21]:
import subprocess

def execute_kubectl_command(command):
    try:
        subprocess.run(command, check=True)
        print(f"Command '{' '.join(command)}' executed successfully.")
    except subprocess.CalledProcessError as e:
        print(f"Error: Failed to execute command '{' '.join(command)}'.")
        print(f"Error message: {e}")

In [26]:
def generate_rsa_key(subject: str, organization=None, key_size=2048):
    # Generate RSA private key
    private_key = rsa.generate_private_key(
        public_exponent=65537,
        key_size=key_size,
        backend=default_backend()
    )

    # Serialize the private key to PEM format
    pem = private_key.private_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PrivateFormat.TraditionalOpenSSL,
        encryption_algorithm=serialization.NoEncryption()
    )

    csr_builder = x509.CertificateSigningRequestBuilder()

     # Add subject information
    csr_attirbs = [
        x509.NameAttribute(NameOID.COMMON_NAME, subject),
    ]

    if organization != None:
        csr_attirbs.append(x509.NameAttribute(NameOID.ORGANIZATION_NAME, organization))

    csr_builder = csr_builder.subject_name(x509.Name(csr_attirbs))

    # Sign the CSR with the private key
    csr = csr_builder.sign(
        private_key,
        hashes.SHA256(),
        default_backend()
    )

    csr_bytes = csr.public_bytes(serialization.Encoding.PEM)

    

    base64_data_pem = base64.b64encode(pem)
    base64_pem = base64_data_pem.decode('utf-8')

    base64_data_csr = base64.b64encode(csr_bytes)
    base64_csr = base64_data_csr.decode('utf-8')

    
    return base64_pem, base64_csr

def create_and_approve_csr(username, base64_csr, expiration_seconds):
    config.load_kube_config()
    csr_name = f"{username}"
    api_instance = client.CertificatesV1Api()

    # Define the Certificate Signing Request (CSR) object
    csr_manifest = {
        "apiVersion": "certificates.k8s.io/v1",
        "kind": "CertificateSigningRequest",
        "metadata": {
            "name": csr_name,
        },
        "spec": {
            "groups": [
                "system:authenticated"
            ],
            "request": base64_csr,
            "signerName": "kubernetes.io/kube-apiserver-client",
            "expirationSeconds": expiration_seconds,
            "usages": [
                "digital signature",
                "key encipherment",
                "client auth"
            ]
        }
    }

    # Create the CSR
    try:
        api_instance.create_certificate_signing_request(body=csr_manifest)
    except Exception as e:
        print("encountered", e)
        return

    try:
        # Run the kubectl command to approve the certificate request
        subprocess.run(["kubectl", "certificate", "approve", csr_name], check=True)
    except subprocess.CalledProcessError as e:
        print(f"Error message: {e}")

    try:
        result = subprocess.run(["kubectl", "get", "csr", csr_name, "-o", "jsonpath='{.status.certificate}'"], capture_output=True, text=True, check=True)
        certificate_base64 = result.stdout.strip()[1:-1]  # Strip leading and trailing single quotes
        # base64_data_certificate = base64.b64encode(result.stdout.strip()[1:-1])
        # base64_certificate = base64_data_certificate.decode('utf-8')
        return result.stdout[1:-1]
    except subprocess.CalledProcessError as e:
        print(f"Error: Failed to get certificate from CSR '{csr_name}'.")
        print(f"Error message: {e}")
        return None

In [28]:
USERNAME = "ufuk"
EXP_SECONDS = 60 * 60 * 24 * 365 # one year

base64_pem, base64_csr = generate_rsa_key(USERNAME)
base64_certificate = create_and_approve_csr(USERNAME, base64_csr, EXP_SECONDS)

print(base64_certificate != None)

print(f'''
- name: {USERNAME}
  user:
    client-certificate-data: {base64_certificate}
    client-key-data: {base64_pem}
''')

True True
certificatesigningrequest.certificates.k8s.io/ufuk approved
True

- name: ufuk
  user:
    client-certificate-data: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSURCRENDQWV5Z0F3SUJBZ0lRZEJTTkQ2eFlxTVloRXhOYmhubnJnVEFOQmdrcWhraUc5dzBCQVFzRkFEQVYKTVJNd0VRWURWUVFERXdwdGFXNXBhM1ZpWlVOQk1CNFhEVEkwTURJeU56RXhOVE15TUZvWERUSTFNREl5TmpFeApOVE15TUZvd0R6RU5NQXNHQTFVRUF4TUVkV1oxYXpDQ0FTSXdEUVlKS29aSWh2Y05BUUVCQlFBRGdnRVBBRENDCkFRb0NnZ0VCQU4yN2JwNDNoelhzWUE5RHlVSmE4eFNSR1hZelA1RUVvb2lGWEhmR0hxMmJxUUhMRHFqZHh2QmYKaTJwcEhCRWoxOFdqZ3hWdWpWd0pqdE8xSnplUmNwcWZLUHVzcGNBMUo0eTJDOCt2a2FKSUJsZVBjQWpEMGpteQowakhxdTkxK3NQTTRFcDVZVWxxRTdXcWM4Tmxad01yUEpPcU8xQ0QzaVIyaDFzTTZlWXA0QmVaWGxJZ1JreVlNCjVmMFJibkVUcEN5WC9TcEN4cktDRnVCSS9MSDdMRGowbjVPSmlFQVphOEFQL3U5VkphR2hCRVZwVTRpdE1pV3kKL3RzWTBxMU5DV2dWTWExZHRSQ3gwVWpSSTVqblN1Sk5tWVZvTXNTSkp4YkxMTnBOMWltUkRXYnRUMVZ3OFplbwoxMUxOQ0RYTmRyOUl3NHVCckFLQUhaME1POFdWdlVVQ0F3RUFBYU5XTUZRd0RnWURWUjBQQVFIL0JBUURBZ1dnCk1CTUdBMVVkSlFRTU1Bb0dDQ3NHQVFVRkJ3TUNNQXdHQTFVZEV3RUIvd1

'LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSURCVENDQWUyZ0F3SUJBZ0lRVUtEbi9adkZSQzVRbmczdnYwMnZlakFOQmdrcWhraUc5dzBCQVFzRkFEQVYKTVJNd0VRWURWUVFERXdwdGFXNXBhM1ZpWlVOQk1CNFhEVEkwTURJeU1qRTFORFl3TjFvWERUSTBNREl5T0RFMwpORFl3TjFvd0VERU9NQXdHQTFVRUF4TUZZV3hwWTJVd2dnRWlNQTBHQ1NxR1NJYjNEUUVCQVFVQUE0SUJEd0F3CmdnRUtBb0lCQVFEUVBQZnZ3S0E2SnlHVVdDQjBSYnVHbmdMQURVZW4xR3Vkc1RybUcyampMQlM5WTVMYmZZQ3UKYlpqU1M1MmVtaXFXKy9DaEJ2RUVOUTZhZitCalZ0TE9YMTg2WDJwZWdDeGVuMVRWNDdXKzJ6NjdmYWlpVjA3ZwozeC9pTURVS2FRN3pJU015ODNaKzEyZU1tdnh4UUNzUFhYajdGUFphc0dyay8yaktncHJNV09Sd0VyR0N3Rmk0ClNvTXRVdVRoeHNuYS9lUVlDWU1pUDJLRXovSjM5ZVh2NUdLVFZNbWMrN2hJMjZPclYzTEQxeVpJaEpaKzZkdTUKakhPd2lzenRDdytUMnEwNjFQN2hPUnZBOGV2VmFpTlV3ZzFocmg2ZlBlcjFYUnVUcnlJS1kwS2xlQnllc09JQgpIVkdlcWFzZjJyWjM1WVdXNUFJU1BTUXFsOTlsTzJXVEFnTUJBQUdqVmpCVU1BNEdBMVVkRHdFQi93UUVBd0lGCm9EQVRCZ05WSFNVRUREQUtCZ2dyQmdFRkJRY0RBakFNQmdOVkhSTUJBZjhFQWpBQU1COEdBMVVkSXdRWU1CYUEKRkZuY1ZWN0J2UHhSUVF2K2FldzkyR3E5S0IzTE1BMEdDU3FHU0liM0RRRUJDd1VBQTRJQkFRQWdLMVhrckY1dQoxZzVkK3l

In [4]:
print(base64_csr)
print(base64_pem)

LS0tLS1CRUdJTiBDRVJUSUZJQ0FURSBSRVFVRVNULS0tLS0KTUlJQ1Z6Q0NBVDhDQVFBd0VqRVFNQTRHQTFVRUF3d0hkR1Z1WVc1ME1UQ0NBU0l3RFFZSktvWklodmNOQVFFQgpCUUFEZ2dFUEFEQ0NBUW9DZ2dFQkFOSWlBOFQwL1JGSUtZNzRmL3FSUmxad1U1NC8ySHJsZG5vYXFQcW1wMGc3CmxUQm5YbE9lblF3bVk3dURNYlkyc2ZQUmJZcWRrT09QNmw4UTNBNm5XMFg0aWtJbkNlZWlyWmhoaC9ieFk3WDEKWVN4TkZ0elNDS1VkdkhXMUsvNE1zVERZcHdvejJZQ3ZZSGw3aVo5U3hvVUpveTV0aUg4MlFVa3FNTFk0THNoeAoraU4vSzBGVWZWZW1EMU5TOXFBc0JiRURlNnR6dkdaOFBZMVBPR3k2bFgwd3kzVHRHM1I1Wm5UZXcwR0s5VlhoCnZPSjBYek82WXBvVzRaWXRvNk85ZUdRS2gzTGxiRXg2QngwTmttNzV4L2xyd0J1T0FtUDdRelFNYXljMDY3MjgKems1SGlJdEdyQ1d3MVlsbTB0d0VpZGFMdlVQTzNHM2RtUnBNcnB5L01UVUNBd0VBQWFBQU1BMEdDU3FHU0liMwpEUUVCQ3dVQUE0SUJBUUJqMTZLdlR0VDRrTEE2U1JjUTliaHRpMjFrK28vUW5ENUdHaUZRd1FUTGRCWmpjNDdWCkRRb0gvN1I3RW9wT3VJeUV0TFM2dXp2bDBreGVQQ054ZWFhaFBiNG1nUkU1N2ppUW50b0lNY3VGWVV5a1ZjV2cKeXNBL0d4WGJaS2Q3bmRHcldlOEkvaVFUKzhWNWJKQTBpMlZ1dmg2WkdrOGNaLzJ3bGgrTnZkM0pTeUdGSGs1Lwo0b3pETGp1TEJ2aG9TS2FHcEVyekNaTldOcThVTXgwc3VEY0hLT3ZMYkVGTzRCTFI4ZzJwVjYrOTdNWkhEZHNh

In [5]:
config.load_kube_config()

In [None]:


# Load Kubernetes configuration from default location


# Create a Kubernetes API client
api_instance = client.CoreV1Api()

# Define the pod specification
pod_manifest = {
    "apiVersion": "v1",
    "kind": "Pod",
    "metadata": {
        "name": "example-pod"
    },
    "spec": {
        "containers": [
            {
                "name": "example-container",
                "image": "nginx",
                "ports": [
                    {
                        "containerPort": 80
                    }
                ]
            }
        ]
    }
}

# Create the pod
api_response = api_instance.create_namespaced_pod(body=pod_manifest, namespace="default")

print("Pod created. Status='%s'" % str(api_response.status))
