In [1]:
from pymongo import MongoClient
from kubernetes import client, config
from pprint import pprint
from kubernetes.client.rest import ApiException

In [2]:
MONGO_HOST = '192.168.178.168'
MONGO_PORT = 32017
NAMESPACE = 'default'
K8S_SHARDS = 0


# Load local kubeconfig (e.g., ~/.kube/config)
config.load_kube_config()

# Create API client for core Kubernetes resources
apps_V1 = client.AppsV1Api()
core_V1 = client.CoreV1Api()

In [3]:
def connect_to_kubernetes():
    global K8S_SHARDS
    K8S_SHARDS = 0
    try:
        print("üìã Listing all pods and their statuses:\n")

        pods = core_V1.list_namespaced_pod(namespace=NAMESPACE,watch=False)
        for pod in pods.items:
            name = pod.metadata.name
            status = pod.status.phase
            if "mongodb-shard" in name:
                K8S_SHARDS += 1
            print(f" - {name}: {status}")
        print(f"\n Currently there are {K8S_SHARDS} shards in k8s")

    except ApiException as e:
        print(f"‚ùå Kubernetes API error: {e}")
    except Exception as e:
        print(f"‚ùå Failed to connect to Kubernetes: {e}")

connect_to_kubernetes()

üìã Listing all pods and their statuses:

 - mongodb-configsvr-0: Running
 - mongodb-mongos-85d8b6979b-4r8xb: Running
 - mongodb-mongos-85d8b6979b-dvtx9: Running
 - mongodb-shard1-0: Running
 - mongodb-shard2-0: Running
 - mongodb-shard3-0: Running
 - mongodb-shard4-0: Running
 - mongodb-shard5-0: Running
 - mongodb-shard6-0: Running

 Currently there are 6 shards in k8s


In [4]:
# Replace with your mongos address and authentication if needed
#mongos_uri = "mongodb://192.168.178.168:32017"
mongos_uri = "mongodb://localhost:27017"

MONGO_SHARDS = 0

# Connect to the mongos router
mongoClient = MongoClient(mongos_uri)

# Test connection by listing databases
try:
    print("Connected to MongoDB. Databases:")
    pprint(mongoClient.list_database_names())
except Exception as e:
    print("Connection failed:", e)


Connected to MongoDB. Databases:
['admin', 'config', 'ycsb_sharded']


In [5]:
def list_shards(mongos_client: MongoClient):
    global MONGO_SHARDS
    """
    Lists all current shards in the MongoDB sharded cluster.
    
    :param mongos_client: The MongoClient connected to mongos
    """
    try:
        result = mongos_client.admin.command("listShards")
        MONGO_SHARDS = len(result['shards'])
        print(f"üì¶ Current shards in the cluster: {MONGO_SHARDS}")
        for shard in result["shards"]:
            print(f" - {shard['_id']}: {shard['host']}")
    except Exception as e:
        print(f"‚ùå Failed to list shards: {e}")

# List current shards
list_shards(mongoClient)

üì¶ Current shards in the cluster: 6
 - shard1: shard1/mongodb-shard1-0.mongodb-shard1.default.svc.cluster.local:27018
 - shard2: shard2/mongodb-shard2-0.mongodb-shard2.default.svc.cluster.local:27018
 - shard3: shard3/mongodb-shard3-0.mongodb-shard3.default.svc.cluster.local:27018
 - shard4: shard4/mongodb-shard4-0.mongodb-shard4.default.svc.cluster.local:27018
 - shard5: shard5/mongodb-shard5-0.mongodb-shard5.default.svc.cluster.local:27018
 - shard6: shard6/mongodb-shard6-0.mongodb-shard6.default.svc.cluster.local:27018


In [7]:
def remove_shard(mongos_client: MongoClient, shard_id: int):
    """
    Initiates or continues removal of a shard from the MongoDB cluster.
    This function executes the `removeShard` command only once, allowing
    external control over when to reissue it.

    :param mongos_client: The MongoClient connected to mongos
    :param shard_id: Numeric string like '3' (for shard3)
    """
    global MONGO_SHARDS
    shard_name = f"shard{shard_id}"

    try:
        res = mongos_client.admin.command("removeShard", shard_name)
        state = res["state"]
        print(f"üì¶ removeShard called on: {shard_name} (state: {state})")
        pprint(res)
        if (state == 'completed'):
            MONGO_SHARDS -= 1
    except Exception as e:
        print(f"‚ùå Failed to remove shard {shard_name}: {e}")


def add_shard(mongos_client: MongoClient, shard_id: int):
    """
    Adds a shard to the MongoDB cluster. Constructs shard URI from the shard ID.
    
    :param mongos_client: The MongoClient connected to mongos
    :param shard_id: Numeric string like '3' (for shard3)
    """
    global MONGO_SHARDS
    shard_name = f"shard{shard_id}"
    host = f"mongodb-shard{shard_id}-0.mongodb-shard{shard_id}.default.svc.cluster.local:27018"
    shard_uri = f"{shard_name}/{host}"

    try:
        result = mongos_client.admin.command("addShard", shard_uri)
        print(f"‚úÖ Shard added: {shard_uri}")
        pprint(result)
        MONGO_SHARDS += 1
    except Exception as e:
        print(f"‚ùå Failed to add shard {shard_uri}: {e}")


def reshard_collection(mongos_client: MongoClient, db_name="ycsb_sharded", coll_name="sharded", chunks=40):
    """
    Initiates a reshardCollection operation on the given collection with:
    - forceRedistribution: true
    - numInitialChunks: 40
    - shardKey: { country: "hashed" }

    :param mongos_client: MongoClient connected to mongos
    :param db_name: Name of the database (e.g., "ycsb_sharded")
    :param coll_name: Name of the collection (e.g., "usertable")
    """
    numInitialChunks = chunks
    while (numInitialChunks > 2*MONGO_SHARDS):
        try:
            namespace = f"{db_name}.{coll_name}"
            cmd = {
                "reshardCollection": namespace,
                "key": { "first_name": "hashed" },
                "numInitialChunks": numInitialChunks,
                "forceRedistribution": True
            }
            print(f"Resharding initiated for {namespace} with {numInitialChunks} chunks...‚úÖ")
            result = mongos_client.admin.command(cmd)
            from pprint import pprint
            pprint(result)
            return True

        except Exception as e:
            #print(f"Failed to reshard {namespace}:‚ùå  \n{e}")
            numInitialChunks -= 1
    
    return False

from bson.int64 import Int64

def split_hashed_chunks(mongos_client: MongoClient, db_name="ycsb_sharded", coll_name="sharded", num_chunks=40, sk_field="country"):
    """
    Splits a collection sharded on a hashed key into `num_chunks` chunks
    using evenly distributed hashed key space split points.

    :param mongos_client: MongoClient connected to mongos
    :param db_name: Name of the database
    :param coll_name: Name of the collection
    :param num_chunks: Number of chunks to split into (default: 40)
    """
    try:
        print(f"üîß Splitting {db_name}.{coll_name} into {num_chunks} chunks on hashed {sk_field}...")

        ns = f"{db_name}.{coll_name}"
        min_hash = -2**63
        max_hash = 2**63

        step = (max_hash - min_hash) // num_chunks
        split_points = [Int64(min_hash + i * step) for i in range(1, num_chunks)]

        for i, split_point in enumerate(split_points):
            cmd = {
                "split": ns,
                f"middle": { sk_field: split_point }
            }
            res = mongos_client.admin.command(cmd)
            print(f"‚úÖ Split {i+1}/{num_chunks - 1} at hashed value {split_point}")

        print(f"üéâ Done splitting into {num_chunks} chunks.")

    except Exception as e:
        print(f"‚ùå Failed during split: {e}")


In [8]:
def scale_k8s_shard_down(apps_v1, shard_id: int, namespace: str = "default"):
    """
    Scales the StatefulSet for the given shard ID to 0 replicas.
    
    :param apps_v1: An initialized AppsV1Api client
    :param shard_id: Integer ID of the shard (e.g., 3 for mongodb-shard3)
    :param namespace: Kubernetes namespace (default: "default")
    """
    global K8S_SHARDS
    shard_name = f"mongodb-shard{shard_id}"
    try:
        apps_v1.patch_namespaced_stateful_set_scale(
            name=shard_name,
            namespace=namespace,
            body={"spec": {"replicas": 0}}
        )
        print(f"üîª Scaled {shard_name} to 0 replicas.")
        K8S_SHARDS -= 1
    except client.exceptions.ApiException as e:
        print(f"‚ùå Failed to scale down {shard_name}: {e}")


def scale_k8s_shard_up(apps_v1, shard_id: int, namespace: str = "default"):
    """
    Scales the StatefulSet for the given shard ID to 1 replica.
    
    :param apps_v1: An initialized AppsV1Api client
    :param shard_id: Integer ID of the shard (e.g., 3 for mongodb-shard3)
    :param namespace: Kubernetes namespace (default: "default")
    """
    global K8S_SHARDS
    shard_name = f"mongodb-shard{shard_id}"
    try:
        apps_v1.patch_namespaced_stateful_set_scale(
            name=shard_name,
            namespace=namespace,
            body={"spec": {"replicas": 1}}
        )
        print(f"üî∫ Scaled {shard_name} to 1 replica.")
        K8S_SHARDS += 1
    except client.exceptions.ApiException as e:
        print(f"‚ùå Failed to scale up {shard_name}: {e}")


def print_statefulsets_info(apps_v1: client.AppsV1Api, namespace: str = "default"):
    """
    Prints information about all StatefulSets in the specified namespace.

    :param apps_v1: Initialized AppsV1Api client
    :param namespace: Kubernetes namespace (default: "default")
    """
    try:
        statefulsets = apps_v1.list_namespaced_stateful_set(namespace=namespace)
        if not statefulsets.items:
            print(f"‚ÑπÔ∏è No StatefulSets found in namespace '{namespace}'.")
            return

        print(f"üì¶ StatefulSets in namespace '{namespace}':\n")
        for sts in statefulsets.items:
            name = sts.metadata.name
            replicas = sts.spec.replicas
            ready_replicas = sts.status.ready_replicas or 0
            selector = sts.spec.selector.match_labels
            print(f" - {name}")
            print(f"    üî¢ Replicas: {ready_replicas}/{replicas}")
            print(f"    üè∑Ô∏è Selector: {selector}\n")

    except client.exceptions.ApiException as e:
        print(f"‚ùå Kubernetes API error: {e}")
    except Exception as e:
        print(f"‚ùå Failed to retrieve StatefulSets: {e}")


In [9]:
print_statefulsets_info(apps_V1)

üì¶ StatefulSets in namespace 'default':

 - mongodb-configsvr
    üî¢ Replicas: 1/1
    üè∑Ô∏è Selector: {'app': 'mongodb-configsvr'}

 - mongodb-shard1
    üî¢ Replicas: 1/1
    üè∑Ô∏è Selector: {'app': 'mongodb-shard1'}

 - mongodb-shard2
    üî¢ Replicas: 1/1
    üè∑Ô∏è Selector: {'app': 'mongodb-shard2'}

 - mongodb-shard3
    üî¢ Replicas: 1/1
    üè∑Ô∏è Selector: {'app': 'mongodb-shard3'}

 - mongodb-shard4
    üî¢ Replicas: 1/1
    üè∑Ô∏è Selector: {'app': 'mongodb-shard4'}

 - mongodb-shard5
    üî¢ Replicas: 1/1
    üè∑Ô∏è Selector: {'app': 'mongodb-shard5'}

 - mongodb-shard6
    üî¢ Replicas: 1/1
    üè∑Ô∏è Selector: {'app': 'mongodb-shard6'}



In [10]:
**Always scale down MongoDB before Kubernetes and scale up Kubernetes before MongoDB**

SyntaxError: invalid syntax (2175669829.py, line 1)

In [13]:
scale_k8s_shard_down(apps_V1, 6)

üîª Scaled mongodb-shard6 to 0 replicas.


In [10]:
scale_k8s_shard_up(apps_V1, 4)

üî∫ Scaled mongodb-shard4 to 1 replica.


In [13]:
add_shard(mongoClient, 6)

‚úÖ Shard added: shard6/mongodb-shard6-0.mongodb-shard6.default.svc.cluster.local:27018
{'$clusterTime': {'clusterTime': Timestamp(1748437228, 18),
                  'signature': {'hash': b'\x00\x00\x00\x00\x00\x00\x00\x00'
                                        b'\x00\x00\x00\x00\x00\x00\x00\x00'
                                        b'\x00\x00\x00\x00',
                                'keyId': 0}},
 'ok': 1.0,
 'operationTime': Timestamp(1748437228, 18),
 'shardAdded': 'shard6'}


In [15]:
remove_shard(mongoClient, 6)

üì¶ removeShard called on: shard6 (state: started)
{'$clusterTime': {'clusterTime': Timestamp(1748504099, 2),
                  'signature': {'hash': b'\x00\x00\x00\x00\x00\x00\x00\x00'
                                        b'\x00\x00\x00\x00\x00\x00\x00\x00'
                                        b'\x00\x00\x00\x00',
                                'keyId': 0}},
 'dbsToMove': [],
 'msg': 'draining started successfully',
 'note': 'you need to drop or movePrimary these databases',
 'ok': 1.0,
 'operationTime': Timestamp(1748504099, 2),
 'shard': 'shard6',
 'state': 'started'}


In [None]:
reshard_collection(mongoClient, coll_name="usertable", chunks=100)

Resharding initiated for ycsb_sharded.usertable with 100 chunks...‚úÖ


In [None]:
split_hashed_chunks(mongoClient, coll_name="usertable", num_chunks=100, sk_field="first_name")

In [None]:
def same_replicas_number():
    global MONGO_SHARDS, K8S_SHARDS
    if (MONGO_SHARDS == K8S_SHARDS):
        print(f"The number of replicas in Kubernetes and MongoDB is the same: {K8S_SHARDS}")
        return True
    else:
        print("The number of replicas is different:")
        print(f"- {K8S_SHARDS} in Kubernetes")
        print(f"- {MONGO_SHARDS} in MongoDB")
        return False

In [None]:
same_replicas_number()

The number of replicas in Kubernetes and MongoDB is the same: 6


True

In [None]:
def scale_cluster_down(mongoClient, apps_V1):
    if (same_replicas_number == False):
        print("‚ùåCan't scale down because replicas number is different!")
        same_replicas_number()
        return False
    else:
        print(f"Starting scaling down from {MONGO_SHARDS} to {MONGO_SHARDS - 1} replicas...\n")
        remove_shard(mongoClient, MONGO_SHARDS)
        print("Resharding the collection...\n")
        reshard_collection(mongoClient, coll_name="usertable", chunks=100, shard_key_field="first_name")
        remove_shard(mongoClient, MONGO_SHARDS)
        scale_k8s_shard_down(apps_V1, K8S_SHARDS)
        print("‚úÖScaled down succesfully!")
        same_replicas_number()
        return True

def scale_cluster_up(mongoClient, apps_V1):
    if (same_replicas_number == False):
        print("‚ùåCan't scale up because replicas number is different!")
        same_replicas_number()
        return False
    else:
        print(f"Starting scaling up from {K8S_SHARDS} to {K8S_SHARDS + 1} replicas...\n")
        scale_k8s_shard_up(apps_V1, K8S_SHARDS + 1)
        add_shard(mongoClient, K8S_SHARDS)
        print("Resharding the collection...\n")
        reshard_collection(mongoClient, coll_name="usertable", chunks=100, shard_key_field="first_name")
        print("‚úÖScaled up succesfully!")
        same_replicas_number()
        return True
        

In [None]:
scale_cluster_down(mongoClient, apps_V1)

In [None]:
scale_cluster_up(mongoClient, apps_V1)