In [None]:
import boto3
import subprocess

# AWS Configuration
s3_bucket = 'your-s3-bucket'
pipeline_name = 'your-pipeline-name'

# Boto3 clients
s3_client = boto3.client('s3')
dp_client = boto3.client('datapipeline')
cloudwatch_client = boto3.client('cloudwatch')


# SQL Server Configuration
tables = ['table1', 'table2']  # List all table names
sql_server_connection = 'your-sql-connection-string'

def create_sql_scripts_for_large_table(table_name, chunk_size):
    scripts = []
    # Example: Assuming you have an ID column for chunking
    last_id = 0

    while True:
        script = f"""
        bcp "SELECT * FROM {table_name} WHERE id > {last_id} AND id <= {last_id + chunk_size}" queryout s3://{s3_bucket}/{table_name}_{last_id}.csv -c -t, -r\\n -U username -P password
        """
        script_path = f"/tmp/{table_name}_{last_id}.sql"
        
        with open(script_path, 'w') as file:
            file.write(script)
        scripts.append(script_path)
        last_id += chunk_size
        # Check if there are more records (you need to implement this function based on your needs)
        if not has_more_records(last_id):
            break
    return scripts

def upload_scripts_to_s3(script_paths, table_name):
    for script_path in script_paths:
        script_name = script_path.split('/')[-1]  # Extract the script name from the path
        s3_key = f'scripts/{table_name}/{script_name}'
        s3_client.upload_file(script_path, s3_bucket, s3_key)
        print(f"Uploaded {script_path} to s3://{s3_bucket}/{s3_key}")


def list_sql_files_in_s3(table_name):
    prefix = f'scripts/{table_name}/'
    response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=prefix)
    files = [obj['Key'] for obj in response.get('Contents', []) if obj['Key'].endswith('.sql')]
    # example full list output would be 
    # ['scripts/table1/table1_0.sql', 'scripts/table1/table1_1000.sql', 'scripts/table1/table1_2000.sql', ...]
    return files

def create_pipeline_definition(table_name):
    sql_files = list_sql_files_in_s3(table_name) # List all SQL files for the table which is the pattern table_name_chunkId.sql Ex: table1_0.sql, table1_1000.sql, ... 
    activities = []

    for file in sql_files:
        # file 'scripts/table1/table1_0.sql'
        chunk_id = file.split('_')[-1].split('.')[0]  # Extract chunkId from filename
        # file.split('_')[-1] would be '0.sql' and file.split('_')[-1].split('.')[0] would be '0'
        # chunk_id = 0 since the file is table1_0.sql abd chunkId is 0 
        activity = {
            "id": f"SqlTableCopyActivity_{chunk_id}",
            "name": f"SqlTableCopyActivity_{chunk_id}",
            "type": "SqlActivity",
            "fields": [
                {"key": "runsOn", "refValue": "Ec2Resource"},
                {"key": "type", "stringValue": "Copy"},
                {"key": "scriptUri", "stringValue": f"s3://{s3_bucket}/{file}"},
                {"key": "mySqlDatabase", "stringValue": "jdbc:mysql://your-db-endpoint"},
                {"key": "username", "stringValue": "your-db-username"},
                {"key": "password", "stringValue": "your-db-password"},
                {"key": "table", "stringValue": table_name}
            ]
        }
        activities.append(activity)

        
     pipeline_definition = {
        "objects": [
            {
                "id": "Default",
                "name": "Default",
                "fields": [
                    {"key": "failureAndRerunMode", "stringValue": "CASCADE"},
                    {"key": "role", "stringValue": "DataPipelineDefaultRole"},
                    {"key": "resourceRole", "stringValue": "DataPipelineDefaultResourceRole"},
                    {"key": "pipelineLogUri", "stringValue": "s3://your-log-bucket/"},
                    {"key": "scheduleType", "stringValue": "cron"}
                ]
            },
            {
                "id": "Ec2Resource",
                "name": "Ec2Resource",
                "type": "Ec2Resource",
                "fields": [
                    {"key": "instanceType", "stringValue": "m5.large"},
                    {"key": "imageId", "stringValue": "ami-0c55b159cbfafe1f0"}
                ]
            }
        ] + activities,
        "parameters": [
            {
                "id": "myTable",
                "type": "String",
                "description": "Table to copy"
            }
        ]
    }

    return pipeline_definition

def create_and_activate_pipeline(table_name):
    pipeline_definition = create_pipeline_definition(table_name)
    response = dp_client.create_pipeline(
        name=pipeline_name,
        uniqueId=f"{pipeline_name}-{table_name}"
    )
    pipeline_id = response['pipelineId']
    
    dp_client.put_pipeline_definition( # Put the pipeline definition with the activities and parameters 
        pipelineId=pipeline_id,
        pipelineObjects=pipeline_definition['objects'],
        parameterObjects=pipeline_definition['parameters']
    )
    
    dp_client.activate_pipeline(pipelineId=pipeline_id)
    print(f"Activated pipeline for table: {table_name}")
    # Here dp_client is the boto3 client for DataPipeline service 


def main():
    for table in tables:
        try:
            # Log start of migration
            print(f"Starting migration for table: {table}")

            # Create SQL scripts
            script_paths = create_sql_scripts_for_large_table(table, chunk_size=1000)
            print(f"Created {len(script_paths)} SQL scripts for table: {table}")

            # Upload scripts to S3
            upload_scripts_to_s3(script_paths, table)
            print(f"Uploaded SQL scripts to S3 for table: {table}")

            # Create and activate the pipeline
            create_and_activate_pipeline(table)
            print(f"Activated pipeline for table: {table}")

            print(f"Completed migration for table: {table}")

        except Exception as e:
            # Log any errors encountered
            print(f"Error occurred while migrating table {table}: {e}")

if __name__ == '__main__':
    main()

Modify create_sql_scripts_for_large_table Function
You need to add logic to track and fetch only the new or updated records. This requires maintaining a record of the last processed timestamp or primary key.

Changes Needed:

Track Last Processed Record: Store the last processed primary key or timestamp.
Fetch New Records: Adjust the SQL script to fetch records newer than the last processed timestamp.

In [None]:
# Compare the data sizes 

In [None]:
import boto3
import pyodbc
import pandas as pd

# AWS Configuration
s3_bucket = 'your-s3-bucket'

# SQL Server Configuration
sql_server_connection = 'your-sql-connection-string'
tables = ['table1', 'table2']

# Boto3 client
s3_client = boto3.client('s3')

def get_record_count_from_sql_server(table_name):
    """ Get record count from SQL Server """
    conn = pyodbc.connect(sql_server_connection)
    query = f"SELECT COUNT(*) FROM {table_name}"
    cursor = conn.cursor()
    cursor.execute(query)
    count = cursor.fetchone()[0]
    conn.close()
    return count

def get_record_count_from_s3(table_name):
    """ Get record count from S3 by summing up counts from all CSV files """
    s3_key_prefix = f'data/{table_name}/'
    response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=s3_key_prefix)
    total_count = 0
    
    if 'Contents' in response:
        for obj in response['Contents']:
            s3_key = obj['Key']
            if s3_key.endswith('.csv'):
                response = s3_client.get_object(Bucket=s3_bucket, Key=s3_key)
                df = pd.read_csv(response['Body'])
                total_count += len(df)
    
    return total_count

def compare_counts():
    """ Compare record counts between SQL Server and S3 """
    discrepancies = []
    
    for table in tables:
        try:
            # Get record counts
            sql_count = get_record_count_from_sql_server(table)
            s3_count = get_record_count_from_s3(table)
            
            # Compare counts
            if sql_count != s3_count:
                discrepancies.append({
                    'table': table,
                    'sql_count': sql_count,
                    's3_count': s3_count
                })
        
        except Exception as e:
            print(f"Error processing table {table}: {e}")
    
    return discrepancies

def main():
    discrepancies = compare_counts()
    
    if discrepancies:
        print("Discrepancies found:")
        for item in discrepancies:
            print(f"Table: {item['table']}, SQL Server Count: {item['sql_count']}, S3 Count: {item['s3_count']}")
    else:
        print("No discrepancies found. All counts match.")

if __name__ == '__main__':
    main()
