# Stock data scraping and analysis with sk-learn

- __version__ = 2.00
- __author__ = Manan Shah

## Load libraries

In [None]:
import math
import numpy as np
import time
from datetime import date, datetime, timedelta

import investpy

# Plotting library
import matplotlib as mpl
from matplotlib import dates as mdates
from matplotlib import pyplot as plt
from matplotlib import style

years = mdates.YearLocator()
months = mdates.MonthLocator()
date_fmt = mdates.DateFormatter("%b-%y")  # Set mon-year format

from warnings import warn

from IPython.display import Markdown as MD
from IPython.display import display

# lxml is much much faster than requsts_html at least for a single call. I sill have to check for simultaneous multiple calls.
from lxml import etree
from requests import Request, Session  # for http requests

import pandas as pd
from pandas import DataFrame as DF
from pandas.plotting import scatter_matrix

pd.set_option(
    "display.column_space",
    12,
    "display.max_colwidth",
    12,
    "display.max_rows",
    10,
    "display.colheader_justify",
    "center",
    "display.date_dayfirst",
    True,
    "display.max_columns",
    15,
    "float_format",
    "{:.2f}".format,
)

## User defined functions
### Format display text

In [None]:
def printmd(string, color=None):
    """
    Displays the string in color with markdown effects.

    Parameters:
        string (str): "The string to be printed."
        color (str): "Color of the string (e.g., "green", "red")."

    Display:
        display(MD(colorstr)): A colored string with markdown effects.

    Example: printmd(f"**the value: {a}**", color = "green")
    """

    colorstr = "<span style='color:{}'>{}</span>".format(
        color,
        string,
    )
    display(MD(colorstr))

ä## Element tree with requests and lxml

In [None]:
def request_lxml_etree(url: str) -> "element tree bytecode":
    """
    Get content of the server's response using requests and lxml. The function returns element-tree for further desired element selection if the rquest is 
    successfull, otherwise returns the status code and the respective error message.

    Parameters:
        url (str): Target URL from which the desired data is to be scraped.

    Returns:
        element_tree (etree object): HTML/XML element object.

    Example:
        tree = request_lxml_etree(url=URL)
    """

    # Standard variables
    headers = {
        "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36"
    }
    timeout = (5, 10)

    # Response object containing a server’s response to an HTTP request.
    # Session object allows making several requests to the same host, the underlying TCP connection will be reused, which can result in a significant performance increase.
    response = Session().get(url=url, headers=headers, timeout=timeout, verify=True)

    # the action requested by the client was received, understood, and accepted.
    if response.status_code == 200:
        # Read the raw bytes of the server’s response content.
        byte_code = response.content  # content as a normal UTF-8 encoded Python string.

        # lxml.etree offers a lot more functionality, such as XPath, XSLT, Relax NG, and XML Schema support.
        elelment_tree = etree.HTML(byte_code)

        return elelment_tree

    # client must take additional action to complete the request.
    elif response.status_code == 301:
        printmd("Status code:", response.status_code)
        warn(
            f"The {URL} is moved permanently and might require a different 'GET' method."
        )

    elif response.status_code == 307:
        printmd("Status code:", response.status_code)
        warn(
            f"The {URL} is redirected temporarily. The future requests should still use the original URL and 'GET' method."
        )

    elif response.status_code == 308:
        printmd("Status code:", response.status_code)
        warn(
            f"The {URL} is moved permanently and will not allow a different 'GET' method."
        )

    # the error seems to have been caused by the client
    elif response.status_code == 400:
        printmd("Status code:", response.status_code)
        raise Exception(
            "Bad request! The server cannot or will not process the request due to an apparent client error (e.g., malformed request syntax, size too large, invalid request message framing, or deceptive request routing)"
        )

    elif response.status_code == 401:
        printmd("Status code:", response.status_code)
        raise Exception(
            "Unauthorized request! An authentication is required and has failed or has not yet been provided. The response must include a authentication data in the header. Check the header dict."
        )

    elif response.status_code == 403:
        printmd("Status code:", response.status_code)
        raise Exception(
            "Request is forbidden! The request contained valid data and was understood by the server, but the server is refusing action (to avoid scraping)."
        )

    elif response.status_code == 404:
        printmd("Status code:", response.status_code)
        raise Exception("The requested resource could not be found.")

    elif response.status_code == 408:
        printmd("Status code:", response.status_code)
        raise Exception("Request Timeout. Check the timeout tupple.")

    elif response.status_code == 423:
        printmd("Status code:", response.status_code)
        raise Exception("The resource that is being accessed is locked.")

    elif response.status_code == 429:
        printmd("Status code:", response.status_code)
        raise Exception("Too many requests in a given amount of time.")

    #  the server is aware that it has encountered an error or is otherwise incapable of performing the request.
    elif response.status_code == 503:
        printmd("Status code:", response.status_code)
        raise Exception(
            "The server cannot handle the request (because it is overloaded or down for maintenance). Generally, this is a temporary state."
        )

    else:
        printmd("Status code:", response.status_code)
        raise Exception("Something went wrong!")

### Table element byte code from a webpage

In [None]:
def etree_table(
    etree: "element tree bytecode", xpath_table: str
) -> "table element bytecode":
    """
    Returns the desired table element using the table_element_xpath.

    Parameters:
        etree (element object): The byte code of selected element tree.
        xpath_table (str): XPath to the desired table to be scraped.

    Returns:
        table_element (element object): The byte code of selected table element.

    Example:
        table = etree_table(etree=stock_tree, xpath_table = './/span[contains(@class, "nextpag")]')
    """
    try:
        # The desired (here single table) elements are fetched as a list from the element tree.
        table_element_list = etree.xpath(xpath_table)

        if len(table_element_list) != 0:
            table_element = table_element_list[0]
            return table_element

        else:
            raise Exception(
                f"No table found. Check the XPath query syntex in {xpath_table}."
            )

    except:
        raise Exception(
            f"Desired class in the XPath is not found! Check the XPath/class in {xpath_table}."
        )

### Pagination of webpage (next URL)

In [None]:
def etree_URLnext(
    etree: "element tree bytecode", xpath_URLnext: str
) -> "pagination URL link (str)":
    """
    Returns the next pagination link if available using the nextpage_link_xpath otherwise raises an error.

    Parameters:
        etree (element object): The byte code of selected element tree.
        xpath_URLnext (str): XPath for "next page" button.

    Returns:
        URLnext (str): Next page URL until it reached the last paging.

    Example:
        url_next = tree_nextpage_link(element_tree=tree, xpath_nextpage= xpath_URLnext)
    """
    try:
        element = etree.xpath(xpath_URLnext)[0]

        if element is not None:
            URLnext = element.get("href")

            if URLnext is None:
                parent_element = element.getparent()
                # child_element = element.getchildren()  # Cueently there are no children. Therefore can't use it, otherwise will give an error.

                URLnext_parent = parent_element.get("href")
                #                 URLnext_child = child_element.get("href")

                if URLnext_parent is not None:
                    URLnext = URLnext_parent
                    return URLnext

                #                 elif URLnext_child is not None:
                #                     URLnext = URLnext_child
                #                     return URLnext

                else:
                    raise Exception(
                        f"No nextpage found! Check the XPath query syntex in {xpath_URLnext}."
                    )

            else:
                return URLnext

        else:
            raise Exception(
                f"No nextpage found. Check the given XPath query syntex in {xpath_URLnext}."
            )

    except:
        raise Exception(
            f"Desired (full/partial) class not found!! Check the XPath query syntex in {xpath_URLnext}."
        )

### DF to dict

In [None]:
def df_to_dictDF(entity_df: "DF") -> dict:
    """
    Break a big df into multiple small small ones. The idea is to break the df when a 'NaN' value column is present, because that's just a table heading 
    (see the original table from moneycontrol.com). Fisrt, two separate list are generated consisting all the column names and their respective location 
    (column number). These are the columns that consist only 'NaN' values. Then, a sub-df is sliced from an immediate next column where 1st all 'NaN' 
    value column is found until the 2nd 'NaN' column and so on. These 'NaN' column names become the keys for the respective sub-dfs in the returning 
    dictionary.

    Parameters:
        entity_df (DF): Input big DataFrame to be broken into small DFs.

    Returns:
        dct (dict): A dict containing DFs as values with their headers as keys.

    Example:
        new_dct = df_to_dictDF(entity_df=data)
    """

    NaN_col_names_list = entity_df.columns[entity_df.isna().all(axis=0)].to_list()
    NaN_col_loc_list = [
        entity_df.columns.get_loc(c)
        for c in entity_df.columns[entity_df.isna().all(axis=0)].to_list()
    ]

    dct = {}
    empty_df_list = []  # If no empty DF is found then...

    if len(NaN_col_loc_list) > 0:
        for aa in range(len(NaN_col_loc_list)):
            if aa < len(NaN_col_loc_list) - 1:
                dct1 = {
                    NaN_col_names_list[aa]: entity_df.iloc[
                        :, NaN_col_loc_list[aa] + 1 : NaN_col_loc_list[aa + 1]
                    ]
                }

                if dct1[NaN_col_names_list[aa]].empty == False:
                    dct.update(dct1)  # update the master dict with a new dict
                    non_empty_df_list = [*dct1]

                else:
                    empty_df_list = [*dct1]

            # The last 'NaN' column does not have RHS limit, hence change in slicing condition
            elif aa == len(NaN_col_loc_list) - 1:
                dct2 = {
                    NaN_col_names_list[aa]: entity_df.iloc[
                        :, NaN_col_loc_list[aa] + 1 :
                    ]
                }

                if dct2[NaN_col_names_list[aa]].empty == False:
                    dct.update(dct2)  # update the master dict with the last dict
                    non_empty_df_list.append([*dct2])

                else:
                    empty_df_list.append([*dct2])

    elif len(NaN_col_loc_list) == 0:
        dct = {entity_df.columns.name: entity_df}

    else:
        raise Exception("Something went wrong!!")

    keys_list = [*dct]

    print(f"sub-DF keys: {keys_list}")

    return dct

### Import module

* Investpy retrieves data from the financial products such as: stocks, funds, ETFs, indices and currency crosses, retrieved from investing.com.

In [None]:
def import_hist_data(
    product_name: str,
    entities_dct: dict,
    country_name: str,
    strt_date: str,
    end_date: str,
    interval: str = "daily",
    currency_cross: str = "usd/inr",
    as_json=False,
    order: str = "ascending",
):
    """
    A UDF to fetch historic data of stocks, commodities, MFs, ETFs, indices, and currencies from 13 different countries (India, US, Spain, etc.) using 
    investpy python package (which fetchs data from investing.com). The fetched data each entity is stored in a dictionary as a DF (dict value) against 
    the respective entity ticker (see investing.com for ticker symbols) as dict keys.

    Parameters:
        product_name (str): Investment options such as tocks, commodities, MFs, ETFs, indices, and currencies.
        entities_dct (dict): Ticker symbols as keys and their respective labels (description) as values.
        country_name (str): Currently the pyckage supports historical data for total 13 countries.
        strt_date (str): Starting date of the fetched data.
        end_date (str): Ending date of the fetched data.
        interval (str): Interval between two consecutive values of the historic data. It could be seconds, minutes, hours,
                        days (daily), weekly, monthly, quarterly, halfyearly, yearly.
        currency_cross (str): Forex rate between two currencies when the product_name is 'currency'. E.g. "usd/inr".
        as_json (bool): If True, the fetched data will be stored in as_json format.
        order (str): The ascendig/discending order of the fetched data on date axis.

    Returns:
        hist_dct (dict): A dictionary of DFs (as dict value) against the respective entity tickers as dict keys.

    Example:
        stcks_dct = import_data(product_name="stocks", entity_dict={"HDBK": "HDFC Bank"}, country_name="India",
                                strt_date="01/01/2019", end_date=date.today().strftime("%d/%m/%Y"), interval="daily")
    """

    product_db_stocks = ["stock", "stocks", "equity", "equities", "share", "shares"]

    product_db_commodities = ["commodity", "commodities"]

    product_db_MFs = [
        "fund",
        "funds",
        "mutual fund",
        "mutual funds",
        "mutual_fund",
        "mutual_funds",
        "mf",
        "mfs",
    ]

    product_db_ETFs = ["etfs", "etf"]

    product_db_indices = ["index", "indices"]

    product_db_currencies = ["currencies", "currency"]

    # concatenate all individual lists into one.
    product_db = (
        product_db_stocks
        + product_db_commodities
        + product_db_MFs
        + product_db_ETFs
        + product_db_indices
        + product_db_currencies
    )

    if product_name.lower() in product_db_commodities:
        historical_data_dfs_list = [
            investpy.commodities.get_commodity_historical_data(  # 'investpy.get_commodity_historical_data' also works!
                commodity=entity,
                country=country_name,
                from_date=strt_date,
                to_date=end_date,
                interval=interval,
                as_json=as_json,
                order=order,
            )
            for entity in [*entities_dct.values()]  # list of (Commodity) DFs
        ]

    elif product_name.lower() in product_db_stocks:
        historical_data_dfs_list = [
            investpy.stocks.get_stock_historical_data(  # 'investpy.get_commodity_historical_data' also works!
                stock=entity,
                country=country_name,
                from_date=strt_date,
                to_date=end_date,
                interval=interval,
                as_json=as_json,
                order=order,
            ).drop(
                columns=["Currency"]
            )
            for entity in [*entities_dct.values()]
        ]

    elif product_name.lower() in product_db_MFs:
        historical_data_dfs_list = [
            investpy.funds.get_fund_historical_data(  # 'investpy.get_commodity_historical_data' also works!
                fund=entity,
                country=country_name,
                from_date=strt_date,
                to_date=end_date,
                interval=interval,
                as_json=as_json,
                order=order,
            )
            for entity in [*entities_dct.values()]
        ]

    elif product_name.lower() in product_db_ETFs:
        historical_data_dfs_list = [
            investpy.etfs.get_etf_historical_data(  # 'investpy.get_commodity_historical_data' also works!
                etf=entity,  # assign values from a dictionary to fetch data
                country=country_name,
                from_date=strt_date,
                to_date=end_date,
                interval=interval,
                as_json=as_json,
                order=order,
            )
            for entity in [*entities_dct.values()]
        ]

    elif product_name.lower() in product_db_indices:
        historical_data_dfs_list = [
            investpy.indices.get_index_historical_data(  # 'investpy.get_commodity_historical_data' also works!
                index=entity,
                country=country_name,
                from_date=strt_date,
                to_date=end_date,
                interval=interval,
                as_json=as_json,
                order=order,
            )
            for entity in [*entities_dct.values()]
        ]

    elif product_name.lower() in product_db_currencies:
        historical_data_dfs_list = [
            investpy.currency_crosses.get_currency_cross_historical_data(  # 'investpy.get_commodity_historical_data' also works!
                currency_cross=entity,
                from_date=strt_date,
                to_date=end_date,
                interval=interval,
                as_json=as_json,
                order=order,
            )
            for entity in [*entities_dct.values()]
        ]

    else:
        raise Exception(
            f"The entered product name ({product_name.lower()}) is not avalable in {product_db}"
        )

    hist_dct = {
        [*entities_dct.keys()][aa]: historical_data_dfs_list[aa]
        for aa in range(len(entities_dct))
    }

    printmd(
        f"Historical {product_name.lower()} data for {[*entities_dct]} from {strt_date} to {end_date} is imported."
    )
    return hist_dct

In [None]:
def stocks_financials(
    entities_dct: dict, xpath_URLnext: str, xpath_table: str, order: str = "ascending"
) -> dict:

    """
    Scrap quarterly stock results data from the given URL (as a dict). Surprizingly, the XPath is same for "Next" button for any financial data on 
    moneycontrol.com! All the DFs scraped from different webpages for one stock will be merged into one with discarding repeating columns and in a further 
    usable way. Then the whole DF is segragated in smaller DFs as per their headings and are stored into a dictionary. The respective headings are then 
    keys for the respective DFs.

    Parameters:
        entities_dct (dict): Name of the entities as keys and list of their financial data URLs as values.
        xpath_URLnext (str): XPath for pagination ("Next" button link)
        xpath_table (str): XPath for table to be scraped.
        order (str): The ascending/descending order of data on Date index in all DFs.

    Returns:
        financial_dct (dict): A dict with entity names as keys and their financial data as values. The financial data are
                              also nested dicts having keys as "Quarterly results, capital structure, etc..." and respective
                              DFs as values, and so on.

    Example:
        financial_data = stocks_financials(entities_dct=stocks_financials_dct, xpath_URLnext=xpath_financials_URLnext,                                                  xpath_table=xpath_financials_table, order="ascending")
    """

    financial_dct = {}

    printmd("**Scraping financial data for...**\n", "yellow")
    for entity in entities_dct.keys():

        printmd(entity, "yellow")

        entity_URLs_list = entities_dct[entity]

        quarterly_results_dct = {}
        yearly_results_dct = {}
        balance_sheet_dct = {}
        profit_loss_dct = {}
        cash_flow_dct = {}
        ratios_dct = {}

        for URL in entity_URLs_list:
            url_list = []  # List of URLs to be iterated for indefinite iteration.
            df_list = []  # Empty list before each iteration.

            if "capital-structure" in URL:

                # Find 'tr' tag from the element-tree by given xpath.
                # The '.' at the beginning means, that the current processing starts at the current node. The '*' selects all element nodes descending from this current node with the @id-attribute-value equal to 'mctable1'.
                # Both child (/) and descendant-or-self (//) are axes in XPath. '/' is short for '/child::node()/'. Use '/' to select a node's immediate children. '//' is short for '/descendant-or-self::node()/'. Use '//' to select a node, its children, its grandchildren, and so on recursively.
                mc_financial_tree = request_lxml_etree(url=URL)
                table_element = etree_table(
                    etree=mc_financial_tree, xpath_table=xpath_table
                )

                # Find 'tr' tag from the element-tree by given xpath.
                trow_elements_list = table_element.findall(".//tr")

                table = [
                    row.xpath(".//th//text() | .//td//text()")
                    for row in trow_elements_list
                ]

                # Rename the existing columns and select desired entries (rows).
                col_names_list = [
                    "From",
                    "Year",
                    "Instrument",
                    "Authorized Capital (Cr.)",
                    "Issued Capital (Cr.)",
                    "Shares",
                    "FV",
                    "Paid Capital (Cr.)",
                ]
                df_raw = DF(table[2:], columns=col_names_list)

                # Add the oldest year to "Year" column. Eg. 1994 at the bottom of DF to get rid of "To" and "From" columns in next steps.
                df_row = pd.concat(
                    [
                        df_raw,
                        DF(
                            np.array(
                                [
                                    [
                                        np.nan,
                                        df_raw.iloc[-1, 0],
                                        np.nan,
                                        np.nan,
                                        np.nan,
                                        np.nan,
                                        np.nan,
                                        np.nan,
                                    ]
                                ]
                            ),
                            columns=df_raw.columns,
                        ),
                    ],
                    axis=0,
                    ignore_index=True,
                )

                # Create a new column named 'Date' with adding 'Day=1' and 'Month = March'.
                df_row["Date"] = pd.to_datetime(
                    df_row[["Year"]].assign(Day=1, Month=3), format="%d-%m-%Y"
                )

                columns_drop = ["From", "Year", "Instrument"]
                df = df_row.drop(columns=columns_drop).set_index("Date")
                df = df.rename_axis(entity, axis=1)

                # Choose the indexing order
                if order == "descending":
                    capital_structure_df = df.sort_index(ascending=False)

                else:
                    capital_structure_df = df.sort_index(ascending=True)

                printmd(f"Capital structure data is scraped Successfully! \n\n")

            else:
                # Keep fetching the data until the next page link is active.
                while URL != "javascript:void();":
                    # --------------------------- Find desired data and save it in a DF ---------------------------#
                    mc_financial_tree = request_lxml_etree(url=URL)
                    table_element = etree_table(
                        etree=mc_financial_tree, xpath_table=xpath_table
                    )

                    # Find 'tr' tag from the element-tree by given xpath.
                    trow_elements_list = table_element.findall(".//tr")

                    # Double list comprehension to get each tr as a list of td (string)
                    table_raw = [
                        [td.text for td in row.getchildren()]
                        for row in trow_elements_list
                    ]

                    # remove None table rows.
                    table_noNone = [tdata for tdata in table_raw if tdata[0]]

                    # Replace None and '\xa0' with np.nan within TRs
                    table = [
                        [
                            np.nan if any([tdata == None, tdata == "\xa0"]) else tdata
                            for tdata in trow
                        ]
                        for trow in table_noNone
                    ]

                    # Return DataFrame with duplicate rows removed.
                    df_raw = DF(data=table).drop_duplicates()

                    # Remove last row named as 'yrc' (date) if present.
                    raw_drop = df_raw[df_raw[0] == "yrc"].index
                    df_yrc_drop = df_raw.drop(raw_drop)

                    # Replace '--' with 'missing' instead of NaN in order to separate the DF in future at NaN values columns.
                    df_missing = df_yrc_drop.iloc[:, :].replace("--", "missing")

                    # Drop columns which have only Nan values.
                    df_na_drop = df_missing.dropna(axis=1, how="all")

                    # Convert numerical strings to np.float64; non-number strings will remain as it is.
                    df = df_na_drop.apply(
                        pd.to_numeric, downcast="float", errors="ignore"
                    )

                    df_list.append(df)

                    # ------------------------ Get subsequent URLs for the same data until the end ------------------------ #
                    url_list.append(URL)  # List of subsequent URL.
                    urlnext = etree_URLnext(
                        etree=mc_financial_tree, xpath_URLnext=xpath_URLnext
                    )
                    URL = urlnext

                # Concat individual df from all pages into one DF
                df_concated = pd.concat(df_list, axis=1, ignore_index=True)

                # Remove '(a) , (b) , (c) , i) , ii) , - ', etc. elements from each data (string).
                string_to_replace = "\(a\) |\(b\) |\(c\) |\(d\) |a\) |b\) |c\) |d\) |i\) |ii\) |'|- | :|\.$"
                string_replaced_with = ""
                df_str_fmtd = df_concated.replace(
                    string_to_replace, string_replaced_with, regex=True
                )

                # Make 1st row as column header and drop the 1st raw.
                df_str_fmtd.columns = df_str_fmtd.iloc[0]
                df_fmtd_hdr = df_str_fmtd.drop(df_str_fmtd.index[0])

                # Remove duplicate columns that is nedded once but is present on every page (e.g. 'Quarterly Results of HDFC Bank (in Rs. Cr.)').
                data_raw = df_fmtd_hdr.loc[
                    :, ~df_fmtd_hdr.columns.duplicated()
                ]  # It checks the column header!
                data_raw.set_index(
                    data_raw.columns[0], inplace=True
                )  # set 1st column as index
                data_trsps = data_raw.transpose()  # Transpose the df

                # Format string date to datetime object.
                data_trsps.index = pd.to_datetime(
                    data_trsps.index, format="%b %y", dayfirst=True
                )
                data_trsps.index.name = "Date"  # Assign name to the index column.

                # Rename the index title with (a shorter) equity name.
                data_trsps_index = data_trsps.rename_axis(entity, axis=1)

                # Choose the indexing order
                if order == "descending":
                    data = data_trsps_index.sort_index(ascending=False)

                else:
                    data = data_trsps_index.sort_index(ascending=True)

                if "quarterly-results" in url_list[0]:
                    if "consolidated" not in url_list[0]:
                        printmd(
                            f"Standalone Quarterly Result data from {len(url_list)} urls is scraped Successfully!"
                        )

                        # Here "update" is needed because 'standalone' and 'consolidated' are two values in the same dict.
                        # Break big DF into samll ones and save them as a dict.
                        quarterly_results_dct.update(
                            {"Standalone": df_to_dictDF(entity_df=data)}
                        )

                    elif "consolidated" in url_list[0]:
                        printmd(
                            f"Consolidated Quarterly Result data from {len(url_list)} urls is scraped Successfully!"
                        )
                        quarterly_results_dct.update(
                            {"Consolidated": df_to_dictDF(entity_df=data)}
                        )

                elif "yearly" in url_list[0]:
                    if "consolidated" not in url_list[0]:
                        printmd(
                            f"Standalone Yearly Result data from {len(url_list)} urls is scraped Successfully!"
                        )
                        yearly_results_dct.update(
                            {"Standalone": df_to_dictDF(entity_df=data)}
                        )

                    elif "consolidated" in url_list[0]:
                        printmd(
                            f"Consolidated Yearly Result data from {len(url_list)} urls is scraped Successfully!"
                        )
                        yearly_results_dct.update(
                            {"Consolidated": df_to_dictDF(entity_df=data)}
                        )

                elif "balance-sheet" in url_list[0]:
                    if "consolidated" not in url_list[0]:
                        printmd(
                            f"Standalone Balance-Sheet data from {len(url_list)} urls is scraped Successfully!"
                        )
                        balance_sheet_dct.update(
                            {"Standalone": df_to_dictDF(entity_df=data)}
                        )

                    elif "consolidated" in url_list[0]:
                        printmd(
                            f"Consolidated Balance-Sheet data from {len(url_list)} urls is scraped Successfully!"
                        )
                        balance_sheet_dct.update(
                            {"Consolidated": df_to_dictDF(entity_df=data)}
                        )

                elif "profit-loss" in url_list[0]:
                    if "consolidated" not in url_list[0]:
                        printmd(
                            f"Standalone Profit/Loss data from {len(url_list)} urls is scraped Successfully!"
                        )
                        profit_loss_dct.update(
                            {"Standalone": df_to_dictDF(entity_df=data)}
                        )

                    elif "consolidated" in url_list[0]:
                        printmd(
                            f"Consolidated Profit/Loss data from {len(url_list)} urls is scraped Successfully!"
                        )
                        profit_loss_dct.update(
                            {"Consolidated": df_to_dictDF(entity_df=data)}
                        )

                elif "cash-flow" in url_list[0]:
                    if "consolidated" not in url_list[0]:
                        printmd(
                            f"Standalone Cash-flow data from {len(url_list)} urls is scraped Successfully!"
                        )

                        cash_flow_dct.update(
                            {"Standalone": df_to_dictDF(entity_df=data)}
                        )

                    elif "consolidated" in url_list[0]:
                        printmd(
                            f"Consolidated Cash-flow data from {len(url_list)} urls is scraped Successfully!"
                        )
                        cash_flow_dct.update(
                            {"Consolidated": df_to_dictDF(entity_df=data)}
                        )

                elif "ratios" in url_list[-2]:
                    if "consolidated" not in url_list[0]:
                        printmd(
                            f"Standalone Ratio data from {len(url_list)} urls is scraped Successfully!"
                        )
                        ratios_dct.update({"Standalone": df_to_dictDF(entity_df=data)})

                    elif "consolidated" in url_list[0]:
                        printmd(
                            f"Consolidated Ratio data from {len(url_list)} urls is scraped Successfully!"
                        )
                        ratios_dct.update(
                            {"Consolidated": df_to_dictDF(entity_df=data)}
                        )

                else:
                    raise Exception("Extend the code!!!!!")

        # Update a dict with the product name as a key and a dict of corresponding sub-DFs corresponding to their respective heading.
        financial_dct.update(
            {
                entity: {
                    "Balance Sheet": balance_sheet_dct,
                    "Profit/Loss": profit_loss_dct,
                    "Quarterly Results": quarterly_results_dct,
                    "Yearly Results": yearly_results_dct,
                    "Cash Flows": cash_flow_dct,
                    "Ratios": ratios_dct,
                    "Capital Structure": capital_structure_df,
                }
            }
        )

    printmd(
        "------------------------------ Scraping Successfull! ------------------------------"
    )
    return financial_dct

In [None]:
def stocks_company_facts(
    entities_dct: dict, xpath_table: str, order: str = "ascending"
) -> dict:
    """
    Scrap company facts from the given URL (as a dict). Surprizingly the XPath is same for "Next" button for any stock! Thereafter, the all DFs scraped 
    from different webpages for one stock will be merged into one with discarding repeating columns and in a further usable way.

    Parameters:
        entities_dct (dict): Name of the entities as keys and list of their facts data URLs as values.
        xpath_table (str): XPath for table to be scraped.
        order (str): The ascending/descending order of data on Date index in all DFs.

    Returns:
        company_facts_dct (dict): A dict with entity names as keys and their facts as values. The financial data are also
                                  nested dicts having keys as "bonus, right, etc..." and respective DFs as values, and so on.

    Example:
        stocks_company_facts_data = stocks_company_facts(entities_dct=stocks_company_facts_dct,
        xpath_table=companyfacts_xpath_table, order="ascending")
    """

    company_facts_dct = {}

    printmd("**Scraping Company facts data for...**\n\n", "yellow")

    for entity in entities_dct.keys():
        printmd(entity, "yellow")

        URLs = entities_dct[entity]

        for URL in URLs:
            # --------------------------- Find interested data and save it in a DF ---------------------------#
            mc_companyfacts_tree = request_lxml_etree(url=URL)  # mc: moneycontrol
            table_element = etree_table(
                etree=mc_companyfacts_tree, xpath_table=xpath_table
            )

            # Find 'tr' tag from the element-tree by given xpath.
            trow_elements_list = table_element.findall(".//tr")

            # Double list comprehension to get each tr as a list of td (string)
            table_raw = [
                [td.text for td in row.getchildren()] for row in trow_elements_list
            ]

            traw_trs_noNone = [
                table_row for table_row in table_raw if len(table_row) != 0
            ]  # remove None trs.

            # Replace None and '\xa0' with np.nan within TRs
            table = [
                [
                    np.nan if any([tdata == None, tdata == "\xa0"]) else tdata
                    for tdata in trow
                ]
                for trow in traw_trs_noNone
            ]

            # Return DataFrame with duplicate rows removed.
            df_raw = DF(data=table)  # .drop_duplicates()

            # Convert numerical strings to np.float64; non-number strings will remain as it is.
            df = df_raw.apply(pd.to_numeric, downcast="float", errors="ignore")

            # Remove '(a) , (b) , (c) , i) , ii) , - ', etc. elements from each data (string).
            string_to_replace = "\\t|\(a\) |\(b\) |\(c\) |\(d\) |a\) |b\) |c\) |d\) |i\) |ii\) |'|- | :|\.$"
            string_replaced_with = ""
            df_str_fmtd = df.replace(
                string_to_replace, string_replaced_with, regex=True
            )

            # Make 1st row as column header and drop the 1st raw.
            df_str_fmtd.columns = df_str_fmtd.iloc[0]
            df_fmtd_hdr = df_str_fmtd.drop(df_str_fmtd.index[0])

            index_name = df_fmtd_hdr.columns[df_fmtd_hdr.columns.str.match("^E")][0]
            df_fmtd_hdr.set_index(index_name, inplace=True)  # set 1st column as index

            # Format string date to datetime object.
            df_fmtd_hdr.index = pd.to_datetime(df_fmtd_hdr.index, dayfirst=True)
            df_fmtd_hdr.index.name = index_name  # Assign name to the index column.

            # Rename the index title with (a shorter) equity name.
            df = df_fmtd_hdr.rename_axis(entity, axis=1)

            # Choose the index order
            if order == "descending":
                data = df.sort_index(ascending=False)

            else:
                data = df.sort_index(ascending=True)

            if "bonus" in URL:
                printmd("Bonus data is scraped Successfully!")
                bonus_df = data

            elif "rights" in URL:
                printmd("Rights data is scraped Successfully!")
                rights_df = data

            elif "splits" in URL:
                printmd("Splits data is scraped Successfully!")
                splits_df = data

            elif "dividends" in URL:
                printmd("Dividends data is scraped Successfully! \n\n")
                dividends_df = data

            else:
                raise Exception("Extend the code!!!!!")

        company_facts_dct.update(
            {
                entity: {
                    "Bonus": bonus_df,
                    "Rights": rights_df,
                    "Splits": splits_df,
                    "Dividends": dividends_df,
                }
            }
        )

    print(
        "---------------------------------- Scraping Successfull! ----------------------------------"
    )

    return company_facts_dct

### Operations on one or among two/three columns

In [None]:
def operation_col_concat(
    entities_dct: dict,
    oprtn_name: str,
    resltn_col_name: str,
    oprnd_col1: str,
    oprnd_col2: str = None,
    oprnd_col3: str = None,
    constant: float = None,
    forecast_time: str = None,
    window_size: int = None,
    window_type: str = None,  # extend or rolling
) -> dict:
    """
    Perform a "single" operation at a time on one or among two/three columns of all DFs in the input dict. For percentage, the difference between the 1st 
    and 2nd column is divided by the 3rd column. The result of the operation will be stored in a new DF with a column name (resltn_col_name) and is 
    concatenated to the original DF.

    Parameters:
        entities_dct (dict): A dictionary of imported historical data.
        oprtn_name (str): Name of the opertation that is going to be performed on operand column(s).
        resltn_col_name (str): Title of the resultant column.
        oprn_col(1,2,3) (str): Name of the column on which the operation is going to be performed.
        constant (numeral): A constant value.
        forecast_time (numeral): No. of (hours, days, months) to be forecasted.

    Returns:
        new_dict (dict): A new dictionary consisting the original input dictinary concated with the resultant column.

    Example:
        stk_frcst_dct = operation_col_concat(entity_dict = stk_dct, oprtn_name = "predict", resltn_col_name =
                        "Close_predict", oprnd_col1 = "Close", oprnd_col2 = None, oprnd_col3 = None, constant = None,
                        forecast_time = 30)
    """
    oprtn_mean = ["average", "avg", "mean"]
    oprtn_VWAP = ["vwap", "volume weighted average price"]
    oprtn_add = ["addition", "add", "sum"]
    oprtn_sub = ["subtraction", "sub", "minus", "difference"]
    oprtn_div = ["division", "divide", "div"]
    oprtn_mul = ["multiplication", "mul", "product"]
    oprtn_prdct = ["predict", "prediction", "forecast", "future"]
    oprtn_pct = ["percentage", "percent"]
    oprtn_rtrn = [
        "percentage change",
        "percentage_change",
        "percent change",
        "percent_change",
        "pct change",
        "pct_change",
        "%",
        "% change",
        "%_change",
        "return",
    ]
    oprtn_sdv = [
        "standard deviation",
        "standard_deviation",
        "sigma",
        "std",
        "sdv",
        "std deviation",
        "std_deviation",
        "std dvtn",
        "std_dvtn",
    ]
    oprtn_var = ["variance"]

    oprtn_names = (
        oprtn_add
        + oprtn_sub
        + oprtn_div
        + oprtn_mul
        + oprtn_prdct
        + oprtn_pct
        + oprtn_rtrn
        + oprtn_var
        + oprtn_sdv
        + oprtn_VWAP
        + oprtn_mean
    )

    col_names = [*entities_dct.values()][0].columns

    if isinstance(entities_dct, dict):
        # Operation name and the operand column1 should always be defines.
        if (oprtn_name.lower() in oprtn_names) and (oprnd_col1 in col_names):

            if oprtn_name.lower() in oprtn_VWAP:
                if (constant, forecast_time is None) and (oprnd_col2 in col_names):
                    new_dict = {
                        [*entities_dct.keys()][aa]: pd.concat(
                            [
                                [*entities_dct.values()][aa],
                                DF(
                                    (
                                        (
                                            (
                                                [*entities_dct.values()][aa][oprnd_col1]
                                                * [*entities_dct.values()][aa][
                                                    oprnd_col2
                                                ]
                                            )
                                            .expanding(min_periods=2)
                                            .sum(axis=1)
                                            / [*entities_dct.values()][aa][oprnd_col2]
                                            .expanding(min_periods=2)
                                            .sum(axis=1)
                                        )
                                        * 100
                                    ),
                                    columns=[resltn_col_name],
                                ),
                            ],
                            axis=1,
                        )
                        for aa in range(len([*entities_dct]))
                    }

                else:
                    raise Exception(
                        f"Three opearand columns are nedded to carry out {oprtn_name} operation."
                    )

            elif oprtn_name.lower() in oprtn_sdv:
                if (oprnd_col2, oprnd_col3, constant, forecast_time is None):

                    # In rolling function the window size remain constant whereas in the expanding the window gets bigger by the min_period. Format: (N1) = Standard deviation; (N1, N2) = standard deviation; (N1, N2, N3) = standard deviation.
                    #  Delta Degrees of Freedom (ddof). The divisor used in calculations is N-ddof, where N represents the number of elements.
                    new_dict = {
                        [*entities_dct.keys()][aa]: pd.concat(
                            [
                                [*entities_dct.values()][aa],
                                DF(
                                    (
                                        [*entities_dct.values()][aa][oprnd_col1]
                                        .expanding(min_periods=2)
                                        .std(axis=1, numeric_only=true, ddof=1)
                                    ),
                                    columns=[
                                        resltn_col_name
                                    ],  # ddof = 1 means N-1 elements used to divide.
                                ),
                            ],
                            axis=1,
                        )
                        for aa in range(len([*entity_dict]))
                    }

                else:
                    raise Exception(
                        f"Something went wrong! The {oprtn_name} could not be performed on {oprnd_col1}."
                    )

            elif oprtn_name.lower() in oprtn_var:
                if (oprnd_col2, oprnd_col3, constant, forecast_time is None):
                    new_dict = {
                        [*entities_dct.keys()][aa]: pd.concat(
                            [
                                [*entities_dct.values()][aa],
                                DF(
                                    (
                                        [*entities_dct.values()][aa][oprnd_col1]
                                        .expanding(min_periods=2)
                                        .var(axis=1, ddof=1)
                                    ),
                                    columns=[resltn_col_name],
                                ),
                            ],
                            axis=1,
                        )
                        for aa in range(len([*entities_dct]))
                    }

                else:
                    raise Exception(
                        f"Something went wrong! The {oprtn_name} could not be performed on {oprnd_col1}."
                    )

            elif oprtn_name.lower() in oprtn_rtrn:
                if (oprnd_col2, oprnd_col3, constant, forecast_time is None):
                    new_dict = {
                        [*entities_dct.keys()][aa]: pd.concat(
                            [
                                [*entities_dct.values()][aa],
                                DF(
                                    (
                                        [*entities_dct.values()][aa][
                                            oprnd_col1
                                        ].pct_change()
                                        * 100
                                    ),
                                    columns=[resltn_col_name],
                                ),
                            ],
                            axis=1,
                        )
                        for aa in range(len([*entities_dct]))
                    }

                else:
                    raise Exception(
                        f"Something went wrong! The {oprtn_name} could not be performed on {oprnd_col1}."
                    )

            elif oprtn_name.lower() in oprtn_pct:
                if (constant, forecast_time is None) and (
                    oprnd_col2,
                    oprnd_col3 in col_names,
                ):
                    new_dict = {
                        [*entities_dct.keys()][aa]: pd.concat(
                            [
                                [*entities_dct.values()][aa],
                                DF(
                                    (
                                        (
                                            (
                                                [*entities_dct.values()][aa][oprnd_col1]
                                                - [*entities_dct.values()][aa][
                                                    oprnd_col2
                                                ]
                                            )
                                            / [*entities_dct.values()][aa][oprnd_col3]
                                        )
                                        * 100
                                    ),
                                    columns=[resltn_col_name],
                                ),
                            ],
                            axis=1,
                        )
                        for aa in range(len([*entities_dct]))
                    }

                else:
                    raise Exception(
                        f"Three opearand columns are nedded to carry out {oprtn_name} operation."
                    )

            elif oprtn_name.lower() in oprtn_add:
                if (constant, oprnd_col3, forecast_time is None) and (
                    oprnd_col2 in col_names
                ):
                    new_dict = {
                        [*entities_dct.keys()][aa]: pd.concat(
                            [
                                [*entities_dct.values()][aa],
                                DF(
                                    (
                                        [*entities_dct.values()][aa][oprnd_col1]
                                        + [*entities_dct.values()][aa][oprnd_col2]
                                    ),
                                    columns=[resltn_col_name],
                                ),
                            ],
                            axis=1,
                        )
                        for aa in range(len([*entities_dct]))
                    }

                elif (constant is not None) and (
                    oprnd_col2,
                    oprnd_col3,
                    forecast_time is None,
                ):
                    new_dict = {
                        [*entities_dct.keys()][aa]: pd.concat(
                            [
                                [*entities_dct.values()][aa],
                                [*entities_dct.values()][aa][oprnd_col1]
                                .add(constant)
                                .to_frame(name=resltn_col_name),
                            ],
                            axis=1,
                        )
                        for aa in range(len([*entities_dct]))
                    }

                else:
                    raise Exception(
                        f"Something went wrong! The {oprtn_name} operation could not be performed!"
                    )

            elif oprtn_name.lower() in oprtn_sub:
                if (oprnd_col3, constant, forecast_time is None) and (
                    oprnd_col2 in col_names
                ):
                    new_dict = {
                        [*entities_dct.keys()][aa]: pd.concat(
                            [
                                [*entities_dct.values()][aa],
                                DF(
                                    (
                                        [*entities_dct.values()][aa][oprnd_col1]
                                        - [*entities_dct.values()][aa][oprnd_col2]
                                    ),
                                    columns=[resltn_col_name],
                                ),
                            ],
                            axis=1,
                        )
                        for aa in range(len([*entities_dct]))
                    }

                elif (constant is not None) and (
                    oprnd_col2,
                    oprnd_col3,
                    forecast_time is None,
                ):
                    new_dict = {
                        [*entities_dct.keys()][aa]: pd.concat(
                            [
                                [*entities_dct.values()][aa],
                                [*entities_dct.values()][aa][oprnd_col1]
                                .add(constant)
                                .to_frame(name=resltn_col_name),
                            ],
                            axis=1,
                        )
                        for aa in range(len([*entities_dct]))
                    }

                else:
                    raise Exception(
                        f"Something went wrong! The {oprtn_name} operation could not be performed!"
                    )

            elif oprtn_name.lower() in oprtn_mul:
                if (constant, oprnd_col3, forecast_time is None) and (
                    oprnd_col2 in col_names
                ):
                    new_dict = {
                        [*entities_dct.keys()][aa]: pd.concat(
                            [
                                [*entities_dct.values()][aa],
                                DF(
                                    (
                                        [*entities_dct.values()][aa][oprnd_col1]
                                        * [*entities_dct.values()][aa][oprnd_col2]
                                    ),
                                    columns=[resltn_col_name],
                                ),
                            ],
                            axis=1,
                        )
                        for aa in range(len([*entities_dct]))
                    }

                elif (constant is not None) and (
                    oprnd_col2,
                    oprnd_col3,
                    forecast_time is None,
                ):
                    new_dict = {
                        [*entities_dct.keys()][aa]: pd.concat(
                            [
                                [*entities_dct.values()][aa],
                                [*entities_dct.values()][aa][oprnd_col1]
                                .mul(constant)
                                .to_frame(name=resltn_col_name),
                            ],
                            axis=1,
                        )
                        for aa in range(len([*entity_dict]))
                    }

                else:
                    raise Exception(
                        f"Something went wrong! The {oprtn_name} operation could not be performed!"
                    )

            elif oprtn_name.lower() in oprtn_div:
                if (constant, forecast_time, oprnd_col3 is None) and (
                    oprnd_col2 in col_names
                ):
                    new_dict = {
                        [*entities_dct.keys()][aa]: pd.concat(
                            [
                                [*entities_dct.values()][aa],
                                DF(
                                    (
                                        [*entities_dct.values()][aa][oprnd_col1]
                                        / [*entities_dct.values()][aa][oprnd_col2]
                                    ),
                                    columns=[resltn_col_name],
                                ),
                            ],
                            axis=1,
                        )
                        for aa in range(len([*entities_dct]))
                    }

                elif ((constant is not None) and (constant != 0)) and (
                    oprnd_col2,
                    oprnd_col3,
                    forecast_time is None,
                ):
                    new_dict = {
                        [*entities_dct.keys()][aa]: pd.concat(
                            [
                                [*entities_dct.values()][aa],
                                [*entities_dct.values()][aa][oprnd_col1]
                                .div(constant)
                                .to_frame(name=resltn_col_name),
                            ],
                            axis=1,
                        )
                        for aa in range(len([*entities_dct]))
                    }

                else:
                    raise Exception("Cannot divide by 0.")

            elif oprtn_name.lower() in oprtn_prdct:
                if (constant, oprnd_col2, oprnd_col3 is None) and (
                    (forecast_time != 0) or (forecast_time is not None)
                ):
                    new_dict = {
                        [*entities_dct.keys()][aa]: pd.concat(
                            [
                                [*entities_dct.values()][aa],
                                [*entities_dct.values()][aa][oprnd_col1]
                                .shift(-forecast_time)  # forecast days from today.
                                .to_frame(name=resltn_col_name),
                            ],
                            axis=1,
                        )
                        for aa in range(len(entities_dct))
                    }

                else:
                    pass

            else:
                raise Exception(
                    f"Something went wrong! The {oprtn_name} operation could not be performed!"
                )

            printmd(
                f"The '{oprtn_name}' operation on all the DFs of the input dict has been preformed and the resultant column ({resltn_col_name}) is concated to all the DFs respectively."
            )

            return new_dict

        else:
            raise Exception(
                f"Either the entered operation name {oprtn_name} does not match with the availebale operations {oprtn_names} or the entered operand column names {oprnd_col1, oprnd_col2, oprnd_col3} do not match with the entity's column names {col_names}."
            )

    else:
        raise Exception(
            f"The input entity must be of a dictionary type not {type(entities_dct)}."
        )

### Transpose imported data

In [None]:
def col_entity_dict(entities_dct: dict) -> dict:
    """
    Returns a dictionary with all the columns as keys and the data with respect to that particular column from all the entities collected in a DF. For 
    e.g. the imported data is a dictionary and has a form of {entity_name: value_df([col1(Open)]) [col2 (High)] [col3 (Low)] [col4 (Close)], [col5 
    (Volume)], [col6 (Currency)])}. This dictionary is converted in a new (can I  say transposed!) dictionary of the form, {col1('Open'): 
    value_df([entity_name1 (col1)] [entity_name2 (col1)][entity_name3 (col1)]...... [entity_nameN (col1)])}.

    Parameters:
        entities_dct (dict): Input dictionary.

    Returns:
        entity_cols_dct (dict): A transposed dictionary with rearranfed data where the columns of DFs (values in input dict)
        becomes the keys and the ticker symbols (keys of input dict) becomes the column names of a DF corresponding to the new
        keys. E.g. entity_dct = {"SBI": DF("Close", "Open"), "HDBK": DF("Close", "Open")} becomes... entity_cols_dct =
                                {"Close": DF("SBI", "HDBK"), "Open": DF("SBI", "HDBK")}.

    Example:
        stk_col_dct = col_entity_dict(entity_dict = stk_dct)
    """

    if isinstance(entities_dct, dict):
        entity_cols_dct = {}
        for column in [*entity_dct.values()][
            0
        ].columns:  # list column names of any DF (here first) to make them keys
            entity_col_df = pd.concat(
                [entity[column] for entity in entities_dct.values()], axis=1
            )
            entity_col_df.columns = [entity_name for entity_name in [*entities_dct]]
            entity_cols_dct[column] = entity_column_df

        printmd(
            f"A dictionary of historical data from {entity_col_df.index[0].date()} to {entity_col_df.index[-1].date()} is sorted by column names ({[*[*entities_dct.values()][0].columns]}) and these new columns of the DFs are renamed as 'entity_name (col)' ({[*entity_col_df.columns]})."
        )

        return entity_cols_dict

    else:
        raise Exception(
            f"The {entities_dct} must be of dict type where it has {type(entities_dct)}."
        )

### Select column(s) from dict of DFs

In [None]:
def select_cols(entities_dct: dict, col_names: list) -> dict:
    """
    A UDF to select particular column(s) from the DFs in a dictionary returning in a new dict with the same format.

    Parameters:
        entities_dct (dict): Input dictionary.
        col_names (str): List of column names to be extracted.

    Returns:
        selected_cols_dict (dict): A dictionray with only selected column(s) in DFs.

    Example:
        stk_Cls_Opn_dct = select_cols(entity_dict= stk_dct, col_names = ["Close", "Open"])
    """
    if isinstance(entities_dct, dict):
        selected_cols_dct = {
            [*entities_dct][aa]: [*entities_dct.values()][aa][col_names]
            for aa in range(len(entities_dct))
        }
        printmd(
            f"The {col_names} column(s) are extracted from the DFs of input dict. The extracted columns are returned as DFs in a new dict. with the same keys."
        )
        return selected_cols_dct

    else:
        raise Exception(
            f"The {entities_dct} must be of dict type where it has {type(entities_dct)}."
        )

### Window filters to DF column(s) in a transpose dict

### Date difference

In [None]:
def days_bw(forecast_date: str, strt_datetime: "date" = date.today()) -> int:
    """
    A UDF to find between two dates The start date will be today by default but can be changed.

    Parameters:
        forecast_date (str): The future date untill which the forecast will take place. E.g. "20/12/2020"
        strt_datetime (date): Start date time.

    Returns:
        days (int): An integer defference between the forecast date and the start date.

    Example:
        forecast_days = days_bw(forecast_date="01/01/2019", strt_datetime = date.today())
    """
    days = (forecast_date - strt_datetime).days

    if days > 0:
        printmd(f"The input dates will forecst for next {days} days", "green")

    elif days == 0:
        raise Exception("The difference between dates is 0 days, which makes no sense!")

    else:
        warn(f"WARNING: The input dates will 'retrodict' for last {abs(days)} days.")

    return days

In [None]:
def moneycontrol_urls(entities_dct: dict, product: str, categories_list: list) -> dict:
    """
    A UDF returning a dict of entity names as keys and the list of respective product URLs as values.

    Parameters:
        entities_dct (dict): Input dictionary.
        product (str): 'financials', 'company-facts'.

    Returns:
        url_dct (dict): A dictionray with only selected column(s) in DFs.

    Example:
        moneycontrol_stk_url_dct = moneycontrol_urls(entities_dct=stocks_url_dct, product=product, categories_list=categories)
    """

    base_url = "https://www.moneycontrol.com/"
    url_dct = {}
    for entity_key, entity_value in entities_dct.items():
        url_list = []
        for category in categories_list:
            url = base_url + product + "/" + entity_value + "/" + category + "/"
            url_list.append(url)

        url_dct.update({entity_key: url_list})

    return url_dct

## Stock Analysis
### Import and modify data
#### Input parameters

#### Historical Price Data

In [None]:
start_date = "01/01/2019"
end_date = date.today().strftime("%d/%m/%Y")
interval = "Daily"

stocks_dct = {"HDFC Bank": "HDBK", "SBI Bank": "SBI"}  # product_sector_objectType

In [None]:
s_bk_dt = import_hist_data(
    product_name="stocks",
    entities_dct=stocks_dct,
    country_name="India",
    strt_date=start_date,
    end_date=end_date,
    interval=interval,
)

In [None]:
display(s_bk_dt["HDFC Bank"])

### Historical Financial Data

In [None]:
xpath_financials_URLnext = './/span[contains(@class, "nextpag")]'
xpath_financials_table = './/table[contains(@class, "mctable")]'  # "/html/body/section/div[2]/div/div[2]/div[2]/div/div[2]/div/div[1]/table"

companyfacts_xpath_table = './/table[contains(@class, "mctable")]'

stocks_url_dct = {"SBI Bank": "statebankindia", "HDFC Bank": "hdfcbank"}

finance_prdct = "financials"
companyfacts_prdct = "company-facts"

finance_ctgrs_list = [
    "balance-sheetVI",
    "consolidated-balance-sheetVI",
    "profit-lossVI",
    "consolidated-profit-lossVI",
    "results/quarterly-results",
    "results/consolidated-quarterly-results",
    "results/yearly",
    "results/consolidated-yearly",
    "cash-flowVI",
    "consolidated-cash-flowVI",
    "ratiosVI",
    "consolidated-ratiosVI",
    "capital-structure/SBI",
]

companyfacts_ctgrs_list = ["bonus", "rights", "splits", "dividends"]

In [None]:
url_finance_dct = moneycontrol_urls(
    entities_dct=stocks_url_dct,
    product=finance_prdct,
    categories_list=finance_ctgrs_list,
)
url_companyfacts_dct = moneycontrol_urls(
    entities_dct=stocks_url_dct,
    product=companyfacts_prdct,
    categories_list=companyfacts_ctgrs_list,
)

In [None]:
stocks_financials_data = stocks_financials(
    entities_dct=url_finance_dct,
    xpath_URLnext=xpath_financials_URLnext,
    xpath_table=xpath_financials_table,
    order="ascending",
)

In [None]:
display(stocks_financials_data.keys())
display(stocks_financials_data["HDFC Bank"].keys())
display(stocks_financials_data["HDFC Bank"]["Balance Sheet"].keys())
display(stocks_financials_data["HDFC Bank"]["Balance Sheet"]["Standalone"].keys())
display(
    stocks_financials_data["HDFC Bank"]["Balance Sheet"]["Standalone"][
        "SHAREHOLDERS FUNDS"
    ].keys()
)  # no more inner dicts
display(
    stocks_financials_data["HDFC Bank"]["Capital Structure"].keys()
)  # no more inner dicts

#### Historical stock details

In [None]:
stocks_company_facts_data = stocks_company_facts(
    entities_dct=url_companyfacts_dct,
    xpath_table=companyfacts_xpath_table,
    order="ascending",
)

In [None]:
# ------------------------------ Forecast time ------------------------------ #
forecast_days = 30  # days

# ------------------------------ Prediciton date index from today------------------------------ #
predict_start = datetime.today()
date_frcstd_index = pd.date_range(
    start=predict_start,
    end=None,
    periods=forecast_days + 1,
    freq="B",
    tz=None,
    normalize=True,
    name=date,
    closed="right",
)  # name = DatetimeIndex; closed = None means extreme points are included; Business (B) days frequency; lower value is exluded hence forecast_days days will yield forecast_days-1 entries, therefore forecast_days+1 days.


# ------------------------------ List of Columns in DF ------------------------------ #
cls_col = "Close"
hi_col = "High"
lw_col = "Low"
opn_col = "Open"
COO_col = "(C-O)/O"
HLC_col = "(H-L)/C"
cls_rtrn = "ΔC/C"  # (N_2 - N_1)/N_2
cls_predict_col = (
    f"{cls_col} (Φ: {forecast_days} d)"  # Φ denotes futre or prediction (forecast time)
)


# ------------------------------ List of Operations ------------------------------ #
oprtn_predict = "predict"  # Will just up-shift the data (entries) by forecast time and puts Nan in the blank entries.
oprtn_pct = "percentage"  # For (High-Low)/Close and (Close - Open)/Open percentages.
oprtn_std = "standard deviation"  # or "std", "sigma"
oprtn_VWAP = "volume weighted average price"  # or "VWAP"
oprtn_return = "return"  # or "percentage change"

__Investigate the model accuracy by varying the independent variables.__

In [None]:
s_bk_HLC_dt = operation_col_concat(
    entities_dct=s_bk_dt,
    oprtn_name=oprtn_pct,
    resltn_col_name=HLC_col,
    oprnd_col1=hi_col,
    oprnd_col2=lw_col,
    oprnd_col3=cls_col,
    constant=None,
    forecast_time=None,
)

s_bk_HLC_COO_dt = operation_col_concat(
    entities_dct=s_bk_HLC_dt,
    oprtn_name=oprtn_pct,
    resltn_col_name=COO_col,
    oprnd_col1=cls_col,
    oprnd_col2=opn_col,
    oprnd_col3=opn_col,
    constant=None,
)

s_bk_HLC_COO_frct_dt = operation_col_concat(
    entities_dct=s_bk_HLC_COO_dt,
    oprtn_name=oprtn_predict,
    resltn_col_name=cls_predict_col,
    oprnd_col1=cls_col,
    oprnd_col2=None,
    oprnd_col3=None,
    constant=None,
    forecast_time=forecast_days,
)

display(s_bk_HLC_COO_frct_dt["HDFC Bank"])

In [None]:
from sklearn import metrics, preprocessing
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import (
    ElasticNet,
    ElasticNetCV,
    Lars,
    LarsCV,
    Lasso,
    LassoCV,
    LassoLars,
    LassoLarsCV,
    LinearRegression,
    MultiTaskElasticNet,
    MultiTaskElasticNetCV,
    MultiTaskLasso,
    Ridge,
    RidgeCV,
    SGDRegressor,
)
from sklearn.metrics import (
    accuracy_score,
    classification_report,
    confusion_matrix,
    r2_score,
)
from sklearn.model_selection import cross_validate, train_test_split
from sklearn.neighbors import KNeighborsRegressor
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import PolynomialFeatures
from sklearn.tree import DecisionTreeRegressor

# from sklearn import cross_validation

# from sklearn.preprocessing import MinMaxScaler
# scaler = MinMaxScaler(feature_range=(0, 1))

In [None]:
# ------------------------------ Classical linear regressors Models ------------------------------ #

model_LR = LinearRegression(fit_intercept=True, normalize=False, copy_X=True, n_jobs=-1)  

display(MD("$$LR:\ \hat{y}(w, x) = ω_0 + ω_1 x_1 + ω_2 x_2 + ..... + ω_n x_n$$"))


# Bias/variance tradeoff: the larger the ridge alpha parameter-> the higher the bias and the lower the variance (the amount of shrinkage and thus the coefficients become more robust to collinearity).
display(MD("$$Ridge = \min_{ω} || Xω - y||_2^2 + 𝛼 ||ω||_2^2$$"))
model_Ridge = Ridge(
    alpha=5, fit_intercept=True, normalize=False, copy_X=True, solver="auto", random_state=41,
)  # Ridge LR

model_RidgeCV = RidgeCV(alphas=(0.01, 1.0, 10.0), fit_intercept=True, normalize=False)  # RidgeCV LR


# Not understood completely!
model_SGDR = SGDRegressor(
    loss="squared_loss",
    penalty="elasticnet",
    alpha=0.0001,
    l1_ratio=0.15,
    fit_intercept=True,
    max_iter=1000,
    shuffle=True,
    random_state=41,
    learning_rate="adaptive",
)


# ---------------------------- Regressors with variable selection ----------------------- #
model_ElaNet = ElasticNet(
    alpha=1.0,
    l1_ratio=0.5,
    fit_intercept=True,
    normalize=False,
    precompute=False,
    max_iter=1000,
    copy_X=True,
    tol=0.0001,
    warm_start=False,
    positive=False,
    random_state=None,
    selection="cyclic",
)

model_ElaNetCV = ElasticNetCV(
    l1_ratio=0.5,
    eps=0.001,
    n_alphas=100,
    alphas=None,
    fit_intercept=True,
    normalize=False,
    precompute="auto",
    max_iter=1000,
    tol=0.0001,
    cv=None,
    copy_X=True,
    verbose=0,
    n_jobs=None,
    positive=False,
    random_state=None,
    selection="cyclic",
)

model_MultiTaskElasticNet = MultiTaskElasticNet(
    alpha=1.0,
    l1_ratio=0.5,
    fit_intercept=True,
    normalize=False,
    copy_X=True,
    max_iter=1000,
    tol=0.0001,
    warm_start=False,
    random_state=None,
    selection="cyclic",
)

model_MultiTaskElasticNetCV = MultiTaskElasticNetCV(
    l1_ratio=0.5,
    eps=0.001,
    n_alphas=100,
    alphas=None,
    fit_intercept=True,
    normalize=False,
    max_iter=1000,
    tol=0.0001,
    cv=None,
    copy_X=True,
    verbose=0,
    n_jobs=None,
    random_state=None,
    selection="cyclic",
)

model_Lasso = Lasso(alpha=5, fit_intercept=True, normalize=False, copy_X=True, random_state=41)

model_LassoCV = LassoCV(
    eps=0.001,
    n_alphas=100,
    alphas=None,
    fit_intercept=True,
    normalize=False,
    precompute="auto",
    max_iter=1000,
    tol=0.1,
    copy_X=True,
    cv=None,
    verbose=False,
    n_jobs=None,
    positive=False,
    random_state=None,
    selection="cyclic",
)

model_MultiTaskLasso = MultiTaskLasso(
    alpha=1.0,
    fit_intercept=True,
    normalize=False,
    copy_X=True,
    max_iter=1000,
    tol=0.0001,
    warm_start=False,
    random_state=None,
    selection="cyclic",
)

model_Lars = Lars(
    fit_intercept=True,
    verbose=False,
    normalize=True,
    precompute="auto",
    n_nonzero_coefs=500,
    eps=2.220446049250313e-16,
    copy_X=True,
    fit_path=True,
    jitter=None,
    random_state=None,
)

model_LarsCV = LarsCV(
    fit_intercept=True,
    verbose=False,
    max_iter=500,
    normalize=True,
    precompute="auto",
    cv=None,
    max_n_alphas=1000,
    n_jobs=None,
    eps=2.220446049250313e-16,
    copy_X=True,
)

# Lasso model fit with Least Angle Regression a.k.a. Lars. It is a Linear Model trained with an L1 prior as regularizer.
model_LassoLARS = LassoLars(
    alpha=1.0,
    fit_intercept=True,
    verbose=False,
    normalize=True,
    precompute="auto",
    max_iter=500,
    eps=2.220446049250313e-16,
    copy_X=True,
    fit_path=True,
    positive=False,
    jitter=None,
    random_state=None,
)

model_LassoLarsCV = LassoLarsCV(
    fit_intercept=True,
    verbose=False,
    max_iter=500,
    normalize=True,
    precompute="auto",
    cv=None,
    max_n_alphas=1000,
    n_jobs=None,
    eps=2.220446049250313e-16,
    copy_X=True,
    positive=False,
)


# ------------------------------ Nearest Neighbors Models ------------------------------ #
model_KNN = KNeighborsRegressor(
    n_neighbors=43,
    weights="uniform",
    algorithm="auto",
    leaf_size=100,
    p=2,
    metric="minkowski",
    metric_params=None,
    n_jobs=-1,
)  # leaf_size?

model_dist_KNN = KNeighborsRegressor(
    n_neighbors=43,
    weights="distance",
    algorithm="auto",
    leaf_size=5,
    p=2,
    metric="minkowski",
    metric_params=None,
    n_jobs=-1,
)

In [None]:
# ---------------------------- Seaparete the independent and dependent variables ---------------------------- #
# Independent variables to predict CP (Just independent variables like OP, HP, LP, and volume)
X = s_bk_HLC_COO_frct_dt["HDFC Bank"].drop(columns=[cls_predict_col], axis=1)[
    :-forecast_days
]  # Only select data
# display(X)

# Dependent variable (CP) prediction from the independent variables
Y = s_bk_HLC_COO_frct_dt["HDFC Bank"][[cls_predict_col]][:-forecast_days]

X_future = s_bk_HLC_COO_frct_dt["HDFC Bank"].drop(columns=[cls_predict_col], axis=1)[
    -forecast_days:
]  # Time Series (TS) sequence

test_size = [
    0.5,
    0.6,
    0.67,
    0.7,
    0.75,
]  # [0.1, 0.2, 0.25, 0.3, 0.33, 0.4, 0.5, 0.6, 0.67, 0.7, 0.75, 0.8, 0.9]

In [None]:
X_train_list = []
X_test_list = []
Y_train_list = []
Y_test_list = []

conf_LR_list = []
conf_Ridge_list = []
conf_RidgeCV_list = []
conf_Lasso_list = []
conf_SGDR_list = []
conf_KNN_list = []
conf_dist_KNN_list = []

for aa in range(len(test_size)):
    X_train, X_test, Y_train, Y_test = train_test_split(
        X, Y, test_size=test_size[aa], shuffle=True, random_state=41
    )

    X_train_list.append(X_train)
    X_test_list.append(X_test)
    Y_train_list.append(Y_train)
    Y_test_list.append(Y_test)

    model_LR.fit(X_train_list[aa], Y_train_list[aa])  # training the algorithm
    conf_LR = model_LR.score(X_test_list[aa], Y_test_list[aa]) * 100
    conf_LR_list.append(conf_LR)
    print(f"Test size: {test_size[aa]} \t\t Confidence_LR: {conf_LR_list[aa]}")

    model_Ridge.fit(X_train_list[aa], Y_train_list[aa])
    conf_Ridge = model_Ridge.score(X_test_list[aa], Y_test_list[aa]) * 100
    conf_Ridge_list.append(conf_Ridge)
    print(f"Test size: {test_size[aa]} \t\t Confidence_Ridge: {conf_Ridge_list[aa]}")

    # For some cases the accuracy to predict data (Y_test) goes below zero.
    #     model_RidgeCV.fit(X_train_list[aa], Y_train_list[aa])
    #     conf_RidgeCV = model_RidgeCV.score(X_test_list[aa], Y_test_list[aa])
    #     conf_RidgeCV_list.append(conf_RidgeCV)
    #     print(
    #         f"Test size: {test_size[aa]} \t\t Confidence_RidgeCV: {conf_RidgeCV_list[aa]}"
    #     )

    model_Lasso.fit(X_train_list[aa], Y_train_list[aa])
    conf_Lasso = model_Lasso.score(X_test_list[aa], Y_test_list[aa]) * 100
    conf_Lasso_list.append(conf_Lasso)
    print(f"Test size: {test_size[aa]} \t\t Confidence_Lasso: {conf_Lasso_list[aa]}")

    #     model_SGDR.fit(X_train_list[aa], Y_train_list[aa])
    #     conf_SGDR = model_SGDR.score(X_test_list[aa], Y_test_list[aa])
    #     conf_SGDR_list.append(conf_SGDR)
    #     print(f"Test size: {test_size[aa]} \t\t Confidence_SGDR: {conf_SGDR_list[aa]}")

    model_KNN.fit(X_train_list[aa], Y_train_list[aa])
    conf_KNN = model_KNN.score(X_test_list[aa], Y_test_list[aa]) * 100
    conf_KNN_list.append(conf_KNN)
    print(f"Test size: {test_size[aa]} \t\t Confidence_KNN: {conf_KNN_list[aa]}")

    model_dist_KNN.fit(X_train_list[aa], Y_train_list[aa])
    conf_dist_KNN = model_dist_KNN.score(X_test_list[aa], Y_test_list[aa]) * 100
    conf_dist_KNN_list.append(conf_dist_KNN)
    print(
        f"Test size: {test_size[aa]} \t\t Confidence_dist_KNN: {conf_dist_KNN_list[aa]}"
    )

    print("\n")

In [None]:
plt.close("all")

In [None]:
# %matplotlib widget
plt.plot(test_size, conf_LR_list, marker="o", label="LR")
plt.plot(test_size, conf_Ridge_list, marker="v", label="Ridge")
# plt.plot(test_size, conf_RidgeCV_list, marker="^", label="RidgeCV")
plt.plot(test_size, conf_Lasso_list, marker="*", label="Lasso")
# plt.plot(test_size, conf_SGDR_list, marker = 'x', label = "SGDR")
plt.plot(test_size, conf_KNN_list, marker="s", label="KNN")
plt.plot(test_size, conf_dist_KNN_list, marker="p", label="dist_KNN")

plt.xlabel("Test size")
plt.ylabel("Accuracy score [%]")
plt.legend()

plt.show()