In [2]:
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_community.document_loaders import AsyncChromiumLoader
from selenium_driver import SimplifiedDOMExtractor

import nest_asyncio
import asyncio
from bs4 import BeautifulSoup, Tag

import time
import json
from typing import List, Dict
import traceback
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import (
    NoSuchElementException,
    ElementNotInteractableException,
    TimeoutException,
)
from bs4 import BeautifulSoup


from dotenv import load_dotenv

load_dotenv()

%load_ext autoreload
%autoreload 2

ModuleNotFoundError: No module named 'langchain_openai'

In [27]:
llm = ChatOpenAI(max_tokens=4096, model='gpt-4o', temperature=0)

In [28]:
llm.invoke("Hey!")

AIMessage(content='Hello! How can I assist you today?', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 9, 'prompt_tokens': 9, 'total_tokens': 18, 'completion_tokens_details': {'reasoning_tokens': 0}}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_057232b607', 'finish_reason': 'stop', 'logprobs': None}, id='run-488c7077-309a-427a-8d7b-f24d1abbf705-0', usage_metadata={'input_tokens': 9, 'output_tokens': 9, 'total_tokens': 18})

In [3]:
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

class ApplicationWebsite:
    # Tags considered interactive elements on a webpage (e.g., forms, buttons)
    interactive_tags = ['a', 'button', 'form', 'input', 'select', 'textarea']
    # Tags considered as text elements (e.g., headings, paragraphs, labels)
    text_tags = ['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'span', 'label']
    # Combining both interactive and text tags for easy filtering
    important_tags = interactive_tags + text_tags

    def __init__(self, website_link):
        """
        Initialize the ApplicationWebsite object.
        
        Args:
            website_link (str): The URL of the website to scrape.
        """
        self.website_link = website_link
        
        # Initialize Selenium WebDriver with Chrome
        service = Service(executable_path="./chromedriver")
        self.driver = webdriver.Chrome(service=service)
        
        # Navigate to the specified website
        self.driver.get(website_link)
  
        # Wait for the main content to load (using an element with ID "root" as an example)
        try:
            element = WebDriverWait(self.driver, 10).until(
                EC.presence_of_element_located((By.ID, "root"))
            )
            time.sleep(5)  # Give some time to ensure the page is fully loaded
            print("Page loaded successfully!")
        except:
            print("Timed out waiting for page to load")

        # Parse the page source using BeautifulSoup
        self.soup = BeautifulSoup(self.driver.page_source, 'html.parser')

    def _parse_html(self, html):
        """Parse the given HTML content using BeautifulSoup.
        
        Args:
            html (str | BeautifulSoup): HTML content to be parsed.
        
        Returns:
            BeautifulSoup: Parsed HTML object.
        """
        if isinstance(html, BeautifulSoup):
            return html
        return BeautifulSoup(html, 'html.parser')

    def _extract_important_html(self, soup):
        """Extract important interactive and text elements from the HTML content.
        
        Args:
            soup (BeautifulSoup): Parsed HTML content.
        
        Returns:
            tuple: (cleaned-up HTML, dictionary mapping select tags to removed options)
        """
        self._remove_long_image_src_links(soup)
        select_map_to_option_tags = self._remove_option_tags_from_selects(soup)

        # Remove unnecessary script and style tags
        for tag in soup(['script', 'style']):
            tag.decompose()  # Completely remove these tags
            
        # Extract only the important elements (both interactive and text tags)
        important_elements = soup.find_all(self.important_tags, recursive=True)
        print(important_elements, '\n\n\n\n')
        
        # Create a new BeautifulSoup object containing only the important elements
        new_soup = BeautifulSoup('', 'html.parser')
        for tag in important_elements:
            new_soup.append(tag)

        return new_soup, select_map_to_option_tags

    def _remove_long_image_src_links(self, soup):
        """Shorten or remove long `src` attributes from image tags.
        
        This is done to avoid cluttering the extracted HTML with large, unnecessary data strings 
        that sometimes appear in the `src` attribute of images, such as long base64-encoded 
        image data or very long URLs. Replacing these long `src` values makes the processed 
        HTML easier to work with while retaining the structure of the page.

        Args:
            soup (BeautifulSoup): Parsed HTML content.
        """
        for tag in soup.find_all(src=True):
            if tag.name == 'img' and len(tag['src']) > 200:  # Example threshold for long `src`
                tag['src'] = 'long_src_removed'  # Replaces long src values with a placeholder

    def _remove_option_tags_from_selects(self, soup):
        """Remove all but the first option tag in each select element.
        
        Many job application forms contain `<select>` elements with a large number of options, 
        which can make the HTML cluttered. This method removes all options except the first 
        one to clean up the HTML and make it easier to analyze. The removed options are 
        stored in a dictionary, in case the full list of options is needed later.

        Args:
            soup (BeautifulSoup): Parsed HTML content.
        
        Returns:
            dict: Mapping of select element IDs to the removed options.
        """
        select_options_dict = {}
        select_elements = soup.find_all('select')

        for select in select_elements:
            select_id = select.get('id', 'undefined')
            options = select.find_all('option')[1:]  # Exclude the first option
            select_options_dict[select_id] = options

            # Remove all but the first option tag
            for option in options:
                option.extract()

        return select_options_dict

    def process(self):
        """Process the job description and application sections.
        
        This method should be overridden in subclasses to handle specific site logic.
        """
        pass


class CareerPunkWebsite(ApplicationWebsite):
    def __init__(self, website_link):
        """
        Initialize the CareerPunkWebsite object, extending the ApplicationWebsite.
        
        Args:
            website_link (str): The URL of the CareerPunk job posting page.
        """
        super().__init__(website_link)

    def extracting_important_sections(self):
        """Extract important sections from both the main page and the iframe content.
        
        This method first extracts the job description from the main page. It then switches
        to the iframe containing the application form (e.g., Greenhouse iframe), extracts 
        the important interactive and text elements from the iframe, and finally switches 
        back to the main content.

        Returns:
            tuple: (job description soup, job description selects, job application soup, job application selects)
        """
        # Extract the job description from the main content
        print("JOB DESCRIPTION SOUP\n\n")
        job_description_soup, job_description_selects = self._extract_important_html(self.soup)

        # Switch to the iframe containing the job application form
        greenhouse_iframe = self.driver.find_element(By.ID, "grnhse_iframe")
        self.driver.switch_to.frame(greenhouse_iframe)

        # Extract job application content from the iframe
        job_application_html = self.driver.page_source
        job_application_soup_unformatted = self._parse_html(job_application_html)

        print("JOB APPLICATION SOUP\n\n")
        job_application_soup, job_application_selects = self._extract_important_html(job_application_soup_unformatted)

        # Switch back to the main content
        self.driver.switch_to.default_content()

        return job_description_soup, job_description_selects, job_application_soup, job_application_selects

    def process(self):
        """Override the process method to handle CareerPunk-specific logic.
        
        This method processes the job description and application form sections by calling 
        `extracting_important_sections()`, which extracts important content from both the 
        main page and the job application iframe. The results are printed to the console.
        """
        (self.job_description_soup, 
         self.job_description_selects,
         self.job_application_soup, 
         self.job_application_selects) = self.extracting_important_sections()
        print("Processing complete!")


In [25]:
# Example usage
extractor = CareerPunkWebsite("https://app.careerpuck.com/job-board/lyft/job/7629317002")
extractor.process()

Page loaded successfully!
JOB DESCRIPTION SOUP


[<a href="https://www.lyft.com"><img alt="Lyft logo" class="CompanyLogo__NonSvgImage-sc-n0redw-2 Gicuv" src="https://image.careerpuck.com/_TnoAqcr/_TnoAqcr.300.png"/></a>, <h1><div class="JobPagePublic__Role-sc-tfpwjw-3 gyODLR role">Software Engineer Intern, Mobile Android - Toronto (Summer 2025)</div></h1>, <button aria-label="Apply to Software Engineer Intern, Mobile Android - Toronto (Summer 2025)" class="PillButton__StyledButton-sc-ifgzpr-0 dEFzTq">Apply</button>, <p><em>At Lyft, our mission is to improve people’s lives with the world’s best transportation. To do this, we start with our own community by creating an open, inclusive, and diverse organization.</em></p>, <p>You're an enthusiastic and aspiring app developer looking to take your skills to the next level by joining our Android team as an intern. We build apps used by millions of people, and we take great pride in our work. This means excellent development practices and an o

In [26]:
extractor.driver.switch_to.frame("grnhse_iframe")
auto_complete_input = extractor.driver.find_element(By.ID, "auto_complete_input")
auto_complete_input.send_keys("Atlanta")

# Everything below this is useless right now. It's experimental work

In [67]:
form_fields = { 
  (field.get('id') or field.get('name')): field for field in application_soup.find_all(['button', 'input', 'select', 'textarea']) 
  if field.get('id') or field.get('name') 
}

In [68]:
form_fields

{'utf8': <input name="utf8" type="hidden" value="✓"/>,
 'fingerprint': <input id="fingerprint" name="fingerprint" type="hidden" value="827f09fcbd3d1393bbe43c4001569bd17ee4715c"/>,
 'render_date': <input id="render_date" name="render_date" type="hidden" value="2024-09-22 23:20:35 +0000"/>,
 'page_load_time': <input id="page_load_time" name="page_load_time" type="hidden" value="Sun, 22 Sep 2024 23:20:35 GMT"/>,
 'from_embed': <input id="from_embed" name="from_embed" type="hidden" value="true"/>,
 'first_name': <input aria-required="true" autocomplete="given-name" id="first_name" maxlength="255" name="job_application[first_name]" type="text"/>,
 'last_name': <input aria-required="true" autocomplete="family-name" id="last_name" maxlength="255" name="job_application[last_name]" type="text"/>,
 'email': <input aria-required="true" autocomplete="email" id="email" maxlength="255" name="job_application[email]" type="text"/>,
 'phone': <input aria-required="true" autocomplete="tel" id="phone" ma

In [9]:
from bs4 import BeautifulSoup

# Define interactive and text-only tags
interactive_tags = ['button', 'input', 'select', 'textarea']
text_tags = ['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'span', 'label']

def get_surrounding_context(element):
    """Retrieve the surrounding context of an element (e.g., label, sibling text)"""
    context = []
    
    # Find associated label
    label = None
    if element.get('id'):
        label = element.find_previous('label', attrs={'for': element.get('id')})
    if not label:
        label = element.find_previous('label')
    
    # Add label text if found
    if label:
        context.append(f"Label: {label.get_text(strip=True)}")
    
    # Add parent context or surrounding text
    parent = element.find_parent()
    if parent:
        context.append(f"Parent tag: {parent.name}")
    
    # Add sibling context (useful for inline labels or hints)
    sibling_text = element.find_previous_sibling(text=True)
    if sibling_text:
        context.append(f"Sibling text: {sibling_text.strip()}")
    
    return context

def map_html_to_profile_with_llm_fallback(career_puck_mapping, profile, soup, llm):
    def get_nested_value(profile, keys):
        """ 
        Recursively extract value from nested profile dictionary 
        using a list of keys like ['location', 'full_location']
        """
        value = profile
        for key in keys:
            value = value.get(key, None)
            if value is None:
                return None
        return value

    mapped_values = {}
    
    # Process all form elements
    forms = soup.find_all('form')
    for form in forms:
        # Extract interactive elements within each form
        interactive_elements = form.find_all(interactive_tags)
        
        for element in interactive_elements:
            # Skip hidden inputs (not useful for user interaction)
            if element.get('type') == 'hidden':
                continue

            # First check for id, name, and type
            field_id = element.get('id', None)
            field_name = element.get('name', None)
            field_type = element.get('type', None)
            matched = False

            # Loop through career_puck_mapping and try to match by id or name
            for key, field_info in career_puck_mapping.items():
                # Check if profile_id is nested (i.e., a dictionary)
                if isinstance(field_info["profile_id"], dict):
                    nested_profile_key = field_info["profile_id"]["profile_id"]
                    nested_keys = nested_profile_key.split(".")
                    profile_value = get_nested_value(profile, nested_keys)
                else:
                    profile_key = field_info["profile_id"]
                    profile_value = profile.get(profile_key, None)

                # Match by id first, then fallback to name if id is not available
                if field_id == field_info.get('profile_id') or field_name == field_info.get('name'):
                    mapped_values[key] = profile_value
                    matched = True
                    break
                
                # Special handling for file type (like resume upload)
                if field_type == "file" and key == "resume_path":
                    mapped_values[key] = profile.get("resume_path", None)
                    matched = True
                    break
            
            # If no match, pass context to LLM
            if not matched:
                surrounding_context = get_surrounding_context(element)
                llm_response = llm.ask(f"Identify this form field: {surrounding_context}")
                print(f"LLM Response for unmatched element: {llm_response}")
    
    return mapped_values


In [62]:
import json

with open("profile.json", "r") as file:
  profile = json.load(file)

In [58]:
websites = ["careerpuck", "greenhouse", "workday"]

In [94]:
career_puck_mapping = {
  "first_name": {
    "profile_id": "first_name",
    "type": "input",
    "attribute": "value"
  },
  "last_name": {
    "profile_id": "last_name",
    "type": "input",
    "attribute": "value"
  },
  "email": {
    "profile_id": "email",
    "type": "input",
    "attribute": "value"
  },
  "phone": {
    "profile_id": "phone_number",
    "type": "input",
    "attribute": "value"
  },
  "auto_complete_input": {
    "profile_id": {
      "profile_id": "location.full_location",
      "type": "input",
      "attribute": "value"
    }
  },
  "resume_path": {
    "name": "file",
    "type": "file",
    "attribute": "value"
  }
}


In [11]:
def is_visible(tag):
    # Filter out elements that are hidden via CSS or by being `input type="hidden"`
    if tag.has_attr('style'):
        style = tag['style']
        if 'display: none' in style or 'visibility: hidden' in style:
            return False
    if tag.name == 'input' and tag.get('type') == 'hidden':
        return False
    return True

visible_fields = extractor.job_application_soup.find_all(lambda tag: tag.name in ['input', 'select', 'textarea', 'button'] and is_visible(tag))
visible_fields

[<input aria-required="true" autocomplete="given-name" id="first_name" maxlength="255" name="job_application[first_name]" type="text"/>,
 <input aria-required="true" autocomplete="family-name" id="last_name" maxlength="255" name="job_application[last_name]" type="text"/>,
 <input aria-required="true" autocomplete="email" id="email" maxlength="255" name="job_application[email]" type="text"/>,
 <input aria-required="true" autocomplete="tel" id="phone" maxlength="255" name="job_application[phone]" required="required" type="text"/>,
 <input autocomplete="off" id="dev-field-1" maxlength="255" name="dev_field_1" type="text"/>,
 <input aria-autocomplete="list" aria-controls="location_autocomplete-items-popup" aria-expanded="false" aria-haspopup="listbox" aria-labelledby="location_autocomplete_label" aria-required="true" autocomplete="off" id="auto_complete_input" maxlength="255" name="job_application[location]" role="combobox" spellcheck="false" type="text"/>,
 <button aria-label="Remove atta

In [None]:
# Apply nest_asyncio to prevent the event loop issue in Jupyter
nest_asyncio.apply()

# Load and clean HTML (remove CSS and JavaScript)
async def load_html_without_css_js(link):
    # Load HTML asynchronously
    loader = AsyncChromiumLoader([link])
    html = loader.load()

    # Use BeautifulSoup to parse and clean the HTML
    soup = BeautifulSoup(html[0].page_content, "html.parser")  # parse the first document
    
    # Remove all <style> and <script> tags (CSS and JavaScript)
    for tag in soup(["style", "script"]):
        tag.decompose()  # removes the tags entirely

    # Return the cleaned HTML
    return soup.prettify()

# Call the asynchronous function and get the cleaned HTML
link = "https://www.pega.com/about/careers/21002/software-engineer-summer-intern-shared-user-services"  # Replace with your actual link
cleaned_html = asyncio.run(load_html_without_css_js(link))

# Output the cleaned HTML
print(cleaned_html)


In [None]:
career_puck_mapping = {
  "first_name": {
    "id": "first_name",
    "type": "input",
    "attribute": "value"
  },
  "last_name": {
    "id": "last_name",
    "type": "input",
    "attribute": "value"
  },
  "email": {
    "id": "email",
    "type": "input",
    "attribute": "value"
  },
  "phone_number": {
    "id": "phone",
    "type": "input",
    "attribute": "value"
  },
  "location": {
    "city": {
      "id": "auto_complete_input",
      "type": "input",
      "attribute": "value"
    },
    "state": {
      "id": "location_state_long_name",
      "type": "input",
      "attribute": "value"
    },
    "country": {
      "id": "location_country_long_name",
      "type": "input",
      "attribute": "value"
    },
    "postal_code": {
      "id": "location_postal_code",
      "type": "input",
      "attribute": "value"
    }
  },
  "resume_path": {
    "id": "resume",
    "type": "file",
    "attribute": "value"
  },
  "gpa": {
    "id": "gpa",
    "type": "input",
    "attribute": "value"
  },
  "graduation_date": {
    "month": {
      "id": "graduation_month",
      "type": "input",
      "attribute": "value"
    },
    "year": {
      "id": "graduation_year",
      "type": "input",
      "attribute": "value"
    }
  },
  "legal_work_in_states": {
    "id": "legal_work_in_states",
    "type": "select",
    "attribute": "value"
  },
  "require_sponsorship": {
    "id": "require_sponsorship",
    "type": "select",
    "attribute": "value"
  },
  "gender_identity": {
    "id": "gender_identity",
    "type": "input",
    "attribute": "value"
  },
  "racial_background": {
    "id": "racial_background",
    "type": "input",
    "attribute": "value"
  },
  "is_veteran": {
    "id": "is_veteran",
    "type": "select",
    "attribute": "value"
  },
  "has_disability": {
    "id": "has_disability",
    "type": "select",
    "attribute": "value"
  },
  "experiences": [
    {
      "company": {
        "id": "employment_company_name_0",
        "type": "input",
        "attribute": "value"
      },
      "position": {
        "id": "employment_title_0",
        "type": "input",
        "attribute": "value"
      },
      "location": {
        "id": "employment_location_0",
        "type": "input",
        "attribute": "value"
      },
      "start_date": {
        "month": {
          "id": "employment_start_date_month_0",
          "type": "input",
          "attribute": "value"
        },
        "year": {
          "id": "employment_start_date_year_0",
          "type": "input",
          "attribute": "value"
        }
      },
      "end_date": {
        "month": {
          "id": "employment_end_date_month_0",
          "type": "input",
          "attribute": "value"
        },
        "year": {
          "id": "employment_end_date_year_0",
          "type": "input",
          "attribute": "value"
        }
      },
      "current": {
        "id": "employment_current_0",
        "type": "checkbox",
        "attribute": "checked"
      }
    }
  ],
  "projects": [
    {
      "name": {
        "id": "project_name_0",
        "type": "input",
        "attribute": "value"
      },
      "technologies": {
        "id": "project_technologies_0",
        "type": "input",
        "attribute": "value"
      },
      "description": {
        "id": "project_description_0",
        "type": "textarea",
        "attribute": "value"
      }
    }
  ],
  "skills": {
    "programming_languages": {
      "id": "programming_languages",
      "type": "input",
      "attribute": "value"
    },
    "technologies": {
      "id": "technologies",
      "type": "input",
      "attribute": "value"
    },
    "frameworks_tools": {
      "id": "frameworks_tools",
      "type": "input",
      "attribute": "value"
    }
  },
  "location_preferences": {
    "id": "location_preferences",
    "type": "input",
    "attribute": "value"
  }
}

In [8]:
# Github Access Token currently not working right now
import requests
import base64
from dotenv import load_dotenv
import os

load_dotenv()
ACCESS_TOKEN = os.getenv('GITHUB_API_TOKEN')


class GithubFileLoader:

    github_api_url = "https://api.github.com"
    rate_limit_url = "https://api.github.com/rate_limit"

    excluded_dirs = {'node_modules', ".git", ".vscode", "__pycache__"}  # Directories to exclude loading
    excluded_files = {'poetry.lock', 'yarn.lock', 'package-lock.json'}  # Files to exclude loading

    @property
    def headers(self):
        return {
            "Accept": "application/vnd.github+json",
            "Authorization": f"Bearer {ACCESS_TOKEN}",
        }

    def __init__(self, repo_url):
        self.repo_url = repo_url.split("https://github.com/")[-1]

    def get_file_paths(self, branch = "dev"):

        try:
            base_url = (
                f"{self.github_api_url}/repos/{self.repo_url}/git/trees/"
                f"{branch}?recursive=1"
            )

            response = requests.get(base_url, headers=self.headers)
            response.raise_for_status()
            all_files = response.json()["tree"]
        
            return [
                f
                for f in all_files
                if f['type'] == "blob"
            ]
        except requests.exceptions.HTTPError as http_err:
            # Check if the error is because the branch doesn't exist or some other HTTP error
            if branch == "main":
                print(f"Main branch not found or error occurred: {http_err}. Trying 'master' branch.")
                return self.get_file_paths(branch="master")
            else:
                # If the branch is not 'main', or retrying with 'master' also failed, raise the error.
                raise http_err
                  
   

    def get_file_content_by_path(self, path: str) -> str:
        try:
            base_url = f"{self.github_api_url}/repos/{self.repo_url}/contents/{path}"
            response = requests.get(base_url, headers=self.headers)
            response.raise_for_status()  # This will raise an exception for 4XX and 5XX responses
            content_encoded = response.json()["content"]
            return base64.b64decode(content_encoded).decode("utf-8")
        except Exception as e:
            return "file not readable"

    def is_file_in_excluded_files(self, file):
        # Split the file path to check each part against excluded directories and files
        path_parts = file["path"].split('/')

        # Check if any part of the path is in the excluded directories or if the file name is in the excluded files
        return any(part in self.excluded_dirs for part in path_parts) or path_parts[-1] in self.excluded_files

    def load(self):
        documents = []

        files = self.get_file_paths()
        for file in files:
         
            ## If node_modules (folder), package-lock.json, yarn.lock, etc, then don't process
            if self.is_file_in_excluded_files(file):
                continue

            content = self.get_file_content_by_path(file["path"])
            documents.append({"file_path": file["path"], "contents": content})


        return documents

    def load_stream(self):
        files = self.get_file_paths()
        for file in files:
         
            ## If node_modules (folder), package-lock.json, yarn.lock, etc, then don't process
            if self.is_file_in_excluded_files(file):
                continue

            content = self.get_file_content_by_path(file["path"])
            yield {"file_path": file["path"], "contents": content}

In [9]:
loader = GithubFileLoader("https://github.com/Ouckah/Summer2025-Internships")

In [16]:
readme_file = loader.get_file_content_by_path('README.md')

In [44]:
import re
pattern = re.compile(r'\|\s*([^|]+?)\s*\|\s*([^|]+?)\s*\|\s*([^|]+?)\s*\|\s*<a href="(https?://[^\s]+?)"')

# Use findall to get all matches
matches = pattern.findall(readme_file)

# Prepare extracted data in a structured format
extracted_data = []
for match in matches:
    company, role, location, url = match
    extracted_data.append({
        'Company': company.strip(),
        'Role': role.strip(),
        'Location': location.strip(),
        'Application URL': url.strip()
    })

subset_extracted_data = extracted_data[:5]

In [51]:
data = asyncio.run(load_html(subset_extracted_data[0]['Application URL']))

USER_AGENT environment variable not set, consider setting it to identify your requests.


In [None]:
print(data[0].page_content)

In [None]:
from langchain_community.document_loaders import SeleniumURLLoader

urls = [
    "https://www.youtube.com/watch?v=dQw4w9WgXcQ",
    "https://goo.gl/maps/NDSHwePEyaHMFGwh8",
]

loader = SeleniumURLLoader(urls=urls)

data = loader.load()

data[1]

In [None]:
data