## Financial Data Aggregator and Analyzer ETL

#### Introduction
In today’s data-driven world, financial analysts and investors require timely access to accurate and comprehensive financial data to make informed decisions. The finance industry generates vast amounts of data from various sources, including stock markets, financial news, and company financial statements. By aggregating, analyzing, and visualizing this data, analysts and investors can identify trends, evaluate risks, and discover investment opportunities.

### Objective
The objective of the Financial Data Aggregator and Analyzer project is to create an end-to-end Extract, Transform, Load (ETL) pipeline that consolidates and processes financial data from multiple sources. This pipeline will empower you to practice essential data engineering skills, such as data collection, data transformation and data loading while working with real-world financial data. By implementing this project, you will gain hands-on experience in utilizing Python, APIs, Web Scraping, and SQL to address challenges in the finance industry.

### Extraction

In [21]:
# import libraries
import pandas as pd
import requests
from bs4 import BeautifulSoup
import json
from textblob import TextBlob
import nltk
from nltk.tokenize import word_tokenize
nltk.download('punkt')
#from googletrans import Translator
pd.options.display.max_colwidth = 50
import os
import sqlite3
import csv

[nltk_data] Downloading package punkt to
[nltk_data]     /Users/nichollastidow/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


#### 1) Financial News: 
Scrape financial news websites from CNBC using web scraping libraries such as BeautifulSoup to collect all the headlines that are present in the page at the moment of performing the requests. Save the timestamp of the moment you realized the request in some variable.

In [22]:
def scrape_financial_news_from_cnbc():
    """
    Scrapes financial news from CNBC.

    Returns:
        DataFrame: A DataFrame with headlines, the scrape timestamp and the URL.
    """
    # Get the HTML of the CNBC homepage.
    response = requests.get("https://www.cnbc.com/markets/")
    soup = BeautifulSoup(response.content, "html.parser")

    # Get the headlines on the CNBC homepage.
    headlines = soup.find_all("a", class_="Card-title")

    # Create lists to store the headlines and their URLs.
    headlines_list = [headline.text.strip() for headline in headlines]
    url_list = [headline['href'] for headline in headlines]

    # Make sure to prepend the main domain to relative URLs.
    base_url = 'https://www.cnbc.com'
    url_list = [base_url + url if url.startswith('/') else url for url in url_list]

    # Save the timestamp of the moment you realized the request.
    timestamp = pd.Timestamp.now()

    # Create a DataFrame with the headlines, the timestamp and the URLs.
    df = pd.DataFrame({
        'headline': headlines_list,
        'timestamp': [timestamp] * len(headlines_list),
        'url': url_list
    })

    return df

if __name__ == "__main__":
    # Call the function to scrape the data.
    df_news = scrape_financial_news_from_cnbc()
    # Print the DataFrame.
    display(df_news)


Unnamed: 0,headline,timestamp,url
0,A rebalance into fixed income away from equiti...,2023-06-20 18:05:57.330004,https://www.cnbc.com/video/2023/06/20/a-rebala...
1,Tax-free bond trade: Finding long-term opportu...,2023-06-20 18:05:57.330004,https://www.cnbc.com/video/2023/06/20/there-is...
2,Average credit card interest rate is a record ...,2023-06-20 18:05:57.330004,https://www.cnbc.com/2023/06/20/credit-card-ra...
3,"Shell boosts dividend by 15%, maintains oil ou...",2023-06-20 18:05:57.330004,https://www.cnbc.com/2023/06/14/shell-boosts-d...
4,Chocolate is set to get more expensive as coco...,2023-06-20 18:05:57.330004,https://www.cnbc.com/2023/06/13/chocolate-is-s...
...,...,...,...
58,Apple's financial fundamental performance is n...,2023-06-20 18:05:57.330004,https://www.cnbc.com/video/2023/06/20/apples-f...
59,Fundstrat's Tom Lee: We're not falling into a ...,2023-06-20 18:05:57.330004,https://www.cnbc.com/video/2023/06/16/fundstra...
60,Market sentiment about interest rates is drivi...,2023-06-20 18:05:57.330004,https://www.cnbc.com/video/2023/06/16/market-s...
61,"The A.I. flood has gone too far too fast, says...",2023-06-20 18:05:57.330004,https://www.cnbc.com/video/2023/06/15/short-th...


#### 2) Stock Market Data: 
Use Alpha Vantage API to extract stock market data. Get historical stock prices for the following companies:
- Apple
- Microsoft
- Google
- Amazon
- Meta

In [23]:
# get alpha vantage api key
from dotenv import load_dotenv
import os

load_dotenv()  # take environment variables from .env.
api_key_av = os.getenv("ALPHA_VANTAGE_API_KEY")


In [24]:
def get_stock_prices_alphaVantage_api(symbols, start_date, end_date,api_key):
    """
    Gets stock prices from Alpha Vantage API for the specified dates and symbols.

    Args:
        symbols (list): A list of stock symbols to get data for.
        start_date (str): The start date in YYYY-MM-DD format.
        end_date (str): The end date in YYYY-MM-DD format.

    Returns:
        A DataFrame of stock prices.
    """

    # Create a DataFrame to store the stock prices.
    df = pd.DataFrame()

    # Iterate over the symbols.
    for symbol in symbols:

        # Construct the API endpoint URL.
        url = f"https://www.alphavantage.co/query?function=TIME_SERIES_DAILY_ADJUSTED&symbol={symbol}&apikey={api_key}&outputsize=full"

        # Make a GET request to the API.
        response = requests.get(url)

        # Check the response status code.
        if response.status_code == 200:

            # Get the JSON response data.
            data = json.loads(response.content.decode("utf-8"))

            # Iterate over the dates.
            for date in data["Time Series (Daily)"]:
                # Check if the date is in the desired date range.
                if start_date <= date <= end_date:

                    # Get the stock price and other information.
                    price_data = data["Time Series (Daily)"][date]
                    open_price = float(price_data['1. open'])
                    highest_price = float(price_data['2. high'])
                    lowest_price = float(price_data['3. low'])
                    close_price = float(price_data['4. close'])
                    adjusted_close = float(price_data['5. adjusted close'])
                    volume = int(price_data['6. volume'])

                    # Create a temp DataFrame and append it to main DataFrame
                    temp_df = pd.DataFrame({
                        'date': [pd.to_datetime(date)],
                        'open_price': [open_price],
                        'highest_price': [highest_price],
                        'lowest_price': [lowest_price],
                        'close_price': [close_price],
                        'adjusted_close': [adjusted_close],
                        'volume': [volume],
                        'symbol': [symbol]})
                    
                    df = pd.concat([df, temp_df], ignore_index=True)

        else:

            # Print an error message.
            print(f"Error getting stock price for {symbol}")

    # Sort the DataFrame by date.
    df = df.sort_values(by='date')

    # Return the DataFrame.
    return df

if __name__ == "__main__":
    # Set the stock symbols.
    symbols = ["AAPL", "MSFT", "GOOG", "AMZN", "META"]

    # api key
    api_key = api_key_av

    # Set the start and end dates.
    start_date = "2023-01-01"
    end_date = "2023-06-20"

    # Get the stock prices.
    df_stocks = get_stock_prices_alphaVantage_api(symbols, start_date, end_date,api_key)

    # Print the DataFrame.
    display(df_stocks)

Unnamed: 0,date,open_price,highest_price,lowest_price,close_price,adjusted_close,volume,symbol
579,2023-01-03,122.82,126.370,122.280,124.74,124.740000,35528531,META
115,2023-01-03,130.28,130.900,124.170,125.07,124.706364,112117471,AAPL
463,2023-01-03,85.46,86.960,84.205,85.82,85.820000,76706040,AMZN
347,2023-01-03,89.83,91.550,89.020,89.70,89.700000,20738457,GOOG
231,2023-01-03,243.08,245.750,237.400,239.58,238.460203,25740036,MSFT
...,...,...,...,...,...,...,...,...
348,2023-06-20,124.86,127.250,124.500,125.78,125.780000,56855478,AMZN
116,2023-06-20,339.27,342.070,335.860,338.05,338.050000,26350198,MSFT
232,2023-06-20,123.50,125.175,122.830,123.85,123.850000,22666024,GOOG
464,2023-06-20,278.73,284.800,276.220,284.33,284.330000,20676920,META


### Transformation

#### 1) Sentiment Analysis
Business would like to know the sentiment of the financial stocks news headlines that you extracted. The headlines are short and concise, and they reflect the latest developments and trends in the stock market. They want to know if the headlines are positive, negative, or neutral in terms of their emotional tone and impact on the investors.

For this you can use a library called textblob and apply the sentiment polarity method. Save the polarity score in a new column called sentiment_score. If the result obtained is between -1 and -0.2 classify the sentiment as negative, if it is between -0.2 and 0.2 classify it as neutral, else classify it as positive. Save this classification into a new column called sentiment.

In [25]:
def categorize_polarity(polarity):
    """
    Classifies the sentiment as positive, neutral or negative based on polarity.

    Args:
        polarity (float): The polarity score from TextBlob.

    Returns:
        str: The sentiment classification.
    """

    if polarity < -0.2:
        return "Negative"
    elif polarity <= 0.2:
        return "Neutral"
    else:
        return "Positive"

def analyze_sentiment(df_news):
    """
    Analyzes the sentiment of news headlines.

    Args:
        df_news (DataFrame): The news headlines.

    Returns:
        DataFrame: The DataFrame with the sentiment analysis.
    """

    # Copy the DataFrame.
    df = df_news.copy()

    # Get the sentiment polarity of the headlines.
    df['sentiment_score'] = df['headline'].apply(lambda headline: TextBlob(headline).sentiment.polarity)

    # Categorize the sentiment.
    df['sentiment'] = df['sentiment_score'].apply(categorize_polarity)

    return df

if __name__ == "__main__":
    # Analyze the sentiment of the news headlines.
    df_news = analyze_sentiment(df_news)

    # Print the DataFrame with sentiment analysis.
    display(df_news)


Unnamed: 0,headline,timestamp,url,sentiment_score,sentiment
0,A rebalance into fixed income away from equiti...,2023-06-20 18:05:57.330004,https://www.cnbc.com/video/2023/06/20/a-rebala...,0.100000,Neutral
1,Tax-free bond trade: Finding long-term opportu...,2023-06-20 18:05:57.330004,https://www.cnbc.com/video/2023/06/20/there-is...,0.000000,Neutral
2,Average credit card interest rate is a record ...,2023-06-20 18:05:57.330004,https://www.cnbc.com/2023/06/20/credit-card-ra...,0.116667,Neutral
3,"Shell boosts dividend by 15%, maintains oil ou...",2023-06-20 18:05:57.330004,https://www.cnbc.com/2023/06/14/shell-boosts-d...,0.000000,Neutral
4,Chocolate is set to get more expensive as coco...,2023-06-20 18:05:57.330004,https://www.cnbc.com/2023/06/13/chocolate-is-s...,0.000000,Neutral
...,...,...,...,...,...
58,Apple's financial fundamental performance is n...,2023-06-20 18:05:57.330004,https://www.cnbc.com/video/2023/06/20/apples-f...,-0.050000,Neutral
59,Fundstrat's Tom Lee: We're not falling into a ...,2023-06-20 18:05:57.330004,https://www.cnbc.com/video/2023/06/16/fundstra...,-0.100000,Neutral
60,Market sentiment about interest rates is drivi...,2023-06-20 18:05:57.330004,https://www.cnbc.com/video/2023/06/16/market-s...,0.000000,Neutral
61,"The A.I. flood has gone too far too fast, says...",2023-06-20 18:05:57.330004,https://www.cnbc.com/video/2023/06/15/short-th...,0.150000,Neutral


#### 2) Relevant Words
Business wants to know which might be the most relevant words for each headline. For this you can use the word_tokenize function from nltk.tokenize library. Create a column called relevant_words to save the result for this task.

In [26]:
def extract_relevant_words(df):
    """
    Extracts relevant words from the headlines.

    Args:
        df (DataFrame): DataFrame containing the headlines.

    Returns:
        DataFrame: The original DataFrame with a new column containing the tokenized headlines.
    """
    # Tokenize the headlines and store the result in a new column.
    df['relevant_words'] = df['headline'].apply(word_tokenize)
    
    return df

if __name__ == "__main__":
    # Extract the relevant words.
    df_news = extract_relevant_words(df_news)
    # Print the DataFrame.
    display(df_news)


Unnamed: 0,headline,timestamp,url,sentiment_score,sentiment,relevant_words
0,A rebalance into fixed income away from equiti...,2023-06-20 18:05:57.330004,https://www.cnbc.com/video/2023/06/20/a-rebala...,0.100000,Neutral,"[A, rebalance, into, fixed, income, away, from..."
1,Tax-free bond trade: Finding long-term opportu...,2023-06-20 18:05:57.330004,https://www.cnbc.com/video/2023/06/20/there-is...,0.000000,Neutral,"[Tax-free, bond, trade, :, Finding, long-term,..."
2,Average credit card interest rate is a record ...,2023-06-20 18:05:57.330004,https://www.cnbc.com/2023/06/20/credit-card-ra...,0.116667,Neutral,"[Average, credit, card, interest, rate, is, a,..."
3,"Shell boosts dividend by 15%, maintains oil ou...",2023-06-20 18:05:57.330004,https://www.cnbc.com/2023/06/14/shell-boosts-d...,0.000000,Neutral,"[Shell, boosts, dividend, by, 15, %, ,, mainta..."
4,Chocolate is set to get more expensive as coco...,2023-06-20 18:05:57.330004,https://www.cnbc.com/2023/06/13/chocolate-is-s...,0.000000,Neutral,"[Chocolate, is, set, to, get, more, expensive,..."
...,...,...,...,...,...,...
58,Apple's financial fundamental performance is n...,2023-06-20 18:05:57.330004,https://www.cnbc.com/video/2023/06/20/apples-f...,-0.050000,Neutral,"[Apple, 's, financial, fundamental, performanc..."
59,Fundstrat's Tom Lee: We're not falling into a ...,2023-06-20 18:05:57.330004,https://www.cnbc.com/video/2023/06/16/fundstra...,-0.100000,Neutral,"[Fundstrat, 's, Tom, Lee, :, We, 're, not, fal..."
60,Market sentiment about interest rates is drivi...,2023-06-20 18:05:57.330004,https://www.cnbc.com/video/2023/06/16/market-s...,0.000000,Neutral,"[Market, sentiment, about, interest, rates, is..."
61,"The A.I. flood has gone too far too fast, says...",2023-06-20 18:05:57.330004,https://www.cnbc.com/video/2023/06/15/short-th...,0.150000,Neutral,"[The, A.I, ., flood, has, gone, too, far, too,..."


#### 3) Headline Translation
Business wants to translate the headlines to Spanish and Italian; it seems they are working on some interesting projects for new markets. Translate the headlines to these two languages. You can use googletrans library to achieve this. Please save the translated headlines in two new columns headline_spanish and headline_it for saving translated headlines to Spanish and to Italian respectively.

In [27]:
# def translate_headlines(df):
#   """
#   Translates the headlines in a DataFrame to Spanish and Italian.

#   Args:
#     df (DataFrame): DataFrame containing the headlines.

#   Returns:
#     A DataFrame with the translated headlines in two new columns: `headline_spanish` and `headline_it`.
#   """
#   # Create a translator object.
#   translator = Translator()

#   # Translate the headlines to Spanish and store the result in a new column.
#   df['headline_spanish'] = df['headline'].apply(lambda x: translator.translate(x, dest='es').text)

#   # Translate the headlines to Italian and store the result in a new column.
#   df['headline_it'] = df['headline'].apply(lambda x: translator.translate(x, dest='it').text)

#   # Return the DataFrame.
#   return df
# if __name__ == "__main__":
#     # Extract the relevant words.
#     df_news = translate_headlines(df_news)
#     # Print the DataFrame.
#     display(df_news)

#### 4) Save the Data
Save data from scrapped headlines into a csv file called headlines_data.csv under the path: data/headlines

In [28]:
def save_headlines_to_csv(df, path='data/headlines/headlines_data.csv'):
    """
    Saves headlines data into a CSV file.

    Args:
        df (DataFrame): DataFrame with the headlines data.
        path (str): The path to save the CSV file to. By default, it's 'data/headlines/headlines_data.csv'.
    """
    # Check if the directory exists, if not create it.
    os.makedirs(os.path.dirname(path), exist_ok=True)

    # Save the DataFrame to a CSV file.
    df.to_csv(path, index=False)

if __name__ == "__main__":
    # Save the data to a CSV file.
    save_headlines_to_csv(df_news)


Save the stock data into a CSV file called stocks_data.csv under the path data/stocks.

In [29]:
def save_stock_info_to_csv(df, path='data/stocks/stocks_data.csv'):
    """
    Saves stock info data into a CSV file.

    Args:
        df (DataFrame): DataFrame with the stocks data.
        path (str): The path to save the CSV file to. By default, it's 'data/stocks/stocks_data.csv'.
    """
    # Check if the directory exists, if not create it.
    os.makedirs(os.path.dirname(path), exist_ok=True)

    # Save the DataFrame to a CSV file.
    df.to_csv(path, index=False)

if __name__ == "__main__":
    # Save the data to a CSV file.
    save_stock_info_to_csv(df_stocks)

### Load

1) Create SQLite Database: Install SQLite in your machine and create a database called etl_extended_case.
2) Create Tables: Create two tables headlines and stocks for loading headlines and stocks data respectively.

In [30]:
# Connect to SQLite database (it will be created if it doesn't exist)
conn = sqlite3.connect('etl_extended_case.db')

# Create a cursor object
c = conn.cursor()

# Create the 'stock_prices' table
c.execute("""
    CREATE TABLE IF NOT EXISTS stock_prices (
        date TEXT,
        open_price FLOAT,
        highest_price FLOAT,
        lowest_price FLOAT,
        close_price FLOAT,
        adjusted_close FLOAT,
        volume INTEGER,
        symbol TEXT
    )
""")

# Create the 'headline_news' table
c.execute("""
    CREATE TABLE IF NOT EXISTS headline_news (
        headline TEXT,
        timestamp TEXT,
        url TEXT,
        sentiment_score FLOAT,
        sentiment TEXT,
        relevant_words TEXT,
        headline_spanish TEXT,
        headline_it TEXT
    )
""")

# Open the files and insert data into tables
with open('data/stocks/stocks_data.csv', 'r') as f:
    stocks_reader = csv.reader(f)
    for row in stocks_reader:
        c.execute(""" INSERT INTO stock_prices VALUES (?,?,?,?,?,?,?,?) """, row)

with open('data/headlines/headlines_data.csv', 'r') as f:
    headlines_reader = csv.reader(f)
    for headline in headlines_reader:
        c.execute(""" INSERT INTO headline_news VALUES (?,?,?,?,?,?,?,?) """, headline)

# Commit the transactions
conn.commit()

# Close the connection
conn.close()

In [31]:
# Verify that the data has been loaded correctly
conn = sqlite3.connect('etl_extended_case.db')
c = conn.cursor()

# Select first 5 rows from stock_prices table
c.execute("SELECT * FROM stock_prices LIMIT 5")
stock_prices_data = c.fetchall()

# Select first 5 rows from headline_news table
c.execute("SELECT * FROM headline_news LIMIT 5")
headline_news_data = c.fetchall()

# Close the connection
conn.close()

# Print the data
print("First 5 rows of stock_prices:")
for row in stock_prices_data:
    display(row)

print("\nFirst 5 rows of headline_news:")
for row in headline_news_data:
    display(row)

In [32]:
import google.auth
from google.cloud import sql
from google.cloud.sql.connector import connector
import pandas as pd

def load_csv_to_cloud_sql(instance_name, database_name, csv_file_path, username, password, region):
    # Authenticate with Google Cloud
    credentials, project_id = google.auth.default()

    # Create a Cloud SQL client
    sql_client = sql.Client(project=project_id, credentials=credentials)

    # Create a new Cloud SQL instance
    instance = sql_client.create_instance(instance_name, region, "POSTGRES")

    # Connect to the instance using the Cloud SQL connector
    conn = connector.connect(instance.connection_name, user=username, password=password, database=database_name)

    # Load data from the CSV file into a Pandas DataFrame
    df = pd.read_csv(csv_file_path)

    # Insert the data into the Cloud SQL database
    cursor = conn.cursor()
    for index, row in df.iterrows():
        values = tuple(row)
        placeholders = ",".join(["%s"] * len(values))
        query = f"INSERT INTO table_name VALUES ({placeholders})"
        cursor.execute(query, values)
    conn.commit()

    # Close the connection
    cursor.close()
    conn.close()

    print("Data imported successfully!")
if __name__ == "__main__":
    instance_name = os.getenv("INSTANCE_CONNECTION_NAME")
    db_name = os.getenv("DB_NAME")
    csv_path = 'data/stocks/stocks_data.csv'
    region = os.getenv("REGION_NAME")
    user = os.getenv("DB_USER")
    password = os.getenv("DB_PASS")
    load_csv_to_cloud_sql(instance_name,db_name,csv_path,user,password,region)
    
    

AttributeError: module 'google.cloud.sql' has no attribute 'Client'

In [33]:
import os

from google.cloud.sql.connector import Connector, IPTypes
import pg8000

import sqlalchemy


def connect_with_connector() -> sqlalchemy.engine.base.Engine:
    """
    Initializes a connection pool for a Cloud SQL instance of Postgres.

    Uses the Cloud SQL Python Connector package.
    """
    # Note: Saving credentials in environment variables is convenient, but not
    # secure - consider a more secure solution such as
    # Cloud Secret Manager (https://cloud.google.com/secret-manager) to help
    # keep secrets safe.

    instance_connection_name = os.environ[
        "INSTANCE_CONNECTION_NAME"
    ]  # e.g. 'project:region:instance'
    db_user = os.environ["DB_USER"]  # e.g. 'my-db-user'
    db_pass = os.environ["DB_PASS"]  # e.g. 'my-db-password'
    db_name = os.environ["DB_NAME"]  # e.g. 'my-database'

    ip_type = IPTypes.PRIVATE if os.environ.get("PRIVATE_IP") else IPTypes.PUBLIC

    # initialize Cloud SQL Python Connector object
    connector = Connector()

    def getconn() -> pg8000.dbapi.Connection:
        conn: pg8000.dbapi.Connection = connector.connect(
            instance_connection_name,
            "pg8000",
            user=db_user,
            password=db_pass,
            db=db_name,
            ip_type=ip_type,
        )
        return conn

    # The Cloud SQL Python Connector can be used with SQLAlchemy
    # using the 'creator' argument to 'create_engine'
    pool = sqlalchemy.create_engine(
        "postgresql+pg8000://",
        creator=getconn,
        
    )
    return pool
connect_with_connector()


Engine(postgresql+pg8000://)

In [34]:
import psycopg2

# Connect to the instance using psycopg2
conn = psycopg2.connect(instance_connection_string)

# Create a cursor object
cursor = conn.cursor()

# Define the SQL query to create the table
create_table_query = """
CREATE TABLE IF NOT EXISTS stocks_data (
    date DATE,
    open FLOAT,
    high FLOAT,
    low FLOAT,
    close FLOAT,
    adjusted_close FLOAT,
    volume BIGINT,
    ticker TEXT
);
"""

# Execute the SQL query to create the table
cursor.execute(create_table_query)

# Commit the transaction
conn.commit()

# Close the cursor and connection
cursor.close()
conn.close()

NameError: name 'instance_connection_string' is not defined

In [35]:
import os

import psycopg2
import pandas as pd
from dotenv import load_dotenv

load_dotenv()

def load_csv_to_cloud_sql(csv_file_path):
    # Get the instance connection string and other parameters from environment variables
    instance_connection_string = os.getenv("DB_CONNECTION_GCP")
    database_name = os.getenv("DB_NAME")

    # Connect to the instance using psycopg2
    conn = psycopg2.connect(instance_connection_string)
    
    # Create a cursor object
    cursor = conn.cursor()

    # Define the SQL query to create the table
    create_table_query = """
    CREATE TABLE IF NOT EXISTS stocks_data (
    date DATE,
    open FLOAT,
    high FLOAT,
    low FLOAT,
    close FLOAT,
    adjusted_close FLOAT,
    volume BIGINT,
    ticker TEXT
    );
    """

    # Execute the SQL query to create the table
    cursor.execute(create_table_query)

    # Commit the transaction
    conn.commit()


    # Load data from the CSV file into a Pandas DataFrame
    df = pd.read_csv(csv_file_path)

    # Insert the data into the Cloud SQL database
    cursor = conn.cursor()
    for index, row in df.iterrows():
        values = tuple(row)
        placeholders = ",".join(["%s"] * len(values))
        query = f"INSERT INTO stocks_data VALUES ({placeholders})"
        cursor.execute(query, values)
    conn.commit()

    # Close the connection
    cursor.close()
    conn.close()

    print("Data imported successfully!")
    
if __name__ == "__main__":
    csv_file_path = 'data/stocks/stocks_data.csv'
    load_csv_to_cloud_sql(csv_file_path)
    

Data imported successfully!


In [36]:
import os
import psycopg2
from dotenv import load_dotenv
import pandas as pd

load_dotenv()

def fetch_and_print_records():
    # Get the instance connection string from environment variables
    instance_connection_string = os.getenv("DB_CONNECTION_GCP")

    # Connect to the instance using psycopg2
    conn = psycopg2.connect(instance_connection_string)

    # Create a cursor object
    cursor = conn.cursor()

    # Define a SQL query to fetch the first 10 records from the stocks_data table
    fetch_query = "SELECT * FROM stocks_data ORDER BY date DESC LIMIT 10;"

    # Execute the query
    cursor.execute(fetch_query)

    # Fetch the records
    records = cursor.fetchall()

    # Convert records to DataFrame for better visualization
    df = pd.DataFrame(records, columns=['date', 'open', 'high', 'low', 'close', 'adjusted_close', 'volume', 'ticker'])

    # Print the records
    print(df)

    # Close the connection
    cursor.close()
    conn.close()

if __name__ == "__main__":
    fetch_and_print_records()


         date    open     high      low   close  adjusted_close     volume  \
0  2023-06-20  278.73  284.800  276.220  284.33          284.33   20676920   
1  2023-06-20  124.86  127.250  124.500  125.78          125.78   56855478   
2  2023-06-20  123.50  125.175  122.830  123.85          123.85   22666024   
3  2023-06-20  184.69  186.100  184.445  185.01          185.01   49751021   
4  2023-06-20  339.27  342.070  335.860  338.05          338.05   26350198   
5  2023-06-16  126.70  126.700  123.790  124.06          124.06   56699200   
6  2023-06-16  127.71  127.900  125.300  125.49          125.49   84247104   
7  2023-06-16  284.75  287.850  280.130  281.00          281.00   43127731   
8  2023-06-16  126.70  126.700  123.790  124.06          124.06   56699200   
9  2023-06-16  186.73  186.990  184.270  184.92          184.92  101256225   

  ticker  
0   META  
1   AMZN  
2   GOOG  
3   AAPL  
4   MSFT  
5   GOOG  
6   AMZN  
7   META  
8   GOOG  
9   AAPL  


In [None]:
# import os
# import pandas as pd
# from sqlalchemy import create_engine, MetaData, Table, Column, String, Float, Integer
# from google.cloud.sql.connector import Connector, IPTypes

# try:
#     """Initializes a connection pool for a Google Cloud SQL instance of MySQL """
#     instance_connection_name = os.environ.get("INSTANCE_CONNECTION_NAME")
#     db_user = os.environ.get("DB_USER")
#     db_pass = os.environ.get("DB_PASS")
#     db_name = os.environ.get("DB_NAME")

#     ip_type = IPTypes.PRIVATE if os.getenv("PRIVATE_IP") else IPTypes.PUBLIC

#     connector = Connector(ip_type)

#     engine = create_engine(f"mysql+mysqldb://{db_user}:{db_pass}@/{db_name}?unix_socket=/cloudsql/{instance_connection_name}")

#     # Create metadata instance
#     metadata = MetaData()

#     # Define tables
#     stocks_data = Table( 'table1', metadata,
#     Column('date',String(10), primary_key=True),
#     Column('symbol', String(10), primary_key=True),
#     Column('open_price', Float),
#     Column('highest_price', Float),
#     Column('lowest_price', Float),
#     Column('close_price', Float),
#     Column('adjusted_close', Float),
#     Column('volume', Integer)
#     )

#     headline_news = Table(
#     'table2', metadata,
#     Column('headline', String, primary_key=True),
#     Column('timestamp', String(10), primary_key=True),
#     Column('url', String),
#     Column('sentiment_score', Float),
#     Column('sentiment', String),
#     Column('relevant_words', String),
#     Column('headline_spanish', String),
#     Column('headline_it', String)
#     )

#     # Create tables
#     metadata.create_all(engine)

#     # Load the stocks data into a DataFrame
#     with open('stocks_path', 'r') as f:
#         stocks_df = pd.read_csv(f)
#     # Write stocks data into stocks table in database
#     stocks_df.to_sql('table1', engine, if_exists='append', index=False)

#     # Load the headlines data into a DataFrame
#     with open('headlines_path', 'r') as f:
#         headlines_df = pd.read_csv(f)

#     # Write headlines data into headlines table in database
#     headlines_df.to_sql('table2', engine, if_exists='append', index=False)

# except Exception as e:
#     print(f"An error occurred when loading data into Google Cloud SQL: {e}")