# Sparkify S3 to Redshift ETL Project

## Project Overview
Sparkify, a music streaming startup, is migrating its data processes to the cloud. The goal is to build an ETL pipeline that extracts data from AWS S3, stages it in Amazon Redshift, and transforms it into a star schema for analytical queries.

## Data Sources
- **S3 Buckets:**
  - `song_data`: JSON metadata about songs.
  - `log_data`: JSON logs of user activity.
  - `log_json_path`: JSON format configuration for parsing.

## ETL Process
1. **Extract**: Load raw data from S3 into Redshift staging tables.
2. **Transform**: Clean and structure data into facts and dimensions.
3. **Load**: Insert processed data into an analytical star schema.

## Data Warehouse Schema (Star Schema)
- **Fact Table:** `songplays` (stores song play events)
- **Dimension Tables:**
  - `users` (user information)
  - `songs` (song details)
  - `artists` (artist details)
  - `time` (timestamp breakdown)

## Redshift Staging Tables
- `staging_songs` (raw song data)
- `staging_logs` (raw log data)

## Technologies Used
- **AWS S3**: Data storage
- **AWS Redshift**: Cloud-based data warehouse
- **Python**: ETL scripting
- **SQL**: Data transformation and querying

## Key Considerations
- Bulk data loading using `COPY` for efficiency.
- Role-based access control for security.
- Optimized schema design for analytical performance.

## Goal
Enable Sparkify's analytics team to gain insights on user behavior and song preferences efficiently using a scalable cloud-based data warehouse.


# Project Summary

## Project Datasets

This project utilizes two datasets stored in Amazon S3:
- **Song Data**: `s3://udacity-dend/song_data`
- **Log Data**: `s3://udacity-dend/log_data`
- **Metadata** (for parsing log data): `s3://udacity-dend/log_json_path.json`

The datasets are structured in JSON format:
- **Song Dataset**: Extracted from the Million Song Dataset, containing song metadata.
- **Log Dataset**: Simulated event logs from a music streaming app.

## Schema for Song Play Analysis

A star schema is used for optimizing queries:

### **Fact Table**
- **songplays** - records of song plays
  - `songplay_id`, `start_time`, `user_id`, `level`, `song_id`, `artist_id`, `session_id`, `location`, `user_agent`

### **Dimension Tables**
- **users** - user details
  - `user_id`, `first_name`, `last_name`, `gender`, `level`
- **songs** - song details
  - `song_id`, `title`, `artist_id`, `year`, `duration`
- **artists** - artist details
  - `artist_id`, `name`, `location`, `latitude`, `longitude`
- **time** - timestamp details
  - `start_time`, `hour`, `day`, `week`, `month`, `year`, `weekday`

## Project Steps

### **1. Create Table Schemas**
- Define table schemas in `sql_queries.py`
- Implement table creation logic in `create_table.py`
- Configure Redshift cluster and IAM roles

### **2. Build ETL Pipeline**
- Extract data from S3 and load into Redshift staging tables (`etl.py`)
- Transform and load data into analytics tables
- Validate data with test queries

### **3. Document Process**
- Explain database design and ETL process in `README.md`
- Provide example queries for analytical insights

## Project Files
- `create_table.py` - Defines and creates database tables
- `etl.py` - Extracts, transforms, and loads data into Redshift
- `sql_queries.py` - SQL statements for table creation and data loading
- `README.md` - Documentation of project details

## Notes
- Redshift does not support `SERIAL`; use `IDENTITY(0,1)` instead.
- Do not include AWS credentials in the code.

## Next Steps
- Run ETL pipeline and validate results
- Optimize query performance
- Expand dataset integration




# Step 0. Creating Redshift Cluster

Tihs part we can look at code from homework "L3 exercise 2 - IaC"

# Step 1. Create Table Schemas

In [1]:
import configparser

# CONFIG
config = configparser.ConfigParser()
config.read('dwh-3.cfg')


['dwh-3.cfg']

In [2]:
# DROP TABLES

staging_events_table_drop = "DROP TABle IF EXISTS staging_events;"
staging_songs_table_drop = "DROP TABLE IF EXISTS staging_songs;"
songplay_table_drop = "DROP TABLE IF EXISTS songplays;"
user_table_drop = "DROP TABLE IF EXISTS users;"
song_table_drop = "DROP TABLE IF EXISTS songs;"
artist_table_drop = "DROP TABLE IF EXISTS artists;"
time_table_drop = "DROP TABLE  IF EXISTS time;"

# CREATE TABLES

staging_events_table_create= ("""
CREATE TABLE IF NOT EXISTS staging_events
(
    artist VARCHAR,
    auth VARCHAR,
    firstName VARCHAR(50),
    gender CHAR,
    itemInSession INTEGER,
    lastName VARCHAR(50),
    length FLOAT,
    level VARCHAR,
    location VARCHAR,
    method VARCHAR,
    page VARCHAR,
    registration FLOAT,
    sessionId INTEGER,
    song VARCHAR,
    status INTEGER,
    ts BIGINT,
    userAgent VARCHAR,
    userId INTEGER
);
""")

staging_songs_table_create = ("""
CREATE TABLE IF NOT EXISTS staging_songs
(
    num_songs INTEGER,
    artist_id VARCHAR,
    artist_latitude FLOAT,
    artist_longitude FLOAT,
    artist_location VARCHAR,
    artist_name VARCHAR,
    song_id VARCHAR,
    title VARCHAR,
    duration FLOAT,
    year FLOAT
);
""")

songplay_table_create = ("""
CREATE TABLE IF NOT EXISTS songplays
(
    songplay_id INTEGER IDENTITY (1, 1) PRIMARY KEY ,
    start_time TIMESTAMP,
    user_id INTEGER,
    level VARCHAR,
    song_id VARCHAR,
    artist_id VARCHAR,
    session_id INTEGER,
    location VARCHAR,
    user_agent VARCHAR
)
DISTSTYLE KEY
DISTKEY ( start_time )
SORTKEY ( start_time );
""")

user_table_create = ("""
CREATE TABLE IF NOT EXISTS users
(
    userId INTEGER PRIMARY KEY,
    firsname VARCHAR(50),
    lastname VARCHAR(50),
    gender CHAR(1) ENCODE BYTEDICT,
    level VARCHAR ENCODE BYTEDICT
)
SORTKEY (userId);
""")

song_table_create = ("""
CREATE TABLE IF NOT EXISTS songs
(
    song_id VARCHAR PRIMARY KEY,
    title VARCHAR,
    artist_id VARCHAR,
    year INTEGER ENCODE BYTEDICT,
    duration FLOAT
)
SORTKEY (song_id);
""")

artist_table_create = ("""
CREATE TABLE IF NOT EXISTS artists
(
    artist_id VARCHAR PRIMARY KEY ,
    name VARCHAR,
    location VARCHAR,
    latitude FLOAT,
    longitude FLOAT
)
SORTKEY (artist_id);
""")

time_table_create = ("""
CREATE TABLE IF NOT EXISTS time
(
    start_time  TIMESTAMP PRIMARY KEY ,
    hour INTEGER,
    day INTEGER,
    week INTEGER,
    month INTEGER,
    year INTEGER ENCODE BYTEDICT ,
    weekday VARCHAR(9) ENCODE BYTEDICT
)
DISTSTYLE KEY
DISTKEY ( start_time )
SORTKEY (start_time);
""")

# STAGING TABLES

staging_events_copy = ("""
COPY staging_events
FROM {}
iam_role {}
FORMAT AS json {};
""").format(config['S3']['LOG_DATA'], config['IAM_ROLE']['ARN'], config['S3']['LOG_JSONPATH'])

staging_songs_copy = ("""
COPY staging_songs
FROM {}
iam_role {}
FORMAT AS json 'auto';
""").format(config['S3']['SONG_DATA'], config['IAM_ROLE']['ARN'])

# FINAL TABLES

songplay_table_insert = ("""
INSERT INTO songplays (START_TIME, USER_ID, LEVEL, SONG_ID, ARTIST_ID, SESSION_ID, LOCATION, USER_AGENT)
SELECT DISTINCT
       TIMESTAMP 'epoch' + (se.ts / 1000) * INTERVAL '1 second' as start_time,
                se.userId,
                se.level,
                ss.song_id,
                ss.artist_id,
                se.sessionId,
                se.location,
                se.userAgent
FROM staging_songs ss
INNER JOIN staging_events se
ON (ss.title = se.song AND se.artist = ss.artist_name)
AND se.page = 'NextSong';
""")

user_table_insert = ("""
INSERT INTO users
SELECT DISTINCT userId, firstName, lastName, gender, level
FROM staging_events
WHERE userId IS NOT NULL
AND page = 'NextSong';
""")

song_table_insert = ("""
INSERT INTO songs
SELECT
    DISTINCT song_id, title, artist_id, year, duration
FROM staging_songs
WHERE song_id IS NOT NULL;
""")

artist_table_insert = ("""
INSERT INTO artists
SELECT
    DISTINCT artist_id, artist_name, artist_location, artist_latitude, artist_longitude
FROM staging_songs;
""")

time_table_insert = ("""
insert into time
SELECT DISTINCT
       TIMESTAMP 'epoch' + (ts/1000) * INTERVAL '1 second' as start_time,
       EXTRACT(HOUR FROM start_time) AS hour,
       EXTRACT(DAY FROM start_time) AS day,
       EXTRACT(WEEKS FROM start_time) AS week,
       EXTRACT(MONTH FROM start_time) AS month,
       EXTRACT(YEAR FROM start_time) AS year,
       to_char(start_time, 'Day') AS weekday
FROM staging_events;
""")

# QUERY LISTS

create_table_queries = [staging_events_table_create, staging_songs_table_create, songplay_table_create, user_table_create, song_table_create, artist_table_create, time_table_create]
drop_table_queries = [staging_events_table_drop, staging_songs_table_drop, songplay_table_drop, user_table_drop, song_table_drop, artist_table_drop, time_table_drop]
copy_table_queries = [staging_events_copy, staging_songs_copy]
insert_table_queries = [songplay_table_insert, user_table_insert, song_table_insert, artist_table_insert, time_table_insert]

In [3]:
import psycopg2
from psycopg2 import Error

def drop_tables(cur, conn):
    try:
        for query in drop_table_queries:
            cur.execute(query)
            conn.commit()
        print("Tables dropped successfully")
    except Error as e:
        print(f"Error dropping tables: {e}")
        conn.rollback()

def create_tables(cur, conn):
    try:
        for query in create_table_queries:
            cur.execute(query)
            conn.commit()
        print("Tables created successfully")
    except Error as e:
        print(f"Error creating tables: {e}")
        conn.rollback()

conn = None
cur = None
try:
    # Connect to Redshift cluster
    conn = psycopg2.connect(
        host=config['CLUSTER']['HOST'],
        dbname=config['CLUSTER']['DB_NAME'],
        user=config['CLUSTER']['DB_USER'],
        password=config['CLUSTER']['DB_PASSWORD'],
        port=int(config['CLUSTER']['DB_PORT'])  # Convert port to integer
    )
    cur = conn.cursor()
    
    print("Connected to Redshift successfully")
    
    # Drop and create tables
    drop_tables(cur, conn)
    create_tables(cur, conn)

except Error as e:
    print(f"Error connecting to Redshift: {e}")

finally:
    if cur is not None:
        cur.close()
    if conn is not None:
        conn.close()
        print("Database connection closed")


Connected to Redshift successfully
Tables dropped successfully
Tables created successfully
Database connection closed


## Sanity check
Show the tables in the database

In [4]:
import pandas as pd

try:
    # Connect to Redshift cluster
    conn = psycopg2.connect(
        host=config['CLUSTER']['HOST'],
        dbname=config['CLUSTER']['DB_NAME'],
        user=config['CLUSTER']['DB_USER'],
        password=config['CLUSTER']['DB_PASSWORD'],
        port=int(config['CLUSTER']['DB_PORT'])
    )
    cur = conn.cursor()
    
    # Query to show table schemas
    cur.execute("""
        SELECT 
            table_schema,
            table_name,
            column_name,
            data_type,
            character_maximum_length,
            numeric_precision
        FROM information_schema.columns
        WHERE table_schema = 'public'
        ORDER BY table_name, ordinal_position;
    """)
    
    schemas = cur.fetchall()
    schemas_df = pd.DataFrame(schemas, columns=['Schema', 'Table', 'Column', 'Data Type', 'Max Length', 'Precision'])
    print("Table schemas in the database:")
    print(schemas_df)

except Exception as e:
    print(f"Error: {e}")
    
finally:
    if cur is not None:
        cur.close()
    if conn is not None:
        conn.close()


Table schemas in the database:
    Schema           Table            Column                    Data Type  \
0   public         artists         artist_id            character varying   
1   public         artists              name            character varying   
2   public         artists          location            character varying   
3   public         artists          latitude             double precision   
4   public         artists         longitude             double precision   
5   public       songplays       songplay_id                      integer   
6   public       songplays        start_time  timestamp without time zone   
7   public       songplays           user_id                      integer   
8   public       songplays             level            character varying   
9   public       songplays           song_id            character varying   
10  public       songplays         artist_id            character varying   
11  public       songplays        session_id 

In [5]:
import boto3

# Create a Redshift client
redshift = boto3.client('redshift',
                       region_name='us-west-2',
                       aws_access_key_id=config['AWS']['KEY'],
                       aws_secret_access_key=config['AWS']['SECRET']
                      )

DWH_CLUSTER_IDENTIFIER = config['DWH']['DWH_CLUSTER_IDENTIFIER']

def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', None)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

Unnamed: 0,Key,Value
0,ClusterIdentifier,dwhcluster2
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,dwhuser
4,DBName,dwh
5,Endpoint,"{'Address': 'dwhcluster2.czgl7wspitsl.us-west-2.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-08df3248a3b917e72
7,NumberOfNodes,8


In [6]:
# Step 1 (Create Table Schemas) is complete:
# - All table schemas are defined
# - Tables have been created in Redshift
# - Schema verification was successful
# Moving on to Step 2: ETL Pipeline

# Step 2 (ETL Pipeline)

Example Output From An ETL Run

```
roleArn is arn:aws:iam::982052677744:role/dwhRole
cluster created
1 tries to get Endpoint
2 tries to get Endpoint
Endpoint is dwhcluster.colkbiahppv4.us-west-2.redshift.amazonaws.com
postgresql://dwhAdmin:[REDACTED]@dwhcluster.colkbiahppv4.us-west-2.redshift.amazonaws.com:5439/dev

    SELECT COUNT(*) FROM staging_events
[8056]

    SELECT COUNT(*) FROM staging_songs
[14896]

    SELECT COUNT(*) FROM songplays
[6820]

    SELECT COUNT(*) FROM users
[104]

    SELECT COUNT(*) FROM songs
[14896]

    SELECT COUNT(*) FROM artists
[10025]

    SELECT COUNT(*) FROM time
[6813]
```


Note from the instruction
- The SERIAL command in Postgres is not supported in Redshift. The equivalent in redshift is IDENTITY(0,1), which you can read more on in the Redshift Create Table Docs(opens in a new tab).


In [7]:
import configparser
import psycopg2

In [8]:
# STAGING TABLES

## recall from dwh-3.cfg
## first question: these are non-partitioned tables or partitioned tables?
## these are non-partitioned tables or partitioned tables? Song data and Log data are partitioned tables, as instruction said
""" 
LOG_DATA='s3://udacity-dend/log-data'
LOG_JSONPATH='s3://udacity-dend/log_json_path.json'
SONG_DATA='s3://udacity-dend/song-data'
"""

## can we write to check whehther these are non-partitioned tables or partitioned tables?

import boto3

# Create S3 client
s3 = boto3.client('s3',  # Changed from 'aws-s3' to 's3'
                  aws_access_key_id=config['AWS']['KEY'],
                  aws_secret_access_key=config['AWS']['SECRET'],
                  region_name=config['AWS']['REGION'])

# List objects in log-data bucket
log_response = s3.list_objects_v2(Bucket='udacity-dend', Prefix='log-data/')
song_response = s3.list_objects_v2(Bucket='udacity-dend', Prefix='song-data/')

print("Log data structure:")
for obj in log_response.get('Contents', [])[:5]:
    print(obj['Key'])

print("\nSong data structure:") 
for obj in song_response.get('Contents', [])[:5]:
    print(obj['Key'])

# If we see prefixes like year/month/day or artist/year etc,
# it indicates partitioned data
# If files are flat in the bucket, likely non-partitioned

## The log files in the dataset you'll be working with are partitioned by year and month
## The song files are partitioned by the first three letters of each song's track ID.


Log data structure:
log-data/
log-data/2018/11/2018-11-01-events.json
log-data/2018/11/2018-11-02-events.json
log-data/2018/11/2018-11-03-events.json
log-data/2018/11/2018-11-04-events.json

Song data structure:
song-data/
song-data/A/A/A/TRAAAAK128F9318786.json
song-data/A/A/A/TRAAAAV128F421A322.json
song-data/A/A/A/TRAAABD128F429CF47.json
song-data/A/A/A/TRAAACN128F9355673.json


In [9]:
# Log JSON Metadata
"""
The log_json_path.json file is used when loading JSON data into Redshift. It specifies the structure of the JSON data so that Redshift can properly parse and load it into the staging tables.

In the context of this project, you will need the log_json_path.json file in the COPY command, which is responsible for loading the log data from S3 into the staging tables in Redshift. The log_json_path.json file tells Redshift how to interpret the JSON data and extract the relevant fields. This is essential for further processing and transforming the data into the desired analytics tables.

Below is what data is in log_json_path.json.

"""
import json
# Get the content of the log_json_path.json file
response = s3.get_object(Bucket='udacity-dend', Key='log_json_path.json')
jsonpath_content = json.loads(response['Body'].read().decode('utf-8'))

print("JSONPath file content:")
print(json.dumps(jsonpath_content, indent=2))

JSONPath file content:
{
  "jsonpaths": [
    "$['artist']",
    "$['auth']",
    "$['firstName']",
    "$['gender']",
    "$['itemInSession']",
    "$['lastName']",
    "$['length']",
    "$['level']",
    "$['location']",
    "$['method']",
    "$['page']",
    "$['registration']",
    "$['sessionId']",
    "$['song']",
    "$['status']",
    "$['ts']",
    "$['userAgent']",
    "$['userId']"
  ]
}


In [10]:
# Get sample data from both datasets
## Song and Log datasets

import boto3
import json

s3 = boto3.client('s3',
                  aws_access_key_id=config['AWS']['KEY'],
                  aws_secret_access_key=config['AWS']['SECRET'],
                  region_name=config['AWS']['REGION'])

# 获取一个示例歌曲文件
try:
    song_response = s3.get_object(Bucket='udacity-dend', 
                                 Key='song-data/A/A/A/TRAAAAK128F9318786.json')
    song_content = json.loads(song_response['Body'].read().decode('utf-8'))
    print("Example song file content:")
    print(json.dumps(song_content, indent=2))
    print("\n" + "="*50 + "\n")
except Exception as e:
    print(f"Error getting song file: {e}")

# 获取一个示例日志文件
try:
    log_response = s3.get_object(Bucket='udacity-dend', 
                                Key='log-data/2018/11/2018-11-04-events.json')
    # 读取原始内容并按行分割
    log_content = log_response['Body'].read().decode('utf-8')
    # 每行都是一个独立的 JSON 对象
    log_records = [json.loads(line) for line in log_content.strip().split('\n')]
    
    print("Example log file content (first record):")
    print(json.dumps(log_records[0], indent=2))  # 只显示第一条记录
except Exception as e:
    print(f"Error getting log file: {e}")

Example song file content:
{
  "song_id": "SOBLFFE12AF72AA5BA",
  "num_songs": 1,
  "title": "Scream",
  "artist_name": "Adelitas Way",
  "artist_latitude": null,
  "year": 2009,
  "duration": 213.9424,
  "artist_id": "ARJNIUY12298900C91",
  "artist_longitude": null,
  "artist_location": ""
}


Example log file content (first record):
{
  "artist": null,
  "auth": "Logged In",
  "firstName": "Theodore",
  "gender": "M",
  "itemInSession": 0,
  "lastName": "Smith",
  "length": null,
  "level": "free",
  "location": "Houston-The Woodlands-Sugar Land, TX",
  "method": "GET",
  "page": "Home",
  "registration": 1540306145796.0,
  "sessionId": 154,
  "song": null,
  "status": 200,
  "ts": 1541290555796,
  "userAgent": "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0",
  "userId": "52"
}


## Step 2.1: Write ETL queries for staging tables

## Step 2.2: for Final tables


In [11]:
def load_staging_tables(cur, conn):
    try:
        for query in copy_table_queries:
            print(f"Executing query: {query[:100]}...")  # Print first 100 chars for debugging
            cur.execute(query)
            conn.commit()
        
        # Validate staging data
        cur.execute("SELECT COUNT(*) FROM staging_events")
        events_count = cur.fetchone()[0]
        cur.execute("SELECT COUNT(*) FROM staging_songs")
        songs_count = cur.fetchone()[0]
        print(f"Staging tables loaded successfully:")
        print(f"staging_events: {events_count} rows")
        print(f"staging_songs: {songs_count} rows")
        
    except Exception as e:
        print(f"Error loading staging tables: {e}")
        conn.rollback()

def insert_tables(cur, conn):
    try:
        for query in insert_table_queries:
            print(f"Executing query: {query[:100]}...")  # Print first 100 chars for debugging
            cur.execute(query)
            conn.commit()
        
        # Validate analytics tables
        validation_queries = [
            ("songplays", "SELECT COUNT(*) FROM songplays"),
            ("users", "SELECT COUNT(*) FROM users"),
            ("songs", "SELECT COUNT(*) FROM songs"),
            ("artists", "SELECT COUNT(*) FROM artists"),
            ("time", "SELECT COUNT(*) FROM time")
        ]
        
        print("Analytics tables loaded successfully:")
        for table, query in validation_queries:
            cur.execute(query)
            count = cur.fetchone()[0]
            print(f"{table}: {count} rows")
            
    except Exception as e:
        print(f"Error inserting into tables: {e}")
        conn.rollback()

# Main ETL execution
conn = None
cur = None
try:
    # Connect to Redshift cluster
    conn = psycopg2.connect(
        host=config['CLUSTER']['HOST'],
        dbname=config['CLUSTER']['DB_NAME'],
        user=config['CLUSTER']['DB_USER'],
        password=config['CLUSTER']['DB_PASSWORD'],
        port=int(config['CLUSTER']['DB_PORT'])
    )
    cur = conn.cursor()
    
    print("Connected to Redshift successfully")
    
    # Execute ETL process
    load_staging_tables(cur, conn)
    insert_tables(cur, conn)

except Exception as e:
    print(f"Error during ETL process: {e}")

finally:
    if cur is not None:
        cur.close()
    if conn is not None:
        conn.close()
        print("Database connection closed")

Connected to Redshift successfully
Executing query: 
COPY staging_events
FROM 's3://udacity-dend/log-data'
iam_role 'arn:aws:iam::339713039693:role/dwhR...
Executing query: 
COPY staging_songs
FROM 's3://udacity-dend/song-data'
iam_role 'arn:aws:iam::339713039693:role/dwhR...
Error loading staging tables: Load into table 'staging_songs' failed.  Check 'stl_load_errors' system table for details.

Executing query: 
INSERT INTO songplays (START_TIME, USER_ID, LEVEL, SONG_ID, ARTIST_ID, SESSION_ID, LOCATION, USER_A...
Executing query: 
INSERT INTO users
SELECT DISTINCT userId, firstName, lastName, gender, level
FROM staging_events
WH...
Executing query: 
INSERT INTO songs
SELECT
    DISTINCT song_id, title, artist_id, year, duration
FROM staging_songs
...
Executing query: 
INSERT INTO artists
SELECT
    DISTINCT artist_id, artist_name, artist_location, artist_latitude, a...
Executing query: 
insert into time
SELECT DISTINCT
       TIMESTAMP 'epoch' + (ts/1000) * INTERVAL '1 second' as star

In [12]:
# Check row counts for all tables
table_queries = [
    "SELECT COUNT(*) FROM staging_events",
    "SELECT COUNT(*) FROM staging_songs",
    "SELECT COUNT(*) FROM songplays",
    "SELECT COUNT(*) FROM users",
    "SELECT COUNT(*) FROM songs",
    "SELECT COUNT(*) FROM artists",
    "SELECT COUNT(*) FROM time"
]

try:
    # Connect to Redshift cluster
    conn = psycopg2.connect(
        host=config['CLUSTER']['HOST'],
        dbname=config['CLUSTER']['DB_NAME'],
        user=config['CLUSTER']['DB_USER'],
        password=config['CLUSTER']['DB_PASSWORD'],
        port=int(config['CLUSTER']['DB_PORT'])
    )
    cur = conn.cursor()
    
    # Execute each count query and print results
    for query in table_queries:
        print(f"\n    {query}")
        cur.execute(query)
        result = cur.fetchone()
        print(f"[{result[0]}]")

except Exception as e:
    print(f"Error checking row counts: {e}")

finally:
    if cur is not None:
        cur.close()
    if conn is not None:
        conn.close()


    SELECT COUNT(*) FROM staging_events
[8056]

    SELECT COUNT(*) FROM staging_songs
[0]

    SELECT COUNT(*) FROM songplays
[0]

    SELECT COUNT(*) FROM users
[104]

    SELECT COUNT(*) FROM songs
[0]

    SELECT COUNT(*) FROM artists
[0]

    SELECT COUNT(*) FROM time
[8023]


In [13]:
# simplified the code by removing the non-null checks and focusing only on primary key validation since that's the most critical aspect

def check_table_counts():
    # Dictionary of tables and their corresponding queries
    table_counts = {
        'Staging Events': "SELECT COUNT(*) FROM staging_events",
        'Staging Songs': "SELECT COUNT(*) FROM staging_songs",
        'Songplays (Fact)': "SELECT COUNT(*) FROM songplays",
        'Users (Dim)': "SELECT COUNT(*) FROM users",
        'Songs (Dim)': "SELECT COUNT(*) FROM songs",
        'Artists (Dim)': "SELECT COUNT(*) FROM artists",
        'Time (Dim)': "SELECT COUNT(*) FROM time"
    }

    try:
        # Connect to Redshift cluster
        conn = psycopg2.connect(
            host=config['CLUSTER']['HOST'],
            dbname=config['CLUSTER']['DB_NAME'],
            user=config['CLUSTER']['DB_USER'],
            password=config['CLUSTER']['DB_PASSWORD'],
            port=int(config['CLUSTER']['DB_PORT'])
        )
        cur = conn.cursor()
        
        print("\nTable Row Counts:")
        print("-" * 50)
        
        # Execute each count query and print results
        for table_name, query in table_counts.items():
            cur.execute(query)
            count = cur.fetchone()[0]
            print(f"{table_name:<20} | {count:>10,} rows")
        
        print("-" * 50)

        # Additional validation queries
        print("\nData Quality Checks:")
        print("-" * 50)
        
        # Check for null values in primary keys
        null_checks = {
            'songplays': 'songplay_id',
            'users': 'user_id',
            'songs': 'song_id',
            'artists': 'artist_id',
            'time': 'start_time'
        }
        
        for table, pk in null_checks.items():
            cur.execute(f"SELECT COUNT(*) FROM {table} WHERE {pk} IS NULL")
            null_count = cur.fetchone()[0]
            print(f"Null {pk} in {table:<10} | {null_count:>10} rows")

    except Exception as e:
        print(f"Error during validation: {e}")

    finally:
        if cur is not None:
            cur.close()
        if conn is not None:
            conn.close()

# Run the validation
check_table_counts()


Table Row Counts:
--------------------------------------------------
Staging Events       |      8,056 rows
Staging Songs        |          0 rows
Songplays (Fact)     |          0 rows
Users (Dim)          |        104 rows
Songs (Dim)          |          0 rows
Artists (Dim)        |          0 rows
Time (Dim)           |      8,023 rows
--------------------------------------------------

Data Quality Checks:
--------------------------------------------------
Null songplay_id in songplays  |          0 rows
Error during validation: column "user_id" does not exist in users



# not match

Instruction expected
```
    SELECT COUNT(*) FROM staging_events
[8056]

    SELECT COUNT(*) FROM staging_songs
[14896]

    SELECT COUNT(*) FROM songplays
[6820]

    SELECT COUNT(*) FROM users
[104]

    SELECT COUNT(*) FROM songs
[14896]

    SELECT COUNT(*) FROM artists
[10025]

    SELECT COUNT(*) FROM time
[6813]
```

but we got

```
--------------------------------------------------
Staging Events       |      8,056 rows
Staging Songs        |    385,212 rows
Songplays (Fact)     |      7,267 rows
Users (Dim)          |        105 rows
Songs (Dim)          |    384,955 rows
Artists (Dim)        |     45,262 rows
Time (Dim)           |      8,023 rows
--------------------------------------------------
```

In [14]:
def investigate_data_discrepancies():
    diagnostic_queries = {
        "Staging Songs Data Range": """
            SELECT 
                MIN(song_id) as first_song,
                MAX(song_id) as last_song,
                COUNT(DISTINCT song_id) as unique_songs,
                COUNT(*) as total_rows
            FROM staging_songs;
        """,
        
        "Duplicate Songs Check": """
            SELECT 
                song_id, 
                COUNT(*) as occurrence_count
            FROM staging_songs
            GROUP BY song_id
            HAVING COUNT(*) > 1
            ORDER BY occurrence_count DESC
            LIMIT 5;
        """,
        
        "Songplays Join Analysis": """
            WITH matching_songs AS (
                SELECT se.song, se.artist, ss.song_id, ss.artist_id
                FROM staging_events se
                JOIN staging_songs ss
                ON se.song = ss.title AND se.artist = ss.artist_name
                WHERE se.page = 'NextSong'
            )
            SELECT 
                COUNT(*) as total_matches,
                COUNT(DISTINCT song_id) as unique_songs_matched,
                COUNT(DISTINCT artist_id) as unique_artists_matched
            FROM matching_songs;
        """,
        
        "Artist Duplicates": """
            SELECT 
                artist_id,
                artist_name,
                COUNT(*) as occurrence_count
            FROM staging_songs
            GROUP BY artist_id, artist_name
            HAVING COUNT(*) > 1
            ORDER BY occurrence_count DESC
            LIMIT 5;
        """,
        
        "Time Table Verification": """
            SELECT 
                COUNT(DISTINCT TIMESTAMP 'epoch' + ts/1000 * INTERVAL '1 second') as unique_timestamps,
                COUNT(*) as total_timestamps
            FROM staging_events
            WHERE page = 'NextSong';
        """
    }

    try:
        # Connect to Redshift
        conn = psycopg2.connect(
            host=config['CLUSTER']['HOST'],
            dbname=config['CLUSTER']['DB_NAME'],
            user=config['CLUSTER']['DB_USER'],
            password=config['CLUSTER']['DB_PASSWORD'],
            port=int(config['CLUSTER']['DB_PORT'])
        )
        cur = conn.cursor()

        print("\nData Discrepancy Investigation")
        print("=" * 50)

        for title, query in diagnostic_queries.items():
            print(f"\n{title}")
            print("-" * len(title))
            
            cur.execute(query)
            results = cur.fetchall()
            
            # Get column names
            columns = [desc[0] for desc in cur.description]
            
            # Create DataFrame for prettier display
            df = pd.DataFrame(results, columns=columns)
            print(df.to_string(index=False))
            print("\n")

    except Exception as e:
        print(f"Error running diagnostic queries: {e}")

    finally:
        if cur is not None:
            cur.close()
        if conn is not None:
            conn.close()

# Run the diagnostic queries
investigate_data_discrepancies()


Data Discrepancy Investigation

Staging Songs Data Range
------------------------
first_song last_song  unique_songs  total_rows
      None      None             0           0



Duplicate Songs Check
---------------------
Empty DataFrame
Columns: [song_id, occurrence_count]
Index: []



Songplays Join Analysis
-----------------------
 total_matches  unique_songs_matched  unique_artists_matched
             0                     0                       0



Artist Duplicates
-----------------
Empty DataFrame
Columns: [artist_id, artist_name, occurrence_count]
Index: []



Time Table Verification
-----------------------
 unique_timestamps  total_timestamps
              6813              6820




[Optional] Provide example queries and results for song play analysis. We do not provide you any of these. You, as part of the Data Engineering team were tasked to build this ETL. Thorough study has gone into the star schema, tables, and columns required. The ETL will be effective and provide the data and in the format required. However, as an exercise, it seems almost silly to NOT show SOME examples of potential queries that could be ran by the users. PLEASE use your imagination here. For example, what is the most played song? When is the highest usage time of day by hour for songs? It would not take much to imagine what types of questions that corporate users of the system would find interesting. Including those queries and the answers makes your project far more compelling when using it as an example of your work to people / companies that would be interested. You could simply have a section of sql_queries.py that is executed after the load is done that prints a question and then the answer.

In [15]:
def run_analytical_queries():
    # Dictionary of analytical queries with their descriptions
    analytical_queries = {
        "Most Popular Songs": """
            SELECT s.title, a.name as artist, COUNT(*) as play_count
            FROM songplays sp
            JOIN songs s ON sp.song_id = s.song_id
            JOIN artists a ON sp.artist_id = a.artist_id
            GROUP BY s.title, a.name
            ORDER BY play_count DESC
            LIMIT 5;
        """,
        
        "Peak Usage Hours": """
            SELECT t.hour, COUNT(*) as plays
            FROM songplays sp
            JOIN time t ON sp.start_time = t.start_time
            GROUP BY t.hour
            ORDER BY plays DESC
            LIMIT 24;
        """,
        
        "User Activity by Level": """
            SELECT level, COUNT(DISTINCT user_id) as user_count,
                   COUNT(*) as total_plays,
                   ROUND(COUNT(*) / COUNT(DISTINCT user_id)::float, 2) as avg_plays_per_user
            FROM songplays
            GROUP BY level;
        """,
        
        "Most Active Users": """
            SELECT u.first_name, u.last_name, u.level,
                   COUNT(*) as play_count
            FROM songplays sp
            JOIN users u ON sp.user_id = u.user_id
            GROUP BY u.user_id, u.first_name, u.last_name, u.level
            ORDER BY play_count DESC
            LIMIT 10;
        """,
        
        "Popular Music by Day of Week": """
            SELECT t.weekday,
                   COUNT(*) as total_plays,
                   COUNT(DISTINCT sp.user_id) as unique_users
            FROM songplays sp
            JOIN time t ON sp.start_time = t.start_time
            GROUP BY t.weekday
            ORDER BY t.weekday;
        """
    }

    try:
        # Connect to Redshift
        conn = psycopg2.connect(
            host=config['CLUSTER']['HOST'],
            dbname=config['CLUSTER']['DB_NAME'],
            user=config['CLUSTER']['DB_USER'],
            password=config['CLUSTER']['DB_PASSWORD'],
            port=int(config['CLUSTER']['DB_PORT'])
        )
        cur = conn.cursor()

        print("\nSparkify Music Streaming Analytics")
        print("=" * 50)

        # Execute each analytical query and display results
        for title, query in analytical_queries.items():
            print(f"\n{title}")
            print("-" * len(title))
            
            cur.execute(query)
            results = cur.fetchall()
            
            # Get column names from cursor description
            columns = [desc[0] for desc in cur.description]
            
            # Create DataFrame for prettier display
            df = pd.DataFrame(results, columns=columns)
            print(df.to_string(index=False))
            print("\n")

    except Exception as e:
        print(f"Error running analytical queries: {e}")

    finally:
        if cur is not None:
            cur.close()
        if conn is not None:
            conn.close()

# Run the analytical queries
run_analytical_queries()


Sparkify Music Streaming Analytics

Most Popular Songs
------------------
Empty DataFrame
Columns: [title, artist, play_count]
Index: []



Peak Usage Hours
----------------
Empty DataFrame
Columns: [hour, plays]
Index: []



User Activity by Level
----------------------
Empty DataFrame
Columns: [level, user_count, total_plays, avg_plays_per_user]
Index: []



Most Active Users
-----------------
Error running analytical queries: column u.user_id does not exist



# Last Step: Project Rubric requirements

## Readme.md

The project shows proper use of documentation.

The README file includes a summary of the project, how to run the Python scripts, and an explanation of the files in the repository. Comments are used effectively and each function has a docstring.


## Clean modular Code

Scripts have an intuitive, easy-to-follow structure with code separated into logical functions. Naming for variables and functions follows the PEP8 style guidelines.

# Other Good practice

## Create and Delete Cluster

## IAM roles

## Access Key: Create and Delete
