# Migrate data from S3 to Redshift with Python

When interacting with AWS from a Jupyter notebook or python code, it is a good practise to store relevant data that allow to communicate with the cloud in a separate config file. In this tutorial, that file is called "dl.cfg" and is store in the same location as the current jupyter notebook.
The file contains three sections:
- AWS credentials (access key ID and secret access key) needed to programmatically access AWS
- names that will be used to create IAM role and IAM policy
- settings that will be used to create the Redshift cluster
- S3 location of the dataset to migrate into Redshift

As a first step, let's extract some of the above mentioned parameters from "dl.cfg" file.

In [37]:
import configparser

# Read AWS credentials from the config file
cfg_data = configparser.ConfigParser()
cfg_data.read('dl.cfg')    

# Save AWS credentials
access_key_id     = cfg_data["AWS"]["access_key_id"]
secret_access_key = cfg_data["AWS"]["secret_access_key"]

# Save IAM role and IAM policy data
role_name          = cfg_data["IAM"]["role_name"]
policy_name        = cfg_data["IAM"]["policy_name"]


In order to allow Redshift reading data from S3, an Identity Access Management (IAM) role should be created. This role will allow AWS services to be called on behalf of the user. AWS IAM services can be accessed by python SDK Boto3 using a specific client. In the code below, the following operations will be executed:
- define client to control IAM
- check if any role with the name defined in the config file already exists and (if it does) delete it
- create a new role destined to Redshift.

In [34]:
import boto3

# Create IAM client for region us-east-1 feeding AWS credentials extracted from the config.json file
iam = boto3.client(
    "iam",
    region_name = "us-east-1",
    aws_access_key_id = access_key_id,
    aws_secret_access_key = secret_access_key
)

# Try to delete the existing role with the same name, if exists
try:
    role = iam.get_role(
        RoleName = role_name
    )
    print("Role named '{}' already exists".format(role_name))

    # Extract all the attached policies to the existing role
    attached_policies = iam.list_attached_role_policies(
        RoleName = role_name
    )[
        "AttachedPolicies"
    ]


    # Iterate over all attached policies
    for attached_policy in attached_policies:

        # Extract attached policy ARN
        attached_policy_arn = attached_policy[
            "PolicyArn"
        ]

        # Detach policy from role
        iam.detach_role_policy(
            RoleName = role_name,
            PolicyArn = attached_policy_arn
        )

    # Delete role
    try:
        delete_role = iam.delete_role(
            RoleName = role_name
        )
        print("Role named '{}' has been deleted".format(role_name))

    except Exception as e:
        print(str(e))
        
except Exception as e:
    print(str(e))

# Create IAM role
try:
    role = iam.create_role(
        RoleName = role_name,
        Description = "Allows Redshift cluster to call AWS services on behalf of the user",
        AssumeRolePolicyDocument = json.dumps(
            {
                "Statement": [
                    {
                        "Action": "sts:AssumeRole",
                        "Effect": "Allow",
                        "Principal": {
                            "Service": "redshift.amazonaws.com"
                        }
                     }
                ],
                "Version": "2012-10-17"
            }
        )
    )
    print("Role named '{}' has been created".format(role_name))

except Exception as e:
    print(str(e))
 
# Extract role ARN
role_arn = iam.get_role(
    RoleName = role_name
)["Role"]["Arn"]
print("Role '{}'s ARN is: '{}'".format(role_name, role_arn))
    

Role named 'Redshift_access_S3bucket' already exists
Role named 'Redshift_access_S3bucket' has been deleted
Role named 'Redshift_access_S3bucket' has been created
Role 'Redshift_access_S3bucket's ARN is: 'arn:aws:iam::341370630698:role/Redshift_access_S3bucket'


An IAM role does not grant itself permission to access specific AWS services. What determines which specific services are accessible is defined by an IAM policy. IAM policies are written in JSON and basically consist of a list of statements; each statement defines one or more actions, an effect (Allow or Deny), and a resource which the statement is applied to.

In the code below, the following operations will be executed:
- check if a policy with the name defined in the config file already exists
- if a policy already exists, detach the policy from all the role it is attached to
- delete all versions of the policy (including the default version)
- create a new policy
- attach the policy to the role created above.

In [36]:
# Check if policy with the wanted name already exists
try:
    policies = iam.list_policies()["Policies"]
    policy_exists = False
    for policy in policies:
        if policy["PolicyName"] == policy_name:
            existing_policy_arn = policy["Arn"]
            policy_exists = True
            break          
except:
    None

# If a policy with the same name already exists, delete it
if policy_exists:
    print("Policy named '{}' already exists".format(policy_name))
    
    # Extract all roles
    roles = iam.list_roles()["Roles"]
    
    # Iterate over all the roles
    for role in roles:
        
        # Extract role name
        existing_role_name = role["RoleName"]
        
        # Extract all the attached policy to the role
        attached_policies = iam.list_attached_role_policies(
            RoleName = existing_role_name
        )["AttachedPolicies"]
        
        # Iterate over all the attached policies
        for attached_policy in attached_policies:

            # Extract attached policy ARN
            attached_policy_arn = attached_policy["PolicyArn"]

            # Checking if the policy correspond to the wanted one
            if attached_policy_arn == existing_policy_arn:
                
                # Detach policy from role
                iam.detach_role_policy(
                    RoleName = existing_role_name,
                    PolicyArn = attached_policy_arn
                )
                
                print("Policy with ARN '{}' detached from role '{}'".format(policy_arn, existing_role_name))
    
    # Extract all the policy versions
    policy_versions = iam.list_policy_versions(
        PolicyArn = existing_policy_arn
    )["Versions"]
    
    # Iterate over all the policy versions
    for policy_version in policy_versions:
        
        # Skip the version if it is a default version
        if policy_version["IsDefaultVersion"]:
            continue
          
        # Extract policy ID
        version_id = policy_version["VersionId"]
        
        # Delete policy version
        iam.delete_policy_version(
            PolicyArn = existing_policy_arn,
            VersionId = version_id
        )
        print("Policy with ARN '{}', version_ID '{}' deleted".format(existing_policy_arn, version_id))
    
    # Delete default version of the policy
    iam.delete_policy(
        PolicyArn = existing_policy_arn
    )
    print("Policy with ARN '{}' deleted".format(existing_policy_arn))
    
else:
    print("Policy named '{}' does not exists".format(policy_name))
 
# Create policy 
try:
    policy = iam.create_policy(
        PolicyName = policy_name,
        Description = "Allow to list and access content of the target bucket 'data-to-migrate'",
        PolicyDocument = json.dumps(
            {
                "Version": "2012-10-17",
                "Statement": [
                    {
                        "Effect": "Allow",
                        "Action": [
                            "s3:ListBucket"
                        ],
                        "Resource": [
                            "arn:aws:s3:::data-to-migrate"
                        ]
                    },
                    {
                        "Effect": "Allow",
                        "Action": [
                            "s3:PutObject",
                            "s3:GetObject",
                            "s3:DeleteObject"
                        ],
                        "Resource": [
                            "arn:aws:s3:::data-to-migrate/*"
                        ]
                    }
                ]
            }
        )
    )
    print("Policy named '{}' created".format(policy_name))
    policy_arn = policy["Policy"]["Arn"]
    print("Policy named '{}' has ARN '{}'".format(policy_name, policy_arn))
except Exception as e:
    print(str(e))

# Attach policy to IAM role
try:
    attachment = iam.attach_role_policy(
        RoleName = role_name,
        PolicyArn = policy_arn
    )
    print("Policy named '{}' attached to role '{}'".format(policy_name, role_name))
except Exception as e:
    print(str(e))



Policy named 'read_list_target_bucket' already exists
Policy with ARN 'arn:aws:iam::341370630698:policy/read_list_target_bucket' detached from role 'Redshift_access_S3bucket'
Policy with ARN 'arn:aws:iam::341370630698:policy/read_list_target_bucket' deleted
Policy named 'read_list_target_bucket' created
Policy named 'read_list_target_bucket' has ARN 'arn:aws:iam::341370630698:policy/read_list_target_bucket'
Policy named 'read_list_target_bucket' attached to role 'Redshift_access_S3bucket'


Similarly to what we have done for IAM, a Redshift client needs to be defined in order to control Redshift using python.

Define Redshift client

In [44]:
# Create Redshift client
redshift = boto3.client(
    "redshift",
    region_name = "us-east-1",
    aws_access_key_id = access_key_id,
    aws_secret_access_key = secret_access_key
)


Extract Redshift related parameters from the config file.

In [30]:
import configparser

# Read AWS credentials from the config file
cfg_data = configparser.ConfigParser()
cfg_data.read('dl.cfg') 

# Save Redshift cluster
cluster_identifier = cfg_data["Redshift"]["cluster_identifier"]
cluster_type       = cfg_data["Redshift"]["cluster_type"]
node_type          = cfg_data["Redshift"]["node_type"]
username           = cfg_data["Redshift"]["username"]
password           = cfg_data["Redshift"]["password"]
database_name      = cfg_data["Redshift"]["database_name"]
port               = cfg_data["Redshift"]["port"]


Since we cannot have more than one Redshift cluster with the same name, the following operations need to be performed to create a Redshift cluster:
- check if a Redshif cluster with the wanted name already exists
- if it does, delete it
- create a new cluster
- extract relevant cluster information.

In [50]:
# Delete Redshift cluster with the same name if it exists
   
try:
    # Delete Cluster
    redshift.delete_cluster(
        ClusterIdentifier = cluster_identifier,
        SkipFinalClusterSnapshot = True,
    )

    print("A cluster named '{}' already exists".format(cluster_identifier))
    print("Deleting existing cluster named '{}'...".format(cluster_identifier))


    # Wait for the cluster status change to deleted
    delete_waiter = redshift.get_waiter("cluster_deleted")
    delete_waiter.wait(
        ClusterIdentifier = cluster_identifier,
        WaiterConfig = {
            "Delay": 30,
            "MaxAttempts": 20
        }
    )

    print("Existing cluster named '{}' deleted".format(cluster_identifier))

except:
    print("A cluster named '{}' does not exist".format(cluster_identifier))  


# Create Redshift cluster
try:
    cluster = redshift.create_cluster(
        DBName = database_name, # (OPT) name of the first database to create when the cluster is created
        ClusterIdentifier = cluster_identifier, # (REQ) name of the cluster
        ClusterType = cluster_type, # (OPT) singlenode vs multinode
        NodeType = node_type, # (REQ) type of node
        MasterUsername = username, # (REQ) username
        MasterUserPassword = password, # (REQ) password
        Port = port, # port number on which the cluster accepts inbound connections
        IamRoles = [role_arn] # list of role Arns defining how Redshift can access other AWS services
    )

except Exception as e:
    print(e)

print("Creating new cluster named '{}'...".format(cluster_identifier))

# Wait for the new cluster status change to available
create_waiter = redshift.get_waiter("cluster_available")
create_waiter.wait(
        ClusterIdentifier = cluster_identifier,
        WaiterConfig = {
            "Delay": 30,
            "MaxAttempts": 20
        }
    )

print("New cluster named '{}' created and available").format(cluster_identifier)

# Extract cluster info
cluster_info = redshift.describe_clusters(
        ClusterIdentifier = cluster_identifier
    )["Clusters"][0]

cluster_endpoint = cluster_info["endpoint"]
vpc_security_group_id = cluster_info["vpc_security_group_id"]
print("Cluster '{}' endpoint is '{}'").format(cluster_identifier, cluster_endpoint)
print("Cluster '{}''s VPC security group ID is '{}'").format(cluster_identifier, vpc_security_group_id)


A cluster named my-redshift-cluster does not exist


In order to communicate with the database hosted on the Redshift cluster through Boto3, it is necessary to authorize an ingress through port 5439 into the cluster VPC security group.

In [47]:
# Set a VPC security group rule to allow a connection through port 5439

try:
    # Define EC2 resource
    ec2 = boto3.resource(
        "ec2",
        region_name = "us-east-1",
        aws_access_key_id = access_key_id,
        aws_secret_access_key = secret_access_key
    )

    # Extract security group for the VPC
    vpc_sg = ec2.SecurityGroup(id = vpc_security_group_id)
    
    # Authorize connection to the VPC
    vpc_sg.authorize_ingress(
        GroupName = vpc_sg.group_name,
        CidrIp = "0.0.0.0/0",
        IpProtocol = "TCP",
        FromPort = 5439,
        ToPort = 5439
    )
    print("Ingress to the VPC authorized")
    
except Exception as e:
    
    # Check if the error is a duplication error
    if "InvalidPermission.Duplicate" in str(e):
        print("Rule requested already exists")
    else:
        print(e)




Rule requested already exists


The library psycopg2 allows executing Postgres SQL queries on a database. In order to connect to the database, a connection string of the type "postgresql://MasterUsername:MasterUserPassword@ClusterEndpoint:DatabasePort,DatabaseName" is needed.

In [23]:
import psycopg2

%load_ext sql
conn_string = "postgresql://{}:{}@{}:{}/{}".format(
    "ldefra-user",
    "MyPassword2020",
    cluster_endpoint,
    5439,
    "dev"
)
%sql $conn_string


The sql extension is already loaded. To reload it, use:
  %reload_ext sql


'Connected: ldefra-user@dev'

In order to facilitate query execution on the Postgres database, let's create a function that establishes a connection to the database, execute the provided query and close the connection.

In [24]:
def execute_sql(sql_query, conn_string, print_results = False):
    """Execute a SQL query on the database associated with
       a connection string
    
    Parameters:
    - sql_query : str
        SQL query to execute
    - conn_string : str
        connection string of the format 'postgresql://MasterUsername:MasterUserPassword@ClusterEndpoint:DatabasePort,DatabaseName'
    - print_results : bool
        select if to print query results or not
    """
    
    # Connect to the database
    conn = psycopg2.connect(conn_string)
    
    # Define cursor
    cur = conn.cursor()
    
    # Execute query
    cur.execute(sql_query)
    conn.commit()
    if print_results:
        print(cur.fetchall()

    # Close cursor
    cur.close()
    
    # Close connection
    conn.close()
    

Before creating a table that can contain data coming from S3, these data should be explored to assess their structure.

From the exploration run below, the dataset is structured into 5 columns:
- petal length
- petal width
- sepal length
- sepal width
- species

The columns listed above can be transferred to a Postgres table using the following datatypes formats:
- petal length (NUMERIC)
- petal width (NUMERIC)
- sepal length (NUMERIC)
- sepal width (NUMERIC)
- species (VARCHAR)

In [38]:
# Define S3 client
s3 = boto3.client(
    "s3"
)

# Get object containing file to be staged
obj = s3.get_object(
    Bucket = "data-to-migrate",
    Key = "iris_dataset.csv"
)

import io
import pandas as pd

# Print colummns info for the dataset
pd.read_csv(io.BytesIO(obj["Body"].read())).info()


ClientError: An error occurred (InvalidAccessKeyId) when calling the GetObject operation: The AWS Access Key Id you provided does not exist in our records.

Based on the information of the source dataset, a new table can be created in database running on the Redshift cluster. The code below deletes a table if already exists, creates a new table and finally copy the data into the new table.

In [40]:
# Delete existing table named "iris"
sql_query = """DROP TABLE IF EXISTS iris"""
execute_sql(sql_query, conn_string)

print("Table deleted if existing")

# Create a new table named "iris"
sql_query = """CREATE TABLE IF NOT EXISTS iris
               (
               sepal_length NUMERIC,
               sepal_width NUMERIC,
               petal_length NUMERIC,
               petal_width NUMERIC,
               species VARCHAR
               )
            """
execute_sql(sql_query, conn_string)

print("Table created")

# Define S3 source file path
file_path = "s3://data-to-migrate/iris_dataset.csv"

# # Copy data
sql_query = """
    COPY iris
    FROM '{}'
    IAM_ROLE '{}' 
    csv
    IGNOREHEADER 1
    ;
""".format(file_path, role_arn)
execute_sql(sql_query, conn_string)

print("Data copied")



In [53]:
sql_query = """SELECT *
               FROM iris
               LIMIT 5
            """
execute_sql(
    sql_query,
    conn_string,
    True
)


[]


To conclude, we want to propose the code needed to delete the Redshift cluster. It is adviced to delete the Redshift cluster if not being used in order to minimize AWS cost.

In [53]:
try:
    # Delete Cluster
    redshift.delete_cluster(
        ClusterIdentifier = cluster_identifier,
        SkipFinalClusterSnapshot = True,
    )

    print("A cluster named {} exists".format(cluster_identifier))
    print("Deleting existing cluster named {}...".format(cluster_identifier))


    # Wait for the cluster status change to deleted
    delete_waiter = redshift.get_waiter("cluster_deleted")
    delete_waiter.wait(
        ClusterIdentifier = cluster_identifier,
        WaiterConfig = {
            "Delay": 30,
            "MaxAttempts": 20
        }
    )

    print("Cluster named {} deleted".format(cluster_identifier))

except:
    print("A cluster named {} does not exist".format(cluster_identifier))  
