In [None]:
import json
import orchest
import orchest.parameters as op
import redis
import psycopg2
from collections import defaultdict

In [None]:
step_params = op.get_params()[0]
internal_connectivity = dict()

In [None]:
# Test connectivity of all redis services.
redis_services = step_params.get("redis-services", ["redis"])
internal_connectivity["redis-services"] = defaultdict(bool)
for svc in redis_services:
    try:
        redis_host = orchest.get_service(svc)["internal_hostname"]
        redis_client = redis.Redis(host=redis_host, port=6379, db=0)
        redis_client.set("hello", "there")
        if redis_client.get("hello") == b"there":
            internal_connectivity["redis-services"][svc] = True
    except Exception as e:
        print(e)

In [None]:
# Test connectivity of all postgres services.
postgres_services = step_params.get("postgres-services", ["postgres"])
internal_connectivity["postgres-services"] = defaultdict(bool)
for svc in postgres_services:
    try:
        postgres_host = orchest.get_service("postgres")["internal_hostname"]
        conn = psycopg2.connect(dbname="postgres", user="postgres", host=postgres_host)
        cur = conn.cursor()
        cur.execute("CREATE TABLE test (id serial PRIMARY KEY, num integer, data varchar);")
        cur.execute("INSERT INTO test (num, data) VALUES (%s, %s)", (1337, "hello"))
        cur.execute("SELECT * FROM test;")
        if cur.fetchone() == (1, 1337, "hello"):
            internal_connectivity["postgres-services"][svc] = True
    except Exception as e:
        print(e)

In [None]:
external_connectivity = defaultdict(list)
for name, service in orchest.get_services().items():
    for url in service["external_urls"].values():
        external_connectivity[name].append(url)

In [None]:
output_file_path = step_params.get("test-output-file", "test-output.json")
connectivity = {
    "internal-connectivity": internal_connectivity,
    "external-connectivity": external_connectivity,
}
# Internal connectivity maps service type (redis/postgres) to a dict
# mapping service name to the result of the connectivity check (True,
# False). External connectivity maps service name to a list of
# external urls where host and port need to be replaced by the host and
# port on which Orchest is running.
with open(f"/data/{output_file_path}", "w") as output_file:
    json.dump(connectivity, output_file)