In [14]:
import redshift_connector

import pandas as pd
from io import StringIO
import boto3 # For S3 operations

# --- Redshift Serverless and S3 Configuration (replace with your actual details) ---
AWS_REGION = 'ap-south-1' # Your AWS region
REDSHIFT_SERVERLESS_WORKGROUP_NAME = 'default-workgroup'
REDSHIFT_SERVERLESS_DB_NAME = 'dev' # Or your specific database name
# If using IAM Role for the executing environment (e.g., EC2, Lambda),
# you typically don't need access_key_id/secret_access_key here.
# redshift_connector will pick up credentials from env vars, ~/.aws/credentials, or IAM role.
# For local testing without an IAM role:
# AWS_ACCESS_KEY_ID = 'YOUR_AWS_ACCESS_KEY_ID'
# AWS_SECRET_ACCESS_KEY = 'YOUR_AWS_SECRET_ACCESS_KEY'

S3_KEY_PREFIX = 'serverless-data-uploads/'
# S3 bucket for COPY/UNLOAD (must exist and have Redshift role access)
S3_BUCKET_NAME = 'kkm2-unique-test-bucket-2025-06-26-py-1'
S3_FILE_TO_UNLOAD = 'redshift_sample.csv'
S3_UNLOAD_PATH = f's3://{S3_BUCKET_NAME}/{S3_FILE_TO_UNLOAD}'
S3_COPY_PATH = f's3://{S3_BUCKET_NAME}/flights.csv' # Path to your existing sample data

# IAM Role ARN for Redshift to access S3 (must have s3:GetObject on your bucket)
# This is attached to your Redshift Serverless namespace.
IAM_ROLE_ARN_FOR_COPY = 'arn:aws:iam::123456789012:role/YourRedshiftServerlessCopyRole'

def get_redshift_serverless_connection():
    """Establishes a connection to Amazon Redshift Serverless using IAM authentication."""
    try:
        conn = redshift_connector.connect(
            workgroup_name=REDSHIFT_SERVERLESS_WORKGROUP_NAME,
            database=REDSHIFT_SERVERLESS_DB_NAME,
            iam=True,
            region=AWS_REGION
            # ... other parameters
        )
        print("Successfully connected to Redshift Serverless!")
        return conn
    except Exception as e:
        print(f"Error connecting to Redshift Serverless: {e}")
        print("Ensure your AWS credentials are configured (e.g., via aws configure),")
        print("your security group allows access, and IAM permissions are correct.")
        return None

def list_redshift_databases(conn):
    """Lists all databases in the Redshift Serverless workgroup."""
    if not conn:
        print("No active connection to Redshift.")
        return []

    try:
        with conn.cursor() as cursor:
            # For Redshift Serverless, querying pg_database will show available databases within the namespace
            cursor.execute("SELECT datname FROM pg_database;")
            databases = [row[0] for row in cursor.fetchall()]
            print("\nRedshift Serverless Databases:")
            for db in databases:
                print(f"- {db}")
            return databases
    except Exception as e:
        print(f"Error listing databases: {e}")
        return []
get_redshift_serverless_connection()

Error connecting to Redshift Serverless: connect() got an unexpected keyword argument 'workgroup_name'
Ensure your AWS credentials are configured (e.g., via aws configure),
your security group allows access, and IAM permissions are correct.


In [13]:
!pip install --upgrade "redshift_connector[full]"

Collecting redshift_connector[full]
  Downloading redshift_connector-2.1.8-py3-none-any.whl.metadata (69 kB)
Downloading redshift_connector-2.1.8-py3-none-any.whl (139 kB)
Installing collected packages: redshift_connector
  Attempting uninstall: redshift_connector
    Found existing installation: redshift-connector 2.1.7
    Uninstalling redshift-connector-2.1.7:
      Successfully uninstalled redshift-connector-2.1.7
Successfully installed redshift_connector-2.1.8


In [8]:
import psycopg2
import boto3
import pandas as pd
from io import StringIO
import time
import uuid

# --- Redshift Serverless and S3 Configuration (replace with your actual details) ---
AWS_REGION = 'ap-south-1' # Your AWS region
REDSHIFT_SERVERLESS_WORKGROUP_NAME = 'demo-workgroup'
REDSHIFT_SERVERLESS_DB_NAME = 'dev' # Or your specific database name
# This user needs to exist in your Redshift Serverless database and be mapped to an IAM role/user
# that has GetCredentials permission.
REDSHIFT_DB_USER = 'admin'

S3_BUCKET_NAME = 'your-unique-serverless-redshift-s3-bucket'
S3_KEY_PREFIX = 'serverless-data-uploads/'
# IAM Role ARN for Redshift Serverless to access S3 for COPY commands.
# This role must be associated with your Redshift Serverless namespace.
IAM_ROLE_ARN_FOR_COPY = 'arn:aws:iam::310879042055:role/aws-service-role/redshift.amazonaws.com/AWSServiceRoleForRedshift'


def get_redshift_serverless_endpoint_and_credentials():
    """
    Obtains the Redshift Serverless workgroup endpoint and temporary database credentials
    using boto3.
    """
    redshift_serverless_client = boto3.client('redshift-serverless', region_name=AWS_REGION)
    redshift_data_client = boto3.client('redshift-data', region_name=AWS_REGION)

    try:
        # 1. Get Workgroup details to find the endpoint
        workgroup_response = redshift_serverless_client.get_workgroup(
            workgroupName=REDSHIFT_SERVERLESS_WORKGROUP_NAME
        )
        endpoint_address = workgroup_response['workgroup']['endpoint']['address']
        endpoint_port = workgroup_response['workgroup']['endpoint']['port']
        print(f"Redshift Serverless Endpoint: {endpoint_address}:{endpoint_port}")

        # 2. Get temporary database credentials
        # Note: GetClusterCredentials for provisioned, GetCredentials for serverless
        credentials_response = redshift_data_client.get_credentials(
            workgroupName=REDSHIFT_SERVERLESS_WORKGROUP_NAME,
            dbName=REDSHIFT_SERVERLESS_DB_NAME,
            dbUser=REDSHIFT_DB_USER,
            # autoCreate is useful if the user doesn't exist, but needs permission for it.
            # autoCreate=False
        )

        db_user = credentials_response['DbUser']
        db_password = credentials_response['DbPassword']
        temp_token = credentials_response['TempUserPassphrase'] # This is the password

        return {
            'host': endpoint_address,
            'port': endpoint_port,
            'user': db_user,
            'password': temp_token,
            'dbname': REDSHIFT_SERVERLESS_DB_NAME
        }

    except Exception as e:
        print(f"Error getting Redshift Serverless endpoint or credentials: {e}")
        print("Ensure your AWS credentials are configured and have `redshift-serverless:GetWorkgroup`")
        print("and `redshift-data:GetCredentials` permissions.")
        return None

def get_psycopg2_redshift_serverless_connection():
    """Establishes a connection to Redshift Serverless using psycopg2 with temporary credentials."""
    conn_params = get_redshift_serverless_endpoint_and_credentials()
    if not conn_params:
        return None

    try:
        conn = psycopg2.connect(**conn_params)
        print("Successfully connected to Redshift Serverless using psycopg2!")
        return conn
    except Exception as e:
        print(f"Error connecting to Redshift Serverless with psycopg2: {e}")
        print("Ensure security group allows access and network path is open.")
        return None

def list_redshift_databases_psycopg2(conn):
    """Lists all databases in the Redshift Serverless workgroup using psycopg2."""
    if not conn:
        print("No active connection to Redshift.")
        return []

    try:
        with conn.cursor() as cursor:
            cursor.execute("SELECT datname FROM pg_database;")
            databases = [row[0] for row in cursor.fetchall()]
            print("\nRedshift Serverless Databases (via psycopg2):")
            for db in databases:
                print(f"- {db}")
            return databases
    except Exception as e:
        print(f"Error listing databases with psycopg2: {e}")
        return []

def upload_data_to_redshift_serverless_via_s3_copy_psycopg2(
    conn,
    df,
    table_name,
    s3_bucket,
    s3_key_prefix,
    iam_role_arn_for_copy,
    create_table_sql=None
):
    """
    Uploads a pandas DataFrame to Redshift Serverless using S3 and the COPY command via psycopg2.
    """
    if not conn:
        print("No active connection to Redshift Serverless.")
        return

    s3_client = boto3.client('s3', region_name=AWS_REGION)

    csv_buffer = StringIO()
    df.to_csv(csv_buffer, index=False, header=False, sep=',') # Ensure no header or index

    s3_file_name = f"{s3_key_prefix}{table_name}_{uuid.uuid4()}.csv" # Use uuid for uniqueness
    s3_path = f"s3://{s3_bucket}/{s3_file_name}"

    try:
        # 1. Upload DataFrame to S3
        s3_client.put_object(Bucket=s3_bucket, Key=s3_file_name, Body=csv_buffer.getvalue())
        print(f"Data uploaded to S3: {s3_path}")

        with conn.cursor() as cursor:
            # Optional: Create table if it doesn't exist
            if create_table_sql:
                try:
                    cursor.execute(create_table_sql)
                    conn.commit()
                    print(f"Table '{table_name}' created (if it didn't exist).")
                except psycopg2.ProgrammingError as e:
                    if "already exists" in str(e):
                        print(f"Table '{table_name}' already exists.")
                    else:
                        raise e # Re-raise other errors

            # 2. Use Redshift COPY command to load data from S3
            copy_command = f"""
            COPY {table_name}
            FROM '{s3_path}'
            IAM_ROLE '{iam_role_arn_for_copy}'
            CSV
            IGNOREHEADER 0;
            """
            print(f"Executing COPY command for table '{table_name}'...")
            cursor.execute(copy_command)
            conn.commit()
            print(f"Data successfully loaded into Redshift Serverless table '{table_name}' from S3.")

            # Optional: Clean up S3 file after successful load
            s3_client.delete_object(Bucket=s3_bucket, Key=s3_file_name)
            print(f"Temporary S3 file deleted: {s3_path}")

    except Exception as e:
        conn.rollback() # Rollback on error
        print(f"Error during data upload with psycopg2: {e}")
    finally:
        csv_buffer.close()


# Example Usage with psycopg2
if __name__ == "__main__":
    conn = get_psycopg2_redshift_serverless_connection()

    if conn:
        # List databases
        list_redshift_databases_psycopg2(conn)

        # --- Example Data for Upload ---
        data = {
            'item_id': [201, 202, 203],
            'item_name': ['Book', 'Pen', 'Notebook'],
            'category': ['Stationery', 'Stationery', 'Stationery'],
            'price_usd': [25.00, 2.50, 8.00]
        }
        df_to_upload = pd.DataFrame(data)
        target_table = 'serverless_inventory'

        # SQL to create the target table (adjust columns and types as needed)
        create_table_sql_statement = f"""
        CREATE TABLE IF NOT EXISTS {target_table} (
            item_id INT,
            item_name VARCHAR(255),
            category VARCHAR(255),
            price_usd DECIMAL(10, 2)
        );
        """

        upload_data_to_redshift_serverless_via_s3_copy_psycopg2(
            conn,
            df_to_upload,
            target_table,
            S3_BUCKET_NAME,
            S3_KEY_PREFIX,
            IAM_ROLE_ARN_FOR_COPY,
            create_table_sql=create_table_sql_statement
        )

        # Close the connection
        conn.close()
        print("Redshift Serverless (psycopg2) connection closed.")

Error getting Redshift Serverless endpoint or credentials: An error occurred (ResourceNotFoundException) when calling the GetWorkgroup operation: Serverless workgroup demo-workgroup not found.
Ensure your AWS credentials are configured and have `redshift-serverless:GetWorkgroup`
and `redshift-data:GetCredentials` permissions.
