In [388]:
#!pip install selenium webdriver-manager

Collecting pip
  Downloading pip-24.3.1-py3-none-any.whl.metadata (3.7 kB)
Downloading pip-24.3.1-py3-none-any.whl (1.8 MB)
   ---------------------------------------- 0.0/1.8 MB ? eta -:--:--
   ---------------------------- ----------- 1.3/1.8 MB 6.7 MB/s eta 0:00:01
   ---------------------------------------- 1.8/1.8 MB 5.6 MB/s eta 0:00:00



[notice] A new release of pip is available: 24.2 -> 24.3.1
[notice] To update, run: C:\Users\bill1\anaconda3\python.exe -m pip install --upgrade pip
ERROR: To modify pip, please run the following command:
C:\Users\bill1\anaconda3\python.exe -m pip install --upgrade pip


In [17]:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
from selenium.webdriver.common.action_chains import ActionChains
import sys
from bs4 import BeautifulSoup
from IPython.display import display, HTML
import pandas as pd
import time
import os
import csv
from selenium.webdriver.chrome.options import Options
import re

# Config

## Parameter Value

In [2]:
# Update for Future Use
FORM_TYPE_LS = ['Form 1 Part E-C - College',
               'Form 1 Part E-D - District',
               'Form 1 Part F by 6 Digit TOP Code - College']
COLLEGE_LS = ['San Diego City College                  ',
              'San Diego Mesa College                  ',
              'San Diego Miramar College Reg Cntr      ']
DISTRICT_LS = ['San Diego District                      ']
YEAR_LS = ['2024-2025', '2023-2024', '2022-2023', '2021-2022', '2020-2021']

## Webpage Element

In [3]:
URL = "https://misweb.cccco.edu/perkinsv/Core_Indicator_Reports/Forms_All.aspx"

# Element ID of each parameter input box (Inspect)
SELECT_FORM_TYPE = 'ASPxRoundPanel1_ASPxComboBoxFT_I'
SELECT_DISTRICT_COLLEGE = 'ASPxRoundPanel1_ASPxComboBoxC_I'
FISCAL_YEAR = 'ASPxRoundPanel1_ASPxComboBoxFY_I'
TOP_CODE = 'ASPxRoundPanel1_ASPxComboBoxTCode_I'
ELEMENT_ID = {
    'form_type': SELECT_FORM_TYPE,
    'district_college': SELECT_DISTRICT_COLLEGE,
    'fiscal_year': FISCAL_YEAR,
    'top_code': TOP_CODE
}

# View Report Button
VIEW_REPORT = 'ASPxRoundPanel1_ASPxButtonRS_B'

# Table Content
TABLE_DIV_ID = 'VisibleReportContentASPxRoundPanel2_ReportViewer2_ctl09'

# Table Columns
COLUMNS = ['CI Number', 'CI Info', 'Demographic', 'DESCR', 'Count', 'Total', 
       'Negotiated Level - State', 'Negotiated Level - District', 
       'College Performance', 'Percent Above or Below Negotiated Level', 
       'Percent Above or Below 90% Negotiated Level']

## File Path

In [4]:
# folder to store all data
#data_folder = '/data'
data_folder = r"C:\Users\bill1\OneDrive\Desktop\PerkinsCoreIndicatorReport\Data"

# Using os.path.join for subdirectories
college_fp = os.path.join(data_folder, 'College')
district_fp = os.path.join(data_folder, 'District')
top_code_fp = os.path.join(data_folder, 'Top Code')


folder_dict = {
    'Form 1 Part E-C - College': college_fp,
    'Form 1 Part E-D - District': district_fp,
    'Form 1 Part F by 6 Digit TOP Code - College': top_code_fp
}

# csv file path to log the scraping record.
record_csv_path = os.path.join(data_folder, 'scraping_log.csv')

# Create directories if they don't exist
os.makedirs(college_fp, exist_ok=True)
os.makedirs(district_fp, exist_ok=True)
os.makedirs(top_code_fp, exist_ok=True)

# Page Navigation Helper Functions

### PerkinsWebScraper

In [None]:
class PerkinsWebScraper:
    URL = "https://misweb.cccco.edu/perkinsv/Core_Indicator_Reports/Forms_All.aspx"
    SELECT_FORM_TYPE = 'ASPxRoundPanel1_ASPxComboBoxFT_I'
    SELECT_DISTRICT_COLLEGE = 'ASPxRoundPanel1_ASPxComboBoxC_I'
    FISCAL_YEAR = 'ASPxRoundPanel1_ASPxComboBoxFY_I'
    TOP_CODE = 'ASPxRoundPanel1_ASPxComboBoxTCode_I'
    TABLE_DIV_ID = 'VisibleReportContentASPxRoundPanel2_ReportViewer2_ctl09'
    VIEW_REPORT = 'ASPxRoundPanel1_ASPxButtonRS_B'
    ELEMENT_ID = {
        'form_type': SELECT_FORM_TYPE,
        'district_college': SELECT_DISTRICT_COLLEGE,
        'fiscal_year': FISCAL_YEAR,
        'top_code': TOP_CODE
    }
    def __init__(self, url = URL, implicit_wait=1, explicit_wait=10, record_csv_path='scrape_record.csv', headless: bool = False):
        """
        Initialize the webdriver and open the given URL.

        Parameters:
        - url (str): The URL to navigate to.
        - implicit_wait (int): The implicit wait time in seconds.
        - explicit_wait (int): The explicit wait time in seconds for WebDriverWait.
        - record_csv_path (str): Path to the CSV file for recording scraped parameters.
        """
        if headless:
            chrome_options = Options()
            chrome_options.add_argument('--headless=new')
        else:
            chrome_options = None
        self.driver = webdriver.Chrome(options=chrome_options)
        self.driver.implicitly_wait(implicit_wait)
        self.wait = WebDriverWait(self.driver, explicit_wait)
        self.url = url
        self.soup = None
        self.record_csv_path = record_csv_path
        self.element_info = ELEMENT_ID
        self.report_button = VIEW_REPORT
        self.table_div_id = TABLE_DIV_ID
        self.form_type = 'None'
        self.district_college = 'None'
        self.fiscal_year = 'None'
        self.top_code = 'None'
        if record_csv_path:
            try:
                self.scrape_record = pd.read_csv(record_csv_path)
            except FileNotFoundError:
                # If the file doesn't exist, create an empty DataFrame with the required columns
                self.scrape_record = pd.DataFrame(columns=['form_type', 'district_college', 'fiscal_year', 'top_code', 'headcount', 'enrollment', 'file_path'])
        else:
            self.scrape_record = pd.DataFrame(columns=['form_type', 'district_college', 'fiscal_year', 'top_code', 'headcount', 'enrollment', 'file_path'])

    def get_url(self):
        self.driver.get(self.url)
    
    def is_recorded(self, form_type, district_college, fiscal_year, top_code):
        """
        Check if the given parameters are already recorded in the scrape_record.
        """
        if self.scrape_record.empty:
            print("No records to check against.")
            return False

        # Create a boolean mask for the matching row
        mask = (
            (self.scrape_record['form_type'] == form_type) &
            (self.scrape_record['district_college'] == district_college.strip()) &
            (self.scrape_record['fiscal_year'] == fiscal_year) &
            (self.scrape_record['top_code'] == top_code)
        )
        exists = mask.any()
        return exists
        

    def input_value(self, input_box, value):
        """
        Inputs a value into a specified input box on the webpage.

        Parameters:
        - input_box (str): The key representing the input box in element_info.
        - value (str): The value to input into the box.
        - element_info (dict): A dictionary mapping input_box keys to element IDs.
        """
        if value == 'None':
            return
        if input_box not in self.element_info:
            print(f"Box name should be one of these values: {list(self.element_info.keys())}. Please input a proper box name.")
        else:
            element_id = self.element_info[input_box]
            try:
                input_element = self.wait.until(EC.presence_of_element_located((By.ID, element_id)))
                input_element.click()  # Click on the input box to focus
                input_element.clear()  # Clear any existing text (optional)
                input_element.send_keys(value)
            except Exception as e:
                print(f"An error occurred while inputting value into '{input_box}'.")

    def view_report(self):
        """
        Clicks the report button to view a report.

        Parameters:
        - report_button (str): The identifier of the report button element.
        """
        try:
            report_button_element = self.wait.until(EC.element_to_be_clickable((By.ID, self.report_button)))
            report_button_element.click()
        except Exception as e:
            print(f"An error occurred while clicking the report button: {e}")

    def get_content(self, parser='html.parser'):
        """
        Waits for the table to appear and parses the page source with BeautifulSoup.

        Parameters:
        - table_div_id (str): The ID of the table's div element to wait for.
        - parser (str): The parser to use with BeautifulSoup.

        Returns:
        - soup (BeautifulSoup object): The parsed HTML content.
        """
        try:
            self.wait.until(EC.presence_of_element_located((By.ID, self.table_div_id)))
            page_source = self.driver.page_source
            self.soup = BeautifulSoup(page_source, parser)
            return self.soup
        except Exception as e:
            print(f"An error occurred while getting content: {e}")
            return None

    def get_top_codes(self, form_type, district_college, fiscal_year):
        """
        Get all available top codes given if form type is top code
        """
        self.get_url()
        self.input_value('form_type', form_type)
        self.input_value('fiscal_year', fiscal_year)
        time.sleep(1)
        self.input_value('district_college', district_college)
        self.wait.until(EC.presence_of_element_located((By.ID, self.TOP_CODE)))
        top_code_dropdown_button = WebDriverWait(self.driver, 15).until(
            EC.element_to_be_clickable((By.ID, self.TOP_CODE))
        )
        time.sleep(1)
        top_code_dropdown_button.click()
        time.sleep(3)
        top_code_dropdown_button.click()

        top_code_options = WebDriverWait(self.driver, 15).until(
            EC.visibility_of_all_elements_located((By.XPATH, "//table[@id='ASPxRoundPanel1_ASPxComboBoxTCode_DDD_L_LBT']//td[@class='dxeListBoxItem_Aqua']"))
        )
        top_codes = [top_code.text for top_code in top_code_options]
        self.close()
        return top_codes

    def scrape_report(self, form_type, district_college, fiscal_year, top_code):
        """
        Orchestrates the scraping process.

        Parameters:
        - form_type (str): The form type to input.
        - district_college (str): The college or district to select.
        - fiscal_year (str): The fiscal year to input.
        - top_code (str): The TOP code to input.

        Returns:
        - soup (BeautifulSoup object): The parsed HTML content.
        """
        if self.is_recorded(form_type, district_college, fiscal_year, top_code):
            print(f'{form_type}, {district_college.strip()}, {fiscal_year}, {top_code} is scraped already.')
            return None
        try:
            #get url
            self.get_url()
            # Input values
            self.input_value('form_type', form_type)
            self.input_value('fiscal_year', fiscal_year)
            time.sleep(1)
            self.input_value('district_college', district_college)
            if top_code and top_code != 'None':
                time.sleep(1)
                self.input_value('top_code', top_code)
                time.sleep(1)
                self.input_value('top_code', top_code)

            # Click the view report button
            time.sleep(1)
            self.view_report()

            # Get and parse the content
            time.sleep(3)
            soup = self.get_content()

            if soup:
                self.form_type = form_type
                self.district_college = district_college.strip()
                self.fiscal_year = fiscal_year
                self.top_code = top_code if top_code else 'None'
                return soup
            else:
                print("Failed to retrieve content from the page.")
                return None
        except Exception as e:
            print(f"An error occurred during scraping: {e}")
            return None

    def add_record(self, headcount, enrollment, file_path):
        """
        Add a new record to the scrape_record DataFrame and save to CSV if path is provided.
        """
        new_record = pd.DataFrame([{
            'form_type': self.form_type,
            'district_college': self.district_college,
            'fiscal_year': self.fiscal_year,
            'top_code': self.top_code,
            'headcount': headcount,
            'enrollment': enrollment,
            'file_path': file_path
        }])
        self.scrape_record = pd.concat([self.scrape_record, new_record], ignore_index=True)

        if self.record_csv_path:
            self.scrape_record.to_csv(self.record_csv_path, index=False)

    def close(self):
        """
        Closes the webdriver instance.
        """
        self.driver.quit()

###  TableParser

In [10]:
class TableParser:
    # Table Content
    TABLE_DIV_ID = 'VisibleReportContentASPxRoundPanel2_ReportViewer2_ctl09'
    # Table Columns
    # if there are number index in front of each row of the table, use the commented COLUMNS
    # COLUMNS = [
    #     'CI Number', 'CI Info', 'Demographic', 'DESCR', 'Count', 'Total',
    #     'Negotiated Level - State', 'Negotiated Level - District',
    #     'College Performance', 'Percent Above or Below Negotiated Level',
    #     'Percent Above or Below 90% Negotiated Level'
    # ]
    COLUMNS = [
        'CI Number', 'CI Info', 'DESCR', 'Count', 'Total',
        'Negotiated Level - State', 'Negotiated Level - District',
        'College Performance', 'Percent Above or Below Negotiated Level',
        'Percent Above or Below 90% Negotiated Level', 'Demographic'
    ]
    
    def __init__(self, soup, output_folder, title = 'None'):
        """
        Initialize with the BeautifulSoup object.

        Parameters:
        - soup (BeautifulSoup object): The parsed HTML content.
        - columns (list): Optional custom column names for the DataFrame.
        """
        self.soup = soup
        self.raw_data_ls = None
        self.clean_data_ls = None
        self.columns = self.COLUMNS
        self.table_div_id = TABLE_DIV_ID
        self.df = None
        self.title = title
        self.enrollment = ''
        self.headcount = ''
        self.output_folder = output_folder

    def parse_table(self):
        """
        Parses the table from the soup content and extracts the data.

        Parameters:
        - table_div_id (str): The ID of the div containing the table.

        Returns:
        - data (list): A list of rows, where each row is a list of column values.
        """
        data = []
        if self.soup is None:
            print("No content to parse. Please provide a valid BeautifulSoup object.")
            return data
        try:
            table_div = self.soup.find('div', id=self.table_div_id)
            if not table_div:
                print(f"Table with div id '{table_div_id}' not found.")
                return data
            table = table_div.find('table')
            if not table:
                print("No table found within the specified div.")
                return data
            rows = table.find_all('tr')
            for row in rows:
                columns = row.find_all('td')
                row_data = [col.text.strip() for col in columns]
                data.append(row_data)
            self.raw_data_ls = data
            return self.raw_data_ls
        except Exception as e:
            print(f"An error occurred while parsing the table: {e}")
            return data

    def clean_rows(self):
        """
        Cleans the raw data extracted from the table.
    
        Returns:
        - clean_data (list): The cleaned data.
        """
        data = self.raw_data_ls
        count = 0
        CI_number = ''
        CI_info = ''
        intermediate_data = []
        
        # First pass: initial cleaning and extraction
        while count < len(data):
            row = data[count]
            # Skip rows that are all empty strings
            if all(elem == '' for elem in row):
                count += 1
                continue
            if len(row) == 7:
                CI_number = row[2]
                CI_info = data[count+1][2] if count+1 < len(data) else ''
                count += 3
            
            # if there are number index in front of each row of the table, use next line instead
            # elif len(row) == 11:
            elif len(row) == 10:
                row[0] = CI_number
                row[1] = CI_info
                intermediate_data.append(row)
                count += 1
            else:
                count += 1
    
        # Second pass: further cleaning based on your code
        # if there are number index in front of each row of the table, use the following line instead
        # clean_data = []
        # demo = ''
        # for row in intermediate_data:
        #     # Skip rows that are all empty strings
        #     if all(elem == '' for elem in row):
        #         continue
        #     if 'CTE' in row[2]:
        #         demo = 'CTE'
        #         row[2] = 'CTE'
        #         clean_data.append(row)
        #     elif row[4:] == [''] * 7:
        #         demo = row[2]
        #         continue
        #     else:
        #         row[2] = demo
        #         clean_data.append(row)

        clean_data = []
        demo = ''
        for row in intermediate_data:
            # Skip rows that are all empty strings
            if all(elem == '' for elem in row):
                continue
            if 'CTE' in row[2]:
                demo = 'CTE'
                row.append(demo)
                clean_data.append(row)
            elif row[3:] == [''] * 7:
                demo = row[2]
                continue
            else:
                row.append(demo)
                clean_data.append(row)

        
    
        self.clean_data_ls = clean_data
        return self.clean_data_ls

    def to_df(self):
        """
        Converts the cleaned data into a Pandas DataFrame.

        Returns:
        - df (DataFrame): The resulting DataFrame.
        """
        if self.clean_data_ls is None:
            print("No clean data to convert. Please run 'clean_rows' method first.")
            return None
        self.df = pd.DataFrame(self.clean_data_ls, columns=self.columns)
        return self.df

    def get_CTE_enrollment_and_headcount(self):
        """
        Extracts CTE enrollment and headcount from the soup content.

        Returns:
        - enrollment (str): The enrollment number.
        - headcount (str): The headcount number.
        """
        enrollment = ''
        headcount = ''
        try:
            table_div = self.soup.find('div', id=self.table_div_id)
            if not table_div:
                print(f"Table with div id '{self.table_div_id}' not found.")
                return enrollment, headcount
            table = table_div.find('table')
            if not table:
                print("No table found within the specified div.")
                return enrollment, headcount
            rows = table.find_all('tr')
            for i in range(len(rows)):
                row = rows[i]
                columns = [col.text.strip() for col in row.find_all('td')]
                if columns and columns[0] == 'Cohort Year CTE Enrollments:':
                    enrollment = rows[i+1].find('td').text.strip().replace(',', '')
                    self.enrollment = enrollment
                if columns and columns[0] == 'CTE Headcount:':
                    headcount = rows[i+1].find('td').text.strip().replace(',', '')
                    self.headcount = headcount
                if self.enrollment and self.headcount:
                    return self.enrollment, self.headcount
            return enrollment, headcount
        except Exception as e:
            print("An error occurred while extracting enrollment and headcount.")
            return enrollment, headcount

    def get_table_info(self):
        """
        Orchestrates the extraction and cleaning of table data.

        Parameters:
        - table_div_id (str): The ID of the div containing the table.

        Returns:
        - df (DataFrame): The cleaned data as a DataFrame.
        - enrollment (str): The enrollment number.
        - headcount (str): The headcount number.
        """
        raw_data = self.parse_table()
        if not raw_data:
            print("No data extracted from the table.")
            return None, None, None
        clean_data = self.clean_rows()
        if not clean_data:
            print("No clean data obtained after processing.")
            return None, None, None
        df = self.to_df()
        enrollment, headcount = self.get_CTE_enrollment_and_headcount()
        return df, enrollment, headcount
        
    def save_df(self):
        if self.df is None:
            print("No DataFrame available to save. Please ensure 'to_df' method has been called.")
            return False

        # Construct the filename
        df_name = f'{self.title}.csv'
        df_path = os.path.join(self.output_folder, df_name)
    
        try:
            # Ensure the folder exists
            os.makedirs(self.output_folder, exist_ok=True)
    
            # Save the DataFrame
            self.df.to_csv(df_path, index=False)
            print(f"DataFrame successfully saved to {df_path}")
            return True
        except Exception as e:
            print(f"An error occurred while saving the DataFrame: {e}")
            return False

# Run

In [23]:
def run(form_type, district_college, fiscal_year, top_code, record_csv_path, headless = False):
    # scrape content soup
    print(f'Working on {form_type}, {district_college.strip()}, {fiscal_year}, {top_code}...')
    scraper = PerkinsWebScraper(implicit_wait=5, explicit_wait=10, headless=headless, record_csv_path=record_csv_path)
    soup = scraper.scrape_report(form_type, district_college, fiscal_year, top_code)
    
    # parse table
    if soup:
        parser = TableParser(soup, output_folder = folder_dict[form_type], title = f'{district_college.strip()}_{fiscal_year}_{top_code.replace("/", " or ")}')
        df, enrollment, headcount = parser.get_table_info()
        success = parser.save_df()
        if success:
            scraper.add_record(headcount=parser.headcount, enrollment=parser.enrollment, file_path=os.path.join(parser.output_folder, f'{parser.title}.csv'))

## College

In [None]:
form_type = FORM_TYPE_LS[0] #college
for college in COLLEGE_LS:
    for fiscal_year in YEAR_LS:
        run(form_type, college, fiscal_year, top_code = 'None', record_csv_path=record_csv_path)

## District

In [8]:
chrome_options = Options()
chrome_options.add_argument('--headless=new')
driver = webdriver.Chrome(options=chrome_options)
driver.get(URL)

Need to change clean_rows function.

## Top Code

In [25]:
form_type = FORM_TYPE_LS[2] # top code
check = PerkinsWebScraper(URL, record_csv_path=record_csv_path, headless=True)
for college in COLLEGE_LS:
    for fiscal_year in YEAR_LS:
        get_top_codes = PerkinsWebScraper(URL)
        top_codes = get_top_codes.get_top_codes(form_type, college, fiscal_year)
        for top_code in top_codes:
            if not check.is_recorded(form_type, college, fiscal_year, top_code):
                run(form_type, college, fiscal_year, top_code, record_csv_path=record_csv_path, headless= True)
            else:
                print(f'{college.strip()}, {fiscal_year}, {top_code} has already been scraped.')

San Diego City College, 2024-2025, 010300   Plant Science has already been scraped.
San Diego City College, 2024-2025, 030300   Environmental Technology has already been scraped.
San Diego City College, 2024-2025, 043000   Biotechnology and Biomedical Technology has already been scraped.
San Diego City College, 2024-2025, 050100   Business and Commerce, General has already been scraped.
San Diego City College, 2024-2025, 050200   Accounting has already been scraped.
San Diego City College, 2024-2025, 050500   Business Administration has already been scraped.
San Diego City College, 2024-2025, 050600   Business Management has already been scraped.
San Diego City College, 2024-2025, 050640   Small Business and Entrepreneurship has already been scraped.
San Diego City College, 2024-2025, 050900   Marketing and Distribution has already been scraped.
San Diego City College, 2024-2025, 051100   Real Estate has already been scraped.
San Diego City College, 2024-2025, 051400   Office Technolog

PermissionError: [Errno 13] Permission denied: 'C:\\Users\\bill1\\OneDrive\\Desktop\\PerkinsCoreIndicatorReport\\Data\\scraping_log.csv'