# Connexion_France_Travail_ADZUNA.ipynb

Ce script a pour objectif :
- d'extraire les offres d'emploi mises à disposition par :
  <br> _ l'API de **France Travail**
  <br>_ de l'API **ADZUNA**
- les stocker dans :
  <br> _ un fichier (**CSV**) en local
  <br> _ un fichier (**Parquet**) en local
  <br> _ dans une **BDD PostgreSQL** en local.

**/!\ Ajouts !** : 
- Modification du schéma de la table cible de la base de données : ajout du champ 'embedding'
- Enregistrement des embeddings de chaque offre sur la base du champ 'description' directement dans la base de données.
- Calcul de la similarité (en SQL) entre l'embedding d'un texte de référence et l'embedding de chaque offre présente dans la bdd.
- Amélioration du logging.

Comment ?
1. Sur la base de critères spécifiques (mots clés, localisation, etc...), 
    - lancement d'une requête pour obtenir les offres d'emploi correspondantes via l'API France Travail
    - lancement d'une requête pour obtenir les offres d'emploi correspondantes via l'API Adzuna
2. Une fois les offres trouvées, vérification et suppression des doublons.
3. Une sauvegarde en local des offres sont stockées dans un fichier (**CSV**).
4. Une sauvegarde en local des offres sont stockées dans un fichier (**Parquet**).
5. Une sauvegarde dans une base de données **PostgreSQL** est également effectuée en local.

Les URL (FRANCE TRAVAIL) utiles sont :
- https://francetravail.io/data/api/offres-emploi
- https://francetravail.io/data/api/offres-emploi/documentation#/

Les URL (ADZUNA) utiles sont :
- https://developer.adzuna.com/overview
- https://developer.adzuna.com/docs/search
- https://developer.adzuna.com/activedocs#!/adzuna/search
- https://developer.adzuna.com/overview
- https://www.adzuna.fr/details/5376850320?utm_medium=api&utm_source=6d1ef246

## Imports

In [335]:
import os
import requests
import pandas as pd
from datetime import datetime
from sqlalchemy import create_engine, Table, MetaData, text
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.exc import SQLAlchemyError
import spacy
from sentence_transformers import SentenceTransformer, util
import logging
import time
import numpy as np
import http.client
import json
import hashlib

## Procédure

### Configuration

In [336]:
##################  VARIABLES  ##################
# France Travail
FT_CLIENT_ID = os.environ.get("FT_CLIENT_ID")
FT_CLIENT_SECRET = os.environ.get("FT_CLIENT_SECRET")
FT_SCOPE = os.environ.get("FT_SCOPE")

# Adzuna
ADZUNA_CLIENT_ID = os.environ.get("ADZUNA_CLIENT_ID")
ADZUNA_CLIENT_SECRET = os.environ.get("ADZUNA_CLIENT_SECRET")

# Param Database PostgreSQL
DB_NAME = os.environ.get("DB_NAME", "jobsdb")
DB_USER = os.environ.get("DB_USER","jobsuser")
DB_PASS = os.environ.get("DB_PASS", "jobspass")
DB_HOST = os.environ.get("DB_HOST", "localhost")
DB_PORT = os.environ.get("DB_PORT","5432")

# Nom table
DB_TABLE_NAME = "offres"

### Configuration Logging

In [337]:
# Logging
LOG_DIR = "logs"
os.makedirs(LOG_DIR, exist_ok=True)
logging.basicConfig(
    filename=os.path.join(LOG_DIR, f"pipeline_{datetime.now().strftime('%Y-%m-%d')}.log"),
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)

### NLP ET embeddings

In [338]:
# NLP & embeddings
nlp = spacy.load("fr_core_news_sm")
model = SentenceTransformer('sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2')

### Texte de référence (CV)

In [339]:
# Offre de référence
reference_text = """
Data Analyst avec expertise Python, SQL, Power BI et analyse de données industrielles.
"""
reference_text_clean = " ".join([token.lemma_ for token in nlp(reference_text.lower()) if not token.is_stop])

### Paramètres de recherche

In [340]:
# ---------------------------
# PARAMETRES DE RECHERCHE
# ---------------------------

# Paramètres de recherche
JOB_QUERY = "data analyst"
COMMUNE = "78300"
DISTANCE = 100000

# Nombre d'annonces par page requise
BLOC_PAGINATION = 50

# Nombre de pages max
MAX_PAGES = 20   # Limiter le nombre de pages récupérées

### Paramètres de sauvegarde

In [341]:
# ---------------------------
# PARAMETRES DE SAUVEGARDE
# ---------------------------
# Répertoires
CSV_DIR = "../data/csv"
PARQUET_DIR = "../data/parquet"

### Authentification France Travail

In [342]:
# ---------------------------
# AUTH FRANCE TRAVAIL
# ---------------------------
def get_token(retries=3, wait=5):
    url = "https://entreprise.pole-emploi.fr/connexion/oauth2/access_token?realm=/partenaire"
    data = {
        "grant_type": "client_credentials",
        "client_id": FT_CLIENT_ID,
        "client_secret": FT_CLIENT_SECRET,
        "scope": FT_SCOPE,
    }
    for attempt in range(retries):
        try:
            r = requests.post(url, data=data)
            r.raise_for_status()
            return r.json()["access_token"]
        except requests.RequestException as e:
            logging.warning(f"Erreur OAuth attempt {attempt+1}: {e}")
            time.sleep(wait)
    raise RuntimeError("Impossible d'obtenir un token OAuth après plusieurs essais.")

### Lancement requête API France Travail

In [343]:
# ---------------------------
# API CALL FRANCE TRAVAIL
# ---------------------------
def fetch_france_travail_jobs(token, max_pages=MAX_PAGES):
    headers = {"Authorization": f"Bearer {token}"}
    try:        
        all_jobs = []
        b_stop_criteria = False
        
        for page in range(1, max_pages + 1):
            if b_stop_criteria == False:    
                url = f"https://api.francetravail.io/partenaire/offresdemploi/v2/offres/search"
                params = {
                    "motsCles": JOB_QUERY,
                    "commune": COMMUNE,
                    "distance" : DISTANCE,
                    "range": f"{(page-1)*BLOC_PAGINATION}-{page*BLOC_PAGINATION-1}"  # pagination par blocs de 50
                }
                r = requests.get(url, headers=headers, params=params)
                r.raise_for_status()
                data = r.json()
                offres = data.get("resultats", [])
                    
                for o in offres:
                    all_jobs.append({
                        "source": "France Travail",
                        "id":o.get("id") if o.get("id") is not None else "None",    
                        "titre": o.get("intitule") if o.get("intitule") is not None else "None",                     
                        "description": o.get("description") if o.get("description") is not None else "None", 
                        "entreprise": o.get("entreprise", {}).get("nom") if o.get("entreprise", {}).get("nom") is not None else "None", 
                        "lieu": o.get("lieuTravail", {}).get("libelle") if o.get("lieuTravail", {}).get("libelle") is not None else "None", 
                        "latitude": o.get("lieuTravail", {}).get("latitude") if o.get("lieuTravail", {}) is not None else "None", 
                        "longitude": o.get("lieuTravail", {}).get("longitude") if o.get("lieuTravail", {}) is not None else "None", 
                        "type_contrat_libelle": o.get("typeContratLibelle") if o.get("typeContratLibelle") is not None else "None", 
                        "date_publication": o.get("dateCreation") if o.get("dateCreation") is not None else "None",    
                        "url": o.get("origineOffre").get("urlOrigine") if o.get("origineOffre") is not None else "None",
                        "secteur_activites": o.get("secteurActiviteLibelle") if o.get("dateCreation") is not None else "None"
                    })
    
                # Si le nombre d'offres est inférieur au nombre max d'offre par pages, c'est un signe qu'il n'y a plus d'offres à extraire après la page actuelle.
                if len(offres) < BLOC_PAGINATION:
                    b_stop_criteria = True
                
        return all_jobs
    except requests.RequestException as e:
        logging.error(f"Erreur API France Travail: {e}")
        return []

### Lancement de requête Adzuna

In [344]:
# ---------------------------
# API CALL ADZUNA
# ---------------------------
def fetch_adzuna_jobs(max_pages=MAX_PAGES):
    headers = {"Accept": "application/json"}
    try:        
        all_jobs = []
        b_stop_criteria = False
        
        for page in range(1, max_pages + 1):
            if b_stop_criteria == False:    
                url = f"https://api.adzuna.com/v1/api/jobs/fr/search/{page}"
                params = {
                    "app_id" : ADZUNA_CLIENT_ID,
                    "app_key" : ADZUNA_CLIENT_SECRET,
                    "title_only": JOB_QUERY,
                    "where": COMMUNE,
                    "results_per_page" : BLOC_PAGINATION,
                    "distance" : DISTANCE
                }
                r = requests.get(url,params=params)
                r.raise_for_status()
                data = r.json()
                offres = data.get("results")
    
                for o in offres:
                    all_jobs.append({
                        "source": "Adzuna",
                        "id" : o.get("id") if o.get("id") is not None else "None", 
                        "titre" : o.get("title") if o.get("title") is not None else "None", 
                        "description" : o.get("description") if o.get("description") is not None else "None", 
                        "entreprise": o.get("company").get("display_name") if o.get("company") is not None else "None",
                        "lieu" : o.get("location").get("display_name") if o.get("location") is not None else "None",    
                        "latitude" : o.get("latitude") if o.get("latitude") is not None else "None", 
                        "longitude" : o.get("longitude") if o.get("longitude") is not None else "None",
                        "type_contrat_libelle" : o.get("contract_type") if o.get("contract_type") is not None else "None",                
                        "date_publication" : o.get("created") if o.get("created") is not None else "None",  
                        "url" : o.get("redirect_url") if o.get("redirect_url") is not None else "None",  
                        "secteur_activites" : o.get("category").get("label") if o.get("category") is not None else "None",
                    })
                    
                # Si le nombre d'offres est inférieur au nombre max d'offre par pages, c'est un signe qu'il n'y a plus d'offres à extraire après la page actuelle.
                if len(offres) < BLOC_PAGINATION:
                    b_stop_criteria = True
                
        return all_jobs
    except requests.RequestException as e:
        logging.error(f"Erreur API Adzuna: {e}")
        return []

### Déduplication

In [345]:
# ---------------------------
# DÉDUPLICATION
# ---------------------------
def deduplicate(jobs):
    seen = set()
    deduped = []
    for job in jobs:
        key_str = f"{job['titre']}_{job['entreprise']}_{job['latitude']}_{job['longitude']}_{job['date_publication']}"
        key = hashlib.md5(key_str.encode()).hexdigest()
        if key not in seen:
            seen.add(key)
            deduped.append(job)
    return deduped

### Nettoyage de texte

In [346]:
def clean_text(text):
    if not text:
        return ""
    doc = nlp(text.lower())
    return " ".join([token.lemma_ for token in doc if not token.is_stop])

### Calcul embeddings

In [347]:
def compute_embedding(text):
    return model.encode([text], convert_to_numpy=True,show_progress_bar=False)[0].tolist()

### Initialisation database

In [348]:
# --- Initialisation DB ---
# SUPPRESSION DE Date_creation TIMESTAMP
def init_db(engine):
    with engine.begin() as conn:
        
        # Activer l'extension PGVector
        conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector;"))

        # Créer la table
        conn.execute(text("""
        CREATE TABLE IF NOT EXISTS offres (
            id TEXT PRIMARY KEY,
            source TEXT,
            titre TEXT,
            description TEXT,
            entreprise TEXT,
            lieu TEXT,
            latitude FLOAT(4),
            longitude FLOAT(4),
            type_contrat_libelle TEXT,
            date_publication DATE,
            url TEXT,
            secteur_activites TEXT,
            last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            embedding vector(384)  -- dimension MiniLM
        )
        """))


### Sauvegarde en base PostgreSQL

In [349]:
def save_to_postgres_upsert(df, engine, table_name="offres"):
    """
    Sauvegarde un DataFrame pandas dans PostgreSQL avec UPSERT.
    Met à jour last_updated pour chaque ligne insérée ou modifiée.
    
    Args:
        df (pd.DataFrame): données à insérer
        engine (sqlalchemy.Engine): moteur SQLAlchemy connecté à PostgreSQL
        table_name (str): nom de la table cible
    
    Returns:
        int: nombre de lignes insérées ou mises à jour
    """
    
    if df.empty:
        logging.info("📭 DataFrame vide, rien à insérer.")
        return 0

    # Nettoyage des NaN
    df = df.where(pd.notnull(df), None)
    
    metadata = MetaData()
    table = Table(table_name, metadata, autoload_with=engine)
    now = datetime.utcnow()
    count = 0

    try:        
        with engine.begin() as conn:
            for row in df.to_dict(orient="records"):
                row["last_updated"] = now
                
                # Calcul embedding uniquement si nouvelle offre
                if not row.get("embedding"):
                    row["embedding"] = compute_embedding(row["description"])
                    
                stmt = pg_insert(table).values(row)
                stmt = stmt.on_conflict_do_update(
                    index_elements=['id'],
                    set_={
                        'source': stmt.excluded.source,
                        'titre': stmt.excluded.titre,
                        'description': stmt.excluded.description,
                        'entreprise': stmt.excluded.entreprise,
                        'lieu': stmt.excluded.lieu,
                        'latitude': stmt.excluded.latitude,
                        'longitude': stmt.excluded.longitude,    
                        'type_contrat_libelle': stmt.excluded.type_contrat_libelle,
                        'date_publication': stmt.excluded.date_publication,
                        'url': stmt.excluded.url,
                        'secteur_activites': stmt.excluded.secteur_activites,
                        # forcé à chaque exécution, même sans changement d'autres colonnes
                        'last_updated': now,
                        'embedding': stmt.excluded.embedding
                    }
                )
                conn.execute(stmt)
                count += 1
        logging.info(f"{count} offres insérées/mises à jour dans PostgreSQL.")
        return count

    except SQLAlchemyError as e:
        logging.error(f"❌ Erreur lors de l'UPSERT : {str(e)}")
        return 0

### Calcul similarité

In [350]:
def search_similar_offres(reference_text, engine, top_k=10):
    ref_emb = compute_embedding(reference_text)
    ref_emb_str = "[" + ",".join(map(str, ref_emb)) + "]"  # convertir en string pour PGVector

    query = text("""
        SELECT id, titre, description, entreprise, simil
        FROM (
            SELECT id, titre, description, entreprise,
            1 - (embedding <#> (:ref)::vector) AS simil
            FROM offres
        ) AS s
        ORDER BY simil DESC
        LIMIT :k
    """)
    
    with engine.connect() as conn:
        result = conn.execute(query, {"ref": ref_emb_str, "k": top_k})       
        return pd.DataFrame(result.fetchall(),columns=["id","titre","description","entreprise","similarity"])


### Sauvegarde CSV

In [351]:
# --- Sauvegarde Parquet ---
def save_to_csv(df):
    if df.empty:
        return
    os.makedirs(CSV_DIR, exist_ok=True)
    today = datetime.now().strftime("%Y-%m-%d")
    path_csv = os.path.join(CSV_DIR, f"{today}_offres.csv")
    df.to_csv(path_csv, index=False,encoding="utf-8")
    print(f"✅ Sauvegardé dans {path_csv}")

### Sauvegarde Parquet

In [352]:
# --- Sauvegarde Parquet ---
def save_to_parquet(df):
    if df.empty:
        return
    os.makedirs(PARQUET_DIR, exist_ok=True)
    today = datetime.now().strftime("%Y-%m-%d")
    path_parquet = os.path.join(PARQUET_DIR, f"{today}_offres.parquet")
    df.to_parquet(path_parquet, index=False)
    print(f"✅ Sauvegardé dans {path_parquet}")

### Pipeline principal

In [353]:
# ---------------- PIPELINE ----------------
def run_pipeline():
    logging.info("Début du pipeline.")

    try:        
        print("Authentification France Travail...")
        token = get_ft_token()
    
        print("Récupération des offres France Travail...")
        ft_jobs = fetch_france_travail_jobs(token)
    
        print("Récupération des offres Adzuna...")
        adzuna_jobs = fetch_adzuna_jobs()
    
        print("Fusion et déduplication...")
        all_jobs = ft_jobs + adzuna_jobs
    
        if not all_jobs:
            print("⚠️ Aucune offre trouvée.")
            return
    
        print(f"Nombre d'offres d'emploi avant déduplication : {len(all_jobs)}")
        jobs_clean = deduplicate(all_jobs)
        print(f"Nombre d'offres d'emploi après déduplication : {len(jobs_clean)}")
    
        print("Affichage Extract offres France Travail...")
        df = pd.DataFrame(jobs_clean)
    
        # Export vers CSV
        print("💾 Sauvegarde en CSV...")
        save_to_csv(df)
    
        # Export vers Parquet
        print("💾 Sauvegarde en Parquet...")
        save_to_parquet(df)
        
        print(f"{len(jobs_clean)} offres uniques exportées dans {CSV_DIR} et {PARQUET_DIR} ✅")
    
        # Connexion DB
        print("Connexion à la base PostgreSQL...")
        engine = create_engine(f"postgresql+psycopg2://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}")
    
        # Initier la bdd
        init_db(engine)
    
        print("💾 Sauvegarde en base PostgreSQL...")
        save_to_postgres_upsert(df, engine, table_name=DB_TABLE_NAME)
        print("💾 Sauvegarde en base PostgreSQL TERMINEE !!!...")

        # Recherche top offres similaires
        print("Lancement requête en similarité ...")
        top_offres = search_similar_offres(reference_text_clean,engine, top_k=10)
        logging.info(f"Top 10 offres les plus proches:\n{top_offres}")
        logging.info("Pipeline terminé avec succès.")
    
        # Affichage extract offres
        display(top_offres.shape)
        display(top_offres)

        # Affichage extract offres
        display(df.shape)
        display(df.head(3))

        print("FIN DU SCRIPT !!!...")
        
    except Exception as e:
        logging.exception(f"Pipeline échoué: {e}")

In [354]:
# # Chargement depuis CSV
# today = datetime.now().strftime("%Y-%m-%d")
# path_parquet = os.path.join(PARQUET_DIR, f"{today}_offres.parquet")
# df = pd.read_parquet(path_parquet)
# display(df.shape)
# display(df.head(3))

### Procédure principale

In [355]:
# ---------------------------
# MAIN
# ---------------------------
if __name__ == "__main__":
    run_pipeline()

Authentification France Travail...
Récupération des offres France Travail...
Récupération des offres Adzuna...
Fusion et déduplication...
Nombre d'offres d'emploi avant déduplication : 1192
Nombre d'offres d'emploi après déduplication : 1190
Affichage Extract offres France Travail...
💾 Sauvegarde en CSV...
✅ Sauvegardé dans ../data/csv/2025-09-18_offres.csv
💾 Sauvegarde en Parquet...
✅ Sauvegardé dans ../data/parquet/2025-09-18_offres.parquet
1190 offres uniques exportées dans ../data/csv et ../data/parquet ✅
Connexion à la base PostgreSQL...
💾 Sauvegarde en base PostgreSQL...


  table = Table(table_name, metadata, autoload_with=engine)


💾 Sauvegarde en base PostgreSQL TERMINEE !!!...
Lancement requête en similarité ...


(10, 5)

Unnamed: 0,id,titre,description,entreprise,similarity
0,1071294,Data analyst (H/F),"Analyse des données (économiques, statistiques...",ENGAGETECH SOCIAL,9.274402
1,6717979,Data analyst (H/F),"Analyse des données (économiques, statistiques...",L'ECOLE NUMERIQUE,9.274402
2,5400064907,Data analyst (H/F) (Intérim),Poste : Votre mission au sein de la data facto...,Leader,8.678255
3,5399136679,Data analyst (H/F) (Intérim),Poste : Votre mission au sein de la data facto...,Leader,8.678255
4,5377455904,BUSINESS DATA analyste informatiqueE (IT),Je recherche pour un de mes clients un consult...,UCASE CONSULTING,8.422405
5,5380688338,Data Analyst Senior H/F,Nous recherchons un Data Analyst Senior pour a...,Ekkiden,8.389194
6,5331594969,Data Analyst Power Bi H/F,L'agence Providence recrute pour l'un de ses c...,PROvidence,8.341934
7,5339226722,consultant informatique Business analyste info...,Dans le cadre du renforcement de notre équipe ...,CRYSTAL PLACEMENT,8.313993
8,5386276133,Data Analyst,Offre d’emploi : Data Analyst Power BI – H/F L...,Worldgrid,8.254158
9,5144986652,Assistant Data Analyst - Alternance H/F,Au sein de notre entreprise partenaire Vos mis...,Digital School of Paris,8.217449


(1190, 12)

Unnamed: 0,source,id,titre,description,entreprise,lieu,latitude,longitude,type_contrat_libelle,date_publication,url,secteur_activites
0,France Travail,197XKGK,Data analyst (H/F),Pourquoi a-t-on besoin de vous?\nDans le cadre...,SAINT-GOBAIN DISTRIBUTION BATIMENT FRANC,92 - Courbevoie,48.901819,2.267542,CDD - 12 Mois,2025-09-17T07:27:40.068Z,https://candidat.francetravail.fr/offres/reche...,Activités des sièges sociaux
1,France Travail,197XHCS,DATA ANALYST/SCIENTIST(H/F),"En tant que Data Analyst - Data Scientist, vou...",INFOBAM,972 - LE LAMENTIN,14.615411,-61.003361,CDI,2025-09-16T18:29:39.856Z,https://candidat.francetravail.fr/offres/reche...,Gestion d'installations informatiques
2,France Travail,197TNXM,Data Analyst ETL / SI Junior - H/F (H/F),"Empreinte est une société Française qui crée, ...",EMPREINTE SA,29 - BREST,48.414003,-4.491985,CDI,2025-09-12T17:46:57.887Z,https://candidat.francetravail.fr/offres/reche...,Fabrication de vêtements de dessous


FIN DU SCRIPT !!!...
