Skip to content

Commit

Permalink
Merge pull request #487 from microbiomedata/432-migrations-implement-…
Browse files Browse the repository at this point in the history
…bookkeeping-so-database-tells-us-which-schema-it-conforms-to

Migrations: Implement bookkeeping
  • Loading branch information
eecavanna committed Mar 6, 2024
2 parents ede3719 + c6fbc61 commit 89fd282
Show file tree
Hide file tree
Showing 19 changed files with 985 additions and 8 deletions.
125 changes: 125 additions & 0 deletions demo/metadata_migration/notebooks/bookkeeper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
from typing import Optional
from enum import Enum
from datetime import datetime

from pymongo import MongoClient
from nmdc_schema.migrators.migrator_base import MigratorBase


class MigrationEvent(str, Enum):
r"""
Enumeration of all migration events that can be recorded.
Reference: https://docs.python.org/3.10/library/enum.html#others
>>> MigrationEvent.MIGRATION_COMPLETED.value
'MIGRATION_COMPLETED'
>>> MigrationEvent.MIGRATION_STARTED.value
'MIGRATION_STARTED'
"""
MIGRATION_STARTED = "MIGRATION_STARTED"
MIGRATION_COMPLETED = "MIGRATION_COMPLETED"


class Bookkeeper:
r"""
A class you can use to record migration-related events in a Mongo database.
"""
def __init__(
self,
mongo_client: MongoClient,
database_name: str = "nmdc",
collection_name: str = "_migration_events",
view_name: str = "_migration_latest_schema_version",
):
self.database_name = database_name
self.collection_name = collection_name
self.view_name = view_name

# Store references to the database and collection.
self.db = mongo_client[self.database_name]
self.collection = self.db.get_collection(self.collection_name)

# Ensure the MongoDB view that indicates the current schema version, exists.
self.ensure_current_schema_version_view_exists()

@staticmethod
def get_current_timestamp() -> str:
r"""Returns an ISO 8601 timestamp (string) representing the current time in UTC."""
utc_now = datetime.utcnow()
iso_utc_now = utc_now.isoformat()
return iso_utc_now # e.g. "2024-02-21T04:31:03.115107"

def record_migration_event(
self,
migrator: MigratorBase,
event: MigrationEvent,
to_schema_version: Optional[str] = None,
) -> None:
r"""
Records a migration event in the collection.
The `to_schema_version` parameter is independent of the `migrator` parameter because, even though the migrator
does have a `.get_destination_version()` method, the string returned by that method is the one defined when the
migrator was _written_, which is sometimes more generic than the one used for data validation when the migrator
is _run_ (e.g. "1.2" as opposed to "1.2.3"). So, this method provides a means by which the calling code can,
optionally, specify a more precise version identifier.
"""

# If a custom schema version identifier was specified, use that; otherwise, use the one built into the migrator.
to_schema_version_str = migrator.get_destination_version()
if to_schema_version is not None:
to_schema_version_str = to_schema_version

document = dict(
created_at=self.get_current_timestamp(),
event=event.value,
from_schema_version=migrator.get_origin_version(),
to_schema_version=to_schema_version_str,
migrator_module=migrator.__module__, # name of the Python module in which the `Migrator` class is defined
)
self.collection.insert_one(document)

def ensure_current_schema_version_view_exists(self) -> None:
r"""
Ensures the MongoDB view that indicates the current schema version, exists.
References:
- https://www.mongodb.com/community/forums/t/is-there-anyway-to-create-view-using-python/161363/2
- https://www.mongodb.com/docs/manual/reference/method/db.createView/#mongodb-method-db.createView
- https://pymongo.readthedocs.io/en/stable/api/pymongo/database.html#pymongo.database.Database.create_collection
"""
if (
self.view_name not in self.db.list_collection_names()
): # returns list of names of both collections and views
agg_pipeline = [
# Sort the documents so the most recent one is first.
{"$sort": {"created_at": -1}},
# Only preserve the first document.
{"$limit": 1},
# Derive a simplified document for the view.
# Examples: `{ "schema_version": "1.2.3" }` or `{ "schema_version": null }`
{
"$project": {
"_id": 0, # omit the `_id` field
"schema_version": { # add this field based upon the migration status
"$cond": {
"if": {
"$eq": [
"$event",
MigrationEvent.MIGRATION_COMPLETED.value,
]
},
"then": "$to_schema_version", # database conforms to this version of the NMDC Schema
"else": None, # database doesn't necessarily conform to any version of the NMDC Schema
}
},
}
},
]
self.db.create_collection(
name=self.view_name,
viewOn=self.collection_name,
pipeline=agg_pipeline,
check_exists=True, # only create the view if it doesn't already exist
comment="The version of the NMDC Schema to which the database conforms.",
)
186 changes: 186 additions & 0 deletions demo/metadata_migration/notebooks/manual_test_bookkeeper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
# Note: This module's name starts with `manual_test_` instead of `test_` to signify that its author (currently) expects
# people to run it manually as opposed to via automated test infrastructure. Its author chose that route after a
# GitHub Actions workflow that runs tests in `test_` modules kept failing due to a database access issue.

from typing import Optional
import unittest
import re
import os
from datetime import datetime, timedelta

from pymongo import MongoClient, timeout
from pymongo.database import Database
from nmdc_schema.migrators.migrator_base import MigratorBase

from demo.metadata_migration.notebooks.bookkeeper import Bookkeeper, MigrationEvent

# Consume environment variables.
MONGO_HOST: str = os.getenv("MONGO_HOST", "localhost")
MONGO_USER: Optional[str] = os.getenv("MONGO_USERNAME", None)
MONGO_PASS: Optional[str] = os.getenv("MONGO_PASSWORD", None)
MONGO_DATABASE_NAME: str = os.getenv("MONGO_TEST_DBNAME", "test-migration-bookkeeper")

MONGO_TIMEOUT_DURATION: int = 3 # seconds


class FakeMigrator(MigratorBase):
_to_version = "A.B.C"
_from_version = "X.Y.Z"

def upgrade(self):
pass


class TestBookkeeper(unittest.TestCase):
r"""
Tests targeting the `Bookkeeper` class.
You can format this file like this:
$ python -m black demo/metadata_migration/notebooks/manual_test_bookkeeper.py
You can start up a containerized MongoDB server like this:
$ docker run --rm --detach --name mongo-test-migration-bookkeeper -p 27017:27017 mongo
One that's running, other containers will be able to access it via:
- host.docker.internal:27017
You can run these tests like this:
$ python -m unittest -v demo/metadata_migration/notebooks/manual_test_bookkeeper.py
Reference: https://docs.python.org/3/library/unittest.html#basic-example
"""

mongo_client: Optional[MongoClient] = None
db: Optional[Database] = None

def setUp(self) -> None:
r"""
Connects to the MongoDB server and gets a reference to the database.
Note: This function runs before each test starts.
"""

# Connect to the MongoDB server and store a reference to the connection.
self.mongo_client = MongoClient(
host=MONGO_HOST,
username=MONGO_USER,
password=MONGO_PASS,
)
with timeout(MONGO_TIMEOUT_DURATION):
# Try connecting to the database server.
_ = self.mongo_client.server_info()
db = self.mongo_client[MONGO_DATABASE_NAME]

# Ensure the database contains no collections.
if len(db.list_collection_names()):
raise KeyError(f"Database is not empty: {MONGO_DATABASE_NAME}")

# Store a reference to the database.
self.db = db

def tearDown(self) -> None:
r"""
Drops all collections in the database and closes the connection to the MongoDB server.
Note: This function runs after each test finishes.
"""

# Drop all collections in the database.
for collection_name in self.db.list_collection_names():
self.db.drop_collection(collection_name)

# Close the connection to the server.
self.mongo_client.close()

def test_get_current_timestamp(self):
pattern = r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}$"
ts_str = Bookkeeper.get_current_timestamp()

# Verify the timestamp is a string having a valid format.
self.assertIsInstance(ts_str, str)
self.assertTrue(re.match(pattern, ts_str))

# Verify the moment represented by the timestamp was within the past minute.
ts = datetime.fromisoformat(ts_str)
now_ts = datetime.now()
time_difference = now_ts - ts
self.assertLess(time_difference, timedelta(minutes=1))

def test_init_method(self):
# Confirm the view does not exist yet.
view_name = "test_view"
self.assertFalse(view_name in self.db.list_collection_names())

# Instantiate the class-under-test.
_ = Bookkeeper(
mongo_client=self.mongo_client,
database_name=MONGO_DATABASE_NAME,
view_name=view_name,
)

# Confirm the view exists now.
self.assertTrue(view_name in self.db.list_collection_names())

def test_record_migration_event(self):
# Instantiate a bookkeeper.
bk = Bookkeeper(
mongo_client=self.mongo_client,
database_name=MONGO_DATABASE_NAME,
)

# Verify the collection is empty.
collection = self.db.get_collection(bk.collection_name)
self.assertTrue(collection.count_documents({}) == 0)

# Record a "migration started" event.
migrator = FakeMigrator()
bk.record_migration_event(
migrator=migrator, event=MigrationEvent.MIGRATION_STARTED
)

# Verify the migration event was recorded.
self.assertTrue(collection.count_documents({}) == 1)
doc = collection.find({})[0]
self.assertIsInstance(doc["created_at"], str)
self.assertIsInstance(doc["event"], str)
self.assertEqual(doc["event"], MigrationEvent.MIGRATION_STARTED)
self.assertIsInstance(doc["from_schema_version"], str)
self.assertEqual(doc["from_schema_version"], migrator.get_origin_version())
self.assertIsInstance(doc["to_schema_version"], str)
self.assertEqual(doc["to_schema_version"], migrator.get_destination_version())
self.assertIsInstance(doc["migrator_module"], str)

# Verify the document in the view says the schema version is `null`.
# Note: That's what I expect, since no "migration complete" events have been recorded yet.
view = self.db.get_collection(bk.view_name)
self.assertTrue(view.count_documents({}) == 1)
view_doc = view.find({})[0]
self.assertIsNone(view_doc["schema_version"])

# Record a "migration completed" event.
bk.record_migration_event(
migrator=migrator, event=MigrationEvent.MIGRATION_COMPLETED
)

# Verify the migration event was recorded.
self.assertTrue(collection.count_documents({}) == 2)

# Verify the document in the view says the schema version matches the one recorded.
view = self.db.get_collection(bk.view_name)
self.assertTrue(view.count_documents({}) == 1)
view_doc = view.find({})[0]
self.assertEqual(view_doc["schema_version"], migrator.get_destination_version())

# Finally, record another "migration started" event.
bk.record_migration_event(
migrator=migrator, event=MigrationEvent.MIGRATION_STARTED
)

# Confirm the document in the view once again says the schema version is `null`.
self.assertTrue(view.count_documents({}) == 1)
view_doc = view.find({})[0]
self.assertIsNone(view_doc["schema_version"])


if __name__ == "__main__":
unittest.main()
Loading

0 comments on commit 89fd282

Please sign in to comment.