In [1]:
import pandas as pd 
import requests
import json
from datetime import datetime
import psycopg2
from sqlalchemy import create_engine

In [2]:
def log(message):
    timestamp_format = "%Y-%h-%d-%H:%M:%S"
    now = datetime.now()
    timestamp = now.strftime(timestamp_format)
    with open("logfile.txt", "a") as f:
        f.write(timestamp + ", " + message + "\n")

In [3]:
# Get credentials
with open("config.json") as f:
    config = json.load(f)

In [4]:
def extract_from_api(api):
    # Connect to the API
    response = requests.get(api)

    if response.status_code == 200:
        try:
            data = response.json()
            return data
        except json.JSONDecodeError as e:
            print(f"JSONDecodeError: {e}")
            return None
    else:
        print(f"Request failed with status code {response.status_code}")
        return None

In [5]:
def extract_and_transform():
    key = config["API_KEY"]
    # Symbols to search in the API
    symbols = ["AMZN", "IBM", "GOOGL", "KO", "MSFT", "NFLX", "TSLA"]
    # symbols = ["IBM"]
    # List where the dataframes will be appended
    dataframes = []
    # For each symbol in the list, connect to the API and extract the information
    for symbol in symbols:
        # demo_api = "https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&symbol=IBM&apikey=demo"

        api = f"https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&symbol={symbol}&apikey={key}"
        log(f"Calling api for: {symbol}")
        data = extract_from_api(api)

        # Check if the API retrieved information
        if data != None:
            # Get the last refreshed date
            last_refresh_date = data["Meta Data"]["3. Last Refreshed"]
            # Get the element that corresponds to the last refreshed date
            time_series_data = data["Time Series (Daily)"][last_refresh_date]

            # Create a list with a dictionary with the keys and values for each symbol
            data_list = [
                {
                    "symbol": symbol,
                    "date": last_refresh_date,
                    "open_value": round(float(time_series_data["1. open"]),2),
                    "high_value": round(float(time_series_data["2. high"]),2),
                    "low_value": round(float(time_series_data["3. low"]),2),
                    "close_value": round(float(time_series_data["4. close"]),2),
                    "volume": time_series_data["5. volume"]
                }
            ]

            # Create the dataframe from the list
            df = pd.DataFrame(data_list)
            # Append to the list of dataframes
            dataframes.append(df)
        else:
            log(f"No information found for {symbol}")

    # Check if the list of dataframes is not empty
    if len(dataframes) > 0:
        stocks_df = pd.concat(dataframes, ignore_index=True)
        log("Successfully created the DataFrame")
        return stocks_df
    else:
        log("No information found for any of the symbols")
        return None

In [6]:
def load_data(stocks_df):
    host = config["DB_HOST"]
    port = config["DB_PORT"]
    database = config["DB_DATABASE"]
    user = config["DB_USERNAME"]
    password = config["DB_PASSWORD"]
    # Create the connection string with the credentials
    conn_string = f"dbname={database} user={user} password={password} host={host} port={port}"
    # Table name
    table = "stocks"

    # Try to establish the connection
    try:
        log("Connecting to the database...")
        connection = psycopg2.connect(conn_string)
        log("Successfully connected to the database")
        
        # Establish a connection through sqlalchemy
        engine = create_engine(f'postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}')

        # Load the dataframe into the Redshift table
        log("Loading DataFrame into the database...")
        stocks_df.to_sql(table, engine, if_exists='append', index=False)

    except psycopg2.Error as e:
        log("Error while connection to the database:", e)

    finally:
        # Close the connection
        if connection:
            connection.close()
            log("Connection closed")

In [7]:
# Generate the dataframe with the information extracted form the API
log("Starting process...")
stocks_df = extract_and_transform()
# Load the dataframe into the Redshift table
load_data(stocks_df)
log("Process finished")