In [1]:
# function1 - to get the range of columns and rows of the table (but only one which is the first set of data in the worksheet)
## [Used for function 3]
import openpyxl
import pandas as pd

def find_excel_table(excel_file_path):
    try:
        workbook = openpyxl.load_workbook(excel_file_path)
        sheet = workbook.active

        # Find starting and ending rows
        start_row = None
        end_row = None

        # iterrows() - to generate an iterator object (index + row) of the DataFrame, allowing us to iterate each row in the DataFrame
        # enumerate() - contains value from iterable object and a count (start, defaulted to 0), allowed to keep track of the number of iterations (loops) in a loop
        for row_idx, row in enumerate(sheet.iter_rows(values_only=True), start=1):
        # any() function returns True if any item in an iterable are true [without it, below will return a generator object, not a single Boolean value]
            if any(cell_value is not None for cell_value in row):
                # the start_row set as None at the begining, so the if condition will be triggered anyway, then set the first row_idx of "values_only=True"
                if start_row is None:
                    start_row = row_idx
                end_row = row_idx

        # Find starting and ending columns
        start_col = None
        end_col = None

        for col_idx, col in enumerate(sheet.iter_cols(values_only=True), start=0):
            if any(cell_value is not None for cell_value in col):
                if start_col is None:
                    start_col = col_idx
                end_col = col_idx

        return start_row, end_row, start_col, end_col

    # Exception is a built-in class in Python, includes ValueError, TypeError, FileNotFoundError, and many others.
    # We can also use the components of the "Exception" class, i.e. "except TypeError as te:"
    except Exception as e:
        print(f"Error: {e}")
        return None

In [2]:
# function 3 - to extract only the data that the columns specified
import openpyxl
import os
import pandas as pd

def extract_and_slice_data(excel_file_path, column_names):
    try:
        # Find the table range
        table_range = find_excel_table(excel_file_path)

        if not table_range:
            raise ValueError("Table not found in the Excel file.")

        start_row, end_row, start_col, end_col = table_range

        # Load the data using pandas with the identified range and skip rows
        df = pd.read_excel(excel_file_path, engine='openpyxl', header=0)
        # df = pd.read_excel(excel_file_path, engine='openpyxl', header=None)

        # Slice the DataFrame to the identified columns
        df = df.iloc[:, [df.columns.get_loc(col) for col in column_names]]

        return df

    except Exception as e:
        print(f"Error: {e}")
        return None

In [3]:
# function 4 - create list of folders for each companies from excel based on above range
def create_folders_from_excel(excel_file_path, base_folder, main_column_name, sub_column_name):
    try:
# Extract and slice data
        df = extract_and_slice_data(excel_file_path, [main_column_name, sub_column_name])

        if df is None:
            raise ValueError("Error extracting data from Excel.")

# Dictionary to store main and their respective subs
        main_subs = {}

        # "iterrows()": to iterate over DataFrame rows as (index, Series) pairs
        # "_" underscore: (often used as a "throwaway" variable) indicating that we don't need the index for this loop
        for _, row in df.iterrows():
            main = row[main_column_name]
            sub = row[sub_column_name]

# Create main folder if it doesn't exist （create a key in the dictionary if not exist)
            # os.path.join(): to create a path for a folder within the directory structure.
            ## It ensures that the path is constructed in a way that is compatible with the file system.         
            main_folder = os.path.join(base_folder, str(main))
            if main not in main_subs:
                main_subs[main] = []

# Create sub folder within the main folder
            sub_folder = os.path.join(main_folder, str(sub))
            main_subs[main].append(sub_folder)

# Create folders based on the dictionary
        for main, subs in main_subs.items():
            main_folder = os.path.join(base_folder, str(main))

            if not os.path.exists(main_folder):
                os.makedirs(main_folder)

            for sub_folder in subs:
                if not os.path.exists(sub_folder):
                    os.makedirs(sub_folder)

        print("Folders created successfully.")

    except Exception as e:
        print(f"Error: {e}")

In [4]:
# !pip install selenium webdriver_manager

In [5]:
# function 6 - enchancement of function 5 - split to sub functions
# function 6.1 - add new parameter "date range" to get the list of annual report links
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from selenium.webdriver.common.action_chains import ActionChains
import requests
import os
def get_annual_report_links(core_url, company_code, cat, start_year, end_year):
    try:
        # Options() - a class to configure settings for the browser window when initializing a new instance
        options = Options()
        
        # headless mode - runs the browser without displaying the graphical user interface (run in the background)
        options.add_argument("--headless")
        
        # Create a new instance of Firefox driver
        driver = webdriver.Firefox(options=options)
        
        # Combine the URL with provided parameters
        url = f"{core_url}?company={company_code}&cat={cat}"

        try:
            # Make a request to the provided URL
            driver.get(url)

            # Wait for the page to load the "Show entries" element ["table-announcements_length" is the <div id= ...>
            WebDriverWait(driver, 10).until(
                EC.presence_of_element_located((By.NAME, 'table-announcements_length'))
            )

            # Find the select element using JavaScript
            select_element = driver.execute_script("return document.getElementsByName('table-announcements_length')[0];")

            # Scroll the select element into view using execute_script
            driver.execute_script("arguments[0].scrollIntoView();", select_element)

            # Use JavaScript to trigger the change event on the dropdown and select the option with value '50'
            driver.execute_script("arguments[0].value='50'; arguments[0].dispatchEvent(new Event('change'));", select_element)

            # Wait for the page to load after changing the number of entries
            WebDriverWait(driver, 10).until(
                EC.presence_of_all_elements_located((By.XPATH, "//a[contains(text(), 'Annual Report')]"))
            )

        except Exception as e:
            print(f"Error: {e}")
        
        annual_report_links = []

        for year in range(int(start_year), int(end_year) + 1):
            try:
                # Attempt to find the annual report link for the specified year
                annual_report_link_xpath = f"//a[contains(text(), 'Annual Report & CG Report - {year}')]"
                annual_report_link = driver.find_element(By.XPATH, annual_report_link_xpath)

                # Extract the link URL
                link_url = annual_report_link.get_attribute('href')

                # Append the link URL to the list
                annual_report_links.append(link_url)

            except NoSuchElementException:
                print(f"Annual report link for the year {year} not found. Proceeding with the next year.")

        return annual_report_links

    except Exception as e:
        print(f"Error: {e}")

    finally:
        if driver:
            try:
                driver.quit()
            except Exception as e:
                print(f"Error while quitting the driver: {e}")

In [6]:
# function 6 - enchancement of function 5 - split to sub functions
# function 6.2 - take in the list of annual report links from function 6.1, and get the list of pdf links that match the pdf_text
def process_links_and_store_pdf_links(annual_report_links, pdf_texts):
    iframe_link_urls = []

    try:
        options = Options()
        options.add_argument("--headless")
        driver = webdriver.Firefox(options=options)

        for link_url in annual_report_links:
            try:
                # Open the annual report link
                driver.get(link_url)

                # Switch to any iframes on the page
                for iframe in driver.find_elements(By.TAG_NAME, 'iframe'):
                    # To access elements inside an iframe, need to switch the focus of the WebDriver, allowing Selenium to interact
                    driver.switch_to.frame(iframe)

                    # Fetch all anchor elements after switching to the iframe
                    iframe_links = driver.find_elements(By.XPATH, "//a")

                    # Flag to track whether the PDF was found
                    pdf_found = False
                    
                    for pdf_text in pdf_texts:
                        for iframe_link in iframe_links:
                            iframe_link_text = iframe_link.text.lower()
                            iframe_link_url = iframe_link.get_attribute('href')

                            if iframe_link_url:
                                # Check if the link text contains the specified PDF text
                                if pdf_text in iframe_link_text:
                                    iframe_link_urls.append(iframe_link_url)

                                    # Set the flag to True since we found a PDF
                                    pdf_found = True
    
                                    # Break out of the loop since we found a PDF for the current text
                                    continue

                        # If PDF was found for the current text, break out of the outer loop
                        if pdf_found:
                            break

                    # If PDF was found, break out of the loop
                    if pdf_found:
                        break

                    # If none of the specified pdf_texts are found for the current link, print a message
                    if not pdf_found:
                        print(f"No matching PDF texts found for link: {link_url}")

                    # Switch back to the default content
                    driver.switch_to.default_content()

            except TimeoutException as e:
                print(f"Timed out waiting for page stabilization after opening the annual report link: {link_url}. "
                      f"Proceeding with the next link.")

    except Exception as e:
        print(f"Error: {e}")

    finally:
        if driver:
            try:
                driver.quit()
            except Exception as e:
                print(f"Error while quitting the driver: {e}")

    return iframe_link_urls

In [7]:
# !pip install pyautogui

In [8]:
# function 6 - enchancement of function 5 - split to sub functions
# function 6.3 - take in the list of pdf links from function 6.2, download them to the defaulted path 'Downloads'
from selenium import webdriver
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import pyautogui
import time

def download_pdf_to_defaulted_path(pdf_links):
    # Set up Firefox options
    options = Options()
    
    # Initialize the WebDriver
    driver = webdriver.Firefox(options=options)

    pdf_count = 0  # Initialize the PDF count
    
    for pdf_link in pdf_links:
        # Open the URL
        driver.get(pdf_link)
    
        # Wait for the download button to be clickable
        wait = WebDriverWait(driver, 10)
        download_button = wait.until(EC.element_to_be_clickable((By.ID, 'download')))
        
        # Click the download button
        download_button.click()
        
        # Wait for the file to be downloaded
        time.sleep(3)
        
        # Simulate pressing Enter to save the file to the default location
        pyautogui.press('enter')
    
        # Wait for a while
        time.sleep(3)
    
    # Close the browser
    driver.quit()
    
    print("PDF downloaded successfully to the defaulted path - 'Downloads'.")
    return pdf_count

In [9]:
# function 6 - enchancement of function 5 - split to sub functions
# function 6.4 - move the file downloaded by function 6.3, then move to specified folder
import os
import re
import shutil

def move_files_to_specified_path(default_download_path, save_path, filename_regex):
    try:
        # Get a list of all files in the default download path
        files = os.listdir(default_download_path)
        
        # Filter the list of files using the regex pattern
        matched_files = [file for file in files if re.match(filename_regex, file)]
        
        if not matched_files:
            print("No files matching the pattern found in the default download path.")
            return

        # # Sort the matched files by modification time to get the latest file
        # matched_files.sort(key=lambda x: os.path.getmtime(os.path.join(default_download_path, x)), reverse=True)
        
        # Move all matched files to the specified save path
        for matched_file in matched_files:
            shutil.move(os.path.join(default_download_path, matched_file), save_path)
            print(f"Moved '{matched_file}' to '{save_path}' successfully.")
            
    except Exception as e:
        print(f"Error: {e}")

In [13]:
# function 7 - Automation by combining functions 3, 4, 6.1, 6.2, 6.3, 6.4
## 3 - to extract data from Excel (included function 1 - find Excel table)
## 4 - create main folders for categories and sub folders for companies (get details from function 4)
## 6.1 - get the list of annual report links
## 6.2 - open each links and download pdf inside (get detail from function 6.1)
## 6.3 - take in the list of pdf links and download them to the defaulted path 'Downloads'
## 6.4 - move the latest file downloaded to specified folder
def excel_download_pdf(core_url = 'https://www.bursamalaysia.com/market_information/announcements/company_announcement',
                       excel_file_path = None, industry = None,
                       column_names = ['Start Date', 'End Date', 'Type', 'Name', 'Code', 'Category'],
                       main_column_name = "Category", sub_column_name = "Name",
                       pdf_texts = ['financial', 'annual', 'ar']):
    
    try:
        if excel_file_path is None or industry is None:
            raise ValueError("Please provide valid values for 'excel_file_path' or 'industry'")
        
        base_folder = r"base\folder"
        industry_folder = rf"{base_folder}\{industry}"
        
        # function 3 - to extract data that the columns specified from Excel
        df = extract_and_slice_data(excel_file_path, column_names)
    
        # function 4 - create main folders for categories and sub folders for companies
        create_folders_from_excel(excel_file_path, industry_folder, main_column_name, sub_column_name)
        
        for index, row in df.iterrows():
            start = row['Start Date']
            end = row['End Date']
            code = row['Code']
            type = row['Type']
    
            # function 6.1 - to get list of annual report links within the date range
            annual_report_links = get_annual_report_links(core_url, code, type, start, end)
    
            # function 6.2 - open the links from function 6.1, and download the pdf inside that match the pdf_text
            pdf_links = process_links_and_store_pdf_links(annual_report_links, pdf_texts)

            # function 6.3 - download those pdf links to the defaulted path 'Downloads'
            download_pdf_to_defaulted_path(pdf_links)

            category = row['Category']
            name = row['Name']
            save_path = rf"{base_folder}\{industry}\{category}\{name}"

            if name == "HLBANK":
                filename_regex = re.compile(rf".*Hong.*Leong.*\.pdf$", re.IGNORECASE)
            elif name == "PBBANK":
                filename_regex = re.compile(rf".*PBB.*\.pdf$", re.IGNORECASE)
            elif name == "AMBANK":
                filename_regex = re.compile(rf".*AMMB.*\.pdf$", re.IGNORECASE)
            elif name == "RHBBANK":
                filename_regex = re.compile(rf".*RHBB.*\.pdf$", re.IGNORECASE)
            elif name == "ABMB":
                filename_regex = re.compile(rf".*Alliance.*\.pdf$", re.IGNORECASE)
            elif name == "BIMB [s]":
                filename_regex = re.compile(rf".*BHB.*\.pdf$", re.IGNORECASE)
            else:
                filename_regex = re.compile(rf".*{name}.*\.pdf$", re.IGNORECASE)
            
            # function 6.4 - move the file to specified folder
            move_files_to_specified_path(r"download\default\path", save_path, filename_regex)

    except Exception as e:
        print(f"Error: {e}")

In [14]:
# Redirect output to a file (from the next cell until the last second
# for capturing the output by running the .bat file
from IPython.utils import io

with io.capture_output() as captured:

# run function
    excel_download_pdf(excel_file_path = r"excel\path",
                       industry = 'Financial Services')

In [12]:
import os

# Save the captured output to a file
with open(r'output\path', 'w') as f:
    f.write(captured.stdout)