# Introduction

Follow full tutorials here: https://github.com/aws/aws-sdk-pandas/blob/main/tutorials/001%20-%20Introduction.ipynb

## Pre-requisites
This part assumes that the terraform backend has been provisioned properly (and has not been destroyed).
To follow this guide, a Redshift cluster has to be provisioned.
Simply run `terraform apply` in the current folder, you will get a cluster up and running to get this notebook runing and practice AWS wrangler.

In [1]:
import awswrangler as wr
wr.__version__

'3.4.2'

In [2]:
import os
import pandas as pd

# Sessions
Create customized session using boto3.Session()
Reference: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html

In [3]:
import boto3
from dotenv import load_dotenv

load_dotenv('.env.aws_credentials')
AWS_ACCESS_KEY = os.getenv('AWS_ACCESS_KEY')
AWS_SECRET_KEY = os.getenv('AWS_SECRET_KEY')
AWS_REGION = os.getenv('AWS_REGION')
# using a custom boto3 session using my own aws development credentials
my_session = boto3.Session(
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY,
    region_name=AWS_REGION)

In [4]:
wr.s3.does_object_exist("s3://noaa-ghcn-pds/fake", boto3_session=my_session)

False

In [5]:
# Amazon S3
bucket_name = "zoomcamp-extracted-data"
# check whether the parquet file from the prefect exercise exists in the s3 bucket
wr.s3.does_object_exist(f"s3://{bucket_name}/yellow_tripdata_2023-09.parquet", boto3_session=my_session)

True

In [7]:
# downloading object to a file path

local_file_dir = "./download/"
s3_file_name = "yellow_tripdata_2023-09.parquet"
s3_file_path = f"s3://{bucket_name}/{s3_file_name}"
local_file = os.path.join(local_file_dir, s3_file_name)




In [None]:
wr.s3.download(path=s3_file_path, local_file=local_file)

In [8]:
taxi_df = pd.read_parquet(local_file)
taxi_df = taxi_df.head(100)

taxi_df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
0,1,2023-09-01 00:15:37,2023-09-01 00:20:21,1.0,0.8,1.0,N,163,230,2,6.5,3.5,0.5,0.0,0.0,1.0,11.5,2.5,0.0
1,2,2023-09-01 00:18:40,2023-09-01 00:30:28,2.0,2.34,1.0,N,236,233,1,14.2,1.0,0.5,2.0,0.0,1.0,21.2,2.5,0.0
2,2,2023-09-01 00:35:01,2023-09-01 00:39:04,1.0,1.62,1.0,N,162,236,1,8.6,1.0,0.5,2.0,0.0,1.0,15.6,2.5,0.0
3,2,2023-09-01 00:45:45,2023-09-01 00:47:37,1.0,0.74,1.0,N,141,229,1,5.1,1.0,0.5,1.0,0.0,1.0,11.1,2.5,0.0
4,2,2023-09-01 00:01:23,2023-09-01 00:38:05,1.0,9.85,1.0,N,138,230,1,45.0,6.0,0.5,17.02,0.0,1.0,73.77,2.5,1.75


In [9]:

REDSHIFT_DATABASE = os.getenv('REDSHIFT_DATABASE')
REDSHIFT_USER = os.getenv('REDSHIFT_USER')
REDSHIFT_PASSWORD = os.getenv('REDSHIFT_PASSWORD')
print(REDSHIFT_DATABASE, REDSHIFT_USER)


trips_data_all masteruser


In [9]:
# need to ensure that redshift vpc cluster has security group to be properly set up to allow inbound access via port 5439
glue_connection_name = "zoomcamp_redshift_glue_connection"
con = wr.redshift.connect(
    connection=glue_connection_name,
    dbname=REDSHIFT_DATABASE,
    boto3_session=my_session
    )

# testing connection
with con.cursor() as cursor:
    cursor.execute("SELECT 1;")
    print(cursor.fetchall())

# con.close()

([1],)


In [10]:
s3_staging_folder = "copy_to_redshift_staging"
table_name = "demo_taxi_data"
# copy the dataframe to redshift cluster
wr.redshift.copy(
    df=taxi_df,
    path=f"s3://{bucket_name}/{s3_staging_folder}",
    con=con,
    table=table_name,
    schema="public",
    boto3_session=my_session
)

# wr.redshift.to_sql(
#     df=taxi_df,
#     table=table_name,
#     schema="public",
#     con=con,
#     mode='overwrite'
# )

with con.cursor() as cursor:
    cursor.execute(f"SELECT COUNT(*) FROM {table_name};")
    print(cursor.fetchall())

([100],)


# After Session
Please ensure you destroy provisioned resources to avoid any charges.
Simply run `terraform destroy` in the `aws_wrangler_tutorial` folder.

# Redshift Serverless

This section executes the COPY command from S3 bucket to Redshift Serverless

## Provision Redshift Serverless
Go into week1's `Terraform/` folder and run:
If using aws-wranger.redshift.connect to connect to serverless:
```bash
terraform apply -target=aws_redshiftserverless_namespace.zoomcamp_dataset -target=aws_redshiftserverless_workgroup.zoomcamp_dataset -target=aws_glue_connection.redshift_serverless_glue
```

if connecting using redshift-connector (which is demonstrated in this notebook):
```bash
terraform apply -target=aws_redshiftserverless_namespace.zoomcamp_dataset -target=aws_redshiftserverless_workgroup.zoomcamp_dataset
```


In [44]:
# creating new connection to the Redshift Serverless via Glue Connection (provisioned via Terraform)
# code ref: https://github.com/aws/amazon-redshift-python-driver/blob/master/tutorials/001%20-%20Connecting%20to%20Amazon%20Redshift.ipynb

import redshift_connector
con = redshift_connector.connect(
    iam=True,
    host="zoomcamp-redshift-workgroup.571772404385.us-east-2.redshift-serverless.amazonaws.com",
    port=5439,
    database=REDSHIFT_DATABASE,
    # user=REDSHIFT_USER,
    # password=REDSHIFT_PASSWORD,
    is_serverless=True,
    serverless_work_group='zoomcamp-redshift-workgroup',
)
# test out the connection

with con.cursor() as cursor:
    cursor.execute("SELECT 1;")
    print(cursor.fetchall())


([1],)


In [53]:
table_name = 'trips_data'

In [46]:
with con.cursor() as cursor:
    cursor.execute(f"DROP TABLE {table_name};")

In [47]:
create_table_query = """
CREATE TABLE IF NOT EXISTS trips_data (
"VendorID" BIGINT, 
tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
passenger_count FLOAT(53), 
trip_distance FLOAT(53), 
"RatecodeID" FLOAT(53), 
store_and_fwd_flag TEXT, 
"PULocationID" BIGINT, 
"DOLocationID" BIGINT, 
payment_type BIGINT, 
fare_amount FLOAT(53), 
extra FLOAT(53), 
mta_tax FLOAT(53), 
tip_amount FLOAT(53), 
tolls_amount FLOAT(53), 
improvement_surcharge FLOAT(53), 
total_amount FLOAT(53), 
congestion_surcharge FLOAT(53), 
airport_fee FLOAT(53)
)
"""

In [48]:
with con.cursor() as cursor:
    cursor.execute(create_table_query)
    print(f"Table {table_name} has been created in redshift.")
    cursor.execute(f"SELECT COUNT(*) FROM {table_name};")
    result = cursor.fetchone()
    print(f"Table {table_name} currently contains {result} records.")

Table trips_data has been created in redshift.
Table trips_data currently contains [0] records.


In [49]:
AWS_ROLE_ARN_REDSHIFT = os.getenv('AWS_ROLE_ARN_REDSHIFT')

# COPY parquet file from S3 and Load to Redshift
sql_copy_command = f"""
COPY {table_name}
FROM '{s3_file_path}'
FORMAT PARQUET
IAM_ROLE '{AWS_ROLE_ARN_REDSHIFT}'
REGION '{AWS_REGION}';
"""
print(sql_copy_command)



COPY trips_data
FROM 's3://zoomcamp-extracted-data/yellow_tripdata_2023-09.parquet'
FORMAT PARQUET
IAM_ROLE 'arn:aws:iam::571772404385:role/redshift-service-role'
REGION 'us-east-2';



In [50]:
with con.cursor() as cursor:
    cursor.execute(sql_copy_command)
    print(f"Data from S3 has been copied over to Redshift database: {REDSHIFT_DATABASE}")
    con.commit()
    cursor.execute("SELECT COUNT(*) FROM trips_data;")
    result = cursor.fetchone()
    print(f"Number of rows in 'trips_data' table: {result}")

Data from S3 has been copied over to Redshift database: trips_data_all
Number of rows in 'trips_data' table: [100000]


In [51]:
con.close()

In [52]:
## Checking records have been copied over to redshift serverless
con = redshift_connector.connect(
    iam=True,
    host="zoomcamp-redshift-workgroup.571772404385.us-east-2.redshift-serverless.amazonaws.com",
    port=5439,
    database=REDSHIFT_DATABASE,
    # user=REDSHIFT_USER,
    # password=REDSHIFT_PASSWORD,
    is_serverless=True,
    serverless_work_group='zoomcamp-redshift-workgroup',
)

with con.cursor() as cursor:
    cursor.execute("SELECT COUNT(*) FROM trips_data;")
    print(cursor.fetchall())


([100000],)
