In [None]:
import os
from datetime import timedelta
# from unittest import TestCase

import inflect
# import pytest
import subprocess

from ska_dlm import CONFIG, data_item, dlm_ingest, dlm_migration, dlm_request, dlm_storage
from ska_dlm.dlm_db.db_access import DB
from ska_dlm.dlm_storage.main import persist_new_data_items
# from ska_dlm.exceptions import InvalidQueryParameters, ValueAlreadyInDB

Set-up services:

In [None]:
subprocess.run(['make', 'python-pre-test'], check=True)
!pgrep postgrest

In [None]:
def _clear_database():
    DB.delete(CONFIG.DLM.dlm_table)
    DB.delete(CONFIG.DLM.storage_config_table)
    DB.delete(CONFIG.DLM.storage_table)
    DB.delete(CONFIG.DLM.location_table)

Define storage location and configure r-clone:

In [None]:
# we need a location to register the storage:
location_id = dlm_storage.init_location("MyOwnStorage", "Server")
uuid = dlm_storage.init_storage(
    storage_name="MyDisk",
    location_id=location_id,
    storage_type="disk",
    storage_interface="posix",
    storage_capacity=100000000,
)
config = '{"name":"MyDisk","type":"local", "parameters":{}}'
dlm_storage.create_storage_config(uuid, config=config)
# configure rclone:
dlm_storage.rclone_config(config)

Intialize a new data_item:

In [None]:
def initialize():
    """Initialize 50 data entries?"""
    engine = inflect.engine()
    success = True
    for i in range(1, 51, 1):
        ordinal = engine.number_to_words(engine.ordinal(i))
        uid = dlm_ingest.init_data_item(f"this/is/the/{ordinal}/test/item")
        if uid is None:
            success = False
    if success:
        print("Success")

initialize()

Ingest a data item:

In [None]:
uid = dlm_ingest.ingest_data_item("/my/ingest/test/item", "/LICENSE", "MyDisk")
assert len(uid) == 36

Register a data item:

In [None]:
uid = dlm_ingest.register_data_item("/my/ingest/test/item2", "/LICENSE", "MyDisk")
assert len(uid) == 36

Query for all expired data_items:

In [None]:
# empty set:
result = dlm_request.query_expired() # our data item was assigned the default uid_expiration of now() + time '24:00'
success = len(result) == 0
print(result)
assert success

In [None]:
# returns results:
initialize()
offset = timedelta(days=1)
# query for items that are expired now or within the next 24 hours:
result = dlm_request.query_expired(offset)
success = len(result) != 0
print(result)
assert success

Initialize a new location:

In [None]:
"""initialisation on a (new?) location."""
# This returns an empty string if unsuccessful
dlm_storage.init_location("TestLocation", "SKAO Data Centre")
location = dlm_storage.query_location(location_name="TestLocation")[0]
assert location["location_type"] == "SKAO Data Centre"
print(location)

Update a data_item record with a file location, a state and a phase:

In [None]:
fname = "dlm_test_file_1.txt"
with open(fname, "w", encoding="UTF-8") as tfile:
    tfile.write("Welcome to the great DLM world!")
fpath = os.path.abspath("dlm_test_file.txt")
fpath = fpath.replace(f"{os.environ['HOME']}/", "")
uid = dlm_ingest.init_data_item(item_name="this/is/the/first/test/item")
storage_id = dlm_storage.query_storage(storage_name="MyDisk")[0]["storage_id"]
data_item.set_uri(uid, fpath, storage_id)
# check that the uri of the data item (with uid = uid) is equal to fpath
assert dlm_request.query_data_item(uid=uid)[0]["uri"] == fpath
print('Location:',dlm_request.query_data_item(uid=uid)[0]["uri"])

data_item.set_state(uid, "READY")
data_item.set_phase(uid, "PLASMA")
# query all data items with uid=uid, and check their state & phase is what we set it to be:
items = dlm_request.query_data_item(uid=uid)
assert len(items) == 1
assert items[0]["item_state"] == "READY"
print('State:',items[0]["item_state"])
assert items[0]["item_phase"] == "PLASMA"
print('Phase:',items[0]["item_phase"])
os.unlink(fname)

Delete the payload of a data_item:

In [None]:
fpath = "dlm_test_file_2.txt"
with open(fpath, "w", encoding="UTF-8") as tfile:
    tfile.write("Welcome to the great DLM world!")
# add the file to the database:
storage_id = dlm_storage.query_storage(storage_name="MyDisk")[0]["storage_id"]
uid = dlm_ingest.ingest_data_item(fpath)
queried_uid = dlm_request.query_data_item(item_name=fpath)[0]["uid"]
assert uid == queried_uid
dlm_storage.delete_data_item_payload(uid) # delete the payload
data_item.set_uri(uid, fpath, storage_id)
data_item.set_state(uid, "DELETED")
assert dlm_request.query_data_item(item_name=fpath)[0]["uri"] == fpath
assert dlm_request.query_data_item(item_name=fpath)[0]["item_state"] == "DELETED"
print('State:',dlm_request.query_data_item(item_name=fpath)[0]["item_state"])

Add a new location, storage and configuration to the rclone server:

In [None]:
def storage_config():
    location = dlm_storage.query_location("MyHost")
    if location:
        location_id = location[0]["location_id"]
    else:
        location_id = dlm_storage.init_location("MyHost", "Server")
    assert len(location_id) == 36
    config = '{"name":"MyDisk2","type":"local", "parameters":{}}'
    uuid = dlm_storage.init_storage(
        storage_name="MyDisk2",
        location_id=location_id,
        storage_type="disk",
        storage_interface="posix",
        storage_capacity=100000000,
    )
    assert len(uuid) == 36
    config_id = dlm_storage.create_storage_config(uuid, config=config)
    assert len(config_id) == 36
    # configure rclone
    assert dlm_storage.rclone_config(config) is True

storage_config()

Copy a test file from one storage to another:

In [None]:
"""Copy a test file from one storage to another."""
# storage_config()
dest_id = dlm_storage.query_storage("MyDisk2")[0]["storage_id"]
uid = dlm_ingest.register_data_item("/my/ingest/test/item2", "/LICENSE", "MyDisk")
assert len(uid) == 36
dlm_migration.copy_data_item(uid=uid, destination_id=dest_id, path="LICENSE_copy")
os.unlink("LICENSE_copy")

# ERROR: '/my/ingest/test/item2' is already registered on storage

Tear down services:

In [None]:
_clear_database()

In [None]:
!make python-post-test