In [None]:
import json
from dataclasses import dataclass, InitVar
from enum import StrEnum
from typing import Optional

from dotenv import load_dotenv
from pymongo import MongoClient, IndexModel

load_dotenv()


@dataclass
class Namespace:
    cluster_name: str
    database: Optional[str] = None
    collection: Optional[str] = None
    user_name: InitVar[Optional[str]] = None

    def __post_init__(self, user_name):
        self.user_name = user_name if user_name else f"{self.cluster_name}-autogenerated"

    @property
    def certificate_path(self):
        return f"pem/{self.user_name}.pem"

    @property
    def uri(self):
        uris = !atlas clusters connectionStrings describe {self.cluster_name} -o json
        return json.loads(uris.s)["standardSrv"]


class Role(StrEnum):
    read = "read"
    readWrite = "readWrite"


# Atlas CLI - create clusters
# https://www.mongodb.com/docs/atlas/cli/current/command/atlas-clusters-create/

def provision_cluster(namespace: Namespace, template="template/m10-replica-set.json"):
    !atlas cluster create {namespace.cluster_name} --file {template} -w


# Atlas CLI - create users and certificates
# https://www.mongodb.com/docs/atlas/cli/current/command/atlas-dbusers-create/
# https://www.mongodb.com/docs/atlas/cli/current/command/atlas-dbusers-certs-create/
# https://www.mongodb.com/docs/atlas/cli/current/command/atlas-dbusers-update/#examples

def create_user(role: Role, namespace: Namespace):
    # Create a user with a scope limited to a single cluster
    canonical_role = _get_canonical_role(role, namespace.database, namespace.collection)
    !atlas dbusers create --x509Type MANAGED --scope {namespace.cluster_name}:CLUSTER --role {canonical_role} --username {namespace.user_name}

    # Create and download a certificate
    # TODO Store certificate in a secret storage
    !atlas dbusers certs create --username {namespace.user_name} --monthsUntilExpiration 1 > {namespace.certificate_path}
    print("Certificate added:", namespace.certificate_path)


def update_user(role: Role, namespace: Namespace):
    describe_params = f"{namespace.user_name} --authDB \\$external -o json"
    current_settings = !atlas dbuser describe {describe_params}

    current_settings = json.loads(current_settings.s)

    # current_settings
    current_roles = current_settings["roles"]
    current_scopes = current_settings["scopes"]

    canonical_roles = set()
    for r in current_roles:
        canonical_roles.add(
            _get_canonical_role(Role[r.get("roleName")], r.get("databaseName"), r.get("collectionName")))

    canonical_scopes = {s.get("name") + ":" + s.get("type") for s in current_scopes}

    new_role = _get_canonical_role(role, namespace.database, namespace.collection)
    canonical_roles.add(new_role)

    new_scope = f"{namespace.cluster_name}:CLUSTER"
    canonical_scopes.add(new_scope)

    update_params = f"{namespace.user_name} --role {','.join(canonical_roles)} --scope {','.join(canonical_scopes)} --authDB \\$external --x509Type MANAGED -o json"
    !atlas dbuser update {update_params}


def _get_canonical_role(role: Role, database, collection):
    database = ("@" + database) if database else ""
    collection = ("." + collection) if collection else ""
    return f"{role}{database}{collection}"


# Migrate collections using local mongoexport pipe to mongoimport
# https://www.mongodb.com/docs/database-tools/mongoexport/
# https://www.mongodb.com/docs/database-tools/mongoimport/

def migrate_collection(source: Namespace, destination: Namespace, mode="upsert"):
    export_params = f"--uri '{source.uri}' --authenticationMechanism 'MONGODB-X509' --sslPEMKeyFile '{source.certificate_path}' --ssl -d '{source.database}' -c '{source.collection}' --compressors zstd"

    import_params = f"--uri '{destination.uri}' --authenticationMechanism 'MONGODB-X509' --sslPEMKeyFile '{destination.certificate_path}' --ssl -d '{destination.database}' -c '{destination.collection}' --mode {mode} --compressors zstd"

    !mongoexport {export_params} | mongoimport {import_params}


# Create indexes in a collection with a pymongo client
# https://www.mongodb.com/docs/languages/python/pymongo-driver/current/indexes/
# https://pymongo.readthedocs.io/en/4.16.0/api/pymongo/collection.html#pymongo.collection.Collection.create_indexes

def create_indexes(namespace: Namespace, index_list: list):
    client = MongoClient(
        namespace.uri,
        tls=True,
        authMechanism='MONGODB-X509',
        authSource="$external",
        tlsCertificateKeyFile=namespace.certificate_path
    )

    canonical_index_list = [IndexModel(list(idx.items())) for idx in index_list]

    return client.get_database(namespace.database).get_collection(namespace.collection).create_indexes(
        canonical_index_list)



#### Create an empty cluster based on a sizing template.

In [None]:
# This command will create a new cluster!
# provision_cluster("new-cluster", template="template/m30-sharded.json")

#### Create or update a database user

In [None]:
# Private user
create_user(
    Role.read,
    Namespace(cluster_name="echo", database="sample_mflix")
)

create_user(
    Role.readWrite,
    Namespace(cluster_name="echo-copy", database="sample_mflix", collection="movies")
)

# Global user
create_user(
    Role.readWrite,
    Namespace(user_name="global-user", cluster_name="echo-copy", database="sample_mflix")
)

update_user(
    Role.readWrite,
    Namespace(user_name="global-user", cluster_name="echo-copy", database="sample_airbnb")
)

#### Copy a collection to a different cluster

In [None]:
migrate_collection(
    source=Namespace(cluster_name="echo", database="sample_mflix", collection="movies"),
    destination=Namespace(cluster_name="echo-copy", database="sample_mflix", collection="movies"),
    mode="upsert"
)

#### Create indexes

In [None]:
create_indexes(
    Namespace(cluster_name="echo-copy", database="sample_mflix", collection="movies"),
    index_list=[
        {"year": 1},
        {"title": 1, "description": 1},
    ]
)

#### Scenarios

In [None]:
# Scripts 1:
# Create new DB
ns = Namespace(cluster_name="echo-copy", database="sample", collection="private_wishlist")

# Create a DB User and a secret to rw data only to this db.
# Any other db user must not have rw access to this DB.
create_user(Role.readWrite, ns)

# Create collection and add indices
create_indexes(ns, [{"priority": 1}])

# Return the secret
print(ns.certificate_path)

In [None]:
# Scripts 2:
# Create new DB
ns = Namespace(user_name="global-user", cluster_name="echo-copy", database="sample", collection="feedback")

# Configures the global db user to read write the data
# update_user(Role.readWrite, ns)

# Create collection and add indices
create_indexes(ns, [{"score": 1, "date": -1}])