In [None]:
from google.oauth2 import service_account
from google.cloud.container_v1 import ClusterManagerClient
from kubernetes import client, config, utils
import google.auth.transport.requests
import yaml
import time
import os
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

project_id = "etcd-clusters"
zone = "us-central1-c"
cluster_id = "etcd-scaling-testing-cluster"
SCOPES = ['https://www.googleapis.com/auth/cloud-platform']
credentials = service_account.Credentials.from_service_account_file(
    "../keys/compsci-512-svc-account.json", scopes=SCOPES)
request = google.auth.transport.requests.Request()
credentials.refresh(request)
cluster_manager_client = ClusterManagerClient(credentials=credentials)
cluster = cluster_manager_client.get_cluster(
    project_id=project_id, zone=zone, cluster_id=cluster_id)
configuration = client.Configuration()
configuration.host = "https://"+cluster.endpoint+":443"
configuration.verify_ssl = False
configuration.api_key = {"authorization": "Bearer " + credentials.token}
client.Configuration.set_default(configuration)
v1 = client.CoreV1Api()
k8sClient = client.ApiClient()
api = client.AppsV1Api(k8sClient)

In [None]:
def delete_shard(instance_num, num_replicas_per_set):
    for i in range(num_replicas_per_set + 1):
        res_i = v1.delete_namespaced_pod(namespace="default", name=f"etcd-{instance_num}-{i}")
        print(f"Deleted etcd-{instance_num}-{i}")
    res_4 = api.delete_namespaced_stateful_set(name=f"etcd-{instance_num}", namespace="default")
    res_5 = v1.delete_namespaced_persistent_volume_claim(f"data-{instance_num}-etcd-{instance_num}-0", namespace="default")
    res_6 = v1.delete_namespaced_persistent_volume_claim(f"data-{instance_num}-etcd-{instance_num}-1", namespace="default")
    res_7 = v1.delete_namespaced_persistent_volume_claim(f"data-{instance_num}-etcd-{instance_num}-2", namespace="default")
    res_8 = v1.delete_namespaced_service(name=f"etcd-{instance_num}", namespace="default")
    res_9 = v1.delete_namespaced_service(name=f"etcd-client-{instance_num}", namespace="default")
    print(f"Successfully deleted shard {instance_num}")

In [None]:
def delete_sharder():
    pods = v1.list_pod_for_all_namespaces(watch=False)
    sharder_name = ""
    for i in pods.items:
        if "sharder" in i.metadata.name:
            sharder_name = i.metadata.name
    print(f"Sharder name: {sharder_name}")
    try:
        res_1 = v1.delete_namespaced_pod(namespace="default", name=sharder_name)
        res_2 = api.delete_namespaced_deployment(name=f"etcd-sharder", namespace="default")
        res_3 = v1.delete_namespaced_service(name=f"sharder-endpoint", namespace="default")
        print("Successfully deleted sharder")
    except client.ApiException:
        print(f"Skipping deletion of sharder")

In [None]:
def generate_deployments(num_shards, num_replicas):
    # always 3 replicas per statefulset ("highly available")
    # instance num always 1
    os.system(f"cd .. && python generate_deployment_files.py {num_shards} 3 1 {num_replicas}")

In [None]:
def deploy(filename):
    k8sClient = client.ApiClient()
    api = client.AppsV1Api(k8sClient)
    deployment_path = "../deployment_files/" + filename
    f = open(deployment_path)
    try:
        response = utils.create_from_yaml(k8sClient, deployment_path)
        print(f"Deployed {deployment_path}")
    except ValueError as e:
        print(e)
    
def deploy_etcd_shard(instance_num):
    deploy(f"etcd-deployment-{instance_num}.yaml")

In [None]:
print("Listing pods with their IPs:")
pods = v1.list_pod_for_all_namespaces(watch=False)
instances = set()
num_replicas_per_set = 1
for i in pods.items:
    if i.metadata.namespace == "default":
        print("%s\t%s\t%s" %
              (i.status.pod_ip, i.metadata.namespace, i.metadata.name))
        inst_num = i.metadata.name.split("-")[1]
        replica_num = i.metadata.name.split("-")[2]
        if inst_num.isnumeric():
            instances.add(int(inst_num))
            num_replicas_per_set = max(num_replicas_per_set, int(replica_num))
print(instances)
print(num_replicas_per_set)

In [None]:
# Take down existing shards
for i in list(instances):
    try:
        delete_shard(i, num_replicas_per_set)
    except client.ApiException:
        print(f"Skipping deletion of shard {i}")
delete_sharder()

In [None]:
# Generate x sharding deployments
NUM_SHARDS = 1
NUM_REPLICAS = 50
generate_deployments(NUM_SHARDS, NUM_REPLICAS)

In [None]:
# Deploy sharding deployments
deploy("sharder-deployment.yaml")
for i in range(1, NUM_SHARDS + 1):
    deploy_etcd_shard(i)
    print(f"Deployed etcd shard {i}")

In [None]:
# Find sharder endpoint ip
svcs = v1.list_service_for_all_namespaces()
sharder_ip = ""
found = False

while not found:
    svcs = v1.list_service_for_all_namespaces()
    try:
        for item in svcs.items:
            if item.metadata.name == "sharder-endpoint":
                sharder_ip = item.status.load_balancer.ingress[0].ip
                print(sharder_ip)
                found = True
    except TypeError:
        time.sleep(2)
        print("Load balancer not found. Waiting...")

In [None]:
# Run benchmark
BENCHMARK_NAME = "3d"
if not os.path.isdir(BENCHMARK_NAME):
    os.mkdir(BENCHMARK_NAME)
os.system(f"locust --host http://{sharder_ip}:5000  --headless --csv={BENCHMARK_NAME}/results -u 500 -r 50 --run-time 5m --html={BENCHMARK_NAME}/results_{BENCHMARK_NAME}.html")

In [None]:
# Close everything and redo