## Imports

In [65]:
import dotenv
from pydantic import BaseModel
import datetime
from google.genai import types, Client
from pymongo import MongoClient
from pymongo.errors import BulkWriteError
import os

dotenv.load_dotenv()
SERPAPI_KEY = os.getenv('SERPAPI_KEY')
GEMINI_API_KEY= os.getenv('GEMINI_API_KEY')
MONGO_URI = os.getenv('MONGO_URI')
DATABASE_NAME = os.getenv('DATABASE_NAME')
COLLECTION_NAME = os.getenv('COLLECTION_NAME')

## Models

In [7]:
class ApplyOption(BaseModel):
    title: str
    link: str

class JobHilight(BaseModel):
    title: str
    items: list[str]

class Job(BaseModel):
    job_id: str
    title: str
    company: str
    location: str
    posted_date: datetime.date
    description: str
    job_highlights: list[JobHilight]
    schedule_type: str
    thumbnail: str | None
    apply: ApplyOption
    via: str
    job_vector: list[float] | None = None

## Factory and Embedding

In [None]:
gemini_client = Client(api_key=GEMINI_API_KEY)

In [74]:
def job_factory(response: dict) -> Job:
    delta_days = response['detected_extensions'].get('posted_at', 0)
    if delta_days:
        delta_days = [int(s) for s in delta_days.split() if s.isdigit()][0]

    job_obj = Job(
        job_id=response['job_id'],
        title=response['title'],
        company=response['company_name'],
        location=response['location'],
        posted_date=datetime.date.today() - datetime.timedelta(days=delta_days),
        description=response['description'],
        job_highlights=[JobHilight(**jobhilight) for jobhilight in response.get('job_highlights', [])],
        schedule_type=response['detected_extensions'].get('schedule_type', ''),
        thumbnail=response.get('thumbnail', None),
        apply=ApplyOption(**response["apply_options"][0]),
        via=response['via']
    )
    return job_obj

In [71]:
def job_highlights_to_text(job_highlights: list[JobHilight]) -> list[str]:
    highlights_text = "\n".join([
        highlight.title + "\n" + "\n".join(highlight.items) for highlight in job_highlights
        ])
    return highlights_text

def add_embedding_to_jobs(jobs: list[Job]) -> Job:
    text_list = []
    for job in jobs:
        text_list.append(job_highlights_to_text(job.job_highlights))
    
    result = gemini_client.models.embed_content(
        model="text-embedding-004",
        contents=text_list,
        config=types.EmbedContentConfig(task_type="SEMANTIC_SIMILARITY")
    )

    for job in jobs:
        embed = result.embeddings.pop(0)
        job.job_vector = embed.values

## Querying Jobs

In [76]:
from serpapi import GoogleSearch


def get_jobs(
        query: str, location: str="Brazil", next_page_token: str = None
        ) -> tuple[list[dict], str]:
    params = {
    "engine": "google_jobs",
    "q": query,
    "location": location,
    "next_page_token": next_page_token,
    "api_key": SERPAPI_KEY
    }

    search = GoogleSearch(params)
    results = search.get_dict()
    if "serpapi_pagination" in results:
        next_page_token = results["serpapi_pagination"].get("next_page_token", None)
    jobs_results = results["jobs_results"]
    return jobs_results, next_page_token


## Parsing results and adding embedings to jobs

In [12]:
parsed_objects = [job_factory(job) for job in jobs_results]

In [13]:
add_embedding_to_jobs(parsed_objects)

# Sending to mongoDB

In [69]:
client = MongoClient(MONGO_URI)
db = client[DATABASE_NAME]
collection = db[COLLECTION_NAME]

# Create an index on the job_id field
collection.create_index("job_id", unique=True)

'job_id_1'

# Batch Transactions

In [77]:
next_page_token = None
for batch_num in range(10):
    try: 
        print(f"Processing batch number {batch_num}")
        jobs_results, next_page_token = get_jobs(
            "python developer", next_page_token=next_page_token
        )
        parsed_objects = [job_factory(job) for job in jobs_results]
        add_embedding_to_jobs(parsed_objects)
        collection.insert_many(
            [job.model_dump(mode="json") for job in parsed_objects],
            ordered=False
        )

        if not next_page_token:
            break

    except BulkWriteError as bulkexcep:
        if bulkexcep.details["writeErrors"][0]["code"] == 11000:
            print("Job already exists")
        pass

Processing batch number 0
Job already exists
Processing batch number 1
Job already exists
Processing batch number 2
Job already exists
Processing batch number 3
Job already exists
Processing batch number 4
Processing batch number 5
Job already exists
Processing batch number 6
Job already exists
Processing batch number 7
Job already exists
Processing batch number 8
Job already exists
Processing batch number 9
Job already exists
