# Cross Cluster Replication APIs
In this notebook, we will go over the xCluster APIs in YBA with examples and the steps required to make those API calls. We assume that you have already two universes deployed in your YBA and their universe uuids are known.
## Get the API Token 
All xCluster replication YBA APIs are restricted to only authenticated users, so to use them through API, you first need to get the API token using the following code:

In [1]:
import requests
import os
from pprint import pprint

yba_url = os.getenv("YBA_URL", "http://localhost:9000")
yba_user = {
    "email": os.getenv("YBA_USER_EMAIL", "admin"),
    "password": os.getenv("YBA_USER_PASSWORD", "admin"),
}

route = f"{yba_url}/api/v1/api_login"
payload = {
    "email": yba_user["email"],
    "password": yba_user["password"],
}
response = requests.post(url=route, json=payload).json()
pprint(response)

customer_uuid = response["customerUUID"]
yba_api_token = response["apiToken"]
headers = {"X-AUTH-YW-API-TOKEN": yba_api_token}

{'apiToken': '3.a9190fe6-1067-409f-95cf-dbd96893c9c9.bfbedb28-41c6-48ea-9905-185ee554d84c',
 'apiTokenVersion': 12,
 'customerUUID': 'f33e3c9b-75ab-4c30-80ad-cba85646ea39',
 'userUUID': 'a9190fe6-1067-409f-95cf-dbd96893c9c9'}


Then you can use `customer_uuid` as a url parameter and pass the `yba_api_token` in the request header with name `X-AUTH-YW-API-TOKEN` to show that the user is authenticated.

## Get Storage Config UUID
XCluster uses backup/restore for replicating existing data on the source universe, and a storage config is required to store the backup and then restore from. To get the storage config uuid, use the following code:

In [2]:
route = f"{yba_url}/api/v1/customers/{customer_uuid}/configs"
response = requests.get(url=route, headers=headers).json()
storage_configs = list(filter(lambda config: config["type"] == "STORAGE", response))
if len(storage_configs) < 1:
    print("No storage config found")
    exit(-1)

storage_config_uuid = storage_configs[0]["configUUID"]
print(storage_config_uuid)

4c48883a-d310-4704-a656-db8104afc173


## List Tables on Source Universe
XCluster APIs are at the table granularity. You need to pass the list of tables you would like to replicate to the xCluster API request bodies.
Please note that although the xCluster APIs are at table granularity, but YBA only supports bootstrapping of YSQL tables with DB granularity, so if you would like to do an xCluster operation that requires bootstrapping of YSQL tables, you need to pass all the table UUIDs in a database.
The following is how you can get the list of the tables for a universe:

In [4]:
source_universe_uuid = os.getenv(
    "YBA_SOURCE_UNIVERSE_UUID", "33f4166a-685a-489d-9137-c9042cfb8616"
)
route = (f"{yba_url}/api/v1/customers/{customer_uuid}/universes/{source_universe_uuid}/tables"
         f"?includeParentTableInfo={str(False).lower()}&onlySupportedForXCluster={str(True).lower()}")
response = requests.get(url=route, headers=headers).json()
all_ysql_tables_uuid_list = [
    table["tableUUID"]
    for table in list(
        filter(lambda table: table["tableType"] == "PGSQL_TABLE_TYPE", response)
    )
]
pprint(all_ysql_tables_uuid_list)

['00004000-0000-3000-8000-000000004022',
 '00004000-0000-3000-8000-00000000401a',
 '00004000-0000-3000-8000-000000004012',
 '00004000-0000-3000-8000-00000000400a',
 '00004000-0000-3000-8000-000000004002']


## Waiting For XCluster Tasks
The xCluster APIs will create a task in the backend and returns a task uuid which you can follow to see the progress and the status of the task. You can use the following function to wait for a task:

In [5]:
import time

def waitForTask(task_uuid):
    route = f"{yba_url}/api/v1/customers/{customer_uuid}/tasks/{task_uuid}"
    while True:
        response = requests.get(url=route, headers=headers).json()
        status = response["status"]
        if status == "Failure":
            route = f"{yba_url}/api/customers/{customer_uuid}/tasks/{task_uuid}/failed"
            response = requests.get(url=route, headers=headers)
            if response is not None:
                response = response.json()
                if "failedSubTasks" in response:
                    errors = [
                        subtask["errorString"] for subtask in response["failedSubTasks"]
                    ]
                    print(f"Task {task_uuid} failed with the following errors:")
                    print("\n".join(errors))
                else:
                    pprint(response)
            else:
                print(
                    f"Task {task_uuid} failed, but could not get the failure messages"
                )
            exit(-1)
        elif status == "Success":
            print(f"Task {task_uuid} finished successfully")
            break
        print(f"Waiting for task {task_uuid}...")
        time.sleep(20)

## Creating XCluster Configs
Now we have all the required information to successfully create a cross cluster replication from universe `source_universe_uuid` to `target_universe_uuid`. `configType` can be either `Basic` or `Txn`. `Txn` provides transactional guarantees while replicating the data.
Please note that unless you have good reasons to skip bootstrapping, the list of tables in `.tables` and `.bootstrapParams.tables` should be the same.

In [6]:
target_universe_uuid = os.getenv(
    "YBA_TARGET_UNIVERSE_UUID", "21c5edb3-6676-4f15-8a52-94f987280823"
)
route = f"{yba_url}/api/v1/customers/{customer_uuid}/xcluster_configs"
payload = {
    "sourceUniverseUUID": source_universe_uuid,
    "targetUniverseUUID": target_universe_uuid,
    "name": "my-xcluster-config",
    "tables": all_ysql_tables_uuid_list,
    "configType": "Basic",  # It could be Basic or Txn.
    "bootstrapParams": {  # You can omit this field to forcefully avoid bootstrapping.
        "tables": all_ysql_tables_uuid_list,
        "allowBootstrap": True,
        "backupRequestParams": {"storageConfigUUID": storage_config_uuid},
    },
}
response = requests.post(url=route, json=payload, headers=headers).json()
pprint(response)
if "taskUUID" not in response:
    print(f"Failed to create the task: {response}")
    exit(-1)

waitForTask(response["taskUUID"])

xcluster_config_uuid = response["resourceUUID"]

{'resourceUUID': '8107542f-d17a-43bb-bd38-3ed0603d921e',
 'taskUUID': '5d09470f-dece-4acc-8a61-be5c4d4847f0'}
Waiting for task 5d09470f-dece-4acc-8a61-be5c4d4847f0...
Task 5d09470f-dece-4acc-8a61-be5c4d4847f0 finished successfully


## Getting XCluster Configs without DB Sync
You can get the xCluster config using its uuid for faster retrieval. See the following example.

In [7]:
# Get the xCluster config without syncing with the DB for faster response time
route = f"{yba_url}/api/v1/customers/{customer_uuid}/xcluster_configs/{xcluster_config_uuid}?syncWithDB={str(False).lower()}"
response = requests.get(url=route, headers=headers).json()
pprint(response)

{'createTime': '2024-11-13T15:38:12Z',
 'dbs': [],
 'imported': False,
 'lag': {'tserver_async_replication_lag_micros': {'data': [],
                                                  'directURLs': ['http://localhost:9090/graph?g0.expr=avg%28max+by+%28exported_instance%2C+saved_name%29%28%7Bnode_prefix%3D%22yb-admin-vbansal-1%22%2C+saved_name%3D%7E%22async_replication_sent_lag_micros%7Casync_replication_committed_lag_micros%22%2C+stream_id%3D%7E%22a6fe572813d440896d457eb0e8196594%7Cc3b52695987127b10e47b4f9f772c9c6%7Cdb6d3ccdee5d44b0664f8554a8117248%7Ca34442d4125df5a9b04e54633c14a055%7C1084476a8fcb2287a648c7abe714c9fa%22%7D%29%29+by+%28exported_instance%2C+saved_name%29+%2F+1000&g0.tab=0&g0.range_input=3600s&g0.end_input='],
                                                  'layout': {'title': 'Async '
                                                                      'Replication '
                                                                      'Lag',
                          

## Getting XCluster Configs with DB Sync
You can get the xCluster config Sync with source and target universe. See the following example.

In [8]:
# Get the xCluster config with syncing with the DB for the latest data
route = f"{yba_url}/api/v1/customers/{customer_uuid}/xcluster_configs/{xcluster_config_uuid}?syncWithDB={str(True).lower()}"
response = requests.get(url=route, headers=headers).json()
pprint(response)

{'createTime': '2024-11-13T15:38:12Z',
 'dbs': [],
 'imported': False,
 'lag': {'tserver_async_replication_lag_micros': {'data': [],
                                                  'directURLs': ['http://localhost:9090/graph?g0.expr=avg%28max+by+%28exported_instance%2C+saved_name%29%28%7Bnode_prefix%3D%22yb-admin-vbansal-1%22%2C+saved_name%3D%7E%22async_replication_sent_lag_micros%7Casync_replication_committed_lag_micros%22%2C+stream_id%3D%7E%22a6fe572813d440896d457eb0e8196594%7Cc3b52695987127b10e47b4f9f772c9c6%7Cdb6d3ccdee5d44b0664f8554a8117248%7Ca34442d4125df5a9b04e54633c14a055%7C1084476a8fcb2287a648c7abe714c9fa%22%7D%29%29+by+%28exported_instance%2C+saved_name%29+%2F+1000&g0.tab=0&g0.range_input=3600s&g0.end_input='],
                                                  'layout': {'title': 'Async '
                                                                      'Replication '
                                                                      'Lag',
                          

## Modifying Tables in XCluster Configs
You can add/remove tables to/from an existing xCluster config. This is useful when you would like to add new tables to your database after the replication is set up, or you would like to drop a table from your database.
Please note that to drop a table from your database, *first you need to remove that table from the xCluster config.*
To modify the tables in replication in an xCluster config, you need to pass the list of the tables that you would like to be in replication. In other words, you remove the table UUIDs that you do not want to be replicated, and add the new table uuids you want to replication. See the following example. 

In [9]:
# Remove tables.
route = f"{yba_url}/api/v1/customers/{customer_uuid}/xcluster_configs/{xcluster_config_uuid}"
payload = {
    "tables": all_ysql_tables_uuid_list[:-2],
}
response = requests.put(url=route, json=payload, headers=headers).json()
pprint(response)
if "taskUUID" not in response:
    print(f"Failed to create the task: {response}")
    exit(-1)

waitForTask(response["taskUUID"])

# Add tables.
route = f"{yba_url}/api/v1/customers/{customer_uuid}/xcluster_configs/{xcluster_config_uuid}"
payload = {
    "tables": all_ysql_tables_uuid_list,
    "bootstrapParams": {  # You can omit this field to forcefully avoid bootstrapping.
        "tables": all_ysql_tables_uuid_list,
        "allowBootstrap": True,
        "backupRequestParams": {"storageConfigUUID": storage_config_uuid},
    },
}
response = requests.put(url=route, json=payload, headers=headers).json()
pprint(response)
if "taskUUID" not in response:
    print(f"Failed to create the task: {response}")
    exit(-1)

waitForTask(response["taskUUID"])

{'resourceUUID': '8107542f-d17a-43bb-bd38-3ed0603d921e',
 'taskUUID': 'a430bed3-c406-45f0-a157-ba76bcf33d07'}
Waiting for task a430bed3-c406-45f0-a157-ba76bcf33d07...
Task a430bed3-c406-45f0-a157-ba76bcf33d07 finished successfully
{'resourceUUID': '8107542f-d17a-43bb-bd38-3ed0603d921e',
 'taskUUID': '017a5dc5-7c71-4a44-9eb2-0583164df331'}
Waiting for task 017a5dc5-7c71-4a44-9eb2-0583164df331...
Task 017a5dc5-7c71-4a44-9eb2-0583164df331 finished successfully


## Reconciling XCluster Configs with YBDB State
Sometimes, it is required to make changes to the replication group using yb-admin. In these cases, the corresponding xCluster config in YBA will not be automatically updated to learn about the yb-admin changes, and a manual synchronization call is required as follows:

In [10]:
route = (
    f"{yba_url}/api/v1/customers/{customer_uuid}/xcluster_configs/{xcluster_config_uuid}/sync"
)
response = requests.post(url=route, headers=headers).json()
if "taskUUID" not in response:
    print(f"Failed to create the task: {response}")
    exit(-1)

waitForTask(response["taskUUID"])

Waiting for task a7bcc007-0c00-4cdd-9e96-cd243641d78c...
Task a7bcc007-0c00-4cdd-9e96-cd243641d78c finished successfully


## Restarting XCluster Configs
The replication between two universes can break for various reasons including temporary network partitions. In these cases, after the issue is resolved, you can restart replication. You may also include index tables to the replication by restarting the replication for their main tables.

In [11]:
route = f"{yba_url}/api/v1/customers/{customer_uuid}/xcluster_configs/{xcluster_config_uuid}"
payload = {
    "tables": all_ysql_tables_uuid_list,
    "bootstrapParams": {
        "allowBootstrap": True,
        "backupRequestParams": {"storageConfigUUID": storage_config_uuid},
    },
}
response = requests.post(url=route, json=payload, headers=headers).json()
pprint(response)
if "taskUUID" not in response:
    print(f"Failed to create the task: {response}")
    exit(-1)

waitForTask(response["taskUUID"])

{'resourceUUID': '8107542f-d17a-43bb-bd38-3ed0603d921e',
 'taskUUID': '6ebb77ce-b641-401c-bfec-cb5303bd4d0c'}
Waiting for task 6ebb77ce-b641-401c-bfec-cb5303bd4d0c...
Waiting for task 6ebb77ce-b641-401c-bfec-cb5303bd4d0c...
Task 6ebb77ce-b641-401c-bfec-cb5303bd4d0c finished successfully


## Pausing/Resuming XCluster Configs
You can pause the replication for some time and then resume it. Please note that if the replication is paused for an extended period of time, a replication restart is required.

In [12]:
# Pause the replication.
route = f"{yba_url}/api/v1/customers/{customer_uuid}/xcluster_configs/{xcluster_config_uuid}"
payload = {"status": "Paused"}
response = requests.put(url=route, json=payload, headers=headers).json()
pprint(response)
if "taskUUID" not in response:
    print(f"Failed to create the task: {response}")
    exit(-1)

waitForTask(response["taskUUID"])

# Resume the replication.
route = f"{yba_url}/api/v1/customers/{customer_uuid}/xcluster_configs/{xcluster_config_uuid}"
payload = {"status": "Running"}
response = requests.put(url=route, json=payload, headers=headers).json()
pprint(response)
if "taskUUID" not in response:
    print(f"Failed to create the task: {response}")
    exit(-1)

waitForTask(response["taskUUID"])

{'resourceUUID': '8107542f-d17a-43bb-bd38-3ed0603d921e',
 'taskUUID': 'fb2df865-ab72-40e1-8b08-5b5fa36f1fef'}
Waiting for task fb2df865-ab72-40e1-8b08-5b5fa36f1fef...
Task fb2df865-ab72-40e1-8b08-5b5fa36f1fef finished successfully
{'resourceUUID': '8107542f-d17a-43bb-bd38-3ed0603d921e',
 'taskUUID': '4edf297f-80e1-4eeb-b58c-bad4e834af81'}
Waiting for task 4edf297f-80e1-4eeb-b58c-bad4e834af81...
Task 4edf297f-80e1-4eeb-b58c-bad4e834af81 finished successfully


## Deleting XCluster Configs
You can delete the xCluster config so there is no replication relation between the two universes. Please note that `isForceDelete` is useful when one of the universes is not available or there is an issue with the config. In those cases, you pass `True` and it will ignore errors and delete the config.

In [13]:
route = (f"{yba_url}/api/v1/customers/{customer_uuid}/xcluster_configs/{xcluster_config_uuid}"
         f"?isForceDelete={str(True).lower()}")
response = requests.delete(url=route, headers=headers).json()
pprint(response)
if "taskUUID" not in response:
    print(f"Failed to create the task: {response}")
    exit(-1)

waitForTask(response["taskUUID"])

{'resourceUUID': '8107542f-d17a-43bb-bd38-3ed0603d921e',
 'taskUUID': 'a91d307d-1973-4247-885c-2966b121c764'}
Waiting for task a91d307d-1973-4247-885c-2966b121c764...
Task a91d307d-1973-4247-885c-2966b121c764 finished successfully
