# API Binance


https://api.binance.com/api/v1/ticker/24hr

## Installing Libraries

In [1]:
!pip install psycopg2-binary pandas requests

Defaulting to user installation because normal site-packages is not writeable


### Set Environment variables.

In [2]:
import os

with open("password_redshift.txt",'r') as f:
    password_redshift = f.read()
    
os.environ['DBNAME_REDSHIFT'] = "data-engineer-database"
os.environ['SCHEMA_NAME_REDSHIFT'] = "mateobelossi_coderhouse"
os.environ['USER_REDSHIFT'] = "mateobelossi_coderhouse"
os.environ['PASS_REDSHIFT'] = password_redshift
os.environ['HOST_REDSHIFT'] = "data-engineer-cluster.cyhh5bfevlmn.us-east-1.redshift.amazonaws.com"
os.environ['PORT_REDSHIFT'] = "5439"
os.environ['TABLE_NAME_REDSHIFT'] = "binance_coins"

#### 1) requests to api binance https://api.binance.com/api/v1/ticker/24hr
#### 2) Include a column "created_at" to df in the obtained results. (symbol and created_at is the composite key)
#### 3) Clean data and drop duplicates from the response received from the Binance API.
#### 4) Save the results in a CSV file.
#### 5) Delete from Redshift any rows where created_at equals the current day of execution to prevent duplicates.
#### 6) Insert data from API to into Redshift.

In [3]:
import psycopg2
from psycopg2 import DatabaseError
from psycopg2 import OperationalError
from psycopg2.extras import execute_values
import pandas as pd
from pandas.io.json import json_normalize
import logging, os, requests, json
from datetime import datetime, timedelta

DBNAME_REDSHIFT=os.getenv('DBNAME_REDSHIFT')
SCHEMA_NAME_REDSHIFT=os.getenv('SCHEMA_NAME_REDSHIFT')
USER_REDSHIFT=os.getenv('USER_REDSHIFT')
PASS_REDSHIFT=os.getenv('PASS_REDSHIFT')
HOST_REDSHIFT=os.getenv('HOST_REDSHIFT')
PORT_REDSHIFT=os.getenv('PORT_REDSHIFT')
TABLE_NAME_REDSHIFT=os.getenv('TABLE_NAME_REDSHIFT')

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.info("Libraries calls ok")
CUR_TIME = datetime.now().strftime("%Y%m%d_%H%M%S")
logging.info(f"Start Time : {CUR_TIME} ")

INSERT_QUERY = f"""
        INSERT INTO {SCHEMA_NAME_REDSHIFT}.{TABLE_NAME_REDSHIFT} (
            symbol, priceChange, priceChangePercent, weightedAvgPrice, prevClosePrice, lastPrice, lastQty,
            bidPrice, bidQty, askPrice, askQty, openPrice, highPrice, lowPrice, volume, quoteVolume,
            openTime, closeTime, firstId, lastId, count, created_at
        ) VALUES %s;
"""

CREATE_TABLE =f"""
CREATE TABLE {USER_REDSHIFT}.{TABLE_NAME_REDSHIFT} (
    symbol VARCHAR(256),
    priceChange FLOAT,
    priceChangePercent FLOAT,
    weightedAvgPrice FLOAT,
    prevClosePrice FLOAT,
    lastPrice FLOAT,
    lastQty FLOAT,
    bidPrice FLOAT,
    bidQty FLOAT,
    askPrice FLOAT,
    askQty FLOAT,
    openPrice FLOAT,
    highPrice FLOAT,
    lowPrice FLOAT,
    volume FLOAT,
    quoteVolume FLOAT,
    openTime TIMESTAMP,
    closeTime TIMESTAMP,
    firstId BIGINT,
    lastId BIGINT,
    count BIGINT,
    created_at TIMESTAMP
);
"""

def connect_to_redshift(host_redshift, dbname_redshift, user_redshift, pass_redshift, port_redshift=5439):
    """
    Establishes a connection to a Redshift database using the provided connection parameters.

    Parameters:
    host_redshift (str): The hostname or IP address of the Redshift server.
    dbname_redshift (str): The name of the database to connect to.
    user_redshift (str): The username used to authenticate with Redshift.
    pass_redshift (str): The password used to authenticate with Redshift.
    port_redshift (str or int): The port number on which the Redshift server is listening. Default is 5439.

    Returns:
    Connection: A psycopg2 connection object that can be used to interact with the database.

    Raises:
    OperationalError: An error from the database if the connection fails, which is caught and logged.
    """
    logging.info("Connecting to Redshift...")
    try:
        conn = psycopg2.connect(
            dbname=dbname_redshift,
            user=user_redshift,
            password=pass_redshift,
            host=host_redshift,
            port=port_redshift
        )
        logging.info("Connection established")
        return conn
    except OperationalError as e:
        logging.error(f"Failed to connect to Redshift: {e}")
        raise
        
def execute_query(conn, query):
    """
    Executes a SQL query using the provided database connection and commits the changes.

    Parameters:
    conn (Connection): A psycopg2 connection object representing the connection to the database.
    query (str): The SQL query to be executed.

    Returns:
    None: The function returns None, indicating the query was executed and committed.

    Raises:
    DatabaseError: If an error occurs during the query execution. The error is caught, logged, and re-raised to the caller.
    """
    cursor = None
    try:
        cursor = conn.cursor()
        cursor.execute(query)
        conn.commit()
        logging.info("Query executed and committed successfully.")
    except DatabaseError as error:
        conn.rollback()
        logging.error(f"Failed to execute query: {error}")
        raise
    finally:
        if cursor:
            cursor.close()

def check_table_if_exists(conn, schema_name, table_name):
    """
    Checks if a specified table exists within a given schema in the database using a database connection.

    Parameters:
    conn (Connection): The database connection from which to create a cursor.
    schema_name (str): The name of the schema in the database where the table is located.
    table_name (str): The name of the table to check for existence.

    Returns:
    bool: True if the table exists, False otherwise.

    Raises:
    DatabaseError: An error from the database if the query execution fails, should be caught and handled by the caller.
    """
    logging.info("Checking if table exists...")
    try:
        cur = conn.cursor()
        cur.execute(f"""
            SELECT EXISTS (
                SELECT 1 FROM information_schema.tables WHERE
                table_schema = '{schema_name}' AND table_name = '{table_name}'
            )
        """)
        table_exists = cur.fetchone()[0]
        return table_exists
    except DatabaseError as e:
        logging.error(f"Error checking if table exists: {e}")
        raise
    finally:
        cur.close()

def truncate_table_if_exists(schema_name_redshift, table_name_redshift):
    """
    Checks if a specified table exists within the Redshift database and truncates it if it does.

    Parameters:
    schema_name_redshift (str): The schema name in the Redshift database where the table resides.
    table_name_redshift (str): The table name to check and potentially truncate.

    Returns:
    None

    Raises:
    Exception: Any exception raised during database operations is caught, logged, and re-raised.
    """
    conn = None
    try:
        conn = connect_to_redshift(HOST_REDSHIFT, DBNAME_REDSHIFT, USER_REDSHIFT, PASS_REDSHIFT, PORT_REDSHIFT)
        table_exists = check_table_if_exists(conn, schema_name_redshift, table_name_redshift)
        if table_exists:
            logging.info(f"Truncating table {schema_name_redshift}.{table_name_redshift}")
            query = f"truncate table {schema_name_redshift}.{table_name_redshift}"
            execute_query(conn, query)
        else:
            logging.info(f"Table {schema_name_redshift}.{table_name_redshift} does not exist.")
    except Exception as e:
        logging.error(f"An error occurred while attempting to truncate table: {e}")
        raise
    finally:
        if conn:
            conn.close()
    
def create_table_if_not_exists(schema_name_redshift,table_name_redshift):
    """
    Connects to a Redshift database and creates a table if it does not already exist in the specified schema.

    Parameters:
    schema_name_redshift (str): The schema name in the Redshift database where the table should be located.
    table_name_redshift (str): The name of the table to be created if it does not exist.

    Globals:
    - HOST_REDSHIFT, DBNAME_REDSHIFT, USER_REDSHIFT, PASS_REDSHIFT, PORT_REDSHIFT: Database connection parameters.
    - CREATE_TABLE (str): SQL command string to create the table, assumed to be globally defined.

    Returns:
    None: This function does not return any value.

    Raises:
    Exception: Captures any exceptions related to database connections or SQL execution, logs them, and re-raises.
    """
    conn = None
    cur = None
    try:
        conn = connect_to_redshift(HOST_REDSHIFT, DBNAME_REDSHIFT, USER_REDSHIFT, PASS_REDSHIFT, PORT_REDSHIFT)
        cur = conn.cursor()
        table_exists = check_table_if_exists(conn, schema_name_redshift, table_name_redshift)
        logging.info(f"Table exists: {table_exists}")
        if not table_exists:
            logging.info(f"Creating table {schema_name_redshift}.{table_name_redshift}")
            cur.execute(CREATE_TABLE)
            conn.commit()
        else:
            logging.info(f"Table {schema_name_redshift}.{table_name_redshift} already exists.")
    except Exception as e:
        logging.error(f"An error occurred while creating table: {e}")
        if conn:
            conn.rollback()
        raise
    finally:
        if cur:
            cur.close()
        if conn:
            conn.close()

def insert_data_into_redshift(conn, schema_name, table_name, df, insert_query):
    """
    Inserts data into a Redshift table.

    Parameters:
        conn (psycopg2.connection): The connection object for the Redshift database.
        schema_name (str): The name of the schema where the table resides.
        table_name (str): The name of the table to insert data into.
        data (list of tuples): The data to be inserted into the table. Each tuple represents a row.

    Returns:
        None
    """
    cur = None
    try:
        data_to_insert = [tuple(row) for row in df.values]
        cur = conn.cursor()
        execute_values(
            cur,
            insert_query,
            data_to_insert,
            page_size=len(data_to_insert)
        )
        conn.commit()
        logging.info("Data inserted into Redshift successfully.")
    except OperationalError as e:
        logging.error(f"Operational Error: {e}")
    except Exception as e:
        logging.error(f"An unexpected error occurred: {e}")
    finally:
        if cur is not None:
            cur.close()
            logging.info("Cursor closed.")
        if conn is not None:
            conn.close()
            logging.info("Database connection closed.")
            
def clean_data_and_drop_duplicates_if_exits(df):
    """
    Cleans the DataFrame containing data received from an API and removes duplicates based on the 'symbol' column,
    keeping only the last entry. This function also standardizes the format of date columns and converts all data
    types according to expected schema before insertion into a Redshift database.

    Parameters:
        df (pandas.DataFrame): DataFrame containing the data received from the API.

    Returns:
        pandas.DataFrame: Cleaned DataFrame ready for insertion into Redshift table.
    """
    df['openTime'] = pd.to_datetime(df['openTime'], unit='ms')
    df['closeTime'] = pd.to_datetime(df['closeTime'], unit='ms')
    df['created_at'] = pd.to_datetime(df['created_at'])
    
    expected_data_types = {
        'symbol': str, 'priceChange': float, 'priceChangePercent': float, 'weightedAvgPrice': float,
        'prevClosePrice': float, 'lastPrice': float, 'lastQty': float, 'bidPrice': float, 'bidQty': float,
        'askPrice': float, 'askQty': float, 'openPrice': float, 'highPrice': float, 'lowPrice': float,
        'volume': float, 'quoteVolume': float, 'openTime': str, 'closeTime': str,
        'firstId': int, 'lastId': int, 'count': int, 'created_at': str
    }
    
    for column, dtype in expected_data_types.items():
        if column in df.columns:
            if dtype == str:
                df[column] = df[column].astype(str)
            else:
                df[column] = df[column].astype(dtype)
                
    important_columns = ['symbol', 'priceChange', 'priceChangePercent', 'weightedAvgPrice', 'prevClosePrice', 
                         'lastPrice', 'lastQty', 'bidPrice', 'bidQty', 'askPrice', 'askQty', 'openPrice', 
                         'highPrice', 'lowPrice', 'volume', 'quoteVolume', 'openTime', 'closeTime', 
                         'firstId', 'lastId', 'count', 'created_at']
    df.dropna(subset=important_columns, inplace=True)
    df.drop_duplicates(subset='symbol', keep='last', inplace=True)
    
    return df

def main():
    try:
        logging.info("Making requests to the Binance API.")
        r = requests.get('https://api.binance.com/api/v1/ticker/24hr')

        if r.status_code == 200:
            logging.info(f"Connection successful; Requests Status: {r.status_code}")
            result = r.json()
            result_df = pd.json_normalize(result)

            logging.info("Adding a column named 'created_at' with the current time.")
            current_date = datetime.now()
            created_at = current_date.strftime("%Y-%m-%d")
            created_at_tomorrow = (current_date + timedelta(days=1)).strftime("%Y-%m-%d")
            result_df['created_at'] = created_at

            logging.info("Cleaning data form dataframe...")
            result_df = clean_data_and_drop_duplicates_if_exits(result_df)
            
            logging.info("Saving data from Binance to a CSV file.")
            name_csv = 'mercado_binance.csv'
            file_path = f"./binance/{datetime.now().strftime('%Y%m%d_%H%M%S')}_{name_csv}"
            result_df.to_csv(file_path, index=False)
            
            conn = connect_to_redshift(HOST_REDSHIFT, DBNAME_REDSHIFT, USER_REDSHIFT, PASS_REDSHIFT, PORT_REDSHIFT)
            
            query = f"""
                DELETE FROM {SCHEMA_NAME_REDSHIFT}.{TABLE_NAME_REDSHIFT}
                WHERE created_at >= '{created_at}'::timestamp and
                created_at < '{created_at_tomorrow}'::timestamp
            """
            logging.info(f"{query}")
            execute_query(conn, query)

            logging.info("Loading data from Binance to Redshift.")
            
            create_table_if_not_exists(SCHEMA_NAME_REDSHIFT,TABLE_NAME_REDSHIFT)
            #truncate_table_if_exists(SCHEMA_NAME_REDSHIFT, TABLE_NAME_REDSHIFT)            
            insert_data_into_redshift(conn, SCHEMA_NAME_REDSHIFT, TABLE_NAME_REDSHIFT, result_df, INSERT_QUERY)

        else:
            logging.info(f"Unable to connect to Binance; Requests Status: {r.status_code}")

    except OperationalError as e:
        logging.error(f"Operational Error: {e}")

    except Exception as e:
        logging.error(f"An unexpected error occurred: {e}")
    
if __name__ == '__main__':
    main()
    CUR_TIME = datetime.now().strftime("%Y%m%d_%H%M%S")
    logging.info(f"End Time : {CUR_TIME} ")
    logging.info('PROCESS_EXECUTED_SUCCESSFULLY')

2024-04-25 02:35:09,862 - INFO - Libraries calls ok
2024-04-25 02:35:09,864 - INFO - Start Time : 20240425_023509 
2024-04-25 02:35:09,867 - INFO - Making requests to the Binance API.
2024-04-25 02:35:10,590 - INFO - Connection successful; Requests Status: 200
2024-04-25 02:35:10,633 - INFO - Adding a column named 'created_at' with the current time.
2024-04-25 02:35:10,636 - INFO - Cleaning data form dataframe...
2024-04-25 02:35:10,678 - INFO - Saving data from Binance to a CSV file.
2024-04-25 02:35:10,827 - INFO - Connecting to Redshift...
2024-04-25 02:35:12,571 - INFO - Connection established
2024-04-25 02:35:12,573 - INFO - 
                DELETE FROM mateobelossi_coderhouse.binance_coins
                WHERE created_at >= '2024-04-25'::timestamp and
                created_at < '2024-04-26'::timestamp
            
2024-04-25 02:35:13,380 - INFO - Query executed and committed successfully.
2024-04-25 02:35:13,382 - INFO - Loading data from Binance to Redshift.
2024-04-25 02:35:

## Reading data from Redshift.

In [4]:
conn = connect_to_redshift(HOST_REDSHIFT, DBNAME_REDSHIFT, USER_REDSHIFT, PASS_REDSHIFT, PORT_REDSHIFT)
select_query = f"""
SELECT * FROM {SCHEMA_NAME_REDSHIFT}.{TABLE_NAME_REDSHIFT}
"""
df = pd.read_sql(select_query, conn)
conn.close()
df.head()

2024-04-25 02:35:19,090 - INFO - Connecting to Redshift...
2024-04-25 02:35:20,824 - INFO - Connection established
  df = pd.read_sql(select_query, conn)


Unnamed: 0,symbol,pricechange,pricechangepercent,weightedavgprice,prevcloseprice,lastprice,lastqty,bidprice,bidqty,askprice,...,highprice,lowprice,volume,quotevolume,opentime,closetime,firstid,lastid,count,created_at
0,LTCBTC,0.0,0.0,0.001306,0.001292,0.001292,3.26,0.001291,82.431,0.001292,...,0.001331,0.001282,100752.454,131.579471,2024-04-24 02:35:10.251,2024-04-25 02:35:10.251,96975620,96997269,21650,2024-04-25
1,NEOBTC,-2.4e-06,-0.858,0.000279,0.00028,0.000277,19.03,0.000277,4.34,0.000278,...,0.000286,0.000274,41997.06,11.728278,2024-04-24 02:35:09.960,2024-04-25 02:35:09.960,46432656,46435294,2639,2024-04-25
2,EOSETH,3.27e-05,12.534,0.00027,0.00026,0.000294,3.6,0.000293,562.9,0.000294,...,0.000305,0.00026,1268941.2,343.019914,2024-04-24 02:35:10.032,2024-04-25 02:35:10.032,23622000,23626772,4773,2024-04-25
3,BNTETH,3e-07,0.131,0.000233,0.000229,0.000229,10.7,0.000229,152.5,0.000229,...,0.000237,0.000228,41973.5,9.788538,2024-04-24 02:34:56.199,2024-04-25 02:34:56.199,2324762,2325123,362,2024-04-25
4,GASBTC,-1.3e-06,-1.548,8.5e-05,8.4e-05,8.3e-05,16.6,8.3e-05,295.4,8.3e-05,...,8.7e-05,8.2e-05,20342.8,1.724424,2024-04-24 02:35:06.252,2024-04-25 02:35:06.252,22297333,22298369,1037,2024-04-25


In [5]:
df.count()

symbol                2611
pricechange           2611
pricechangepercent    2611
weightedavgprice      2611
prevcloseprice        2611
lastprice             2611
lastqty               2611
bidprice              2611
bidqty                2611
askprice              2611
askqty                2611
openprice             2611
highprice             2611
lowprice              2611
volume                2611
quotevolume           2611
opentime              2611
closetime             2611
firstid               2611
lastid                2611
count                 2611
created_at            2611
dtype: int64