# AWS Redshift Schema Pipeline & ETL Pipeline

In [1]:
# ETL libs
from ..etl.connection import Connection
from ..etl.create_tables import SchemaPipeline
from ..etl import ETLPipeline

## 1. Initialize Connection and Pipeline Objects

- Set the AWS Redshift connector instance.
- Create the schema pipeline executor.
- Create the ETL pipeline executor.

In [2]:
connection = Connection(redshift=True)
schema_pipeline = SchemaPipeline(connection)
etl_pipeline = ETLPipeline(connection)

## 2. Run the Schema Pipeline

- Drop all tables.
- Create staging tables.
- Create Data Warehouse tables.

In [3]:
schema_pipeline.run()

-----------------------------------------------------
AWS Redshift Schema Pipeline
-----------------------------------------------------
INFO: Droping the database tables...
INFO: Database tables droped.
INFO: Creating the database schema...
INFO: Database schema created.
-----------------------------------------------------
Time Statistics
-----------------------------------------------------
Drop tables time: 8.56 seconds
Create tables time: 23.55 seconds


## 2. Run the ETL Pipeline

- Copy JSON data from S3 buckets to staging tables.
- Select and transform data from staging tables.
- Insert transformed data into Data Warehouse tables.

In [4]:
etl_pipeline.run()

-----------------------------------------------------
AWS Redshift ETL Pipeline
-----------------------------------------------------
INFO: Loading S3 data into staging tables...
INFO: Staging tables loaded.
INFO: Inserting data into DW tables...
INFO: Staging tables loaded.
-----------------------------------------------------
Time Statistics
-----------------------------------------------------
Staging tables time: 1122.09 seconds
Insert tables time: 11.06 seconds


## 3. Fetch Data from All Tables to Validate the Process

Define the ```fetch_dataframe``` function to query a table and export its data to a CSV file.

In [None]:
def fetch_dataframe(table, folder='../../data'):
    # query table
    connection.cursor.execute(f'SELECT * FROM {table}')
    # fecth data frame
    data = connection.cursor.fetch_dataframe()
    # commit
    connection.commit()
    # export CSV data
    data.to_csv(f'{folder}/{table}.csv')
    # return data frama
    return data

### 3.1 Fetch ```staging_events``` table

In [5]:
staging_events = fetch_dataframe('staging_events')
staging_events.head(2)

Unnamed: 0,userid,firstname,lastname,gender,level,artist,song,length,sessionid,auth,iteminsession,location,registration,ts,page,useragent,status,method
0,39,Walter,Frye,M,free,,,,38,Logged In,0,"San Francisco-Oakland-Hayward, CA",1540919166796,1541105830796,Home,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",200,GET
1,8,Kaylee,Summers,F,free,,,,139,Logged In,0,"Phoenix-Mesa-Scottsdale, AZ",1540344794796,1541106106796,Home,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",200,GET


### 3.2 Fetch ```staging_songs``` table

In [6]:
staging_songs = fetch_dataframe('staging_songs')
staging_songs.head(2)

Unnamed: 0,song_id,title,duration,year,num_songs,artist_id,artist_name,artist_location,artist_latitude,artist_longitude
0,SOFRDWL12A58A7CEF7,Hit Da Scene,252.94322,0,1,AR9Q9YC1187FB5609B,Quest_ Pup_ Kevo,New Jersey,,
1,SODZYPO12A8C13A91E,Burn My Body (Album Version),177.99791,0,1,AR1C2IX1187B99BF74,Broken Spindles,,,


### 3.3 Fetch ```users``` table

In [7]:
users = fetch_dataframe('users')
users.head(2)

Unnamed: 0,user_id,first_name,last_name,gender,level
0,8,Kaylee,Summers,F,free
1,26,Ryan,Smith,M,free


### 3.4 Fetch ```songs``` table

In [8]:
songs = fetch_dataframe('songs')
songs.head(2)

Unnamed: 0,song_id,title,year,duration,artist_id
0,SONQBUB12A6D4F8ED0,Angie (1993 Digital Remaster),0,271.49016,ARFCUN31187B9AD578
1,SODWXQV12A6310F10D,English Summer Rain,2003,241.52771,AR6892W1187B9AC71B


### 3.5 Fetch ```artists``` table

In [9]:
artists = fetch_dataframe('artists')
artists.head(2)

Unnamed: 0,artist_id,name,location,latitude,longitude
0,AR5KOSW1187FB35FF4,Elena,Dubai UAE,49.80388,15.47491
1,AR6XPWV1187B9ADAEB,Foo Fighters,"Seattle, WA",,


### 3.6 Fetch ```time``` table

In [10]:
time = fetch_dataframe('time')
time.head(2)

Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,2018-11-01 21:05:52.796,21,1,44,11,2018,4
1,2018-11-01 21:11:13.796,21,1,44,11,2018,4


### 3.7 Fetch ```songplays``` table

In [11]:
songplays = fetch_dataframe('songplays')
songplays.head(2)

Unnamed: 0,songplay_id,level,location,user_agent,session_id,user_id,song_id,artist_id,start_time
0,929382,free,"Palestine, TX",Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; r...,938,92,SONQBUB12A6D4F8ED0,ARFCUN31187B9AD578,2018-11-26 18:25:34.796
1,8543624,paid,"Red Bluff, CA","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_...",436,85,SOLRYQR12A670215BF,ARNLO5S1187B9B80CC,2018-11-16 14:21:12.796


## 4. Close Connection

In [12]:
connection.close()