# Migrating tables from a remote MySQL server to Redshift Serverless using Python

### **1. Prerequisites:**

* **Python Environment:** You need Python 3.6 or higher installed, preferably with virtual environments.
* **Required Libraries:** Install the following Python packages:
    * `mysql-connector-python`: For connecting to the MySQL server.
    * `psycopg2`: For connecting to Redshift Serverless.
    * `boto3`: For AWS interactions.
    * `tqdm` (optional): For progress bars.

    ```bash
    pip install mysql-connector-python psycopg2 boto3 tqdm
    ```
* **AWS Credentials:** Ensure you have the necessary AWS credentials configured (e.g., via environment variables, `~/.aws/credentials`, or an IAM role).
* **Redshift Serverless Setup:** You need a Redshift Serverless endpoint created and its connection details (workgroup name, database name, etc.).
* **MySQL Server Connection Details:** You need the hostname, username, password, and database name of the MySQL server.
* **Network Connectivity:** Ensure that your environment can connect to both the MySQL server and Redshift Serverless.
* **S3 Bucket:** An S3 bucket is required to temporarily store the data extracted from MySQL.

### **2. Understanding the Process:**

The migration process can be summarized as follows:

1. **Connect to MySQL:** Establish a connection to the remote MySQL server.
2. **Extract Data:** Query the desired tables from MySQL and retrieve the data.
3. **Convert to CSV:**  Transform the data into CSV format (or other suitable format like Parquet).
4. **Upload to S3:** Upload the generated CSV files to an S3 bucket.
5. **Connect to Redshift:** Connect to your Redshift Serverless environment.
6. **Create Tables in Redshift:** Create the corresponding tables in Redshift based on the MySQL schema.
7. **Copy Data from S3:** Use Redshift's `COPY` command to load the data from the S3 bucket into the Redshift tables.
8. **Clean Up (Optional):** Optionally, delete the temporary files from the S3 bucket.

### **3. Python Implementation:**

#### Configuration

In [32]:
# Configuration
MYSQL_HOST = "<ec2-external-ip>"
MYSQL_PORT = "<mysql-port>"
MYSQL_USER = "<database-user>"
MYSQL_PASSWORD = "<database-password>"
MYSQL_DATABASE = "<database-name>"
MYSQL_TABLE = "<table-name>"

REDSHIFT_SERVERLESS_WORKGROUP = "<workgroup-name>"  # e.g., "my-workgroup"
REDSHIFT_DATABASE = "<database-name-(default: 'dev')>"
REDSHIFT_USER = "<database-user>"
REDSHIFT_PASSWORD = "<database-password>"
AWS_REGION = "<aws-region>"  # e.g., "us-east-1"
S3_BUCKET = "<bucket-name>"
S3_FOLDER = "<folder-name>"  # Folder for storing temporary files

#### Installing dependencies

In [None]:
# !pip install mysql
# !pip install mysql.connector
# !pip install psycopg2
# !pip install redshift_connector

#### Defining functions

In [31]:
import mysql.connector
import psycopg2
import boto3
import csv
import os
from tqdm import tqdm
from io import StringIO  # Import StringIO

def get_redshift_endpoint():
    redshift_client = boto3.client('redshift-serverless', region_name=AWS_REGION)
    response = redshift_client.get_workgroup(workgroupName=REDSHIFT_SERVERLESS_WORKGROUP)
    endpoint = response['workgroup']['endpoint']['address']
    port = response['workgroup']['endpoint']['port']
    return f"{endpoint}:{port}"

def get_mysql_schema(cursor, table_name):
  """
  Retrieves the schema (column names and types) of a MySQL table.
  """
  cursor.execute(f"SHOW COLUMNS FROM `{table_name}`")
  columns = cursor.fetchall()
  schema = []
  for column in columns:
    schema.append({
        'name': column[0],
        'type': column[1],
        'is_nullable': column[2],
        'is_key': column[3]
    })
  return schema

def create_redshift_table_statement(schema, table_name):
    """
    Generates a CREATE TABLE statement for Redshift based on the MySQL schema.
    """
    columns = []
    for column in schema:
        redshift_type = convert_mysql_type_to_redshift(column['type'])
        if redshift_type:
            nullable_str = "NULL" if column['is_nullable'] == 'YES' else "NOT NULL"
            columns.append(f"    \"{column['name']}\" {redshift_type} {nullable_str}")
        else:
           print(f"Column {column['name']} has an unsupport data type {column['type']}, skipping")

    if not columns:
       return None # skip creation if no columns were converted
    statement = f"CREATE TABLE IF NOT EXISTS \"{table_name}\" (\n"
    statement += ",\n".join(columns)
    statement += "\n)"
    return statement

def convert_mysql_type_to_redshift(mysql_type):
    """
    Converts a MySQL data type to a corresponding Redshift data type.
    """
    mysql_type = mysql_type.lower()
    if 'int' in mysql_type:
        if 'tinyint' in mysql_type:
            return 'SMALLINT'
        elif 'smallint' in mysql_type:
            return 'SMALLINT'
        elif 'mediumint' in mysql_type:
            return 'INTEGER'
        elif 'bigint' in mysql_type:
            return 'BIGINT'
        else:
            return 'INTEGER'
    elif 'decimal' in mysql_type or 'numeric' in mysql_type:
      return 'DECIMAL'
    elif 'float' in mysql_type:
        return 'FLOAT'
    elif 'double' in mysql_type or 'real' in mysql_type:
      return 'DOUBLE PRECISION'
    elif 'varchar' in mysql_type or 'char' in mysql_type or 'text' in mysql_type:
        return 'VARCHAR(65535)' # Max varchar allowed in redshift is 65535
    elif 'date' in mysql_type:
      return 'DATE'
    elif 'datetime' in mysql_type or 'timestamp' in mysql_type:
        return 'TIMESTAMP'
    elif 'boolean' in mysql_type or 'tinyint(1)' in mysql_type:
      return 'BOOLEAN'
    # Add more type conversions as needed
    return None

def fetch_mysql_data(cursor, table_name):
    """Fetches all data from a MySQL table."""
    cursor.execute(f"SELECT * FROM `{table_name}`")
    return cursor.fetchall()

def upload_data_to_s3(s3_client, data, table_name):
    """Uploads CSV data to S3."""
    s3_file_path = f"{S3_FOLDER}/{table_name}.csv"
    csv_buffer = StringIO()
    csv_writer = csv.writer(csv_buffer, quoting=csv.QUOTE_NONNUMERIC) # CSV supports quoted values to handle commas within the data.
    csv_writer.writerows(data)
    try:
        s3_client.put_object(
            Bucket=S3_BUCKET,
            Key=s3_file_path,
            Body=csv_buffer.getvalue()
        )
    except Exception as e:
        print(f"Error uploading to S3: {e}")
        return False

    print(f"Data for '{table_name}' uploaded to s3://{S3_BUCKET}/{s3_file_path}")
    return True

def copy_data_to_redshift(conn, table_name, s3_file_path, redshift_user, redshift_password):
    """Copies data from S3 to Redshift using COPY command."""
    cursor = conn.cursor()
    try:
        copy_statement = f"""
          COPY "{table_name}" FROM 's3://{S3_BUCKET}/{s3_file_path}'
          CREDENTIALS 'aws_iam_role={get_iam_role()}'
          FORMAT AS CSV
          IGNOREHEADER 0
          QUOTE AS '"'
          REGION AS '{AWS_REGION}';
        """
        cursor.execute(copy_statement)
        conn.commit()
        print(f"Data copied to Redshift table '{table_name}'")
    except Exception as e:
      print(f"Error copying data to Redshift: {e}")

def get_iam_role():
    """
        Gets IAM role associated with this ec2 instance or lambda function
        Returns:
          role: The iam role ARN
    """
    iam_client = boto3.client('iam')
    try:
        # lambda_role
        role = boto3.client('sts').get_caller_identity().get('Arn')
        role_name = role.split('/')[-1]
        role = iam_client.get_role(RoleName=role_name)['Role']['Arn']

    except Exception as e:
        # ec2_role
        role = boto3.client('sts').get_caller_identity().get('Arn')
        role_name = role.split('/')[-2]
        role = iam_client.get_role(RoleName=role_name)['Role']['Arn']
    return role

def migrate_table(mysql_conn, redshift_conn, s3_client, table_name):
    """Migrates a single table from MySQL to Redshift."""
    mysql_cursor = mysql_conn.cursor()

    # Get MySQL table schema
    print(f"Fetching schema for table '{table_name}'")
    schema = get_mysql_schema(mysql_cursor, table_name)
    if not schema:
      print(f"Skipping table {table_name}, error during schema fetch")
      return

    # Create Redshift table
    create_table_statement = create_redshift_table_statement(schema, table_name)
    if not create_table_statement:
        print(f"Skipping table {table_name}, error during table creation statement generation")
        return
    try:
        redshift_cursor = redshift_conn.cursor()
        redshift_cursor.execute(create_table_statement)
        redshift_conn.commit()
        print(f"Redshift table '{table_name}' created successfully")
    except Exception as e:
      print(f"Error creating Redshift table: {e}")
      return

    # Fetch data from MySQL
    print(f"Fetching data for table '{table_name}'")
    data = fetch_mysql_data(mysql_cursor, table_name)
    if not data:
      print(f"Skipping table {table_name}, no data returned")
      return

    # Upload data to S3
    if not upload_data_to_s3(s3_client, data, table_name):
      print(f"Skipping table {table_name}, error during s3 upload")
      return

    # Copy data to Redshift
    s3_file_path = f"{S3_FOLDER}/{table_name}.csv"
    copy_data_to_redshift(redshift_conn, table_name, s3_file_path, REDSHIFT_USER, REDSHIFT_PASSWORD)

#### MySQL Connection

In [33]:
# Connect to MySQL
print("Connecting to MySQL...")
mysql_conn = mysql.connector.connect(
    host=MYSQL_HOST,
    port=MYSQL_PORT,
    user=MYSQL_USER,
    password=MYSQL_PASSWORD,
    database=MYSQL_DATABASE
)
print("Connected to MySQL successfully.")

Connecting to MySQL...
Connected to MySQL successfully.


#### Get Table Schema 

In [None]:
"""Migrates a single table from MySQL to Redshift."""
mysql_cursor = mysql_conn.cursor()

# Get MySQL table schema
print(f"Fetching schema for table '{MYSQL_TABLE}'")
schema = get_mysql_schema(mysql_cursor, MYSQL_TABLE)
if not schema:
  print(f"Skipping table {MYSQL_TABLE}, error during schema fetch")

#### Build "Create Table" Statement

In [40]:
# Create Redshift table
create_table_statement = create_redshift_table_statement(schema, MYSQL_TABLE)

print(create_table_statement)

CREATE TABLE IF NOT EXISTS "user" (
    "id" INTEGER NOT NULL,
    "mob_id" VARCHAR(65535) NOT NULL,
    "salutation" VARCHAR(65535) NULL,
    "first_name" VARCHAR(65535) NULL,
    "last_name" VARCHAR(65535) NULL,
    "email" VARCHAR(65535) NOT NULL,
    "temp_email" VARCHAR(65535) NULL,
    "password" VARCHAR(65535) NOT NULL,
    "phone" VARCHAR(65535) NULL,
    "phone2" VARCHAR(65535) NULL,
    "mfa_phone" VARCHAR(65535) NULL,
    "mfa_temp_phone" VARCHAR(65535) NULL,
    "mfa_phone_state" SMALLINT NOT NULL,
    "fax" VARCHAR(65535) NULL,
    "title" VARCHAR(65535) NULL,
    "reset_password" SMALLINT NULL,
    "current_project_id" INTEGER NULL,
    "primary_contact" SMALLINT NULL,
    "pic" VARCHAR(65535) NULL,
    "license_exp" DATE NULL,
    "company_id" INTEGER NULL,
    "address_id" INTEGER NULL,
    "reminder_key" VARCHAR(65535) NULL,
    "reminder_exp" DATE NULL,
    "iss_notification" SMALLINT NULL,
    "activation_dt" DATE NULL,
    "new_id" INTEGER NULL,
    "old_id" INTEGER

#### Fetch Redshift Endpoint Information

In [17]:
# Get redshift connection info
print("Fetching Redshift Serverless endpoint...")
redshift_endpoint = get_redshift_endpoint()
print(f"Redshift Serverless endpoint: {redshift_endpoint}")

Fetching Redshift Serverless endpoint...
Redshift Serverless endpoint: test-temp.654395624723.us-east-1.redshift-serverless.amazonaws.com:5439


#### Redshift Connection

In [None]:
import redshift_connector

redshift_conn = redshift_connector.connect(
    host=redshift_endpoint.split(':')[0],
    port=redshift_endpoint.split(':')[1],
    database=REDSHIFT_DATABASE,
    user=REDSHIFT_USER,
    password=REDSHIFT_PASSWORD,
    access_key_id='your secret key id',
    secret_access_key='your secret key',
)

# ALTERNATIVE METHOD:

# import psycopg2

# print("Connecting to Redshift Serverless...")
# redshift_conn = psycopg2.connect(
#     host=redshift_endpoint.split(':')[0],
#     port=redshift_endpoint.split(':')[1],
#     database=REDSHIFT_DATABASE,
#     user=REDSHIFT_USER,
#     password=REDSHIFT_PASSWORD
# )
# print("Connected to Redshift Serverless successfully.")

### Upload Tables to S3 and Migrate to Redshift

In [None]:
try:

    # Connect to S3
    s3_client = boto3.client('s3', region_name=AWS_REGION)

    # List tables from MySQL
    mysql_cursor = mysql_conn.cursor()
    mysql_cursor.execute("SHOW TABLES")
    tables = [table[0] for table in mysql_cursor.fetchall()]
    print(f"Tables found in MySQL: {tables}")

    # Migrate each table
    for table_name in tqdm(tables, desc="Migrating Tables"):
        print(f"Migrating table: {table_name}")
        migrate_table(mysql_conn, redshift_conn, s3_client, table_name)
        print(f"Finished migrating table: {table_name}")

    print("Migration completed successfully!")
    # cleanup S3 folder (Optional)
    # for table_name in tables:
    #     s3_file_path = f"{S3_FOLDER}/{table_name}.csv"
    #     s3_client.delete_object(Bucket=S3_BUCKET, Key=s3_file_path)

except Exception as e:
    print(f"An error occurred: {e}")

finally:
    if 'mysql_conn' in locals() and mysql_conn.is_connected():
       mysql_conn.close()
    if 'redshift_conn' in locals() and redshift_conn.closed == 0 :
        redshift_conn.close()
    print("Connections closed.")

**4. Explanation:**

*   **Configuration:** The code begins by setting up the necessary configuration variables for connecting to MySQL, Redshift Serverless, and S3.
*   **Connection Functions:** The code includes helper functions to connect to MySQL, retrieve schema, generate table create statement, convert type, fetch data, upload to S3, copy to redshift, and retrieve the iam role.
*   **Table Migration Loop:** The `main()` function iterates through the MySQL tables, calling the `migrate_table()` function for each one.
*   **`migrate_table()` Function:** This function orchestrates the migration of a single table by:
    *   Fetching the table schema from MySQL.
    *   Creating the corresponding table in Redshift based on the retrieved schema.
    *   Fetching the data from MySQL.
    *   Uploading the data as CSV files to S3.
    *   Copying the data from S3 to Redshift using the `COPY` command.
*  **Type Conversion:** The `convert_mysql_type_to_redshift` provides the mapping between mysql datatypes and redshift. More type conversions can be added here.
*   **Error Handling:** The code includes `try-except` blocks for error handling during connection, table creation, data extraction, S3 upload, and Redshift copy.
*   **IAM Role:** The `get_iam_role()` function helps to automatically fetch the IAM role attached to your instance or lambda functions for S3 access.
*  **Quoting:** CSV data is handled by quoting all values to ensure that commas within values doesn't create a parsing error.

**5. Important Considerations:**

*   **Large Tables:** For very large tables, consider using a more efficient approach, such as partitioning the data in MySQL and S3, and then loading it into Redshift using multiple `COPY` commands in parallel.
*   **Data Types:** Carefully check if the type mappings are complete and correct for your data.  You may need to expand the type conversion logic.
*   **Error Handling:** Implement more comprehensive error handling and logging.
*   **Security:** Securely handle your passwords and AWS credentials.
*   **Network Configuration:** Ensure the proper network security group/firewall settings are in place to allow connections between MySQL, the script execution environment, and Redshift.
*   **Performance Optimization:** Adjust batch sizes, connection parameters, and other performance settings as needed for your environment.
*   **Incremental Loads:** For incremental data loads, you will need to implement logic to detect new or updated data in your MySQL database. Consider using timestamps, change data capture, or other methods for tracking changes.
* **Schema Changes:** This script assumes the same schema for MySQL and Redshift. Consider implementing schema evolution logic if the schemas are not the same.
* **Key Handling:** Primary keys, foreign keys, and other key constraints will need to be migrated separately if required.
* **Quoting:** Consider if you have non-ascii characters or control characters in your data that need to be handled using advanced encoding techniques.

**6. How to Run the Code:**

1.  Replace placeholder values in the configuration section with your actual values.
2.  Save the code as a Python file (e.g., `mysql_to_redshift.py`).
3.  Run the script from your terminal:

    ```bash
    python mysql_to_redshift.py
    ```

This enhanced explanation and code example will help you migrate your tables from MySQL to Redshift Serverless. Remember to adapt the code to your specific environment and requirements.