In [13]:
search_urls = {
    "search_id_1": "https://example.com/search_1",
    "search_id_2": "https://example.com/search_2",
    "search_id_3": "https://example.com/search_3",
}

search_details = {
    "search_id_1": {"search_url": "https://example.com/search_1", "interval_mins": 5},
    "search_id_2": {"search_url": "https://example.com/search_2", "interval_mins": 10},
    "search_id_3": {"search_url": "https://example.com/search_3", "interval_mins": 5},
}


user_searches = {
    "user_id_1": set(["search_id_1", "search_id_2"]),
    "user_id_2": set(["search_id_1", "search_id_3"]),
}

jobs_notified = {
    "user_id_1": set(["job_id_1", "job_id_2"]),
    "user_id_2": set(["job_id_4"])
}

job_details = {
    "job_id_1": {"is_right_for_user": True},
    "job_id_2": {"is_right_for_user": True},
    "job_id_3": {"is_right_for_user": False},
    "job_id_4": {"is_right_for_user": False},
}


In [12]:
async def scrape_search(search_id):
    print(f"Scraping jobs for search {search_id}", search_urls[search_id])
    return ["job_listing_html"]

def parse_job_listing_html(job_listing_html):
    job_listing_html
    import uuid
    return [f"job_data_{uuid.uuid4()}", f"job_data_{uuid.uuid4()}"]

def get_user_searches(user_id):
    return user_searches[user_id]


def insert_job(job_id, job_data):
    job_details[job_id] = job_data
    return job_id


async def scrape_job_details(job_id):
    job_details[job_id] = await scrape_job_details(job_id)
    return job_details[job_id]

async def process_parsed_job(parsed_job, search_id):
    job_details = await scrape_job_details(parsed_job["job_id"])
    insert_job(parsed_job["job_id"], job_details)

async def execute_search(search_id: str) -> list[str]:
    job_listing_html = await scrape_search(search_id)
    parsed_jobs = parse_job_listing_html(job_listing_html)
    job_ids = []
    for parsed_job in parsed_jobs:
        job_id = await process_parsed_job(parsed_job, search_id)
        job_ids.append(job_id)
    return job_ids

def get_job_details(job_id):
    return job_details[job_id]

def is_job_suitable_for_user(user_id, job_id):
    job_details = get_job_details(job_id)
    return job_details["is_right_for_user"]

def filter_jobs_for_user(user_id, job_ids):
    filtered_job_ids = set()
    jobs_ids = job_ids - jobs_notified[user_id]
    for job_id in jobs_ids:
        if is_job_suitable_for_user(user_id, job_id):
            filtered_job_ids.add(job_id)
    return filtered_job_ids


def send_job_alert_email(user_id, job_ids):
    jobs_notified[user_id].update(job_ids)
    send_email(user_id, f"New job alert: {', '.join(job_ids)}")


def send_email(user_id, text):
    print(f"Sending email to user {user_id}: {text}")

async def process_user_searches(user_id, search_ids):
    jobs_found = set()
    for search_id in search_ids:
        job_ids = await execute_search(search_id)
        jobs_found.update(job_ids)
    jobs_to_alert = filter_jobs_for_user(user_id, jobs_found)
    send_job_alert_email(user_id, jobs_to_alert)

for user_id, search_ids in user_searches.items():
    process_user_searches(user_id, search_ids)

KeyError: 'job_id_3'

In [14]:
import asyncio
from typing import Dict, Set, List
import uuid

# Type aliases
UserId = str
SearchId = str
JobId = str

search_details: Dict[SearchId, Dict] = {
    "search_id_1": {"search_url": "https://example.com/search_1", "interval_mins": 5},
    "search_id_2": {"search_url": "https://example.com/search_2", "interval_mins": 10},
    "search_id_3": {"search_url": "https://example.com/search_3", "interval_mins": 5},
}

user_searches: Dict[UserId, Set[SearchId]] = {
    "user_id_1": {"search_id_1", "search_id_2"},
    "user_id_2": {"search_id_1", "search_id_3"},
}

jobs_notified: Dict[UserId, Set[JobId]] = {
    "user_id_1": {"job_id_1", "job_id_2"},
    "user_id_2": {"job_id_4"}
}

job_details: Dict[JobId, Dict] = {
    "job_id_1": {"is_right_for_user": True},
    "job_id_2": {"is_right_for_user": True},
    "job_id_3": {"is_right_for_user": False},
    "job_id_4": {"is_right_for_user": False},
}

# Cache to store search results
search_cache: Dict[SearchId, List[JobId]] = {}

async def scrape_search(search_id: SearchId) -> List[str]:
    print(f"Scraping jobs for search {search_id}", search_details[search_id]["search_url"])
    return ["job_listing_html"]

def parse_job_listing_html(job_listing_html: str) -> List[Dict]:
    return [{"job_id": f"job_id_{uuid.uuid4()}"} for _ in range(2)] 

async def scrape_job_details(job_id: JobId) -> Dict:
    # Simulating job detail scraping
    return {"is_right_for_user": bool(uuid.uuid4().int % 2)}

async def process_parsed_job(parsed_job: Dict) -> JobId:
    job_id = parsed_job["job_id"]
    if job_id not in job_details:
        job_details[job_id] = await scrape_job_details(job_id)
    return job_id

async def execute_search(search_id: SearchId) -> List[JobId]:
    if search_id in search_cache:
        print(f"Using cached results for search {search_id}")
        return search_cache[search_id]
    
    job_listing_html = await scrape_search(search_id)
    parsed_jobs = parse_job_listing_html(job_listing_html)
    job_ids = []
    for parsed_job in parsed_jobs:
        job_id = await process_parsed_job(parsed_job)
        job_ids.append(job_id)
    
    search_cache[search_id] = job_ids
    return job_ids

def is_job_suitable_for_user(user_id: UserId, job_id: JobId) -> bool:
    return job_details[job_id]["is_right_for_user"]

def filter_jobs_for_user(user_id: UserId, job_ids: Set[JobId]) -> Set[JobId]:
    new_jobs = job_ids - jobs_notified.get(user_id, set())
    return {job_id for job_id in new_jobs if is_job_suitable_for_user(user_id, job_id)}

def send_job_alert_email(user_id: UserId, job_ids: Set[JobId]):
    if user_id not in jobs_notified:
        jobs_notified[user_id] = set()
    jobs_notified[user_id].update(job_ids)
    print(f"Sending email to user {user_id}: New job alert: {', '.join(job_ids)}")

async def process_user_searches(user_id: UserId, search_ids: Set[SearchId]):
    jobs_found = set()
    for search_id in search_ids:
        job_ids = await execute_search(search_id)
        jobs_found.update(job_ids)
    jobs_to_alert = filter_jobs_for_user(user_id, jobs_found)
    if jobs_to_alert:
        send_job_alert_email(user_id, jobs_to_alert)

async def main():
    # Clear the search cache before each run
    search_cache.clear()
    
    tasks = [
        process_user_searches(user_id, search_ids)
        for user_id, search_ids in user_searches.items()
    ]
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    asyncio.run(main())

RuntimeError: asyncio.run() cannot be called from a running event loop