In [1]:
import io
import boto3
import sagemaker
import json
from sagemaker import get_execution_role
import os
from sklearn.datasets import *
import pandas as pd
from botocore.exceptions import ClientError
import awswrangler as wr
from datetime import date

In [2]:
# Get region
session = boto3.session.Session()
region_name = session.region_name

# Get SageMaker session & default S3 bucket
sagemaker_session = sagemaker.Session()
bucket = sagemaker_session.default_bucket()  # replace with your own bucket name if you have one
role = 'sagemaker_developer'
prefix = "data/tabular/california_housing"
filename = "california_housing.csv"

In [3]:
iam = boto3.client("iam")
sts = boto3.client("sts")
redshift = boto3.client("redshift")
sm = boto3.client("sagemaker")
s3 = sagemaker_session.boto_session.resource("s3")

In [4]:
role_name = role.split("/")[-1]
print("Your Role name used to create this notebook is: {}".format(role_name))

Your Role name used to create this notebook is: sagemaker_developer


In [6]:
# helper functions to upload data to s3
def write_to_s3(filename, bucket, prefix):
    # put one file in a separate folder. This is helpful if you read and prepare data with Athena
    filename_key = filename.split(".")[0]
    key = "{}/{}/{}".format(prefix, filename_key, filename)
    return s3.Bucket(bucket).upload_file(filename, key)


def upload_to_s3(bucket, prefix, filename):
    url = "s3://{}/{}/{}".format(bucket, prefix, filename)
    print("Writing to {}".format(url))
    write_to_s3(filename, bucket, prefix)

In [7]:
tabular_data = fetch_california_housing()
tabular_data_full = pd.DataFrame(tabular_data.data, columns=tabular_data.feature_names)
tabular_data_full["target"] = pd.DataFrame(tabular_data.target)
tabular_data_full.to_csv("california_housing.csv", index=False)

upload_to_s3(bucket, "data/tabular", filename)

Writing to s3://sagemaker-us-east-1-083839308414/data/tabular/california_housing.csv


In [9]:
assume_role_policy_doc = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {"Service": "redshift.amazonaws.com"},
            "Action": "sts:AssumeRole",
        }
    ],
}

In [10]:
# Create Role
iam_redshift_role_name = "Tabular_Redshift"
try:
    iam_role_redshift = iam.create_role(
        RoleName=iam_redshift_role_name,
        AssumeRolePolicyDocument=json.dumps(assume_role_policy_doc),
        Description="Tabular data Redshift Role",
    )
except ClientError as e:
    if e.response["Error"]["Code"] == "EntityAlreadyExists":
        print("Role already exists")
    else:
        print("Unexpected error: %s" % e)

Role already exists


In [11]:
# get role arn
role_rs = iam.get_role(RoleName="Tabular_Redshift")
iam_role_redshift_arn = role_rs["Role"]["Arn"]
print("Your Role arn used to create a Redshift Cluster is: {}".format(iam_role_redshift_arn))

Your Role arn used to create a Redshift Cluster is: arn:aws:iam::083839308414:role/Tabular_Redshift


In [12]:
# s3FullAccess
my_redshift_to_s3 = {
    "Version": "2012-10-17",
    "Statement": [{"Effect": "Allow", "Action": "s3:*", "Resource": "*"}],
}

In [14]:
# Athena Full Access
my_redshift_to_athena = {
    "Version": "2012-10-17",
    "Statement": [
        {"Effect": "Allow", "Action": ["athena:*"], "Resource": ["*"]},
        {
            "Effect": "Allow",
            "Action": [
                "glue:CreateDatabase",
                "glue:DeleteDatabase",
                "glue:GetDatabase",
                "glue:GetDatabases",
                "glue:UpdateDatabase",
                "glue:CreateTable",
                "glue:DeleteTable",
                "glue:BatchDeleteTable",
                "glue:UpdateTable",
                "glue:GetTable",
                "glue:GetTables",
                "glue:BatchCreatePartition",
                "glue:CreatePartition",
                "glue:DeletePartition",
                "glue:BatchDeletePartition",
                "glue:UpdatePartition",
                "glue:GetPartition",
                "glue:GetPartitions",
                "glue:BatchGetPartition",
            ],
            "Resource": ["*"],
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:ListMultipartUploadParts",
                "s3:AbortMultipartUpload",
                "s3:CreateBucket",
                "s3:PutObject",
            ],
            "Resource": ["arn:aws:s3:::aws-athena-query-results-*"],
        },
        {
            "Effect": "Allow",
            "Action": ["s3:GetObject", "s3:ListBucket"],
            "Resource": ["arn:aws:s3:::athena-examples*"],
        },
        {
            "Effect": "Allow",
            "Action": ["s3:ListBucket", "s3:GetBucketLocation", "s3:ListAllMyBuckets"],
            "Resource": ["*"],
        },
        {
            "Effect": "Allow",
            "Action": ["sns:ListTopics", "sns:GetTopicAttributes"],
            "Resource": ["*"],
        },
        {
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricAlarm",
                "cloudwatch:DescribeAlarms",
                "cloudwatch:DeleteAlarms",
            ],
            "Resource": ["*"],
        },
        {"Effect": "Allow", "Action": ["lakeformation:GetDataAccess"], "Resource": ["*"]},
    ],
}

In [15]:
try:
    policy_redshift_s3 = iam.create_policy(
        PolicyName="Tabular_RedshiftPolicyToS3", PolicyDocument=json.dumps(my_redshift_to_s3)
    )
    print("Policy created.")
except ClientError as e:
    if e.response["Error"]["Code"] == "EntityAlreadyExists":
        print("Policy already exists")
    else:
        print("Unexpected error: %s" % e)

account_id = sts.get_caller_identity()["Account"]
policy_redshift_s3_arn = f"arn:aws:iam::{account_id}:policy/Tabular_RedshiftPolicyToS3"

Policy already exists


In [16]:
try:
    policy_redshift_athena = iam.create_policy(
        PolicyName="Tabular_RedshiftPolicyToAthena",
        PolicyDocument=json.dumps(my_redshift_to_athena),
    )
    print("Policy created.")
except ClientError as e:
    if e.response["Error"]["Code"] == "EntityAlreadyExists":
        print("Policy already exists")
    else:
        print("Unexpected error: %s" % e)

account_id = sts.get_caller_identity()["Account"]
policy_redshift_athena_arn = f"arn:aws:iam::{account_id}:policy/Tabular_RedshiftPolicyToAthena"

Policy already exists


In [17]:
# Attach RedshiftPolicyToAthena policy
try:
    response = iam.attach_role_policy(
        PolicyArn=policy_redshift_athena_arn, RoleName=iam_redshift_role_name
    )
except ClientError as e:
    if e.response["Error"]["Code"] == "EntityAlreadyExists":
        print("Policy is already attached. This is ok.")
    else:
        print("Unexpected error: %s" % e)

In [18]:
# Attach RedshiftPolicyToS3 policy
try:
    response = iam.attach_role_policy(
        PolicyArn=policy_redshift_s3_arn, RoleName=iam_redshift_role_name
    )
except ClientError as e:
    if e.response["Error"]["Code"] == "EntityAlreadyExists":
        print("Policy is already attached. This is ok.")
    else:
        print("Unexpected error: %s" % e)

In [19]:
# making sure you have secret manager policy attached to role
try:
    policy = "SecretsManagerReadWrite"
    response = iam.attach_role_policy(
        PolicyArn="arn:aws:iam::aws:policy/{}".format(policy), RoleName=role_name
    )
    print("Policy %s has been succesfully attached to role: %s" % (policy, role_name))
except ClientError as e:
    if e.response["Error"]["Code"] == "EntityAlreadyExists":
        print("Policy is already attached.")
    else:
        print("Unexpected error: %s " % e)

Policy SecretsManagerReadWrite has been succesfully attached to role: sagemaker_developer


In [20]:
# making sure you have RedshiftFullAccess policy attached to role
from botocore.exceptions import ClientError

try:
    policy = "AmazonRedshiftFullAccess"
    response = iam.attach_role_policy(
        PolicyArn="arn:aws:iam::aws:policy/{}".format(policy), RoleName=role_name
    )
    print("Policy %s has been succesfully attached to role: %s" % (policy, role_name))
except ClientError as e:
    if e.response["Error"]["Code"] == "EntityAlreadyExists":
        print("Policy is already attached. ")
    else:
        print("Unexpected error: %s " % e)

Policy AmazonRedshiftFullAccess has been succesfully attached to role: sagemaker_developer


In [21]:
secretsmanager = boto3.client("secretsmanager")

try:
    response = secretsmanager.create_secret(
        Name="tabular_redshift_login",
        Description="California Housing data New Cluster Redshift Login",
        SecretString='[{"username":"awsuser"},{"password":"Californiahousing1"}]',
        Tags=[
            {"Key": "name", "Value": "tabular_redshift_login"},
        ],
    )
except ClientError as e:
    if e.response["Error"]["Code"] == "ResourceExistsException":
        print("Secret already exists. This is ok.")
    else:
        print("Unexpected error: %s" % e)

Secret already exists. This is ok.


In [22]:
# And retrieving the secret again
secretsmanager = boto3.client("secretsmanager")
import json

secret = secretsmanager.get_secret_value(SecretId="tabular_redshift_login")
cred = json.loads(secret["SecretString"])

master_user_name = cred[0]["username"]
master_user_pw = cred[1]["password"]

In [23]:
# Set up parameters
# Redshift configuration parameters
redshift_cluster_identifier = "redshiftdemo"
database_name = "california_housing"
cluster_type = "multi-node"
node_type = "dc2.large"
number_nodes = "2"

In [26]:
def create_redshift_security_group():
    ec2 = boto3.client('ec2', region_name='us-east-1',aws_access_key_id='AKIARHBJWKJ7PKABODEC'
                                   ,aws_secret_access_key='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')

    # Each region has a unique VPC.
    response = ec2.describe_vpcs()
    vpc_id = response.get('Vpcs', [{}])[0].get('VpcId', '')
    if not vpc_id:
        raise RuntimeError("You must create a VPC first!")

    port = int(5439)
    group_name = 'redshift-security-group'
    try:
        response = ec2.create_security_group(
            GroupName=group_name,
            Description='redshift security group',
            VpcId=vpc_id)
        security_group_id = response['GroupId']
        print(f"Security Group {security_group_id} Created in vpc {vpc_id}.")

        data = ec2.authorize_security_group_ingress(
            GroupId=security_group_id,
            IpPermissions=[
                {'IpProtocol': 'tcp',
                 'FromPort': port,
                 'ToPort': port,
                 'IpRanges': [{'CidrIp': '0.0.0.0/0'}]}
            ])
        print(f"Ingress Successfully Set {data}")
        return security_group_id
    except ClientError as e:
        if e.response['Error']['Code'] == 'InvalidGroup.Duplicate':
            response = ec2.describe_security_groups(
                Filters=[
                    dict(Name='group-name', Values=[group_name])
                ]
            )
            return response['SecurityGroups'][0]['GroupId']
        raise e


In [27]:
security_group_id=create_redshift_security_group()

In [28]:
response = redshift.create_cluster(
    DBName=database_name,
    ClusterIdentifier=redshift_cluster_identifier,
    ClusterType=cluster_type,
    NodeType=node_type,
    NumberOfNodes=int(number_nodes),
    MasterUsername=master_user_name,
    MasterUserPassword=master_user_pw,
    # ClusterSubnetGroupName="<cluster-subnet-group-1>",  # you can either specify an existing subnet group (change this to your Subnet Group name), or use the security group ID that was retrieved above
    IamRoles=[iam_role_redshift_arn],
    VpcSecurityGroupIds=[security_group_id],
    Port=5439,
    PubliclyAccessible=False,
)

print(response)

{'Cluster': {'ClusterIdentifier': 'redshiftdemo', 'NodeType': 'dc2.large', 'ClusterStatus': 'creating', 'ClusterAvailabilityStatus': 'Modifying', 'MasterUsername': 'awsuser', 'DBName': 'california_housing', 'AutomatedSnapshotRetentionPeriod': 1, 'ManualSnapshotRetentionPeriod': -1, 'ClusterSecurityGroups': [], 'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-06acad5291c25b090', 'Status': 'active'}], 'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0', 'ParameterApplyStatus': 'in-sync'}], 'ClusterSubnetGroupName': 'default', 'VpcId': 'vpc-0f7f95fdb82475396', 'PreferredMaintenanceWindow': 'fri:04:30-fri:05:00', 'PendingModifiedValues': {'MasterUserPassword': '****'}, 'ClusterVersion': '1.0', 'AllowVersionUpgrade': True, 'NumberOfNodes': 2, 'PubliclyAccessible': False, 'Encrypted': False, 'Tags': [], 'EnhancedVpcRouting': False, 'IamRoles': [{'IamRoleArn': 'arn:aws:iam::083839308414:role/Tabular_Redshift', 'ApplyStatus': 'adding'}], 'MaintenanceTrackName': 'current', 

In [None]:
# check cluster status
response = redshift.describe_clusters(ClusterIdentifier=redshift_cluster_identifier)
cluster_status = response["Clusters"][0]["ClusterStatus"]
print("Your Redshift Cluster Status is: " + cluster_status)

Your Redshift Cluster Status is: available


In [49]:
redshift_endpoint_address = response["Clusters"][0]["Endpoint"]["Address"]
iam_role = response["Clusters"][0]["IamRoles"][0]["IamRoleArn"]

print("Redshift endpoint: {}".format(redshift_endpoint_address))
print("IAM Role: {}".format(iam_role))

Redshift endpoint: redshiftdemo.cl3ngyue18f8.us-east-1.redshift.amazonaws.com
IAM Role: arn:aws:iam::083839308414:role/Tabular_Redshift


In [56]:
def get_redshift_cluster_endpoint():

    redshift_client = boto3.client('redshift', region_name='us-east-1',aws_access_key_id='AKIARHBJWKJ7PKABODEC'
                                   ,aws_secret_access_key='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')
    endpoint = redshift_client.describe_clusters(
        ClusterIdentifier='redshiftdemo')[
        'Clusters'][0]['Endpoint']
    return endpoint['Address'], endpoint['Port']


In [57]:
address, port =get_redshift_cluster_endpoint()
print(f"Connecting to Redshift cluster at {address}:{port} ...")


redshiftdemo.cl3ngyue18f8.us-east-1.redshift.amazonaws.com
5439


In [39]:
secretsmanager = boto3.client("secretsmanager")
secret = secretsmanager.get_secret_value(SecretId="tabular_redshift_login")
cred = json.loads(secret["SecretString"])

master_user_name = cred[0]["username"]
master_user_pw = cred[1]["password"]

In [44]:
redshift_cluster_identifier = "redshiftdemo"

database_name_redshift = "california_housing"
database_name_athena = "tabular_california_housing"

redshift_port = int(5439)

schema_redshift = "redshift"
schema_spectrum = "spectrum"

table_name_csv = "california_housing_athena"

In [64]:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import psycopg2

In [65]:
def connect_db():
    """Connects to the database."""
    address, port = get_redshift_cluster_endpoint()
    # connect to default database
    print(f"Connecting to Redshift cluster at {address}:{port} ...")
    conn = psycopg2.connect(
        f"host={address} "
        f"dbname={database_name_redshift} "
        f"user={master_user_name} "
        f"password={master_user_pw} "
        f"port={port}")
    print(conn)
    conn.set_session(autocommit=True)
    return conn.cursor(), conn

In [68]:
cur, conn = connect_db()


Connecting to Redshift cluster at redshiftdemo.cl3ngyue18f8.us-east-1.redshift.amazonaws.com:5439 ...
<connection object at 0x7f4d8c403c28; dsn: 'user=awsuser password=xxx dbname=california_housing host=redshiftdemo.cl3ngyue18f8.us-east-1.redshift.amazonaws.com port=5439', closed: 0>


In [58]:
# Connect to Redshift Database Engine
engine = create_engine(
    "postgresql://{}:{}@{}:{}/{}".format(
        master_user_name,
        master_user_pw,
        address,
        port,
        database_name_redshift,
    )
)

In [59]:
# config session
session = sessionmaker()
session.configure(bind=engine)
s = session()
print(iam_role)

arn:aws:iam::083839308414:role/Tabular_Redshift


Method 1: Access Data without Moving it to Redshift: Amazon Redshift Spectrum
Redshift Spectrum is used to query data directly from files on Amazon S3.You will need to create external tables in an external schema. The external schema references a database in the external data catalog and provides the IAM role ARN that authorizes your cluster to access Amazon S3 on your behalf.

Get table and schema information from the Glue Catalog: getting meta data from data catalog and connecting to the Athena database

In [69]:
statement = """
rollback;
create external schema if not exists {} from data catalog 
    database '{}' 
    iam_role '{}'
    create external database if not exists
""".format(
    schema_spectrum, database_name_athena, iam_role
)

s.execute(statement)
s.commit()

In [72]:
statement = """
select *
    from {}.{} limit 10
""".format(
    schema_spectrum, table_name_csv
)

df = pd.read_sql_query(statement, engine)
df.head(5)

Unnamed: 0,medinc,houseage,averooms,avebedrms,population,aveoccup,latitude,longitude,medvalue
0,8.3252,41.0,6.984127,1.02381,322.0,2.555556,37.88,-122.23,4.526
1,8.3014,21.0,6.238137,0.97188,2401.0,2.109842,37.86,-122.22,3.585
2,7.2574,52.0,8.288136,1.073446,496.0,2.80226,37.85,-122.24,3.521
3,5.6431,52.0,5.817352,1.073059,558.0,2.547945,37.85,-122.25,3.413
4,3.8462,52.0,6.281853,1.081081,565.0,2.181467,37.85,-122.25,3.422


Method 2: Loading Data into Redshift from Athena
To load data into Redshift, you need to either use COPY command or INSERT INTO command to move data into a table from data files. Copied files may reside in an S3 bucket, an EMR cluster, or on a remote host accessed.

Create and Upload Data into Athena Database

In [71]:
database_name = "tabular_california_housing"
table_name_csv = "california_housing_athena"

# SQL statement to execute
statement = """CREATE EXTERNAL TABLE IF NOT EXISTS {}.{}(
        MedInc double,
        HouseAge double,
        AveRooms double,
        AveBedrms double,
        Population double,
        AveOccup double,
        Latitude double,
        Longitude double, 
        MedValue double

) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n' LOCATION '{}'
TBLPROPERTIES ('skip.header.line.count'='1')""".format(
    database_name, table_name_csv, data_s3_path
)

# Execute statement using connection cursor
cursor = connect(region_name=region_name, s3_staging_dir=s3_staging_dir).cursor()
cursor.execute(statement)

NameError: name 'data_s3_path' is not defined

In [None]:
# verify the table has been created
statement = "SHOW TABLES in {}".format(database_name)
cursor.execute(statement)

df_show = as_pandas(cursor)
df_show.head(5)

In [74]:
# create schema
statement = """create schema if not exists {}""".format(schema_redshift)

s = session()
s.execute(statement)
s.commit()

In [None]:
table_name_redshift = table_name_csv + "_" + "redshift_insert"
statement = """
rollback;
create table if not exists redshift.{}(
        MedInc float,
        HouseAge float,
        AveRooms float,
        AveBedrms float,
        Population float,
        AveOccup float,
        Latitude float,
        Longitude float, 
        MedValue float)""".format(
    table_name_redshift
)

s.execute(statement)
s.commit()

In [None]:
table_name_redshift = table_name_csv + "_" + "redshift_insert"

statement = """
    insert into redshift.{}
        select * from {}.{}             
    """.format(
    table_name_redshift, schema_spectrum, table_name_csv
)
s.execute(statement)
s.commit()

In [None]:
statement = """
     select * from redshift.{} limit 10
""".format(
    table_name_redshift
)
df = pd.read_sql_query(statement, engine)
df.head(5)

Method 3: Copy data directly from S3
You can also Copy Data into a new table. https://docs.aws.amazon.com/redshift/latest/dg/tutorial-loading-run-copy.html

Create a new Schema in Redshift

In [75]:
# create a new sample table
table_name_redshift = table_name_csv + "_" + "redshift_copy"
statement = """
rollback;
create table if not exists redshift.{}(
        MedInc float,
        HouseAge float,
        AveRooms float,
        AveBedrms float,
        Population float,
        AveOccup float,
        Latitude float,
        Longitude float, 
        MedValue float)""".format(
    table_name_redshift
)

s.execute(statement)
s.commit()

In [76]:
table_name_redshift = table_name_csv + "_" + "redshift_copy"
data_s3_path = "s3://{}/data/tabular/california_housing/california_housing.csv".format(bucket)
statement = """
rollback;
copy redshift.{}  
  from '{}'
  iam_role '{}'
  csv
  ignoreheader 1
    """.format(
    table_name_redshift, data_s3_path, iam_role
)
s.execute(statement)
s.commit()

In [77]:
statement = """
     select * from redshift.{} limit 10
""".format(
    table_name_redshift
)
df_copy = pd.read_sql_query(statement, engine)
df_copy.head(5)

Unnamed: 0,medinc,houseage,averooms,avebedrms,population,aveoccup,latitude,longitude,medvalue
0,8.3252,41.0,6.984127,1.02381,322.0,2.555556,37.88,-122.23,4.526
1,8.3014,21.0,6.238137,0.97188,2401.0,2.109842,37.86,-122.22,3.585
2,7.2574,52.0,8.288136,1.073446,496.0,2.80226,37.85,-122.24,3.521
3,5.6431,52.0,5.817352,1.073059,558.0,2.547945,37.85,-122.25,3.413
4,3.8462,52.0,6.281853,1.081081,565.0,2.181467,37.85,-122.25,3.422


AWS Data Wrangler Get Engine Function
Run this command within a private subnet. You can find your host address by going to the Redshift Console, then choose Clusters -> Property -> Connection details -> View all connection details -> Node IP address -> Private IP address. https://aws-data-wrangler.readthedocs.io/en/latest/stubs/awswrangler.db.get_engine.html#awswrangler.db.get_engine

In [78]:
private_ip = redshift.describe_clusters(ClusterIdentifier=redshift_cluster_identifier)["Clusters"][
    0
]["ClusterNodes"][0]["PrivateIPAddress"]
print("Private IP address is: ", private_ip)

Private IP address is:  172.31.65.127


In [83]:
engine = wr.db.get_engine(
    db_type="postgresql",
    host=private_ip,  # Private IP address of your Redshift Cluster
    port=redshift_port,
    database=database_name_redshift,
    user=master_user_name,
    password=master_user_pw,
)

TypeError: connect() got an unexpected keyword argument 'db_type'

In [None]:
df = wr.db.read_sql_query("SELECT * FROM redshift.{}".format(table_name_redshift), con=engine)


In [None]:
df.head()
