In [3]:
# Welcome to your new notebook
# Type here in the cell editor to add code!
!!pip install snowflake-connector-python

StatementMeta(, c3063245-6ce1-4245-bd35-5954747402f0, 5, Finished, Available, Finished)



# Setup stream for the Table / View / Dynamic table

This code creates the stream that stores the changes/deltas.

In [78]:
import pandas as pd
import snowflake.connector

# Your Snowflake credentials
SF_CONFIG = {
    "user": "Snowflake_User",
    "password": "Snowflake_password",
    "account": "abcd-xy12345.east-us-2.azure",  # e.g. abcd-xy12345.east-us-2.azure
    "warehouse": "Snowflake_warehouse",
    "database": "Snowflake_Database",
    "schema": "PUBLIC"
}


def get_object_type(conn, database, schema, object_name):
    """Returns the object type: TABLE, VIEW, MATERIALIZED VIEW, or DYNAMIC TABLE"""
    query = f"""
        SELECT  
        case 
        when TABLE_TYPE = 'VIEW' then 'VIEW' 
        when TABLE_TYPE = 'BASE TABLE' and IS_DYNAMIC = 'YES' then 'DYNAMIC TABLE' 
        else 'TABLE' end as TABLE_TYPE  FROM INFORMATION_SCHEMA.TABLES
        WHERE TABLE_SCHEMA = '{schema}'
          AND TABLE_NAME = '{object_name.upper()}'
    """
    with conn.cursor() as cur:
        cur.execute(query)
        result = cur.fetchone()
        return result[0] if result else None


def create_stream_if_supported(conn, database, schema, object_name, stream_name):
    """Create a stream if the object is a table or dynamic table"""
    object_type = get_object_type(conn, database, schema, object_name)
    if object_type in ('VIEW','TABLE', 'DYNAMIC TABLE'):
        fully_qualified_object = f'{schema}.{object_name}'
        fully_qualified_stream = f'{schema}.{stream_name}'
        stream_query = f"""
                CREATE OR REPLACE STREAM {fully_qualified_stream}
                ON {object_type} {fully_qualified_object};
            """
        display(stream_query)
        with conn.cursor() as cur:
            cur.execute(stream_query)
        print(f"✅ Created stream `{stream_name}` on `{object_name}` ({object_type})")
    else:
        print(f"⚠️ Skipped `{object_name}` ({object_type}) — streams not supported.")


def fetch_table_snapshot(conn, object_name, output_path):
    """Fetch full snapshot of a table/dynamic table and write to Parquet."""
    df = pd.read_sql(f'SELECT * FROM {object_name}', conn)
    df.to_parquet(output_path, index=False)
    print(f"📦 Snapshot written to {output_path}")
    return df


def fetch_stream_changes(conn, stream_name, output_path):
    """Fetch changes from stream and write to Parquet."""
    query = f"""
        SELECT *,
            CASE METADATA$ACTION
                WHEN 'INSERT' THEN 1
                WHEN 'DELETE' THEN 2
                WHEN 'UPDATE' THEN 1
                ELSE 1
            END AS __rowMarker__
        FROM {stream_name} order by metadata$row_id, METADATA$action;
    """
    df = pd.read_sql(query, conn)
    df.to_parquet(output_path, index=False)
    print(f"📦 Stream changes written to {output_path}")
    return df

my_client_id="<Service Principal Id>"
my_client_secret="<Secret>"
my_client_tenant="<Azure Tenant Id>"

landing_zone = "https://onelake.dfs.fabric.microsoft.com/<Workspace Id>/<Mirrored Database Id>/Files/LandingZone"

# Setup the OpenMirroring Client
client = OpenMirroringClient(
    client_id=my_client_id,
    client_secret=my_client_secret,
    client_tenant=my_client_tenant,
    host=landing_zone
)




StatementMeta(, c3063245-6ce1-4245-bd35-5954747402f0, 80, Finished, Available, Finished)

In [70]:
# ❗ Main logic — loop over multiple objects
if __name__ == "__main__":
    conn = snowflake.connector.connect(**SF_CONFIG)
    db = SF_CONFIG["database"]
    schema = SF_CONFIG["schema"]
    file_location ="/lakehouse/default/Files/"

    # These are the tables, views and dynamic tables that will be mirrored
    object_names = ['CUSTOMER_SMALL',  'view_customer_small','dynamic_table_customer_demo']  # List of table/view names

    for obj in object_names:
        stream_name = f"{obj}_STREAM"
        snapshot_file = f"{file_location}{obj}_snapshot.parquet"
        client.remove_table(schema_name=schema , table_name=obj)
        client.create_table(schema_name=schema , table_name=obj, key_cols=["C_CUSTKEY"])
        create_stream_if_supported(conn, db, schema, obj, stream_name)
        fetch_table_snapshot(conn, f'{schema}.{obj}', snapshot_file)
        client.upload_data_file(schema_name=schema , table_name=obj, local_file_path=snapshot_file)
    conn.close()


StatementMeta(, c3063245-6ce1-4245-bd35-5954747402f0, 72, Finished, Available, Finished)

Folder 'PUBLIC.schema/CUSTOMER_SMALL' deleted successfully.
Folder and _metadata.json created successfully at: PUBLIC.schema/CUSTOMER_SMALL


'\n                CREATE OR REPLACE STREAM PUBLIC.CUSTOMER_SMALL_STREAM\n                ON TABLE PUBLIC.CUSTOMER_SMALL;\n            '

📦 Snapshot written to /lakehouse/default/Files/CUSTOMER_SMALL_snapshot.parquet
File uploaded successfully as '_00000000000000000001.parquet'.
File renamed from _00000000000000000001.parquet to 00000000000000000001.parquet successfully.
File renamed successfully to '00000000000000000001.parquet'.
Folder 'PUBLIC.schema/view_customer_small' deleted successfully.
Folder and _metadata.json created successfully at: PUBLIC.schema/view_customer_small


'\n                CREATE OR REPLACE STREAM PUBLIC.view_customer_small_STREAM\n                ON VIEW PUBLIC.view_customer_small;\n            '

✅ Created stream `view_customer_small_STREAM` on `view_customer_small` (VIEW)
📦 Snapshot written to /lakehouse/default/Files/view_customer_small_snapshot.parquet
File uploaded successfully as '_00000000000000000001.parquet'.
File renamed from _00000000000000000001.parquet to 00000000000000000001.parquet successfully.
File renamed successfully to '00000000000000000001.parquet'.
Folder 'PUBLIC.schema/dynamic_table_customer_demo' deleted successfully.
Folder and _metadata.json created successfully at: PUBLIC.schema/dynamic_table_customer_demo


'\n                CREATE OR REPLACE STREAM PUBLIC.dynamic_table_customer_demo_STREAM\n                ON DYNAMIC TABLE PUBLIC.dynamic_table_customer_demo;\n            '

✅ Created stream `dynamic_table_customer_demo_STREAM` on `dynamic_table_customer_demo` (DYNAMIC TABLE)
📦 Snapshot written to /lakehouse/default/Files/dynamic_table_customer_demo_snapshot.parquet
File uploaded successfully as '_00000000000000000001.parquet'.
File renamed from _00000000000000000001.parquet to 00000000000000000001.parquet successfully.
File renamed successfully to '00000000000000000001.parquet'.


In [81]:
# ❗ Main logic — loop over multiple objects
# For the CDC, I drop and recreate the stream after I collect the changes.
# this keeps the streams small
if __name__ == "__main__":
    conn = snowflake.connector.connect(**SF_CONFIG)
    db = SF_CONFIG["database"]
    schema = SF_CONFIG["schema"]
    file_location ="/lakehouse/default/Files/"

    for obj in object_names:
        stream_name = f"{obj}_STREAM"
        snapshot_file = f"{file_location}{obj}_changes.parquet"
        fetch_stream_changes(conn, f'{schema}.{stream_name}', snapshot_file)
        create_stream_if_supported(conn, db, schema, obj, stream_name)
        client.upload_data_file(schema_name=schema , table_name=obj, local_file_path=snapshot_file)

    conn.close()

StatementMeta(, c3063245-6ce1-4245-bd35-5954747402f0, 83, Finished, Available, Finished)

  df = pd.read_sql(query, conn)


📦 Stream changes written to /lakehouse/default/Files/CUSTOMER_SMALL_changes.parquet


'\n                CREATE OR REPLACE STREAM PUBLIC.CUSTOMER_SMALL_STREAM\n                ON TABLE PUBLIC.CUSTOMER_SMALL;\n            '

✅ Created stream `CUSTOMER_SMALL_STREAM` on `CUSTOMER_SMALL` (TABLE)
File uploaded successfully as '_00000000000000000007.parquet'.
File renamed from _00000000000000000007.parquet to 00000000000000000007.parquet successfully.
File renamed successfully to '00000000000000000007.parquet'.
📦 Stream changes written to /lakehouse/default/Files/view_customer_small_changes.parquet


'\n                CREATE OR REPLACE STREAM PUBLIC.view_customer_small_STREAM\n                ON VIEW PUBLIC.view_customer_small;\n            '

✅ Created stream `view_customer_small_STREAM` on `view_customer_small` (VIEW)
File uploaded successfully as '_00000000000000000007.parquet'.
File renamed from _00000000000000000007.parquet to 00000000000000000007.parquet successfully.
File renamed successfully to '00000000000000000007.parquet'.
📦 Stream changes written to /lakehouse/default/Files/dynamic_table_customer_demo_changes.parquet


'\n                CREATE OR REPLACE STREAM PUBLIC.dynamic_table_customer_demo_STREAM\n                ON DYNAMIC TABLE PUBLIC.dynamic_table_customer_demo;\n            '

✅ Created stream `dynamic_table_customer_demo_STREAM` on `dynamic_table_customer_demo` (DYNAMIC TABLE)
File uploaded successfully as '_00000000000000000007.parquet'.
File renamed from _00000000000000000007.parquet to 00000000000000000007.parquet successfully.
File renamed successfully to '00000000000000000007.parquet'.


In [25]:
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

from azure.storage.filedatalake import DataLakeServiceClient
from azure.identity import ClientSecretCredential
import requests
import json
import os

class OpenMirroringClient:
    def __init__(self, client_id: str, client_secret: str, client_tenant: str, host: str):
        self.client_id = client_id
        self.client_secret = client_secret
        self.client_tenant = client_tenant
        self.host = self._normalize_path(host)
        self.service_client = self._create_service_client()

    def _normalize_path(self, path: str) -> str:
        """
        Normalizes the given path by removing the 'LandingZone' segment if it ends with it.

        :param path: The original path.
        :return: The normalized path.
        """
        if path.endswith("LandingZone"):
            # Remove the 'LandingZone' segment
            return path[:path.rfind("/LandingZone")]
        elif path.endswith("LandingZone/"):
            # Remove the 'LandingZone/' segment
            return path[:path.rfind("/LandingZone/")]
        return path

    def _create_service_client(self):
        """Creates and returns a DataLakeServiceClient."""
        try:
            credential = ClientSecretCredential(self.client_tenant, self.client_id, self.client_secret)            
            return DataLakeServiceClient(account_url=self.host, credential=credential)
        except Exception as e:
            raise Exception(f"Failed to create DataLakeServiceClient: {e}")

    def create_table(self, schema_name: str = None, table_name: str = "", key_cols: list = []):
        """
        Creates a folder in OneLake storage and a _metadata.json file inside it.

        :param schema_name: Optional schema name.
        :param table_name: Name of the table.
        :param key_cols: List of key column names.
        """
        if not table_name:
            raise ValueError("table_name cannot be empty.")

        # Construct the folder path
        folder_path = f"{schema_name}.schema/{table_name}" if schema_name else f"{table_name}"

        try:
            # Create the folder
            file_system_client = self.service_client.get_file_system_client(file_system="LandingZone")  # Replace with your file system name
            directory_client = file_system_client.get_directory_client(folder_path)
            directory_client.create_directory()

            # Create the _metadata.json file
            metadata_content = {"keyColumns": [f'{col}' for col in key_cols]}
            metadata_file_path = os.path.join(folder_path, "_metadata.json")
            file_client = directory_client.create_file("_metadata.json")
            file_client.append_data(data=json.dumps(metadata_content), offset=0, length=len(json.dumps(metadata_content)))
            file_client.flush_data(len(json.dumps(metadata_content)))

            print(f"Folder and _metadata.json created successfully at: {folder_path}")
        except Exception as e:
            raise Exception(f"Failed to create table: {e}")

    def remove_table(self, schema_name: str = None, table_name: str = "", remove_schema_folder: bool = False):
        """
        Deletes a folder in the OneLake storage.

        :param schema_name: Optional schema name.
        :param table_name: Name of the table.
        :param remove_schema_folder: If True, removes the schema folder as well.
        """
        if not table_name:
            raise ValueError("table_name cannot be empty.")

        # Construct the folder path
        folder_path = f"{schema_name}.schema/{table_name}" if schema_name else f"{table_name}"

        try:
            # Get the directory client
            file_system_client = self.service_client.get_file_system_client(file_system="LandingZone")  # Replace with your file system name
            directory_client = file_system_client.get_directory_client(folder_path)

            # Check if the folder exists
            if not directory_client.exists():
                print(f"Warning: Folder '{folder_path}' not found.")
                return

            # Delete the folder
            directory_client.delete_directory()
            print(f"Folder '{folder_path}' deleted successfully.")

            # Check if schema folder exists
            if remove_schema_folder and schema_name:
                schema_folder_path = f"{schema_name}.schema"
                schema_directory_client = file_system_client.get_directory_client(schema_folder_path)
                if schema_directory_client.exists():
                    schema_directory_client.delete_directory()
                    print(f"Schema folder '{schema_folder_path}' deleted successfully.")
                else:
                    print(f"Warning: Schema folder '{schema_folder_path}' not found.")
        except Exception as e:
            raise Exception(f"Failed to delete table: {e}")

    def get_next_file_name(self, schema_name: str = None, table_name: str = "") -> str:
        """
        Finds the next file name for a folder in OneLake storage.

        :param schema_name: Optional schema name.
        :param table_name: Name of the table.
        :return: The next file name padded to 20 digits.
        """
        if not table_name:
            raise ValueError("table_name cannot be empty.")

        # Construct the folder path
        folder_path = f"LandingZone/{schema_name}.schema/{table_name}" if schema_name else f"LandingZone/{table_name}"

        try:
            # Get the system client
            file_system_client = self.service_client.get_file_system_client(file_system=folder_path)

            # List all files in the folder
            file_list = file_system_client.get_paths(recursive=False)
            parquet_files = []

            for file in file_list:
                file_name = os.path.basename(file.name)
                if not file.is_directory and file_name.endswith(".parquet") and not file_name.startswith("_"):
                    # Validate the file name pattern
                    if not file_name[:-8].isdigit() or len(file_name[:-8]) != 20:  # Exclude ".parquet"
                        raise ValueError(f"Invalid file name pattern: {file_name}")
                    parquet_files.append(int(file_name[:-8]))

            # Determine the next file name
            if parquet_files:
                next_file_number = max(parquet_files) + 1
            else:
                next_file_number = 1

            # Return the next file name padded to 20 digits
            return f"{next_file_number:020}.parquet"

        except Exception as e:
            raise Exception(f"Failed to get next file name: {e}")

    def upload_data_file(self, schema_name: str = None, table_name: str = "", local_file_path: str = ""):
        """
        Uploads a file to OneLake storage.

        :param schema_name: Optional schema name.
        :param table_name: Name of the table.
        :param local_file_path: Path to the local file to be uploaded.
        """
        if not table_name:
            raise ValueError("table_name cannot be empty.")
        if not local_file_path or not os.path.isfile(local_file_path):
            raise ValueError("Invalid local file path.")

        # Construct the folder path
        folder_path = f"{schema_name}.schema/{table_name}" if schema_name else f"{table_name}"

        try:
            # Get the directory client
            file_system_client = self.service_client.get_file_system_client(file_system="LandingZone")  # Replace with your file system name
            directory_client = file_system_client.get_directory_client(folder_path)

            # Check if the folder exists
            if not directory_client.exists():
                raise FileNotFoundError(f"Folder '{folder_path}' not found.")

            # Get the next file name
            next_file_name = self.get_next_file_name(schema_name, table_name)

            # Add an underscore to the file name for temporary upload
            temp_file_name = f"_{next_file_name}"

            # Upload the file
            file_client = directory_client.create_file(temp_file_name)
            with open(local_file_path, "rb") as file_data:
                file_contents = file_data.read()
                file_client.append_data(data=file_contents, offset=0, length=len(file_contents))
                file_client.flush_data(len(file_contents))

            print(f"File uploaded successfully as '{temp_file_name}'.")
            
            # Python SDK doesn't handle rename properly for onelake, using REST API to rename the file instead
            self.rename_file_via_rest_api(f"LandingZone/{folder_path}", temp_file_name, next_file_name)
            print(f"File renamed successfully to '{next_file_name}'.")

        except Exception as e:
            raise Exception(f"Failed to upload data file: {e}")
        
    def rename_file_via_rest_api(self, folder_path: str, old_file_name: str, new_file_name: str):
        # Create a ClientSecretCredential
        credential = ClientSecretCredential(self.client_tenant, self.client_id, self.client_secret)            
        # Get a token
        token = credential.get_token("https://storage.azure.com/.default").token

        # Construct the rename URL
        rename_url = f"{self.host}/{folder_path}/{new_file_name}"

        # Construct the source path
        source_path = f"{self.host}/{folder_path}/{old_file_name}"

        # Set the headers
        headers = {
            "Authorization": f"Bearer {token}",
            "x-ms-rename-source": source_path,
            "x-ms-version": "2020-06-12"
        }

        # Send the rename request
        response = requests.put(rename_url, headers=headers)

        if response.status_code in [200, 201]:
            print(f"File renamed from {old_file_name} to {new_file_name} successfully.")
        else:
            print(f"Failed to rename file. Status code: {response.status_code}, Error: {response.text}")

    def get_mirrored_database_status(self):
        """
        Retrieves and displays the status of the mirrored database from Monitoring/replicator.json.

        :raises Exception: If the status file or path does not exist.
        """
        file_system_client = self.service_client.get_file_system_client(file_system="Monitoring")
        try:
            file_client = file_system_client.get_file_client("replicator.json")
            if not file_client.exists():
                raise Exception("No status of mirrored database has been found. Please check whether the mirrored database has been started properly.")

            download = file_client.download_file()
            content = download.readall()
            status_json = json.loads(content)
            print(json.dumps(status_json, indent=4))
        except Exception:
            raise Exception("No status of mirrored database has been found. Please check whether the mirrored database has been started properly.")

    def get_table_status(self, schema_name: str = None, table_name: str = None):
        """
        Retrieves and displays the status of tables from Monitoring/table.json.

        :param schema_name: Optional schema name to filter.
        :param table_name: Optional table name to filter.
        :raises Exception: If the status file or path does not exist.
        """
        file_system_client = self.service_client.get_file_system_client(file_system="Monitoring")
        try:
            file_client = file_system_client.get_file_client("tables.json")
            if not file_client.exists():
                raise Exception("No status of mirrored database has been found. Please check whether the mirrored database has been started properly.")

            download = file_client.download_file()
            content = download.readall()
            status_json = json.loads(content)

            # Treat None as empty string for filtering
            schema_name = schema_name or ""
            table_name = table_name or ""

            if not schema_name and not table_name:
                # Show the whole JSON content
                print(json.dumps(status_json, indent=4))
            else:
                # Filter tables array
                filtered_tables = [
                    t for t in status_json.get("tables", [])
                    if t.get("sourceSchemaName", "") == schema_name and t.get("sourceTableName", "") == table_name
                ]
                print(json.dumps({"tables": filtered_tables}, indent=4))
        except Exception:
            raise Exception("No status of mirrored database has been found. Please check whether the mirrored database has been started properly.")

StatementMeta(, c3063245-6ce1-4245-bd35-5954747402f0, 27, Finished, Available, Finished)