In [1]:
# Import selenium and its dependencies
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import NoSuchElementException, TimeoutException, StaleElementReferenceException, ElementClickInterceptedException, ElementNotInteractableException

# import pandas 
import pandas as pd
import time

import undetected_chromedriver as uc
from multiprocessing import Pool, cpu_count

In [7]:
df = pd.read_csv("./csv/Pharmacy with retiring owners.csv")

In [9]:
def type_in_and_search(driver, row, retries=5):
    for i in range(retries):
        time.sleep(1)
        try:
            advanced_search_not_open = driver.find_element(By.ID, "AdvancedSearch").find_element(
                By.TAG_NAME, "div"
            ).get_attribute("class") == 'arrow-right'
            
            if advanced_search_not_open:
                driver.find_element(By.ID, "AdvancedSearch").find_element(
                    By.TAG_NAME, "div"
                ).click()
            
            driver.find_element(By.ID, "regNum").find_element(
                By.TAG_NAME, "input"
            ).clear()

            driver.find_element(By.ID, "regNum").find_element(
                By.TAG_NAME, "input"
            ).send_keys(row["Accreditation_Number"])

            driver.find_element(By.ID, "regNum").find_element(
                By.TAG_NAME, "input"
            ).send_keys(Keys.ENTER)
            
            return
        except NoSuchElementException:
            print("No Such Element")
        except ElementClickInterceptedException:
            print("Element Click Intercepted")
        except ElementNotInteractableException:
            print("Element Not Interactable")
        except StaleElementReferenceException:
            print("Stale Element Reference")
            
    raise Exception("Failed to type in and search")

def click_on_pharmacy_link(driver, retries=5):
    for i in range(retries):
        time.sleep(1)
        try:
            driver.find_element(By.CLASS_NAME, "ResultDiv").find_element(
                By.TAG_NAME, "a"
            ).click()
            return
        except NoSuchElementException:
            print("No Such Element")
        except ElementClickInterceptedException:
            print("Element Click Intercepted")
        except ElementNotInteractableException:
            print("Element Not Interactable")
        except StaleElementReferenceException:
            print("Stale Element Reference")
            
    raise Exception("Failed to click on pharmacy link")
        
def get_corporate_owner(driver, retries=5):
    # Get the corporate owner
    owner = -1
    for i in range(retries):
        time.sleep(1)
        try:
            owner = (
                driver.find_element(By.ID, "CorporationOwnerContent")
                .find_element(By.TAG_NAME, "a")
                .get_property("href")
            )
            return owner
        except:
            print("Cant find Corporation Owner")
            
    raise Exception("Failed to get corporate owner")

def get_years_ownership(driver, retries=5):
    # Get the years of ownership
    years = -1
    for i in range(retries):
        time.sleep(1)
        try:
            years = int(
                driver.find_element(By.ID, "OpeningDate").text[-4:]
            )
            return years
        except NoSuchElementException:
            print("Cant find assessment date")

    raise Exception("Failed to get years of ownership")
            
def get_staff_count(driver, retries=5):
    # Get the # of staff
    staff = -1
    for i in range(retries):
        time.sleep(1)
        try:
            staff = len(
                driver.find_element(By.ID, "PharmacyStaffSRLContent").find_elements(
                    By.CLASS_NAME, "ResultDiv"
                )
            )
            return staff
        except NoSuchElementException:
            print("Cant find # of staff")
            
    raise Exception("Failed to get staff count")

def get_director_count(driver, retries=5):
    # Get the # of directors
    directors = -1
    for i in range(retries):
        time.sleep(1)
        try:
            directors = len(
                driver.find_element(By.ID, "DirectorsShareholdersContent").find_elements(
                    By.CLASS_NAME, "ResultDiv"
                )
            )
            return directors
        except NoSuchElementException:
            print("Cant find # of directors")
            
    raise Exception("Failed to get director count")

def get_links_of_director(driver, retries=5):
    for i in range(retries):
        time.sleep(1)
        try:
            director_list = driver.find_element(
                By.ID, "DirectorsShareholdersContent"
            ).find_elements(By.TAG_NAME, "a")
            links = [director.get_property("href") for director in director_list]
            return links
        except NoSuchElementException:
            print("Cant find link of directors")
        
    no_directors_element = driver.find_element(By.XPATH, "//*[contains(text(), 'Corporation directors are only listed if they are current members of the College.')]")
    if no_directors_element.is_displayed():
        print("No Directors")
        return None
       
    
    raise Exception("Failed to get links of directors")

def click_on_directors_tab(driver, retries=5):
    for i in range(retries):
        time.sleep(1)
        try:
            driver.find_element(By.ID, "DirectorsShareholders").click()
            return
        except:
            print("Cant click directors")
            
    raise Exception("Failed to click on directors tab")
    
def get_owners_age(driver, retries=5):
    owners_age = -1
    for i in range(retries):
        time.sleep(1)
        try:
            driver.find_element(By.ID, "pid7").click()
            break
        except:
            print("Cant find pid7")
    
    for i in range(retries):
        time.sleep(1)
        try:
            grad_age = int(
                driver.find_element(By.ID, "qualiEducationResult")
                .find_elements(By.CLASS_NAME, "f16")[1]
                .text[-4:]
            )
            owners_age = 2023 - grad_age + 25
            return owners_age
 
        except:
            print("Cant find grad_age")
            
    raise Exception("Failed to get owners age")

def get_concern_of_owner(driver, retries=5):
    concerns = 0
    for i in range(retries):
        time.sleep(1)
        try:
            driver.find_element(By.XPATH, '//*[@alt="Pending or current concerns"]')
            concerns = 1
        except NoSuchElementException:
            print("No Concerns")
            
    return concerns        


In [52]:
# Split the DataFrame into chunks based on the number of available CPU cores
num_cores = cpu_count()
chunk_size = len(df) // num_cores

# Function to be applied to each chunk of the DataFrame
def process_chunk(chunk):
    driver = uc.Chrome()
    
    try:
        for iter, row in chunk[chunk["Average Owner Age"].isna()].iterrows():
            driver.get(
                "https://members.ocpinfo.com/tcpr/public/pr/en/#/forms/new/?table=0x800000000000003C&form=0x800000000000002B&command=0x80000000000007C4"
            )
        
            type_in_and_search(driver, row, retries=10)
            click_on_pharmacy_link(driver)
            
            # Get the corporate owner
            chunk.loc[iter, "Corporation Owner"] = get_corporate_owner(driver, retries=100)

            # Get the years of ownership
            chunk.loc[iter, "Years of Ownership"] = get_years_ownership(driver, retries=5)

            # Get the # of staff
            chunk.loc[iter, "# of staff"] = get_staff_count(driver, retries=5)

            # Get the directors anchor link
            driver.get(chunk.loc[iter, "Corporation Owner"])
            
            click_on_directors_tab(driver)

            # Get the # of directors
            chunk.loc[iter, "# of directors"] = get_director_count(driver, retries=100)

            average_owner_age = 0
            concerns = 0
                    
            # go to each director's page and get their age
            links = get_links_of_director(driver)
            
            if links is None:
                chunk.loc[iter, "Concerns"] = 0
                chunk.loc[iter, "Average Owner Age"] = 0
            
            else:
                for link in links:
                    driver.get(link)
                    average_owner_age += get_owners_age(driver, retries=10)
                    concerns += get_concern_of_owner(driver)

                average_owner_age = 0 if len(links) == 0 else average_owner_age / len(links)
                
                chunk.loc[iter, "Concerns"] = concerns
                chunk.loc[iter, "Average Owner Age"] = average_owner_age
            
        return chunk
    except Exception as e:
        print(e)
    
    finally:
        driver.quit()

# Define a function to process a specific chunk
def process_chunk_wrapper(chunk_start):
    chunk_end = chunk_start + chunk_size if chunk_start + chunk_size < len(df) else len(df)
    return process_chunk(df.iloc[chunk_start:chunk_end])


# Create a list of chunk starting indices
chunk_starts = list(range(0, len(df), chunk_size))

# Create a Pool of workers and apply the function to each chunk in parallel
with Pool(num_cores) as pool:
    result_chunks = pool.imap(process_chunk_wrapper, chunk_starts)

# Concatenate the results back into a single DataFrame
result_df = pd.concat(result_chunks, ignore_index=True)


In [10]:
driver = uc.Chrome()

for iter, row in df[df["Average Owner Age"].isna()].iterrows():
    driver.get(
        "https://members.ocpinfo.com/tcpr/public/pr/en/#/forms/new/?table=0x800000000000003C&form=0x800000000000002B&command=0x80000000000007C4"
    )
    
    type_in_and_search(driver, row, retries=10)
    click_on_pharmacy_link(driver)
    
    # Get the corporate owner
    df.loc[iter, "Corporation Owner"] = get_corporate_owner(driver, retries=100)

    # Get the years of ownership
    df.loc[iter, "Years of Ownership"] = get_years_ownership(driver, retries=5)

    # Get the # of staff
    df.loc[iter, "# of staff"] = get_staff_count(driver, retries=5)

    # Get the directors anchor link
    driver.get(df.loc[iter, "Corporation Owner"])
    
    click_on_directors_tab(driver, retries=100)

    # Get the # of directors
    df.loc[iter, "# of directors"] = get_director_count(driver, retries=100)

    average_owner_age = 0
    concerns = 0
            
    # go to each director's page and get their age
    links = get_links_of_director(driver)
    
    if links is None:
        df.loc[iter, "Concerns"] = 0
        df.loc[iter, "Average Owner Age"] = 0
    
    else:
        for link in links:
            driver.get(link)
            average_owner_age += get_owners_age(driver, retries=15)
            concerns += get_concern_of_owner(driver)

        average_owner_age = 0 if len(links) == 0 else average_owner_age / len(links)
        
        df.loc[iter, "Concerns"] = concerns
        df.loc[iter, "Average Owner Age"] = average_owner_age
        
    df.to_csv("./csv/Pharmacy with retiring owners.csv", index=False)

Element Not Interactable
Cant find Corporation Owner
Cant find Corporation Owner
Cant click directors
Cant click directors
Cant click directors
Cant click directors
Cant click directors
Cant click directors
Cant click directors
Cant click directors
Cant click directors
Cant click directors


Exception: Failed to click on directors tab

In [6]:
df.to_csv("./csv/Pharmacy with retiring owners.csv", index=False)

In [5]:
df = df[~df['Company_Name'].str.contains("Loblaw")]
df = df[~df['Company_Name'].str.contains("Wal-Mart")]
df = df[~df['Company_Name'].str.contains("Shoppers Drug Mart")]
df = df[~df['Company_Name'].str.contains("Metro Pharmacy")]
df = df[~df['Company_Name'].str.contains("Sobeys Pharmacy")]
df = df[~df['Company_Name'].str.contains("Costco")]
df = df[~df['Company_Name'].str.contains("Rexall")]