## Schema translation

When we replicate topics from a source cluster using the DefaultReplicationPolicty in MM2, topic names have the `source.` prefix. 

We can only deserialize these topics if the schemas are registered under a subject name that follows the topic name because we use the `topic.name.strategy.topic` topic name strategy.

This notebook iterates over schemas in the target cluster (replicated from the source cluster) and register them under a new subject name that includes the `source.` prefix. 

### Check schema registry mode
Schema registry mode must be READWRITE to run this notebook.

In [1]:
import httpx

schema_registry_url = "http://sasquatch-source-schema-registry.sasquatch:8081"

async with httpx.AsyncClient() as client:

    response = await client.get(f"{schema_registry_url}/mode")

    if response.status_code == 200:  # Successful response
        print(response.text)
    else:
        print(f"Request failed with status code: {response.status_code}")

{"mode":"READWRITE"}


### Fetch subjects
Make a list of subjects

In [2]:
import httpx

schema_registry_url = "http://sasquatch-source-schema-registry.sasquatch:8081"

async with httpx.AsyncClient() as client:

    response = await client.get(f"{schema_registry_url}/subjects")

    if response.status_code == 200:  # Successful response
        subjects = [subject for subject in response.json() if subject.startswith("lsst.sal")]
    else:
        print(f"Request failed with status code: {response.status_code}")

print(f"Found {len(subjects)} subjects.")

Found 2396 subjects.


### Register schema under new subject name

In [4]:
import httpx

schema_registry_url = "http://sasquatch-source-schema-registry.sasquatch:8081"
headers = {"Content-Type": "application/vnd.schemaregistry.v1+json"}

async with httpx.AsyncClient() as client:
    for subject in subjects:
        # Get the latest version of the schema
        url = f"{schema_registry_url}/subjects/{subject}/versions/latest"

        response = await client.get(url)

        if response.status_code == 200:
            data = response.json()
        else:
            print(f"Failed to retrieve schema for {subject}. Status code: {response.status_code}")
            break

        schema = {'schema': data['schema']}

        # Check if a schema has already been registered under the new subject.
        new_subject = f"source.{subject}"

        url = f"{schema_registry_url}/subjects/{new_subject}"
        response = await client.post(url, headers=headers, json=schema)

        if response.status_code == 200:
            print(f"Schema already registered under {new_subject}.")
        else:
            # Register schema under new subject name
            url = f"{schema_registry_url}/subjects/{new_subject}/versions"
            response = await client.post(url, headers=headers, json=schema)

            if response.status_code == 200:
                print(f"Registered schema under new subject {new_subject} with {response.text}")
            else:
                print(f"Failed to register schema under subject {subject}. Status code: {response.status_code}")
                break


Registered schema under new subject source.lsst.sal.ATAOS.ackcmd-value with {"id":6480}
Registered schema under new subject source.lsst.sal.ATAOS.command_applyCorrection-value with {"id":6484}
Registered schema under new subject source.lsst.sal.ATAOS.command_applyFocusOffset-value with {"id":6487}
Registered schema under new subject source.lsst.sal.ATAOS.command_disable-value with {"id":6482}
Registered schema under new subject source.lsst.sal.ATAOS.command_disableCorrection-value with {"id":6481}
Registered schema under new subject source.lsst.sal.ATAOS.command_enable-value with {"id":6483}
Registered schema under new subject source.lsst.sal.ATAOS.command_enableCorrection-value with {"id":6491}
Registered schema under new subject source.lsst.sal.ATAOS.command_exitControl-value with {"id":6485}
Registered schema under new subject source.lsst.sal.ATAOS.command_offset-value with {"id":6490}
Registered schema under new subject source.lsst.sal.ATAOS.command_resetOffset-value with {"id":648