In [1]:
import pandas as pd
import boto3
import json
import os
import psycopg2

In [None]:
DWH_DB= 'dwh'
DWH_DB_USER= 'dwhuser'
DWH_DB_PASSWORD= 'Passw0rd'
DWH_PORT = 5439
DWH_ENDPOINT = 'dwhcluster.ctfn38mfdsex.us-east-1.redshift.amazonaws.com'
DWH_IAM_ROLE_NAME = 'dwhRole'
DWH_CLUSTER_TYPE = 'multi-node'
DWH_NODE_TYPE = 'dc2.large'
DWH_NUM_NODES = '4'
DWH_CLUSTER_IDENTIFIER = 'dwhCluster'
logdata = 's3://udacity-dend/log_data'
jsonpath = 's3://udacity-dend/log_json_path.json'


key = ''
secret = ''

ec2 = boto3.resource('ec2',
                    region_name='us-east-1',
                    aws_access_key_id=key,
                    aws_secret_access_key=secret)

s3 = boto3.resource('s3',
                    region_name='us-east-1',
                    aws_access_key_id=key,
                    aws_secret_access_key=secret)

iam = boto3.client('iam',
                    region_name='us-east-1',
                    aws_access_key_id=key,
                    aws_secret_access_key=secret)

redshift = boto3.client('redshift',
                    region_name='us-east-1',
                    aws_access_key_id=key,
                    aws_secret_access_key=secret)

In [52]:
sampleDbBucket = s3.Bucket('udacity-dend')

for obj in sampleDbBucket.objects.filter(Prefix='log_data'):
    print(obj)

s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-01-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-02-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-03-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-04-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-05-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-06-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-07-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-08-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-09-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-10-events.json')
s3.ObjectSummary(b

In [8]:


try:
    print('1 creating a new iam role')
    dwhrole = iam.create_role(
                Path='/',
                RoleName=DWH_IAM_ROLE_NAME,
                Description = 'allows redshift clusters to call aws services on your behalf',
                AssumeRolePolicyDocument=json.dumps(
                    {'Statement': [{'Action': 'sts:AssumeRole', 'Effect': 'Allow', 'Principal': {'Service': 'redshift.amazonaws.com'}}],
                    'Version': '2012-10-17'}))
    
except Exception as e:
    print(e)

print('2 attaching policy')

iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                      PolicyArn='arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess')['ResponseMetadata']['HTTPStatusCode']

print('3 get the iam role arn')

roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

print(roleArn)

1 creating a new iam role
An error occurred (EntityAlreadyExists) when calling the CreateRole operation: Role with name dwhRole already exists.
2 attaching policy
3 get the iam role arn
arn:aws:iam::464175583074:role/dwhRole


In [9]:
try:
    response = redshift.create_cluster(
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),
    
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
    
        IamRoles=[roleArn])

except Exception as e:
    print(e)



In [17]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

Unnamed: 0,Key,Value
0,ClusterIdentifier,dwhcluster
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,dwhuser
4,DBName,dwh
5,Endpoint,"{'Address': 'dwhcluster.ctfn38mfdsex.us-east-1.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-0c9d4232b8cf237d3
7,NumberOfNodes,4


In [9]:
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)

DWH_ENDPOINT ::  dwhcluster.ctfn38mfdsex.us-east-1.redshift.amazonaws.com
DWH_ROLE_ARN ::  arn:aws:iam::464175583074:role/dwhRole


In [15]:
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    
    defaultSg.authorize_ingress(
        GroupName= defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)

ec2.SecurityGroup(id='sg-0f9046cab86e8ce42')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


In [10]:
%load_ext sql

The sql extension is already loaded. To reload it, use:
  %reload_ext sql


In [12]:
#conn_string="postpresql://{}:{}@{}:{}/{}".format(DWH_DB_USER,DWH_DB_PASSWORD,DWH_ENDPOINT,DWH_PORT,DWH_DB)

conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(DWH_ENDPOINT,DWH_DB,DWH_DB_USER,DWH_DB_PASSWORD,DWH_PORT))

cur = conn.cursor()

#print(conn_string)

conn.commit()

conn.set_session(autocommit=True)


In [104]:
query = 'drop table if exists log_data'
try:
    rows = cur.execute(query)
except Exception as e:
    print(e)

In [105]:
query = 'create table log_data'
query = query + '(artist varchar(150),\
    auth              varchar(10) not null,\
    firstName         varchar(15),\
    gender            varchar(1),\
    itemInSession     int not null,\
    lastName          varchar(15),\
    length            double precision,\
    level             varchar(4) not null,\
    location          varchar(50),\
    method            varchar(3) not null,\
    page              varchar(20) not null,\
    registration      bigint,\
    sessionId         int not null,\
    song              varchar(175),\
    status            int not null,\
    ts                timestamp not null,\
    userAgent         varchar(150),\
    userId            int)'

try:
    cur.execute(query)
except Exception as e:
    print(e)

In [106]:
staging_events_copy = "copy log_data\
                        from 's3://udacity-dend/log_data'\
                        credentials 'aws_iam_role=arn:aws:iam::464175583074:role/dwhRole'\
                        json 's3://udacity-dend/log_json_path.json'\
                        region 'us-west-2'\
                        timeformat as 'epochmillisecs'"

cur.execute(staging_events_copy)

In [126]:
query = 'drop table if exists song_data'
try:
    rows = cur.execute(query)
except Exception as e:
    print(e)
    
query = 'create table song_data'
query = query + "(num_songs            integer not null,\
    artist_id            varchar(20) not null,\
    artist_latitiude     double precision,\
    artist_longitude     double precision,\
    artist_location      varchar(225),\
    artist_name          varchar(200) not null,\
    song_id              varchar(20) not null,\
    title                varchar(175) not null,\
    duration             double precision not null,\
    year                 integer not null)"

try:
    cur.execute(query)
except Exception as e:
    print(e)

In [127]:
staging_songs_copy = "copy song_data\
                        from 's3://udacity-dend/song_data'\
                        credentials 'aws_iam_role=arn:aws:iam::464175583074:role/dwhRole'\
                        json 'auto'\
                        region 'us-west-2'"

try:
    cur.execute(staging_songs_copy)
except Exception as e:
    print(e)


In [3]:
redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER, SkipFinalClusterSnapshot=True)

{'Cluster': {'ClusterIdentifier': 'dwhcluster',
  'NodeType': 'dc2.large',
  'ClusterStatus': 'deleting',
  'MasterUsername': 'dwhuser',
  'DBName': 'dwh',
  'Endpoint': {'Address': 'dwhcluster.ctfn38mfdsex.us-east-1.redshift.amazonaws.com',
   'Port': 5439},
  'ClusterCreateTime': datetime.datetime(2024, 4, 26, 21, 12, 46, 591000, tzinfo=tzlocal()),
  'AutomatedSnapshotRetentionPeriod': 1,
  'ClusterSecurityGroups': [],
  'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-0f9046cab86e8ce42',
    'Status': 'active'}],
  'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
    'ParameterApplyStatus': 'in-sync'}],
  'ClusterSubnetGroupName': 'default',
  'VpcId': 'vpc-0c9d4232b8cf237d3',
  'AvailabilityZone': 'us-east-1f',
  'PreferredMaintenanceWindow': 'tue:04:30-tue:05:00',
  'PendingModifiedValues': {},
  'ClusterVersion': '1.0',
  'AllowVersionUpgrade': True,
  'NumberOfNodes': 4,
  'PubliclyAccessible': True,
  'Encrypted': False,
  'Tags': [],
  'EnhancedVpcRout

In [79]:
error = "SELECT errors.tbl, info.table_id::integer, info.table_id, * FROM stl_load_errors errors INNER JOIN svv_table_info info ON errors.tbl = info.table_id LIMIT 2;"
error2 = "select * from stl_load_errors"
try:
    a= cur.execute(error2)
except Exception as e:
    print(e)
print(a)

None


In [None]:
insert_t = ("""insert into songplay (songplay_id, start_time, user_id, level, song_id, artist_id,                                                     session_id, location, user_agent)
    select e.registration,
           e.ts time(),
           e.userId,
           e.level,
           s.song_id,
           s.artist_id,
           e.sessionId,
           e.location,
           e.userAgent
    from events e
    join songs s on (s.artist_name=e.artist)
    where e.page='NextSong';
""")

try:
    a= cur.execute(error2)
except Exception as e:
    print(e)
print(a)