![Egeria Logo](https://raw.githubusercontent.com/odpi/egeria/master/assets/img/ODPi_Egeria_Logo_color.png)

### Egeria Hands-On Lab
# Welcome to the Conformance Test Suite Lab

## Introduction

Egeria is an open source project that provides open standards and implementation libraries to connect tools, catalogs and platforms together so they can share information (called metadata) about data and the technology that supports it.

In this hands-on lab you will get a chance to work with the conformance test suite that is used to validate that a technology can successfully join an open metadata repository cohort.

## About the Conformance Suite 

The Conformance Suite can be used to test a Platform or Repository Connector to record which Conformance Profiles it supports. The Conformance Suite has different Workbenches that are used to test different types of system. 

Initially our focus will be on the Repository Conformance Workbench. This workbench is used to test that an  OMRS Repository Connector record which of the Repository Conformance Profiles it supports.

There are 13 repository conformance profiles in this workbench. One of them is mandatory - i.e. any repository connector must fully support that profile in order to be certified as conformant. The other profiles are optional and for each of these optional profiles, a repository connector can be certified as compliant even if it does not provide the function required by that profile - so long as it responds appropriately to requests.

## Configuring and running the Conformance Suite 

We'll come back to the profiles later, but for now let's configure and run the Conformance Suite.

We're going to need a pair of OMAG Servers - one to run the repository under test, the other to run the workbench. The servers need to join the same cohort.

![CTS-Cohort.png](../images/CTS-Cohort.png)
> **Figure 1:** Cohort for conformance testing

When the one running the workbench sees the cohort registration of the server under test, it runs the workbench tests against that server's repository.

## Starting up the Egeria platforms

We'll start one OMAG Server Platform on which to run both the servers.
We also need Apache Zookeeper and Apache Kafka.  


In [None]:
%run ../common/globals.ipynb

import requests
import pprint
import json
import os
import time

# Disable warnings about self-signed certificates
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

ctsPlatformURL = os.environ.get('ctsPlatformURL','https://localhost:9445')

def checkServerPlatform(testPlatformName, testPlatformURL):
    response = requests.get(testPlatformURL + "/open-metadata/platform-services/users/garygeeke/server-platform/origin/")
    if response.status_code == 200:
        print("   ...", testPlatformName, "at", testPlatformURL, "is active - ready to begin")
    else:
        print("   ...", testPlatformName, "at", testPlatformURL, "is down - start it before proceeding")

print ("\nChecking OMAG Server Platform availability...")
checkServerPlatform("CTS OMAG Server Platform", ctsPlatformURL)

print ("Done.")

## Configuring the Servers

We're going to configure both the servers in the diagram above.

It's useful to create some generally useful definitions here.

Knowing both server names up front will be handy for when we configure the workbench.

To configure the servers we'll need a common cohort name and event bus configuration. 
We can let the CTS server default to using a local in-memory repository.
The CTS server does not need to run any Access Services.

In [None]:
ctsServerName    = "CTS_Server"
sutServerName    = "SUT_Server"
devCohort        = "devCohort"

We'll need to pass a couple of JSON request bodies - so let's set up a reusable header:

In [None]:
jsonContentHeader = {'content-type':'application/json'}

We'll need a JSON request body for configuration of the event bus.

In [None]:
eventBusURLroot   = os.environ.get('eventBusURLroot', 'localhost:9092')

eventBusBody      = {
    "producer": {
        "bootstrap.servers": eventBusURLroot
    },
    "consumer":{
        "bootstrap.servers": eventBusURLroot
    }
}

We'll also need a JSON request body for configuration of the workbench. 
This can be used to set the pageSize used in searches.

In [None]:
workbenchConfigBody = {
    "class"                  : "RepositoryConformanceWorkbenchConfig",
    "tutRepositoryServerName": sutServerName ,
    "maxSearchResults"       : 10
}

We also need a userId for the configuration commands. You could change this to a name you choose.

In [None]:
adminUserId      = "garygeeke"

We can perform configuration operations through the administrative interface provided by the ctsPlatformURL.

The URLs for the configuration REST APIs have a common structure and begin with the following root:

In [None]:
adminPlatformURL = ctsPlatformURL

adminCommandURLRoot = adminPlatformURL + '/open-metadata/admin-services/users/' + adminUserId + '/servers/'

What follows are descriptions and coded requests to configure each server.  There are a lot of common steps 
involved in configuring a metadata server, so we first define some simple 
functions that can be re-used in later steps for configuring each server.

Each function returns True or False to indicate whether it was successful.

In [None]:
def postAndPrintResult(url, json=None, headers=None):
    print("   ...... (POST", url, ")")
    response = requests.post(url, json=json, headers=headers)
    if response.status_code == 200:
        print("   ...... Success. Response: ", response.json())
        return True
    else:
        print("   ...... Failed. Response: ", response.json())
        return False
    
def getAndPrintResult(url, json=None, headers=None):
    print("   ...... (GET", url, ")")
    response = requests.get(url, json=json, headers=headers)
    if response.status_code == 200:
        print("   ...... Success. Response: ", response.json())
        return True
    else:
        print("   ...... Failed. Response: ", response.json())
        return False

def getResult(url, json=None, headers=None):
    print("\n   ...... (GET", url, ")")
    try:
        response = requests.get(url, json=json, headers=headers)
        if response.status_code == 200:
            if response.json()['relatedHTTPCode'] == 200:
                return response.json()
        return None
    except requests.exceptions.RequestException as e:
        print ("   ...... FAILED - http request threw an exception: ", e)
        return None    

def configurePlatformURL(serverName, serverPlatform):
    print("\n   ... Configuring the platform the server will run on...")
    url = adminCommandURLRoot + serverName + '/server-url-root?url=' + serverPlatform
    return postAndPrintResult(url)

def configureServerType(serverName, serverType):
    print ("\n   ... Configuring the server's type...")
    url = adminCommandURLRoot + serverName + '/server-type?typeName=' + serverType
    return postAndPrintResult(url)

def configureUserId(serverName, userId):
    print ("\n   ... Configuring the server's userId...")
    url = adminCommandURLRoot + serverName + '/server-user-id?id=' + userId
    return postAndPrintResult(url)

def configurePassword(serverName, password):
    print ("\n   ... Configuring the server's password (optional)...")
    url = adminCommandURLRoot + serverName + '/server-user-password?password=' + password
    return postAndPrintResult(url)

def configureMetadataRepository(serverName, repositoryType):
    print ("\n   ... Configuring the metadata repository...")
    url = adminCommandURLRoot + serverName + '/local-repository/mode/' + repositoryType
    return postAndPrintResult(url)

def configureDescriptiveName(serverName, collectionName):
    print ("\n   ... Configuring the short descriptive name of the metadata stored in this server...")
    url = adminCommandURLRoot + serverName + '/local-repository/metadata-collection-name/' + collectionName
    return postAndPrintResult(url)

def configureEventBus(serverName, busBody):
    print ("\n   ... Configuring the event bus for this server...")
    url = adminCommandURLRoot + serverName + '/event-bus'
    return postAndPrintResult(url, json=busBody, headers=jsonContentHeader)

def configureCohortMembership(serverName, cohortName):
    print ("\n   ... Configuring the membership of the cohort...")
    url = adminCommandURLRoot + serverName + '/cohorts/' + cohortName
    return postAndPrintResult(url)
    
def configureRepositoryWorkbench(serverName, workbenchBody):
    print ("\n   ... Configuring the repository workbench for this server...")
    url = adminCommandURLRoot + serverName + '/conformance-suite-workbenches/repository-workbench/repositories'
    return postAndPrintResult(url, json=workbenchBody, headers=jsonContentHeader)


## Configuring the CTS Server

We're going to configure the CTS Server from the diagram above. The CTS Server is the one that runs the repository workbench.

The server will default to using a local in-memory repository.
The CTS server does not need to run any Access Services.

Notice that when we configure the CTS Server to run the repository workbench, we provide the name of the server under test.

First we introduce a 'success' variable which is used to monitor progress in the subsequent cells.

In [None]:
success = True

In [None]:
ctsServerType          = "Conformance Suite Server"
ctsServerUserId        = "CTS1npa"
ctsServerPassword      = "CTS1passw0rd"
ctsServerPlatform      = ctsPlatformURL


print("Configuring " + ctsServerName + "...")

if (success):
    success = configurePlatformURL(ctsServerName, ctsServerPlatform)
if (success):
    success = configureServerType(ctsServerName, ctsServerType)
if (success):
    success = configureUserId(ctsServerName, ctsServerUserId)
if (success):
    success = configurePassword(ctsServerName, ctsServerPassword)
if (success):
    success = configureEventBus(ctsServerName, eventBusBody)
if (success):
    success = configureCohortMembership(ctsServerName, devCohort)
if (success):
    success = configureRepositoryWorkbench(ctsServerName, workbenchConfigBody)

if (success):
    print("\nDone.")
else:
    print("\nFAILED: please check the messages above and correct before proceeding")

 ## Configuring the SUT Server (Server Under Test)

Next we're going to configure the SUT Server from the diagram above. The SUT Server is the one that hosts the repository that is being tested. The SUT Server will run on the same platform as the CTS Server.

The server will default to using a local in-memory repository.
The SUT server does not need to run any Access Services.

Notice that when we configure the CTS Server to run the repository workbench, we provide the name of the server under test.

In [None]:
sutServerType                  = "Metadata Repository Server"
sutServerUserId                = "SUTnpa"
sutServerPassword              = "SUTpassw0rd"
metadataCollectionName         = "SUT_MDR"
metadataRepositoryTypeInMemory = "in-memory-repository"
metadataRepositoryTypeGraph    = "local-graph-repository"

print("Configuring " + sutServerName + "...")

if (success):
    success = configurePlatformURL(sutServerName, ctsServerPlatform)
if (success):
    success = configureServerType(sutServerName, sutServerType)
if (success):
    success = configureUserId(sutServerName, sutServerUserId)
if (success):
    success = configurePassword(sutServerName, sutServerPassword)
if (success):
    success = configureMetadataRepository(sutServerName, metadataRepositoryTypeInMemory)
if (success):
    success = configureDescriptiveName(sutServerName, metadataCollectionName)
if (success):
    success = configureEventBus(sutServerName, eventBusBody)
if (success):
    success = configureCohortMembership(sutServerName, devCohort)

if (success):
    print("\nDone.")
else:
    print("\nFAILED: please check the messages above and correct before proceeding")

The commands below deploy the server configuration documents to the server platforms where the
servers will run.

In [None]:
def deployServerToPlatform(serverName, platformURL):
    print("   ... deploying", serverName, "to the", platformURL, "platform...")
    url = adminCommandURLRoot + serverName + '/configuration/deploy'
    platformTarget = {
        "class": "URLRequestBody",
        "urlRoot": platformURL
    }
    try:
        return postAndPrintResult(url, json=platformTarget, headers=jsonContentHeader)
    except requests.exceptions.RequestException as e:
        print ("   ...... FAILED - http request threw an exception: ", e)
        return False    


print("\nDeploying server configuration documents to appropriate platforms...")
    
if (success):
    success = deployServerToPlatform(ctsServerName, ctsPlatformURL)
if (success):
    success = deployServerToPlatform(sutServerName, ctsPlatformURL)

if (success):
    print("\nDone.")
else:
    print("\nFAILED: please check the messages above and correct before proceeding")

## Starting the servers

We'll need to define the URL for the OMRS operational services API.

In [None]:
operationalServicesURLcore = "/open-metadata/admin-services/users/" + adminUserId

Start the CTS Server, followed by the SUT Server.

When the CTS Server sees the cohort registration for the SUT Server it will start to run the workbench.

In [None]:
def startServer(serverName, platformURL):
    print("   ... starting server", serverName, "...")
    url = platformURL + operationalServicesURLcore + '/servers/' + serverName + '/instance'
    return postAndPrintResult(url)

print ("\nStarting the CTS server ...")

if (success):
    success = startServer(ctsServerName, ctsPlatformURL)
    
# Pause to allow server to initialize fully    
time.sleep(4)

print ("\nStarting the SUT server ...")

if (success):
    success = startServer(sutServerName, ctsPlatformURL)

if (success):
    print("\nDone.")
else:
    print("\nFAILED: please check the messages above and correct before proceeding")

## Workbench Progress

The repository workbench runs a lot of tests (several thousand) and it can take a while to complete -- meaning several hours.  There is no 'completion event' because when the conformance suite has completed the synchronous workbench tests it continues to run and will perform asynchronous tests in responses to events that may be received within the cohort. The consequence of this is that it is not easy to know when the CTS has 'finished'. However, if you scan the output console logging from the conformance suite it is possible to detect the log output:

Thu Nov 21 09:11:01 GMT 2019 CTS_Server Information CONFORMANCE-SUITE-0011 The Open Metadata Conformance Workbench repository-workbench has completed its synchronous tests, further test cases may be triggered from incoming events.

When this has been seen you will probably see a number of further events being processed by the CTS Server. There can be up to several hundred events - that look like the following:

Thu Nov 21 09:11:03 GMT 2019 CTS_Server Event OMRS-AUDIT-8006 Processing incoming event of type DeletedEntityEvent for instance 2fd6cd97-35dd-41d9-ad2f-4d25af30033e from: OMRSEventOriginator{metadataCollectionId='f076a951-fcd0-483b-a06e-d0c7abb61b84', serverName='SUT_Server', serverType='Metadata Repository Server', organizationName='null'}

Thu Nov 21 09:11:03 GMT 2019 CTS_Server Event OMRS-AUDIT-8006 Processing incoming event of type PurgedEntityEvent for instance 2fd6cd97-35dd-41d9-ad2f-4d25af30033e from: OMRSEventOriginator{metadataCollectionId='f076a951-fcd0-483b-a06e-d0c7abb61b84', serverName='SUT_Server', serverType='Metadata Repository Server', organizationName='null'}

These events are usually DELETE and PURGE events relating to instances that have been cleaned up on the SUT Server. 

Once these events have been logged the console should go quiet. When you see this, it is possible to retrieve the workbench results from the CTS Server.

## Polling for Status
The following cell can be used to find out whether the workbench has completed its synchronous tests....

In [None]:
conformanceSuiteServicesURLcore = "/open-metadata/conformance-suite/users/" + adminUserId

def retrieveStatus(serverName, platformURL):
    print("   ... retrieving completion status from server", serverName, "...")
    url = platformURL + '/servers/' + serverName + conformanceSuiteServicesURLcore + '/status/workbenches/repository-workbench'
    return getResult(url)

print ("\nRetrieve repository-workbench status ...")

status_json = retrieveStatus(ctsServerName, ctsPlatformURL)

if (status_json != None):
    workbenchId = status_json['workbenchStatus']['workbenchId']
    workbenchComplete = status_json['workbenchStatus']['workbenchComplete']
    if (workbenchComplete == True):
        print("\nWorkbench",workbenchId,"is complete.")
    else:
        print("\nWorkbench",workbenchId,"has not yet completed.")
else:
    print("\nFAILED: please check the messages above and correct before proceeding")

## Retrieving the Workbench Results

The repository workbench keeps the results of the testcases in memory. When the workbench is complete (see above) you 
can request a report of the results from the REST API on the CTS Server.

The REST API has several options that supports different styles of report. Here we will request a summary report, followed by requesting the full details of each profile and test case individually. Some of the detailed profile reports can be large (10-20MB), so if you are running the Jupyter notebook server with its default configuration, the report may exceed the default max data rate for the notebook server. If you are not running the Egeria team's containers (docker/k8s), and you have not done so already, please restart the notebook server with the following configuration option:

jupyter notebook --NotebookApp.iopub_data_rate_limit=1.0e10 

If the following call results in a Java Heap error you may need to increase the memory configured for your container environment, or available locally. Min 2GB, ideally 4GB additional heap space is recommended for CTS.

Given the amount of detail involved, this may take a minute or two to retrieve all of the details of a completed CTS run: wait until the cell shows a number (rather than an asterisk). This indicates the cell has completed, and you should also see a final line of output that states: \"Done -- all details retrieved. (While it runs, you should see the output updating with the iterative REST calls that are made to retrieve each profile's or test case's details.)

In [None]:
from requests.utils import quote
import os

report_json = None
cwd = os.getcwd()

profileDir = "profile-details"
testCaseDir = "test-case-details"

conformanceSuiteServicesURLcore = "/open-metadata/conformance-suite/users/" + adminUserId

def retrieveSummary(serverName, platformURL):
    print("   ... retrieving test report summary from server", serverName, "...")
    url = platformURL + '/servers/' + serverName + conformanceSuiteServicesURLcore + '/report/summary'
    return getResult(url)

def retrieveProfileNames(serverName, platformURL):
    print("   ... retrieving profile list from server", serverName, "...")
    url = platformURL + '/servers/' + serverName + conformanceSuiteServicesURLcore + '/report/profiles'
    return getResult(url)

def retrieveTestCaseIds(serverName, platformURL):
    print("   ... retrieving test case list from server", serverName, "...")
    url = platformURL + '/servers/' + serverName + conformanceSuiteServicesURLcore + '/report/test-cases'
    return getResult(url)

def retrieveProfileDetails(serverName, platformURL, profileName):
    encodedProfileName = quote(profileName)
    url = platformURL + '/servers/' + serverName + conformanceSuiteServicesURLcore + '/report/profiles/' + encodedProfileName
    return getResult(url)

def retrieveTestCaseDetails(serverName, platformURL, testCaseId):
    url = platformURL + '/servers/' + serverName + conformanceSuiteServicesURLcore + '/report/test-cases/' + testCaseId
    return getResult(url)

print ("\nRetrieve Conformance Suite summary results ...")

summary_json = retrieveSummary(ctsServerName, ctsPlatformURL)

if (summary_json != None):
    profiles = retrieveProfileNames(ctsServerName, ctsPlatformURL)
    profileDetailsDir = cwd + os.path.sep + profileDir
    os.makedirs(profileDetailsDir, exist_ok=True)
    print("Retrieving details for each profile...")
    for profile in profiles['profileNames']:
        profile_details = retrieveProfileDetails(ctsServerName, ctsPlatformURL, profile)
        with open(profileDetailsDir + os.path.sep + profile.replace(" ", "_") + ".json", 'w') as outfile:
            json.dump(profile_details, outfile)
    test_cases = retrieveTestCaseIds(ctsServerName, ctsPlatformURL)
    testCaseDetailsDir = cwd + os.path.sep + testCaseDir
    os.makedirs(testCaseDetailsDir, exist_ok=True)
    print("Retrieving details for each test case...")
    for test_case in test_cases['testCaseIds']:
        test_case_details = retrieveTestCaseDetails(ctsServerName, ctsPlatformURL, test_case)
        with open(testCaseDetailsDir + os.path.sep + test_case + ".json", 'w') as outfile:
            json.dump(test_case_details, outfile)
    print("\nDone -- all details retrieved.")
else:
    print("\nFAILED: please check the messages above and correct before proceeding")

## Conformance Profile Results

The following is a summary of the status of each conformance profile. To ensure that you get a complete summary, make sure you retrieve the results (as above) once the workbench has completed.

(Note that this uses pandas to summarize the results table: if you have not already done so, use pip3 to install pandas and its dependencies.)

In [None]:
import pandas
from pandas import json_normalize

if (summary_json != None):
    repositoryWorkbenchResults = json_normalize(data = summary_json['testLabSummary'],
                                                record_path =['testSummariesFromWorkbenches','profileSummaries'])
    repositoryWorkbenchResultsSummary = repositoryWorkbenchResults[['name','description','profilePriority','conformanceStatus']]

    display(repositoryWorkbenchResultsSummary.head(15))