In [24]:

import pandas as pd
import boto3
from dotenv import load_dotenv
import os
from botocore.exceptions import ClientError
import json
import psycopg2
import time
%load_ext sql


def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])


load_dotenv()

KEY                    = os.getenv('AWS_ACCESS_KEY_ID')
SECRET                 = os.getenv('AWS_SECRET_ACCESS_KEY')

DWH_CLUSTER_TYPE       = os.getenv("DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = os.getenv("DWH_NUM_NODES")
DWH_NODE_TYPE          = os.getenv("DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = os.getenv("DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = os.getenv("DWH_DB")
DWH_DB_USER            = os.getenv("DWH_DB_USER")
DWH_DB_PASSWORD        = os.getenv("DWH_DB_PASSWORD")
DWH_PORT               = os.getenv("DWH_PORT")

DWH_IAM_ROLE_NAME      = os.getenv("DWH_IAM_ROLE_NAME")

# (DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)
print("\n\n")
print(">>> Starting Redshift Cluster!\n>>> This is the configuration of redshift cluster!")
print("\n")

print(pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
             }))

print("\n\n")


ec2 = boto3.resource('ec2',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )

s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                   )

iam = boto3.client('iam',aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET,
                     region_name='us-west-2'
                  )

redshift = boto3.client('redshift',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )
try:
    print("1.1 Creating a new IAM Role") 
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )    
except Exception as e:
    print(e)
    
#1.1 Create the role, 
try:
    print("1.1 Creating a new IAM Role") 
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )    
except Exception as e:
    print(e)
    
    
print("1.2 Attaching Policy")

iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

print("1.3 Get the IAM role ARN")
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

print(f"\n\n>>> {roleArn}\n\n")

# try:
#     response = redshift.create_cluster(        
#         #HW
#         ClusterType=DWH_CLUSTER_TYPE,
#         NodeType=DWH_NODE_TYPE,
#         NumberOfNodes=int(DWH_NUM_NODES),

#         #Identifiers & Credentials
#         DBName=DWH_DB,
#         ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
#         MasterUsername=DWH_DB_USER,
#         MasterUserPassword=DWH_DB_PASSWORD,
        
#         #Roles (for s3 access)
#         IamRoles=[roleArn]  )
# except Exception as e:
#     print(f"\n >>> {e} \n\n")
    
    
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]

cluster_df = prettyRedshiftProps(myClusterProps)

check_status = cluster_df['Value'][2]
status = 'available'



while True:
    time.sleep(5)
    myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
    cluster_df = prettyRedshiftProps(myClusterProps)
    check_status = cluster_df['Value'][2]
    if check_status == status:
        DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
        DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
        print("\n\n>>> Cluster successfully created!\n")
        print("\n>>> DWH_ENDPOINT :: \n", DWH_ENDPOINT)
        print("\n>>> DWH_ROLE_ARN :: \n", DWH_ROLE_ARN)
        break
    print('Cluster not up yet')



conn_string="\n\npostgresql://{}:{}@{}:{}/{}\n\n".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
print(f"\n\n{conn_string}\n\n")
%sql $conn_string

import boto3

s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                     )


sampleDbBucket =  s3.Bucket("tien-duong1151")
for obj in sampleDbBucket.objects.filter(Prefix="data/"):
     print(f"\n\n{obj}!\n\n")
    

The sql extension is already loaded. To reload it, use:
  %reload_ext sql



>>> Starting Redshift Cluster!
>>> This is the configuration of redshift cluster!


                    Param       Value
0  DWH_CLUSTER_TYPE        multi-node
1  DWH_NUM_NODES           4         
2  DWH_NODE_TYPE           dc2.large 
3  DWH_CLUSTER_IDENTIFIER  dwhCluster
4  DWH_DB                  dwh       
5  DWH_DB_USER             dwhuser   
6  DWH_DB_PASSWORD         Passw0rd  
7  DWH_PORT                5439      
8  DWH_IAM_ROLE_NAME       dwhRole   



1.1 Creating a new IAM Role
An error occurred (EntityAlreadyExists) when calling the CreateRole operation: Role with name dwhRole already exists.
1.1 Creating a new IAM Role
An error occurred (EntityAlreadyExists) when calling the CreateRole operation: Role with name dwhRole already exists.
1.2 Attaching Policy
1.3 Get the IAM role ARN


>>> arn:aws:iam::539761204517:role/dwhRole




  pd.set_option('display.max_colwidth', -1)




>>> Cluster successfully created!


>>> DWH_ENDPOINT :: 
 dwhcluster.cmjnfq1cdb24.us-west-2.redshift.amazonaws.com

>>> DWH_ROLE_ARN :: 
 arn:aws:iam::539761204517:role/dwhRole




postgresql://dwhuser:Passw0rd@dwhcluster.cmjnfq1cdb24.us-west-2.redshift.amazonaws.com:5439/dwh






s3.ObjectSummary(bucket_name='tien-duong1151', key='data/')!




s3.ObjectSummary(bucket_name='tien-duong1151', key='data/binance_btc.csv')!




s3.ObjectSummary(bucket_name='tien-duong1151', key='data/coin_exchange_info.csv')!




s3.ObjectSummary(bucket_name='tien-duong1151', key='data/coin_market_info.csv')!




s3.ObjectSummary(bucket_name='tien-duong1151', key='data/coins_data.csv')!




s3.ObjectSummary(bucket_name='tien-duong1151', key='data/demo_file.csv')!




In [28]:
demo_stage_table_drop = 'DROP TABLE IF EXISTS staging_demo'
demo_stage_table_create = ("""
CREATE TABLE IF NOT EXISTS staging_demo (
name VARCHAR
,base VARCHAR 
,quote VARCHAR
,price VARCHAR);
""")
my_query = [demo_stage_table_drop, demo_stage_table_create]

conn = psycopg2.connect(f"host={DWH_ENDPOINT} dbname={DWH_DB} user={DWH_DB_USER} password={DWH_DB_PASSWORD} port={DWH_PORT}")
cur = conn.cursor()
for i in my_query:
    cur.execute(i)
    conn.commit()
conn.close()

        

In [29]:
my_bucket = 's3://tien-duong1151/data/demo_file.csv'

staging_demo = f"""
COPY staging_demo 
FROM '{my_bucket}'
CREDENTIALS 'aws_iam_role={DWH_ROLE_ARN}'
COMPUPDATE OFF region 'us-west-2'
FORMAT AS csv 'auto'
"""



%sql $staging_demo

 * postgresql://dwhuser:***@dwhcluster.cmjnfq1cdb24.us-west-2.redshift.amazonaws.com:5439/dwh
(psycopg2.errors.SyntaxError) syntax error at or near "'auto'"
LINE 1: ...Role' COMPUPDATE OFF region 'us-west-2' FORMAT AS csv 'auto'
                                                                 ^

[SQL: COPY staging_demo FROM 's3://tien-duong1151/data/demo_file.csv' CREDENTIALS 'aws_iam_role=arn:aws:iam::539761204517:role/dwhRole' COMPUPDATE OFF region 'us-west-2' FORMAT AS csv 'auto']
(Background on this error at: https://sqlalche.me/e/14/f405)


In [36]:
my_bucket = 's3://tien-duong1151/data/demo_file.csv'
DWH_ROLE_ARN = "arn:aws:iam::539761204517:role/dwhRole"
staging_demo = """
COPY staging_demo FROM 's3://tien-duong1151/data/demo_file.csv' 
CREDENTIALS 'aws_iam_role=arn:aws:iam::539761204517:role/dwhRole'
IGNOREHEADER 1
delimiter ',' 
blanksasnull
;
"""



%sql $staging_demo

 * postgresql://dwhuser:***@dwhcluster.cmjnfq1cdb24.us-west-2.redshift.amazonaws.com:5439/dwh
Done.


[]

In [120]:
dft = pd.read_csv('data/coin_exchange_info.csv')
dft.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 290 entries, 0 to 289
Data columns (total 13 columns):
 #   Column          Non-Null Count  Dtype  
---  ------          --------------  -----  
 0   id              290 non-null    int64  
 1   name            290 non-null    object 
 2   name_id         290 non-null    object 
 3   url             290 non-null    object 
 4   country         290 non-null    object 
 5   date_live       220 non-null    object 
 6   date_added      290 non-null    object 
 7   usdt            290 non-null    int64  
 8   fiat            290 non-null    int64  
 9   auto            290 non-null    object 
 10  volume_usd      290 non-null    float64
 11  udate           251 non-null    object 
 12  volume_usd_adj  290 non-null    float64
dtypes: float64(2), int64(3), object(8)
memory usage: 29.6+ KB


In [119]:
dft = pd.read_csv('data/coin_data.csv')
dft.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7000 entries, 0 to 6999
Data columns (total 16 columns):
 #   Column              Non-Null Count  Dtype  
---  ------              --------------  -----  
 0   id                  7000 non-null   int64  
 1   symbol              7000 non-null   object 
 2   name                7000 non-null   object 
 3   nameid              7000 non-null   object 
 4   rank                7000 non-null   float64
 5   price_usd           7000 non-null   float64
 6   percent_change_24h  7000 non-null   float64
 7   percent_change_1h   6984 non-null   float64
 8   percent_change_7d   6994 non-null   float64
 9   price_btc           7000 non-null   float64
 10  market_cap_usd      7000 non-null   object 
 11  volume24            7000 non-null   float64
 12  volume24a           4496 non-null   float64
 13  csupply             7000 non-null   object 
 14  tsupply             5867 non-null   float64
 15  msupply             4129 non-null   float64
dtypes: flo

In [117]:
dft = pd.read_csv('data/coin_market_info.csv')
dft.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 992 entries, 0 to 991
Data columns (total 8 columns):
 #   Column      Non-Null Count  Dtype  
---  ------      --------------  -----  
 0   name        974 non-null    object 
 1   base        974 non-null    object 
 2   quote       974 non-null    object 
 3   price       974 non-null    float64
 4   price_usd   974 non-null    float64
 5   volume      974 non-null    float64
 6   volume_usd  974 non-null    float64
 7   time        974 non-null    object 
dtypes: float64(4), object(4)
memory usage: 62.1+ KB


In [104]:
dft.to_csv('data/.csv', index=False)

In [56]:
my_credentials = '123456'

In [62]:
a = f'my_credential = {my_credentials}'
b = 'moreoutput'
c = 'something more then this'

In [68]:
from contextlib import redirect_stdout

with open('.env_db', 'w') as file:
    with redirect_stdout(file):
        # print(f'DWH_ENDPOINT={DWH_ENDPOINT}')
        # print(f'DWH_ROLE_ARN={DWH_ROLE_ARN}')
        print(a)


In [70]:
from dotenv import load_dotenv
import os
import boto3
import pandas as pd
from pyspark.sql import SparkSession

load_dotenv()


client = boto3.client('s3')
resource = boto3.resource('s3')
bucket='tien-duong1151'
s3_file = 'data/binance_btc.csv'

key = os.getenv('AWS_ACCESS_KEY_ID')
secret = os.getenv('AWS_SECRET_ACCESS_KEY')

obj = client.get_object(Bucket=bucket, Key=s3_file)
my_obj = pd.read_csv(obj['Body'])
my_obj.head()
# my_obj.columns = map(lambda x: str(x).capitalize(), my_obj.columns)
# my_obj.rename(columns={'Open':'Open_market'},inplace=True)
# # my_obj.columns = my_obj.columns.str.replace(' ', '_')
# my_obj.to_csv('data/binance_btc.csv', index=False)

# def upload_to_s3(bucketname, local_file_path, s3_file_path):

#     try:
#         s3 = boto3.resource('s3')
#         s3.meta.client.upload_file(local_file_path, bucketname, s3_file_path)
#         print('Success!')
#     except Exception as e:
#         print(f'{e}\n Fail!')
# upload_to_s3(bucket, s3_file,s3_file)


Unnamed: 0,Unix,Date,Symbol,Open_market,High,Low,Close,Volume_btc,Volume_usdt,Tradecount
0,1640138760000,2021-12-22 02:06:00,BTC/USDT,48740.22,48745.96,48727.47,48727.47,2.27206,110730.913517,136
1,1640138700000,2021-12-22 02:05:00,BTC/USDT,48763.11,48763.12,48736.7,48736.73,5.33108,259880.120493,427
2,1640138640000,2021-12-22 02:04:00,BTC/USDT,48778.58,48778.58,48750.37,48763.12,6.87389,335219.036805,389
3,1640138580000,2021-12-22 02:03:00,BTC/USDT,48760.37,48778.58,48746.39,48778.58,10.58951,516291.289638,425
4,1640138520000,2021-12-22 02:02:00,BTC/USDT,48799.99,48800.0,48756.93,48760.37,12.24525,597357.839002,535


In [71]:
my_obj.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1197481 entries, 0 to 1197480
Data columns (total 10 columns):
 #   Column       Non-Null Count    Dtype  
---  ------       --------------    -----  
 0   Unix         1197481 non-null  int64  
 1   Date         1197481 non-null  object 
 2   Symbol       1197481 non-null  object 
 3   Open_market  1197481 non-null  float64
 4   High         1197481 non-null  float64
 5   Low          1197481 non-null  float64
 6   Close        1197481 non-null  float64
 7   Volume_btc   1197481 non-null  float64
 8   Volume_usdt  1197481 non-null  float64
 9   Tradecount   1197481 non-null  int64  
dtypes: float64(6), int64(2), object(2)
memory usage: 91.4+ MB


In [68]:
my_obj

Unnamed: 0,Unix,Date,Symbol,Open_market,High,Low,Close,Volume_btc,Volume_usdt,Tradecount
0,1640138760000,2021-12-22 02:06:00,BTC/USDT,48740.22,48745.96,48727.47,48727.47,2.27206,110730.913517,136
1,1640138700000,2021-12-22 02:05:00,BTC/USDT,48763.11,48763.12,48736.70,48736.73,5.33108,259880.120493,427
2,1640138640000,2021-12-22 02:04:00,BTC/USDT,48778.58,48778.58,48750.37,48763.12,6.87389,335219.036805,389
3,1640138580000,2021-12-22 02:03:00,BTC/USDT,48760.37,48778.58,48746.39,48778.58,10.58951,516291.289638,425
4,1640138520000,2021-12-22 02:02:00,BTC/USDT,48799.99,48800.00,48756.93,48760.37,12.24525,597357.839002,535
...,...,...,...,...,...,...,...,...,...,...
1197476,1567965660000,2019-09-08 18:01:00,BTC/USDT,10000.00,10000.00,10000.00,10000.00,0.00000,0.000000,0
1197477,1567965600000,2019-09-08 18:00:00,BTC/USDT,10000.00,10000.00,10000.00,10000.00,0.00000,0.000000,0
1197478,1567965540000,2019-09-08 17:59:00,BTC/USDT,10000.00,10000.00,10000.00,10000.00,0.00100,10.000000,1
1197479,1567965480000,2019-09-08 17:58:00,BTC/USDT,10000.00,10000.00,10000.00,10000.00,0.00000,0.000000,0


In [25]:
import pandas as pd
# from config import spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
   config("spark.jars.repositories", "https://repos.spark-packages.org/").\
   config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
   enableHiveSupport().getOrCreate()

data = 'data/binance_btc.csv'
df =  spark.read.option('header', True).csv(data)
# df.printSchema()
df = df.withColumnRenamed('Open', 'Open_market')

# dft = pd.read_csv('data/binance_btc.csv')
# dft.info()

In [26]:

df.printSchema()

root
 |-- unix: string (nullable = true)
 |-- date: string (nullable = true)
 |-- symbol: string (nullable = true)
 |-- Open_market: string (nullable = true)
 |-- high: string (nullable = true)
 |-- low: string (nullable = true)
 |-- close: string (nullable = true)
 |-- Volume_BTC: string (nullable = true)
 |-- Volume_USDT: string (nullable = true)
 |-- tradecount: string (nullable = true)

