In [39]:
# | default_exp api


# Archivematica API client

> Tools to interact with the Archivematica REST API


In [40]:
# | hide
from nbdev.showdoc import *


In [41]:
# | export
import base64
import requests
import time
from requests.auth import HTTPBasicAuth
from typing import Optional, Dict
from tqdm import tqdm
from dotenv import load_dotenv
import os

In [42]:
# | export
class ArchivematicaAPIClient(object):
    """
    A client for interacting with Matica's API for file uploading, transferring, and ingesting processes.
    """

    TIME_SPAN = 2  # Time to wait in loops, in seconds

    def __init__(self, dashboard_url: str, dashboard_username: str, dashboard_api_key: str, storage_service_url: str, storage_service_username: str, storage_service_password: str):
        """
        Initializes the ArchivematicaAPIClient with necessary details for API interactions.

        Args:
            dashboard_url (str): URL of the dashboard.
            dashboard_username (str): Username for the dashboard.
            dashboard_api_key (str): API key for the dashboard.
            storage_service_url (str): URL of the storage service.
            storage_service_username (str): Username for the storage service.
            storage_service_password (str): Password for the storage service.
        """

        self.dashbord_url = dashboard_url
        self.dashboard_username = dashboard_username
        self.dashboard_api_key = dashboard_api_key

        self.dash_board_headers = {
            'Authorization': f'ApiKey {dashboard_username}:{dashboard_api_key}',
            "Content-Type": "application/json"
        }

        self.dash_board_endpoint = f"{dashboard_url}/api"

        self.storage_service_url = storage_service_url
        self.storage_username = storage_service_username
        self.storage_password = storage_service_password
    
    def v2beta_package(self, transfer_type: str, transfer_accession: str, location_uuid: str, path: str, name: str, processing_config: str) -> str:
        """
        Initiates a file transfer process.

        Args:
            transfer_type: The type of the transfer.
            transfer_accession: The accession number for the transfer.

        Returns:
            The directory associated with the transfer.
        """

        path = f"{location_uuid}:{path}"

        path_encoded = base64.b64encode(path.encode()).decode()
        data = {
            "name": name,
            "type": transfer_type,
            "accession": transfer_accession,
            "processing_config": processing_config,
            "path": path_encoded,
        }

        response = requests.post(f'{self.dashbord_url}/api/v2beta/package/', headers=self.dash_board_headers, json=data)

        self.transfer_UUID = response.json()["id"]
        return self.transfer_UUID
    
    def approve_transfer(self, transfer_type: str, directory: str) -> str:
        """
        Approves a file transfer after initiation.

        Args:
            transfer_type (str): The type of the transfer.
            directory (str): The directory associated with the transfer.

        Returns:
            Optional[str]: The UUID of the approved transfer, if available.
        """
        while True:
            data = {"type": transfer_type, "directory": directory}
            response = requests.post(f'{self.dashbord_url}/api/transfer/approve/', headers=self.dash_board_headers, data=data)
            if "uuid" in response.json():
                self.transfer_UUID = response.json()["uuid"]
                break
            time.sleep(self.TIME_SPAN)
        return self.transfer_UUID

    def check_transfer_status(self, transfer_UUID: str) -> str:
        """
        Checks the status of a file transfer until it is no longer processing.

        Args:
            transfer_UUID: The UUID of the transfer to check.

        Returns:
            The SIP UUID of the transfer.
        """
        while True:
            response = requests.get(f'{self.dashbord_url}/api/transfer/status/{transfer_UUID}', headers=self.dash_board_headers)
            r = response.json()
            if "status" in r and r["status"] != "PROCESSING":
                self.sip_uuid = r["sip_uuid"]
                break
            time.sleep(self.TIME_SPAN)
        return self.sip_uuid
    
    def transfer_delete(self, transfer_UUIDs: list) -> list:
        """
        Deletes the specified transfers.
        
        Args:
            transfer_UUIDs: The list of transfer UUIDs to delete.

        Returns:
            The results of the deletion.
        """
        results = []
        for transfer_UUID in transfer_UUIDs:
            response = requests.delete(f'{self.dashbord_url}/api/transfer/{transfer_UUID}/delete', headers=self.dash_board_headers)
            results.append(response.json())

        return results
    
    def transfer_completed(self) -> Dict:
        """
        Checks for completed transfers.

        Returns:
            The list of completed transfers.
        """

        response = requests.get(f'{self.dashbord_url}/api/transfer/completed', headers=self.dash_board_headers)

        return response.json()

    ###########

    def ingest(self, sip_UUID: str) -> str:
        """
        Waits for the ingestion process to complete and returns the UUID of the ingested AIP.

        Args:
            sip_UUID: The SIP UUID to ingest.

        Returns:
            The UUID of the ingested AIP.
        """
        while True:
            response = requests.get(f'{self.dash_board_endpoint}/ingest/status/{sip_UUID}', headers=self.dash_board_headers)
            r = response.json()
            if "status" in r and r["status"] != "PROCESSING":
                return r["uuid"]
            time.sleep(self.TIME_SPAN)

    def ingest_delete(self, ingest_UUIDs: list) -> list:
        """
        Deletes the specified ingestions.

        Args:
            ingest_UUIDs: The list of ingest UUIDs to delete.

        Returns:
            The results of the deletion.
        """
        results = []
        for ingest_UUID in ingest_UUIDs:
            response = requests.delete(f'{self.dash_board_endpoint}/ingest/{ingest_UUID}/delete', headers=self.dash_board_headers)
            results.append(response.json())

        return results

    def ingest_completed(self) -> Dict:
        """
        Checks for completed ingestions.

        Returns:
            The list of completed ingestions.
        """

        response = requests.get(f'{self.dash_board_endpoint}/ingest/completed', headers=self.dash_board_headers)

        return response.json()
    
    #####

    def v2_file(self, size:int = 20) -> list:
        """
        Retrieves a list of files.

        Args:
            size: The number of files to retrieve.

        Returns:
            The list of files.
        """
        response = requests.get(f'{self.storage_service_url}/api/v2/file/', auth=HTTPBasicAuth(self.storage_username, self.storage_password), params={"limit": size})
        return response.json()

    def v2_file_delete_aip(self, file_UUIDs: list, event_reason: str, pipeline: str, user_id: str, user_email: str) -> list:
        """
        Deletes the specified files.

        Args:
            file_UUIDs: The list of file UUIDs to delete.
            event_reason: The reason for the deletion.
            pipeline: The pipeline for the deletion.
            user_id: The user ID for the deletion.
            user_email: The user email for the deletion.

        Returns:
            The results of the deletion.
        """
        results = []
        for file_UUID in tqdm(file_UUIDs):
            response = requests.post(f'{self.storage_service_url}/api/v2/file/{file_UUID}/delete_aip/', auth=HTTPBasicAuth(self.storage_username, self.storage_password), json={
                "event_reason": event_reason,
                "pipeline": pipeline,
                "user_id"   : user_id,
                "user_email": user_email,

            })
            results.append(response.json())

        return results

    def v2_file_delete(self, file_UUIDs: list) -> list:
        """
        Deletes the specified files.

        Args:
            file_UUIDs: The list of file UUIDs to delete.

        Returns:
            The results of the deletion.
        """
        results = []
        for file_UUID in tqdm(file_UUIDs):
            response = requests.delete(f'{self.storage_service_url}/api/v2/file/{file_UUID}/contents', auth=HTTPBasicAuth(self.storage_username, self.storage_password))
            res = {
                "file_UUID": file_UUID,
            }
            if(response.status_code == 204):
                # results.append({"status": "success"})
                res["status"] = "success"
            else:
                # results.append({"status": "failed"})
                res["status"] = "failed"

            results.append(res)

        return results


    def get_aip_url(self, ingest_UUID: str) -> str:
        """
        Constructs and returns the URL for downloading the ingested AIP.

        Args:
            ingest_UUID: The UUID of the ingested AIP.

        Returns:
            The URL for downloading the AIP.
        """
        endpoint = self.storage_service_url + "/api"
        url = f"{endpoint}/v2/file/{ingest_UUID}/download/"
        return url
    
    def get_current_full_path(self, ingest_UUID: str) -> Optional[str]:
        """
        Retrieves the current full path of the ingested AIP by making an API call.

        Args:
            ingest_UUID (str): The UUID of the ingested AIP.

        Returns:
            Optional[str]: The current full path of the AIP if the request is successful, None otherwise.
        """
        endpoint = self.storage_service_url + "/api"
        url = f"{endpoint}/v2/file/{ingest_UUID}/"

        try:
            response = requests.get(url, auth=HTTPBasicAuth(self.storage_username, self.storage_password))
            response.raise_for_status()  # Raises an exception for 4XX or 5XX errors
            return response.json().get("current_full_path")
        except requests.RequestException as e:
            print(f"Error retrieving current full path: {e}")
            return None
        
    def check_status(self, transfer_UUID):
        """
        Checks the status of a file transfer until it is no longer processing.

        Args:
            transfer_UUID: The UUID of the transfer to check.

        Returns:
            The UUID of the ingested AIP.
        """
        print("transfer ...")
        sip_uuid = self.check_transfer_status(transfer_UUID)

        print("ingest ...")
        self.ingest(sip_uuid)

        return sip_uuid
        
    @staticmethod
    def main(transfer_type, transfer_accession, location_uuid, path, name, processing_config, env_path=None):
        """
        Main function to orchestrate the file transfer, ingestion, and deletion processes.

        Args:
            transfer_type: The type of the transfer.
            transfer_accession: The accession number for the transfer.
            location_uuid: The UUID of the location.
            path: The path to the file to transfer.
            name: The name of the file to transfer.
            processing_config: The processing configuration to use.
            env_path: The path to the environment variables file.

        Returns:
            The UUID of the ingested AIP.
        """
        if env_path:
            load_dotenv(override=True, dotenv_path=env_path)
        else:
            load_dotenv(override=True)

        dashboard_url = os.environ.get("DASHBOARD_URL")
        dashboard_username = os.environ.get("DASHBOARD_USERNAME")
        dashboard_api_key = os.environ.get("DASHBOARD_API_KEY")

        storage_service_url = os.environ.get("STORAGE_SERVICE_URL")
        storage_service_username = os.environ.get("STORAGE_SERVICE_USERNAME")
        storage_service_password = os.environ.get("STORAGE_SERVICE_PASSWORD")

        client = ArchivematicaAPIClient(dashboard_url, dashboard_username, dashboard_api_key, storage_service_url, storage_service_username, storage_service_password)

        transfer_UUID = client.v2beta_package(transfer_type, transfer_accession, location_uuid, path, name, processing_config)
        sip_uuid = client.check_status(transfer_UUID)

        return sip_uuid


In [43]:
show_doc(ArchivematicaAPIClient)

---

[source](https://github.com/nakamura196/archivematica_tools/blob/main/archivematica_tools/api.py#L15){target="_blank" style="float:right; font-size:smaller"}

### ArchivematicaAPIClient

>      ArchivematicaAPIClient (dashboard_url:str, dashboard_username:str,
>                              dashboard_api_key:str, storage_service_url:str,
>                              storage_service_username:str,
>                              storage_service_password:str)

A client for interacting with Matica's API for file uploading, transferring, and ingesting processes.

First import the `ArchivematicaAPIClient` class.

```python
from archivematica_tools.api import ArchivematicaAPIClient
```

To initialise the `ArchivematicaAPIClient` you need to provide the options.

`.env` file

```txt
DASHBOARD_URL=http://localhost:62080
DASHBOARD_USERNAME=test
DASHBOARD_API_KEY=test

STORAGE_SERVICE_URL=http://localhost:62081
STORAGE_SERVICE_USERNAME=test
STORAGE_SERVICE_PASSWORD=test
```

```python
# 環境変数を参照
dashboard_url = os.environ.get("DASHBOARD_URL")
dashboard_username = os.environ.get("DASHBOARD_USERNAME")
dashboard_api_key = os.environ.get("DASHBOARD_API_KEY")

storage_service_url = os.environ.get("STORAGE_SERVICE_URL")
storage_service_username = os.environ.get("STORAGE_SERVICE_USERNAME")
storage_service_password = os.environ.get("STORAGE_SERVICE_PASSWORD")

aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID")
aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY")
matica = ArchivematicaAPIClient(dashboard_url, dashboard_username, dashboard_api_key, storage_service_url, storage_service_username, storage_service_password)
```

# Transfer

Initiates a file transfer process.

In [44]:
show_doc(ArchivematicaAPIClient.v2beta_package)

---

[source](https://github.com/nakamura196/archivematica_tools/blob/main/archivematica_tools/api.py#L50){target="_blank" style="float:right; font-size:smaller"}

### ArchivematicaAPIClient.v2beta_package

>      ArchivematicaAPIClient.v2beta_package (transfer_type:str,
>                                             transfer_accession:str,
>                                             location_uuid:str, path:str,
>                                             name:str, processing_config:str)

Initiates a file transfer process.

Args:
    transfer_type: The type of the transfer.
    transfer_accession: The accession number for the transfer.

Returns:
    The directory associated with the transfer.

In [45]:
show_doc(ArchivematicaAPIClient.approve_transfer)

---

[source](https://github.com/nakamura196/archivematica_tools/blob/main/archivematica_tools/api.py#L78){target="_blank" style="float:right; font-size:smaller"}

### ArchivematicaAPIClient.approve_transfer

>      ArchivematicaAPIClient.approve_transfer (transfer_type:str,
>                                               directory:str)

Approves a file transfer after initiation.

Args:
    transfer_type (str): The type of the transfer.
    directory (str): The directory associated with the transfer.

Returns:
    Optional[str]: The UUID of the approved transfer, if available.

In [46]:
show_doc(ArchivematicaAPIClient.check_transfer_status)

---

[source](https://github.com/nakamura196/archivematica_tools/blob/main/archivematica_tools/api.py#L98){target="_blank" style="float:right; font-size:smaller"}

### ArchivematicaAPIClient.check_transfer_status

>      ArchivematicaAPIClient.check_transfer_status (transfer_UUID:str)

Checks the status of a file transfer until it is no longer processing.

Args:
    transfer_UUID: The UUID of the transfer to check.

Returns:
    The SIP UUID of the transfer.

In [47]:
show_doc(ArchivematicaAPIClient.transfer_delete)

---

[source](https://github.com/nakamura196/archivematica_tools/blob/main/archivematica_tools/api.py#L117){target="_blank" style="float:right; font-size:smaller"}

### ArchivematicaAPIClient.transfer_delete

>      ArchivematicaAPIClient.transfer_delete (transfer_UUIDs:list)

Deletes the specified transfers.

Args:
    transfer_UUIDs: The list of transfer UUIDs to delete.

Returns:
    The results of the deletion.

In [48]:
show_doc(ArchivematicaAPIClient.transfer_completed)

---

[source](https://github.com/nakamura196/archivematica_tools/blob/main/archivematica_tools/api.py#L134){target="_blank" style="float:right; font-size:smaller"}

### ArchivematicaAPIClient.transfer_completed

>      ArchivematicaAPIClient.transfer_completed ()

Checks for completed transfers.

Returns:
    The list of completed transfers.

# Ingest

In [49]:
show_doc(ArchivematicaAPIClient.ingest)

---

[source](https://github.com/nakamura196/archivematica_tools/blob/main/archivematica_tools/api.py#L148){target="_blank" style="float:right; font-size:smaller"}

### ArchivematicaAPIClient.ingest

>      ArchivematicaAPIClient.ingest (sip_UUID:str)

Waits for the ingestion process to complete and returns the UUID of the ingested AIP.

Args:
    sip_UUID: The SIP UUID to ingest.

Returns:
    The UUID of the ingested AIP.

In [50]:
show_doc(ArchivematicaAPIClient.ingest_delete)

---

[source](https://github.com/nakamura196/archivematica_tools/blob/main/archivematica_tools/api.py#L165){target="_blank" style="float:right; font-size:smaller"}

### ArchivematicaAPIClient.ingest_delete

>      ArchivematicaAPIClient.ingest_delete (ingest_UUIDs:list)

Deletes the specified ingestions.

Args:
    ingest_UUIDs: The list of ingest UUIDs to delete.

Returns:
    The results of the deletion.

In [51]:
show_doc(ArchivematicaAPIClient.ingest_completed)

---

[source](https://github.com/nakamura196/archivematica_tools/blob/main/archivematica_tools/api.py#L182){target="_blank" style="float:right; font-size:smaller"}

### ArchivematicaAPIClient.ingest_completed

>      ArchivematicaAPIClient.ingest_completed ()

Checks for completed ingestions.

Returns:
    The list of completed ingestions.

# Storage Service

In [52]:
show_doc(ArchivematicaAPIClient.v2_file)

---

[source](https://github.com/nakamura196/archivematica_tools/blob/main/archivematica_tools/api.py#L196){target="_blank" style="float:right; font-size:smaller"}

### ArchivematicaAPIClient.v2_file

>      ArchivematicaAPIClient.v2_file (size:int=20)

Retrieves a list of files.

Args:
    size: The number of files to retrieve.

Returns:
    The list of files.

In [53]:
show_doc(ArchivematicaAPIClient.v2_file_delete_aip)

---

[source](https://github.com/nakamura196/archivematica_tools/blob/main/archivematica_tools/api.py#L209){target="_blank" style="float:right; font-size:smaller"}

### ArchivematicaAPIClient.v2_file_delete_aip

>      ArchivematicaAPIClient.v2_file_delete_aip (file_UUIDs:list,
>                                                 event_reason:str,
>                                                 pipeline:str, user_id:str,
>                                                 user_email:str)

Deletes the specified files.

Args:
    file_UUIDs: The list of file UUIDs to delete.
    event_reason: The reason for the deletion.
    pipeline: The pipeline for the deletion.
    user_id: The user ID for the deletion.
    user_email: The user email for the deletion.

Returns:
    The results of the deletion.

In [54]:
show_doc(ArchivematicaAPIClient.v2_file_delete)

---

[source](https://github.com/nakamura196/archivematica_tools/blob/main/archivematica_tools/api.py#L236){target="_blank" style="float:right; font-size:smaller"}

### ArchivematicaAPIClient.v2_file_delete

>      ArchivematicaAPIClient.v2_file_delete (file_UUIDs:list)

Deletes the specified files.

Args:
    file_UUIDs: The list of file UUIDs to delete.

Returns:
    The results of the deletion.

In [55]:
show_doc(ArchivematicaAPIClient.get_aip_url)

---

[source](https://github.com/nakamura196/archivematica_tools/blob/main/archivematica_tools/api.py#L264){target="_blank" style="float:right; font-size:smaller"}

### ArchivematicaAPIClient.get_aip_url

>      ArchivematicaAPIClient.get_aip_url (ingest_UUID:str)

Constructs and returns the URL for downloading the ingested AIP.

Args:
    ingest_UUID: The UUID of the ingested AIP.

Returns:
    The URL for downloading the AIP.

In [56]:
show_doc(ArchivematicaAPIClient.get_current_full_path)

---

[source](https://github.com/nakamura196/archivematica_tools/blob/main/archivematica_tools/api.py#L278){target="_blank" style="float:right; font-size:smaller"}

### ArchivematicaAPIClient.get_current_full_path

>      ArchivematicaAPIClient.get_current_full_path (ingest_UUID:str)

Retrieves the current full path of the ingested AIP by making an API call.

Args:
    ingest_UUID (str): The UUID of the ingested AIP.

Returns:
    Optional[str]: The current full path of the AIP if the request is successful, None otherwise.

In [57]:
#| hide
import nbdev; nbdev.nbdev_export()