[View source on GitHub]: https://github.com/wadmp/wadmp.github.io/blob/master/jupyter_notebooks/add_device_CA.ipynb
[Notebook Viewer]: https://nbviewer.jupyter.org/github/wadmp/wadmp.github.io/blob/master/jupyter_notebooks/add_device_CA.ipynb
[Run in binder]: https://mybinder.org/v2/gh/wadmp/wadmp.github.io/master?filepath=jupyter_notebooks%2Fadd_device_CA.ipynb
[Run in Google Colab]: https://colab.research.google.com/github/wadmp/wadmp.github.io/blob/master/jupyter_notebooks/add_device_CA.ipynb

| [![GitHub logo](https://raw.githubusercontent.com/wadmp/wadmp.github.io/master/images/github_logo.png)][View source on GitHub] | [![Jupyter logo](https://raw.githubusercontent.com/wadmp/wadmp.github.io/master/images/jupyter_logo.png)][Notebook Viewer] | [![binder logo](https://raw.githubusercontent.com/wadmp/wadmp.github.io/master/images/binder_logo.png)][Run in binder] | [![Colab logo](https://raw.githubusercontent.com/wadmp/wadmp.github.io/master/images/colab_logo.png)][Run in Google Colab] |
|:---------------------:|:---------------:|:-------------:|:-------------------:|
| [View source on GitHub] | [Notebook Viewer] | [Run in binder] | [Run in Google Colab] |

## Introduction
WebAccess/DMP will only allow "trusted" devices to connect.
Trust is defined by a "Public-Key Infrastructure" which utilises X.509 certificates over TLS.
This notebook allows you to add a new CA certificate to the "trust store" of a WebAccess/DMP Server.

### Requirements
* You need to have an existing user account on the WA/DMP server.
* You must be a SysAdmin!

### Usage
In the "Global Variables" cell below, change BASE_URL to match the particular WA/DMP instance that you are using.

Run the cells, either one at a time (Shift-Enter is your friend!), or all at once.

When prompted, enter the required User Input.

## Setup

In [None]:
%%capture

# Install packages in the current Jupyter kernel
import sys
!{sys.executable} -m pip install requests
!{sys.executable} -m pip install cryptography

import os
from urllib.parse import urlparse, urlunparse
import requests
import json
import base64
import re
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.backends import default_backend

## Global variables

In [None]:
BASE_URL = 'https://gateway.wadmp.com'
BASE_PATH = 'api'
SESSION = requests.Session()  # Use one HTTPS session for all API calls

## Functions to be used later

In [None]:
def login(username, password):
    """Login to the system, and return a token
    """
    url = f"{BASE_URL}/public/auth/connect/token"
    credentials = {'username': username, 'password': password, 'client_id': 'python', 'grant_type': 'password'}
    print(f"Sending POST request to {url} with:\n"
          f"    credentials={credentials}\n")
    response = SESSION.post(url, data=credentials)

    print(response.status_code)
    try:
        print(json.dumps(response.json(), indent=4, sort_keys=True))
    except ValueError:
        print(response.text)

    if response.status_code == requests.codes['ok']:
        return response.json()["access_token"]
    else:
        print("Failed to login!")
        sys.exit(1)


def get_certs():
    """Gets the device CA certificates in the trust store of the Management Server.
       The returned value is a Base64-encoded string of all of the trusted CA certs, in PEM format.
    """
    url = f"{BASE_URL}/{BASE_PATH}/certs"
    print(f"Sending GET request to {url}\n")
    response = SESSION.get(url)

    print(response.status_code)
    try:
        print(json.dumps(response.json(), indent=4, sort_keys=True))
    except ValueError:
        print(response.text)

    if response.status_code == requests.codes['ok']:
        return response.json()['data']['certs']
    else:
        print("Failed to retrieve the certificates!")
        return None


def put_certs(certs):
    """Replaces the device CA certificates in the trust store of the Management Server.
       certs = Base64-encoded string, comprising ALL trusted CA certificates in PEM format.
    """
    url = f"{BASE_URL}/{BASE_PATH}/certs"
    print(f"Sending PUT request to {url} with:\n"
          f"    certs={certs}\n")
    body = {'certs': certs}
    response = SESSION.put(url, json=body)

    print(response.status_code)
    try:
        print(json.dumps(response.json(), indent=4, sort_keys=True))
    except ValueError:
        print(response.text)

    if response.status_code == requests.codes['ok']:
        return response.json()['success']
    else:
        print("Failed to edit MS definition!")
        return None

## User input

In [None]:
USERNAME = input("Username on WebAccess/DMP:")
PASSWORD = input("Password on WebAccess/DMP:")

## Login to Server

In [None]:
user_token = login(USERNAME, PASSWORD)
SESSION.headers.update({'Authorization': f'Bearer {user_token}'})

## Check the Trust Store

In [None]:
certs_b64 = get_certs()
certs_pem = base64.b64decode(certs_b64).decode('utf-8')  # b64decode() returns a "bytes-like" object

pattern = r"(-----BEGIN CERTIFICATE-----[A-Za-z0-9+/=\s]+-----END CERTIFICATE-----)"

certs_list = re.findall(pattern, certs_pem, re.MULTILINE)
print(f"{len(certs_list)} certs:")
for cert_pem in certs_list:
    # load_pem_x509_certificate() takes a "bytes-like" object
    cert_object = x509.load_pem_x509_certificate(cert_pem.encode('utf-8'), default_backend())
    print(f"   - {cert_object.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value}")

## User input

In [None]:
new_cert_b64 = input("Certificate to be added (as a Base64-encoded string!):")

new_cert_pem = base64.b64decode(new_cert_b64).decode('utf-8')  # b64decode() returns a "bytes-like" object

pattern = r"^(-----BEGIN CERTIFICATE-----[A-Za-z0-9+/=\s]+-----END CERTIFICATE-----)$"

match = re.search(pattern, new_cert_pem)
if match:
    # load_pem_x509_certificate() takes a "bytes-like" object
    cert_object = x509.load_pem_x509_certificate(match.group(1).encode('utf-8'), default_backend())
    print(f"This cert has the following attributes:")
    print(f"   - {cert_object.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value}")
else:
    print("This isn't a valid certificate!")

## Add the new cert to the Trust Store 

In [None]:
new_certs_pem = certs_pem + "\n" + new_cert_pem
new_certs_b64 = base64.b64encode(new_certs_pem.encode('utf-8')).decode('utf-8')  # b64encode() takes a "bytes-like" object and returns bytes
put_certs(new_certs_b64)

## Check the Trust Store again

In [None]:
certs_b64 = get_certs()
certs_pem = base64.b64decode(certs_b64).decode('utf-8')  # b64decode() returns a "bytes-like" object

pattern = r"(-----BEGIN CERTIFICATE-----[A-Za-z0-9+/=\s]+-----END CERTIFICATE-----)"

certs_list = re.findall(pattern, certs_pem, re.MULTILINE)
print(f"{len(certs_list)} certs:")
for cert_pem in certs_list:
    # load_pem_x509_certificate() takes a "bytes-like" object
    cert_object = x509.load_pem_x509_certificate(cert_pem.encode('utf-8'), default_backend())
    print(f"   - {cert_object.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value}")