# Database Replication Across Snowflake Accounts
## Complete Setup and Configuration Guide

This notebook provides a comprehensive checklist and guidance for configuring and deploying database replication across Snowflake accounts.

### What is Database Replication?
Database replication enables you to:
- Create read-only replicas of databases across Snowflake accounts
- Maintain data synchronization across regions and cloud platforms
- Support disaster recovery and high availability strategies
- Enable data sharing and distribution across your organization

### Replication Methods
Snowflake offers two approaches:
1. **Replication Groups** (Recommended): Modern approach supporting databases and multiple object types
2. **Database Replication**: Legacy approach for databases only

**Note**: This guide uses Replication Groups, which Snowflake recommends for all new implementations.

---
## Prerequisites Checklist

Before starting, verify you have:

### ‚úÖ Account Requirements
- [ ] Source and target accounts are in the same organization
- [ ] Organization administrator has enabled replication for both accounts
- [ ] You have ACCOUNTADMIN role or CREATE REPLICATION GROUP privilege
- [ ] For failover: Business Critical Edition or higher

### ‚úÖ Database Requirements
- [ ] Database is permanent or transient (not temporary)
- [ ] Database is not created from a share
- [ ] Database objects are supported for replication (see documentation)
- [ ] You have OWNERSHIP privilege on the database

### ‚úÖ Network Requirements
- [ ] Cross-region connectivity is available (if replicating across regions)
- [ ] Target accounts can receive replication traffic
- [ ] Tri-Secret Secure or PrivateLink configured in target if required for compliance

---
## Step 1: View Available Accounts

First, identify which accounts in your organization are enabled for replication.

In [None]:
import os
import snowflake.connector
import pandas as pd

conn = snowflake.connector.connect(
    connection_name=os.getenv("SNOWFLAKE_CONNECTION_NAME") or "mwaws"
)
cursor = conn.cursor()

cursor.execute("SHOW REPLICATION ACCOUNTS")
accounts_df = cursor.fetch_pandas_all()
print("\n=== Replication-Enabled Accounts ===")
print(accounts_df[['snowflake_region', 'account_name', 'organization_name']].to_string(index=False))
print(f"\nTotal accounts available: {len(accounts_df)}")

### üìã Checklist: Account Discovery
- [ ] All target accounts appear in the list above
- [ ] Note the organization_name and account_name for target accounts
- [ ] Verify regions support your replication requirements

---
## Step 2: Create a Replication Group

A replication group defines which objects to replicate and which accounts can receive them.

In [None]:
replication_group_name = "MY_REPLICATION_GROUP"
target_accounts = ["MYORG.ACCOUNT2", "MYORG.ACCOUNT3"]
databases_to_replicate = ["MYDB"]

print("Configuration:")
print(f"  Replication Group: {replication_group_name}")
print(f"  Target Accounts: {', '.join(target_accounts)}")
print(f"  Databases: {', '.join(databases_to_replicate)}")
print("\nReview the configuration above before executing the next cell.")

In [None]:
target_list = ", ".join(target_accounts)
create_sql = f"""
CREATE REPLICATION GROUP {replication_group_name}
  OBJECT_TYPES = DATABASES
  ALLOWED_DATABASES = {', '.join(databases_to_replicate)}
  ALLOWED_ACCOUNTS = {target_list}
  REPLICATION_SCHEDULE = '10 MINUTE'
"""

print("Executing SQL:")
print(create_sql)
cursor.execute(create_sql)
print("\n‚úÖ Replication group created successfully")

### üìã Checklist: Replication Group Creation
- [ ] Replication group created without errors
- [ ] Replication schedule configured (10 minutes in example)
- [ ] All target accounts are specified
- [ ] All databases to replicate are included

---
## Step 3: Enable Failover (Optional - Business Critical Only)

**Skip this step if:**
- You don't have Business Critical Edition or higher
- You only need read-only replication (no failover)

Failover allows promoting a secondary to primary in disaster recovery scenarios.

In [None]:
failover_accounts = ["MYORG.ACCOUNT2"]

for account in failover_accounts:
    sql = f"""
    ALTER REPLICATION GROUP {replication_group_name}
      SET OBJECT_TYPES = DATABASES, INTEGRATIONS, ROLES, USERS, WAREHOUSES
    """
    print(f"Converting to failover group for account: {account}")
    print(sql)

print("\nNote: Execute ALTER FAILOVER GROUP commands separately if needed")
print("This is an example - adjust object types based on your requirements")

### üìã Checklist: Failover Configuration
- [ ] Failover enabled for required accounts
- [ ] Object types for failover defined
- [ ] Documented which account can be promoted

---
## Step 4: View Replication Groups

Verify the replication group configuration.

In [None]:
cursor.execute("SHOW REPLICATION GROUPS")
groups_df = cursor.fetch_pandas_all()
print("\n=== Replication Groups ===")
if not groups_df.empty:
    print(groups_df[['name', 'type', 'region', 'allowed_accounts']].to_string(index=False))
else:
    print("No replication groups found")

cursor.execute("SHOW REPLICATION DATABASES")
dbs_df = cursor.fetch_pandas_all()
print("\n=== Replication Databases ===")
if not dbs_df.empty:
    print(dbs_df[['name', 'is_primary', 'primary', 'replication_allowed_to_accounts']].to_string(index=False))
else:
    print("No replication databases found")

### üìã Checklist: Verification
- [ ] Replication group appears in SHOW output
- [ ] Primary databases are listed
- [ ] Allowed accounts match your configuration
- [ ] Replication schedule is correct

---
## Step 5: Create Secondary Database in Target Account

**Execute this section in each TARGET account** to create replicas.

### Important Notes:
- Log into the target account before running
- Use the same database name as the primary (recommended)
- Match DATA_RETENTION_TIME_IN_DAYS if set on primary

In [None]:
print("\n=== TARGET ACCOUNT: Create Secondary Database ===")
print("\n‚ö†Ô∏è IMPORTANT: Run this in the TARGET account, not the source!\n")

primary_db_identifier = "MYORG.ACCOUNT1.MYDB"
secondary_db_name = "MYDB"
data_retention_days = 1

create_secondary_sql = f"""
CREATE DATABASE {secondary_db_name}
  AS REPLICA OF {primary_db_identifier}
  DATA_RETENTION_TIME_IN_DAYS = {data_retention_days}
"""

print("SQL to execute in target account:")
print(create_secondary_sql)
print("\nAfter creating, verify with: SHOW REPLICATION DATABASES;")

### üìã Checklist: Secondary Database Creation
- [ ] Secondary database created in each target account
- [ ] Database name matches primary (or documented difference)
- [ ] DATA_RETENTION_TIME_IN_DAYS matches primary setting
- [ ] Initial replication completed successfully

---
## Step 6: Refresh Secondary Database

**Execute in TARGET account** to manually trigger a refresh.

In [None]:
print("\n=== TARGET ACCOUNT: Refresh Secondary Database ===")
print("\n‚ö†Ô∏è Run this in the TARGET account\n")

secondary_db_name = "MYDB"

refresh_sql = f"ALTER DATABASE {secondary_db_name} REFRESH"

print("SQL to execute:")
print(refresh_sql)
print("\nFor large databases, consider increasing timeout:")
print("ALTER SESSION SET STATEMENT_TIMEOUT_IN_SECONDS = 604800;")

### üìã Checklist: Manual Refresh
- [ ] Refresh command executed successfully
- [ ] No timeout errors (increase timeout if needed)
- [ ] Data synchronized from primary

---
## Step 7: Monitor Replication Progress

Track the status of replication operations.

In [None]:
database_name = "MYDB"

progress_sql = f"""
SELECT 
    phase_name,
    result,
    start_time,
    end_time,
    progress,
    details
FROM TABLE(INFORMATION_SCHEMA.DATABASE_REFRESH_PROGRESS('{database_name}'))
ORDER BY start_time DESC
"""

print("=== Replication Progress ===")
cursor.execute(progress_sql)
progress_df = cursor.fetch_pandas_all()
if not progress_df.empty:
    print(progress_df.to_string(index=False))
else:
    print("No progress data available")

### üìã Checklist: Progress Monitoring
- [ ] All phases completed successfully
- [ ] No errors in result column
- [ ] End_time populated for all phases
- [ ] Progress shows 100% completion

---
## Step 8: View Replication History

Review past replication operations and data transfer volumes.

In [None]:
database_name = "MYDB"

history_sql = f"""
SELECT 
    start_time,
    end_time,
    phase_name,
    result,
    bytes_transferred,
    ROUND(bytes_transferred / 1024 / 1024 / 1024, 2) AS gb_transferred
FROM TABLE(INFORMATION_SCHEMA.DATABASE_REFRESH_HISTORY('{database_name}'))
ORDER BY start_time DESC
LIMIT 20
"""

print("=== Recent Replication History ===")
cursor.execute(history_sql)
history_df = cursor.fetch_pandas_all()
if not history_df.empty:
    print(history_df.to_string(index=False))
else:
    print("No history data available")

### üìã Checklist: History Review
- [ ] Historical refreshes completed successfully
- [ ] Data transfer volumes are as expected
- [ ] No recurring errors
- [ ] Refresh frequency matches requirements

---
## Step 9: Monitor Replication Costs

Track data transfer and compute costs for replication.

In [None]:
cost_sql = """
SELECT 
    TO_DATE(start_time) AS date,
    database_name,
    SUM(credits_used) AS credits_used,
    SUM(bytes_transferred) / 1024 / 1024 / 1024 AS gb_transferred
FROM SNOWFLAKE.ACCOUNT_USAGE.DATABASE_REPLICATION_USAGE_HISTORY
WHERE start_time >= DATEADD(month, -1, CURRENT_TIMESTAMP())
GROUP BY 1, 2
ORDER BY 1 DESC, 3 DESC
"""

print("=== Replication Costs (Last 30 Days) ===")
try:
    cursor.execute(cost_sql)
    cost_df = cursor.fetch_pandas_all()
    if not cost_df.empty:
        print(cost_df.to_string(index=False))
        print(f"\nTotal Credits: {cost_df['CREDITS_USED'].sum():.2f}")
        print(f"Total Data: {cost_df['GB_TRANSFERRED'].sum():.2f} GB")
    else:
        print("No cost data available")
except Exception as e:
    print(f"Note: ACCOUNT_USAGE data may not be immediately available: {e}")

### üìã Checklist: Cost Monitoring
- [ ] Replication costs are within budget
- [ ] No unexpected spikes in data transfer
- [ ] Credit usage trends are acceptable
- [ ] Cost monitoring alerts configured

---
## Step 10: Validate Data Consistency (Optional)

Use HASH_AGG to verify data matches between primary and secondary.

In [None]:
print("=== Data Consistency Validation ===")
print("""
To validate data consistency:

1. On SECONDARY database:
   - Get snapshot timestamp from DATABASE_REFRESH_PROGRESS
   - Run: SELECT HASH_AGG(*) FROM schema.table;

2. On PRIMARY database:
   - Run: SELECT HASH_AGG(*) FROM schema.table 
          AT(TIMESTAMP => 'snapshot_timestamp'::TIMESTAMP);

3. Compare hash values - they should match

Example:
""")

validation_example = """
-- Get snapshot timestamp (run on secondary)
SELECT PARSE_JSON(details)['snapshot_transaction_timestamp']
FROM TABLE(INFORMATION_SCHEMA.DATABASE_REFRESH_PROGRESS('MYDB'))
WHERE phase_name = 'PRIMARY_UPLOADING_DATA';

-- Compare hashes
-- Secondary: SELECT HASH_AGG(*) FROM mytable;
-- Primary:   SELECT HASH_AGG(*) FROM mytable AT(TIMESTAMP => '<timestamp>');
"""
print(validation_example)

### üìã Checklist: Data Validation
- [ ] Sample tables selected for validation
- [ ] Hash values match between primary and secondary
- [ ] Row counts verified
- [ ] Critical data validated

---
## Step 11: Setup Scheduled Refresh (Using Tasks)

**Execute in TARGET account** to automate secondary database refreshes.

### Important Notes:
- The replication group's REPLICATION_SCHEDULE handles automatic refreshes
- Use tasks only if you need custom scheduling
- Tasks run with the role that creates them

In [None]:
print("\n=== TARGET ACCOUNT: Setup Scheduled Refresh Task ===")
print("\nNote: Only needed if not using replication group's built-in schedule\n")

task_db = "ADMIN_DB"
task_schema = "PUBLIC"
task_name = "REFRESH_MYDB_TASK"
warehouse = "COMPUTE_WH"
db_to_refresh = "MYDB"
schedule = "10 MINUTE"
timeout_ms = 14400000

task_sql = f"""
USE DATABASE {task_db};
USE SCHEMA {task_schema};

CREATE TASK {task_name}
  WAREHOUSE = {warehouse}
  SCHEDULE = '{schedule}'
  USER_TASK_TIMEOUT_MS = {timeout_ms}
AS
  ALTER DATABASE {db_to_refresh} REFRESH;

ALTER TASK {task_name} RESUME;
"""

print("SQL to execute in target account:")
print(task_sql)
print("\nTask will refresh every", schedule)

### üìã Checklist: Task Configuration
- [ ] Task database and schema exist
- [ ] Warehouse specified and exists
- [ ] Schedule matches RPO requirements
- [ ] Timeout sufficient for database size
- [ ] Task created and resumed
- [ ] Task execution role has OWNERSHIP on database

---
## Step 12: Monitor Task Execution

**Execute in TARGET account** to verify task-based refreshes.

In [None]:
print("\n=== TARGET ACCOUNT: Monitor Task History ===")

task_history_sql = """
SELECT 
    name,
    state,
    scheduled_time,
    completed_time,
    DATEDIFF(second, scheduled_time, completed_time) AS duration_seconds,
    error_code,
    error_message
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
    task_name => 'REFRESH_MYDB_TASK',
    scheduled_time_range_start => DATEADD(hour, -24, CURRENT_TIMESTAMP())
))
ORDER BY scheduled_time DESC
"""

print("SQL to check task execution:")
print(task_history_sql)

### üìã Checklist: Task Monitoring
- [ ] Task executing on schedule
- [ ] No error codes in task history
- [ ] Duration within acceptable limits
- [ ] All scheduled runs completing successfully

---
## Troubleshooting Guide

### Common Issues and Solutions

#### Issue: "Account not enabled for replication"
**Solution:** Contact your organization administrator to enable replication for the account.

#### Issue: "Database cannot be replicated"
**Solution:** Verify:
- Database is not created from a share
- Database is permanent or transient
- No unsupported objects (check documentation)

#### Issue: "Timeout during initial replication"
**Solution:** Increase statement timeout:
```sql
ALTER SESSION SET STATEMENT_TIMEOUT_IN_SECONDS = 604800;
ALTER WAREHOUSE <wh_name> SET STATEMENT_TIMEOUT_IN_SECONDS = 604800;
```

#### Issue: "Replication schedule not working"
**Solution:** Check:
- Task is resumed (not suspended)
- Task role has OWNERSHIP on database
- No conflicts with manual refreshes

#### Issue: "High replication costs"
**Solution:**
- Review refresh frequency (increase interval if possible)
- Check for unnecessary data changes in source
- Monitor data transfer volumes in ACCOUNT_USAGE

#### Issue: "Replication group refresh fails"
**Solution:**
- Check DATABASE_REFRESH_PROGRESS for error details
- Verify all objects are supported for replication
- Ensure no dangling references

---
## Best Practices

### 1. Naming Conventions
- Use the same name for secondary databases as primary
- Name replication groups clearly (e.g., `PROD_DR_REPLICATION`)
- Document any naming differences

### 2. Scheduling
- Match refresh frequency to RPO requirements
- Avoid over-frequent refreshes (unnecessary cost)
- Use replication group schedules instead of tasks when possible

### 3. Monitoring
- Set up alerts for failed refreshes
- Monitor replication costs regularly
- Review ACCOUNT_USAGE views weekly
- Validate data consistency periodically

### 4. Security
- Enable Tri-Secret Secure in target accounts if required
- Configure PrivateLink if needed for compliance
- Replicate roles and users for consistent access control
- Document failover procedures

### 5. Performance
- Increase timeout for large initial replications
- Monitor refresh duration trends
- Optimize source database for replication
- Remove unnecessary objects from replication

### 6. Documentation
- Maintain list of replicated databases
- Document target accounts and purposes
- Keep failover procedures updated
- Track configuration changes

---
## Final Checklist

### ‚úÖ Configuration Complete
- [ ] All target accounts identified and enabled
- [ ] Replication group created with correct settings
- [ ] Secondary databases created in all target accounts
- [ ] Initial replication completed successfully
- [ ] Automatic refresh schedule configured
- [ ] Monitoring and alerting set up

### ‚úÖ Documentation Complete
- [ ] Replication architecture documented
- [ ] Failover procedures written
- [ ] Contact list for each account maintained
- [ ] Cost monitoring dashboard created

### ‚úÖ Testing Complete
- [ ] Manual refresh tested
- [ ] Automatic refresh verified
- [ ] Data consistency validated
- [ ] Failover tested (if applicable)

### ‚úÖ Operations Ready
- [ ] Team trained on replication monitoring
- [ ] Runbooks created for common issues
- [ ] Escalation procedures defined
- [ ] Regular review schedule established

---
## Additional Resources

### Snowflake Documentation
- [Database Replication Overview](https://docs.snowflake.com/en/user-guide/account-replication-intro)
- [Replication Groups](https://docs.snowflake.com/en/user-guide/account-replication-config)
- [Failover and Failback](https://docs.snowflake.com/en/user-guide/account-replication-failover-failback)
- [Replication Considerations](https://docs.snowflake.com/en/user-guide/account-replication-considerations)

### SQL Commands
- `CREATE REPLICATION GROUP`
- `ALTER REPLICATION GROUP`
- `CREATE DATABASE ... AS REPLICA OF`
- `ALTER DATABASE ... REFRESH`
- `SHOW REPLICATION ACCOUNTS`
- `SHOW REPLICATION DATABASES`

### Monitoring Views
- `INFORMATION_SCHEMA.DATABASE_REFRESH_PROGRESS()`
- `INFORMATION_SCHEMA.DATABASE_REFRESH_HISTORY()`
- `ACCOUNT_USAGE.DATABASE_REPLICATION_USAGE_HISTORY`

---
## Cleanup (Optional)

Use this section to remove replication configuration if needed.

In [None]:
print("\n=== Cleanup Commands ===")
print("\n‚ö†Ô∏è WARNING: These commands will remove replication configuration\n")

cleanup_sql = """
-- Stop task (if using tasks)
-- ALTER TASK REFRESH_MYDB_TASK SUSPEND;
-- DROP TASK REFRESH_MYDB_TASK;

-- Drop secondary database (run in target account)
-- DROP DATABASE MYDB;

-- Drop replication group (run in source account)
-- DROP REPLICATION GROUP MY_REPLICATION_GROUP;
"""

print(cleanup_sql)
print("\nUncomment and execute carefully!")

In [None]:
cursor.close()
conn.close()
print("Connection closed")