In [1]:
#--------------------#
# Importing Packages #
#--------------------#
import yfinance as yf
import pandas as pd
import warnings
from datetime import datetime
from datetime import date, timedelta

#---------------------#
# Importing Variables #
#---------------------#
from variables.variables import DB_CONFIG, STOCKS, ETFS

#---------------------#
# Importing Constants #
#---------------------#
from include.constants import COMPANY_INFO_COLUMNS, STOCK_FUNDAMENTALS_COLUMNS, PRICE_HISTORY_COLUMNS, INCOMES_STATEMENT_COLUMNS, INCOMES_STATEMENT_COLUMNS
#from include.utils import get_price_history, store_dataframe_in_mysql, get_company_info

HISTORY_START_DATE = date.today() - timedelta(days=7) #Adjust days based on refresh rate
HISTORY_END_DATE = date.today()

#stock = yf.Ticker("KPN.AS")
#stock.get_income_stmt(freq="quarterly")
#store_dataframe_in_mysql(get_price_history(HISTORY_START_DATE, HISTORY_END_DATE, ["KPN.AS"], PRICE_HISTORY_COLUMNS), "price_history", PRICE_HISTORY_COLUMNS, DB_CONFIG)

In [2]:
def get_income_statement(required_stocks, income_stmt_freq="quarterly"):
    import yfinance as yf
    import pandas as pd

    for stock in required_stocks:
        symbol = yf.Ticker(stock)
        income_stmt = symbol.get_income_stmt(freq=income_stmt_freq)

        # Check if the result is a DataFrame and if it's not empty
        if isinstance(income_stmt, pd.DataFrame) and not income_stmt.empty:
            income_stmt = income_stmt.reset_index()
            income_stmt_pivoted = income_stmt.set_index("index").T
            income_stmt_pivoted.index.name = "Datetime"
            income_stmt_pivoted = income_stmt_pivoted.reset_index()
            income_stmt_pivoted['Datetime'] = pd.to_datetime(income_stmt_pivoted['Datetime']).dt.strftime('%Y-%m-%d %H:%M:%S')
            income_stmt_pivoted["Symbol"] = stock
            return income_stmt_pivoted
        else:
            print(f"No '{income_stmt_freq}' income statement found for '{stock}'")
            #warnings.warn(f"No '{income_stmt_freq}' income statement found for '{stock}'") #return None if this causes error in airflow

#store_dataframe_in_mysql(get_income_statement(STOCKS, "quarterly"), "income_statement_quarterly", INCOMES_STATEMENT_COLUMNS, DB_CONFIG) #HEIJM, PHIA

In [2]:
STOCK_FUNDAMENTALS_COLUMNS = {
    "revenueGrowth": "FLOAT",
    "profitMargins": "FLOAT", #Same as Net Margin?
    "grossMargins": "FLOAT",
    "operatingMargins": "FLOAT",
    "returnOnEquity": "FLOAT",
    "freeCashflow": "INT", #<- history needed for % increase/decrease
    "debtToEquity": "FLOAT",
    "currentRatio": "FLOAT",
    "trailingEps": "FLOAT",
    "trailingPE": "FLOAT",
    "returnOnAssets": "FLOAT",
    "operatingCashflow": "INT", #<- history needed for % increase/decrease
    #"quickRatio": "FLOAT",
    #"priceToBook": "FLOAT",
    #"priceToSalesTrailing12Months": "FLOAT"
}

def get_company_info(required_stocks, required_columns_dict):
    import yfinance as yf
    import pandas as pd
    import warnings
    from datetime import datetime

    # Extract column names from the dictionary keys
    column_names = list(required_columns_dict.keys())
    Company_Info_df = pd.DataFrame(columns=column_names)

    for stock in required_stocks:
        temp_list = []
        company_info = yf.Ticker(stock).info

        for column in column_names:  # Iterate over the extracted column names
            if column in company_info:
                temp_list.append(company_info[column])
            else:
                temp_list.append(None)
                warnings.warn(f"Column '{column}' not found for stock '{stock}'")

        index_label = f"{stock} {datetime.now().replace(hour=0, minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S')}"
        Company_Info_df.loc[index_label] = temp_list

    return Company_Info_df

def store_dataframe_in_mysql(df, table_name, column_definitions, db_config):
    import mysql.connector

    # Connect to MySQL
    conn = mysql.connector.connect(**db_config)
    cursor = conn.cursor()

    # Create table query
    columns_with_types = ", ".join([f"{col} {dtype}" for col, dtype in column_definitions.items()])
    create_table_query = f"CREATE TABLE IF NOT EXISTS {table_name} ({columns_with_types})" #Create table if it does not exist
    cursor.execute(create_table_query)

    # Check existing columns in the table
    cursor.execute(f"DESCRIBE {table_name}")
    existing_columns = [column[0] for column in cursor.fetchall()]

    # Add missing/new columns if necessary
    for column, dtype in column_definitions.items():
        if column not in existing_columns:
            alter_query = f"ALTER TABLE {table_name} ADD COLUMN {column} {dtype}"
            print(f"Adding missing column: {column} with type {dtype}")
            cursor.execute(alter_query)

    # Insert data into the table
    placeholders = ", ".join(["%s"] * len(column_definitions))  # Placeholder for values
    insert_query = f"INSERT INTO {table_name} ({', '.join(column_definitions.keys())}) VALUES ({placeholders})"

    for _, row in df.iterrows():
        print(insert_query, tuple(row))
        cursor.execute(insert_query, tuple(row))

    # Commit changes and close the connection
    conn.commit()
    cursor.close()
    conn.close()

In [3]:
store_dataframe_in_mysql(get_company_info(STOCKS, STOCK_FUNDAMENTALS_COLUMNS), "stock_fundamentals", STOCK_FUNDAMENTALS_COLUMNS, DB_CONFIG)



INSERT INTO stock_fundamentals (revenueGrowth, profitMargins, grossMargins, operatingMargins, returnOnEquity, freeCashflow, debtToEquity, currentRatio, trailingEps, trailingPE, returnOnAssets, operatingCashflow) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) (0.038, 0.15008001, 0.53725, 0.26197, 0.24058, 967750016.0, 181.252, 0.598, 0.21, 16.709524, 0.072399996, 2233999872.0)


DataError: 1264 (22003): Out of range value for column 'operatingCashflow' at row 1