In [1]:
import time
from datetime import datetime
import random
import logging

import json
import sqlite3
from pathlib import Path
from typing import List, Dict
from dotenv import dotenv_values
import pandas as pd

import requests
from requests.adapters import HTTPAdapter, Retry
from requests_cache import CachedSession
from requests.exceptions import RetryError, RequestException

In [2]:
env = dotenv_values("../.env")
OFFERS_URL = env.get("OFFERS_URL")

In [3]:
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)

In [4]:
class JobOffersFetcher:
    def __init__(
        self,
        base_url: str,
        cache_name="fetcher_cache",
        cache_expire=60,
        use_cache=False,
        min_delay=1,
        max_delay=4,
        retries=5,
        backoff_range=(1, 4),
    ):
        if not base_url:
            raise ValueError("URL is required and cannot be empty")
        self.base_url = base_url
        self.min_delay = min_delay
        self.max_delay = max_delay
        self.retries = retries
        self.backoff_range = backoff_range
        self.session = self._init_session(cache_name, cache_expire, use_cache)

    def _init_session(self, cache_name, cache_expire, use_cache):
        backoff_factor = random.uniform(*self.backoff_range)

        retry_strategy = Retry(
            total=self.retries,
            backoff_factor=backoff_factor,
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["GET"],
        )
        adapter = HTTPAdapter(max_retries=retry_strategy)

        if use_cache:
            session = CachedSession(
                cache_name, expire_after=cache_expire, use_cache=True
            )
        else:
            session = requests.Session()

        session.mount("http://", adapter)
        session.mount("https://", adapter)

        def log_response_hook(response, *args, **kwargs):
            logger.info(f"Hook: {response.status_code} {response.url}")

        session.hooks["response"] = [log_response_hook]

        return session

    def fetch_page(self, page: int = 1) -> dict:
        try:
            response = self.session.get(self.base_url, params={"page": page})
            response.raise_for_status()

            return response.json()
        except RetryError as e:
            raise RuntimeError(f"All retries failed: {e}")
        except RequestException as e:
            logger.error(f"Request failed: {e}")
            raise
        except Exception as e:
            logger.error(f"Error: {e}")
            raise

    def fetch_pages(self, max_pages: int = 5):
        pages = list(range(1, max_pages + 1))
        random.shuffle(pages)

        all_data = []

        for page in pages:
            page_data = self.fetch_page(page)
            all_data.extend(page_data.get("data", []))
            logger.info(f"Saved page: {page}")
            time.sleep(random.randint(self.min_delay, self.max_delay))

        return {"data": all_data}

In [5]:
# job_offers_fetcher = JobOffersFetcher(OFFERS_URL)
# data = job_offers_fetcher.fetch_pages(3)

In [6]:
# data["data"][199]

In [7]:
# with open(f"offers.json", "w") as f:
#     f.write(json.dumps(data))

In [8]:
class JobOfferDatabase:
    def __init__(self, db_path: Path = Path("../data/db/job_offers.db")):
        self.db_path = db_path
        self.db_path.parent.mkdir(parents=True, exist_ok=True)
        self._create_table()

    @staticmethod
    def to_int_or_none(val):
        if val is None:
            return None
        return int(bool(val))

    @staticmethod
    def clean_str(val):
        if isinstance(val, str):
            return val.strip()
        return val

    def _create_table(self):
        with sqlite3.connect(self.db_path) as conn:
            conn.execute(
                """
            CREATE TABLE IF NOT EXISTS job_offers (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                slug TEXT,
                title TEXT,
                requiredSkills TEXT,
                niceToHaveSkills TEXT,
                workplaceType TEXT,
                workingTime TEXT,
                experienceLevel TEXT,
                employmentTypes TEXT,
                categoryId INTEGER,
                multilocation TEXT,
                city TEXT,
                street TEXT,
                latitude TEXT,
                longitude TEXT,
                remoteInterview INTEGER,
                companyName TEXT,
                companyLogoThumbUrl TEXT,
                publishedAt TEXT,
                openToHireUkrainians INTEGER,
                languages TEXT,
                date_fetched TEXT
            )
            """
            )

    def insert_offer(self, offer: Dict, date_fetched: str):
        with sqlite3.connect(self.db_path) as conn:
            conn.execute(
                """
            INSERT INTO job_offers (
                slug, title, requiredSkills, niceToHaveSkills, workplaceType,
                workingTime, experienceLevel, employmentTypes, categoryId, multilocation,
                city, street, latitude, longitude, remoteInterview,
                companyName, companyLogoThumbUrl, publishedAt, openToHireUkrainians,
                languages, date_fetched
            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            """,
                (
                    self.clean_str(offer.get("slug")),
                    self.clean_str(offer.get("title")),
                    json.dumps(offer.get("requiredSkills")),
                    json.dumps(offer.get("niceToHaveSkills")),
                    self.clean_str(offer.get("workplaceType")),
                    self.clean_str(offer.get("workingTime")),
                    self.clean_str(offer.get("experienceLevel")),
                    json.dumps(offer.get("employmentTypes")),
                    offer.get("categoryId"),
                    json.dumps(offer.get("multilocation")),
                    self.clean_str(offer.get("city")),
                    self.clean_str(offer.get("street")),
                    str(offer.get("latitude")),
                    str(offer.get("longitude")),
                    self.to_int_or_none(offer.get("remoteInterview")),
                    self.clean_str(offer.get("companyName")),
                    self.clean_str(offer.get("companyLogoThumbUrl")),
                    self.clean_str(offer.get("publishedAt")),
                    self.to_int_or_none(offer.get("openToHireUkrainians")),
                    json.dumps(offer.get("languages")),
                    date_fetched,
                ),
            )

    def insert_offers(self, offers: List[Dict]):
        date_fetched = datetime.now().strftime("%Y-%m-%d")
        for offer in offers:
            self.insert_offer(offer, date_fetched)

In [None]:
fetcher = JobOffersFetcher(OFFERS_URL)
offers = fetcher.fetch_pages()["data"]

db = JobOfferDatabase()
db.insert_offers(offers)

In [12]:
with sqlite3.connect("../data/db/job_offers.db") as conn:
    query = "SELECT * FROM job_offers"
    df = pd.read_sql_query(query, conn)

In [None]:
# df.shape
df.head()