## Collecte des contenus du site Cegid pour RAG et SEO

Ce notebook permet de :
- R√©cup√©rer les articles, pages, landing pages et custom posts via l'API WordPress REST
- Stocker les contenus dans la table `Cegid_website` avec gestion des offsets
- Supporter l'incr√©mental via le tracking des IDs d√©j√† import√©s


## 1. Configuration

In [0]:
# Configuration WordPress
# Credentials (utiliser Databricks Secrets en production)
WP_BASE_URL = "https://www.cegid.com"  # Domaine racine
WP_LOGIN = "semji"
WP_PASSWORD = " 2VUV BySM SSrp wzJW ZFul nLaf"  # Set your password here

# Configuration des sites par langue
WP_SITES = {
    "fr": {
        "prefix": "fr",
        "language": "fr",
        "label": "France"
    },
    "es": {
        "prefix": "es", 
        "language": "es",
        "label": "Espa√±a"
    },
    "uk": {
        "prefix": "uk",
        "language": "en-GB",
        "label": "United Kingdom"
    },
    "us": {
        "prefix": "us",
        "language": "en-US", 
        "label": "United States"
    },
    "de": {
        "prefix": "de",
        "language": "de",
        "label": "Deutschland"
    },
    "it": {
        "prefix": "it",
        "language": "it",
        "label": "Italia"
    },
    "pt": {
        "prefix": "pt",
        "language": "pt",
        "label": "Portugal"
    },
}

# Site(s) √† importer (liste ou "all" pour tous)
WP_SITES_TO_IMPORT = ["fr"]  # Ex: ["fr", "es"] ou list(WP_SITES.keys()) pour tous

# Configuration Databricks
DATABRICKS_CONFIG = {
    "catalog": "gdp_cdt_dev_04_gld",  # Adapter selon votre catalogue Unity Catalog
    "schema": "sandbox_mkt",
    "table_name": "cegid_website"
}

# Configuration technique WordPress
WORDPRESS_CONFIG = {
    "base_url": WP_BASE_URL,
    "api_endpoint": "/wp-json/wp/v2",
    "per_page": 100,  # Maximum autoris√© par l'API WordPress
    "timeout": 30,
    "auth": (WP_LOGIN, WP_PASSWORD)  # Basic Auth ou Application Password
}

# Configuration des sites par langue
CONTENT_TYPES = {
    "post": {
        "endpoint": "/posts",
        "label": "blog"
    },
    "page": {
        "endpoint": "/pages",
        "label": "pages"
    }
}

## 2. Imports et initialisation


In [0]:
import requests
import json
from datetime import datetime
from typing import Dict, List, Optional, Tuple
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, 
    TimestampType, ArrayType, MapType, LongType
)
from pyspark.sql.functions import (
    col, lit, current_timestamp, when, max as spark_max,
    from_json, explode, regexp_replace, trim
)
from bs4 import BeautifulSoup
import html
from pyspark.sql.types import BooleanType

# Initialisation Spark (d√©j√† disponible dans Databricks)
spark = SparkSession.builder.getOrCreate()

## 3. Sch√©ma de la table Cegid_website


In [0]:
FIELD_MAPPING = {

    # --- IDENTIFIANTS ---
    "id": None,                          # Calcul√© via calculate_composite_id()
    "wp_id": "id",
    "content_type": None,                # Pass√© en param√®tre
    "site_id": None,                     # Config du site
    
    # --- M√âTADONN√âES ---
    "slug": "slug",
    "url": "link",
    "title": "title.rendered",
    
    # --- SEO ---
    "meta_description": "yoast_head_json.description",
    "meta_title": "yoast_head_json.title",
    "meta_keyword": "yoast_head_json.focuskw",        # Focus keyword Yoast
    "noindex": "yoast_head_json.robots.index",       # Robot noindex
    
    # --- CONTENU ---
    "content_raw": "content.rendered",
    "content_text": None,                # Calcul√© (HTML nettoy√©)
    "excerpt": "excerpt.rendered",
    
    # --- TAXONOMIES ---
    "categories": "categories",
    "tags": "tags",
    "custom_taxonomies": None,           # Extraction custom
    
    # --- DATES ---
    "date_published": "date",
    "date_modified": "modified",
    "date_imported": None,               # datetime.now()
    
    # --- AUTEUR & STATUT ---
    "status": "status",
    "author_id": "author",
    
    # --- MEDIA ---
    "featured_image_url": "_embedded.wp:featuredmedia.0.source_url",
    
    # --- LANGUE ---
    "language": "lang",
    
    # --- DONN√âES BRUTES ---
    "raw_json": None,
}

CONTENT_SCHEMA = StructType([
    # --- IDENTIFIANTS ---
    StructField("id", LongType(), False),
    StructField("wp_id", IntegerType(), False),
    StructField("content_type", StringType(), False),
    StructField("site_id", StringType(), False),
    
    # --- M√âTADONN√âES ---
    StructField("slug", StringType(), True),
    StructField("url", StringType(), True),
    StructField("title", StringType(), True),
    
    # --- SEO ---
    StructField("meta_description", StringType(), True),
    StructField("meta_title", StringType(), True),
    StructField("meta_keyword", StringType(), True),
    StructField("noindex", BooleanType(), True),
  
    # --- CONTENU ---
    StructField("content_raw", StringType(), True),
    StructField("content_text", StringType(), True),
    StructField("excerpt", StringType(), True),
    
    # --- TAXONOMIES ---
    StructField("categories", ArrayType(IntegerType()), True),
    StructField("tags", ArrayType(IntegerType()), True),
    StructField("custom_taxonomies", MapType(StringType(), ArrayType(IntegerType())), True),
    
    # --- DATES ---
    StructField("date_published", TimestampType(), True),
    StructField("date_modified", TimestampType(), True),
    StructField("date_imported", TimestampType(), False),
    
    # --- AUTEUR & STATUT ---
    StructField("status", StringType(), True),
    StructField("author_id", IntegerType(), True),
    
    # --- MEDIA ---
    StructField("featured_image_url", StringType(), True),
    
    # --- LANGUE ---
    StructField("language", StringType(), True),
    
    # --- DONN√âES BRUTES ---
    StructField("raw_json", StringType(), True),
])

## 4. Fonctions utilitaires

In [0]:
def clean_html_content(html_content: str) -> str:
    """
    Nettoie le contenu HTML pour extraire le texte brut.
    Utile pour le RAG et l'indexation.
    """
    if not html_content:
        return ""
    
    # D√©code les entit√©s HTML
    decoded = html.unescape(html_content)
    
    # Parse avec BeautifulSoup
    soup = BeautifulSoup(decoded, 'html.parser')
    
    # Supprime les scripts et styles
    for element in soup(['script', 'style', 'nav', 'footer', 'header']):
        element.decompose()
    
    # Extrait le texte
    text = soup.get_text(separator=' ', strip=True)
    
    # Nettoie les espaces multiples
    text = ' '.join(text.split())
    
    return text
def get_nested_value(data: dict, path: str, default=None):
    """
    Extrait une valeur imbriqu√©e depuis un dictionnaire via une notation point√©e.
    
    Exemples:
        get_nested_value(item, "title.rendered")  
        ‚Üí item["title"]["rendered"]
        
        get_nested_value(item, "_embedded.author.0.name")  
        ‚Üí item["_embedded"]["author"][0]["name"]
        
        get_nested_value(item, "yoast_head_json.description")
        ‚Üí item["yoast_head_json"]["description"]
    
    Args:
        data: Dictionnaire source
        path: Chemin en notation point√©e (ex: "a.b.0.c")
        default: Valeur par d√©faut si le chemin n'existe pas
    
    Returns:
        La valeur trouv√©e ou default
    """
    if not path or not data:
        return default
    
    keys = path.split('.')
    value = data
    
    try:
        for key in keys:
            if isinstance(value, dict):
                value = value.get(key)
            elif isinstance(value, list):
                # Support des index num√©riques (ex: "author.0.name")
                index = int(key)
                value = value[index] if len(value) > index else None
            else:
                return default
            
            if value is None:
                return default
        return value
    except (KeyError, IndexError, TypeError, ValueError):
        return default

def get_rendered_field(item: dict, field: str) -> str:
    """Extrait un champ rendu de l'API WordPress."""
    if field in item and isinstance(item[field], dict):
        return item[field].get('rendered', '')
    return item.get(field, '')

def parse_wp_date(date_str: str) -> Optional[datetime]:
    """
    Parse une date ISO WordPress en objet datetime.
    Formats support√©s: 
    - 2024-01-15T10:30:00
    - 2024-01-15T10:30:00Z
    - 2024-01-15
    """
    if not date_str:
        return None
    
    try:
        # Format avec T et possible Z
        if 'T' in date_str:
            # Supprime le Z final si pr√©sent
            clean_date = date_str.replace('Z', '')
            return datetime.fromisoformat(clean_date)
        else:
            # Format date seule
            return datetime.strptime(date_str, '%Y-%m-%d')
    except (ValueError, TypeError):
        return None

def calculate_composite_id(wp_id: int, content_type: str, site_id: str) -> int:
    """
    Calcule un ID composite unique bas√© sur le site, le type de contenu et l'ID WordPress.
    Permet d'√©viter les collisions entre sites et types de contenus.
    
    Structure: SITE_OFFSET + TYPE_OFFSET + wp_id
    
    Offset par site (milliards):
    - fr: 1_000_000_000
    - es: 2_000_000_000
    - uk: 3_000_000_000
    - etc.
    
    Offset par type (millions):
    - post: 0
    - page: 10_000_000
    - landing_page: 20_000_000
    - product: 30_000_000
    """
    SITE_OFFSETS = {
        "fr": 1_000_000_000,
        "es": 2_000_000_000,
        "uk": 3_000_000_000,
        "us": 4_000_000_000,
        "de": 5_000_000_000,
        "it": 6_000_000_000,
        "pt": 7_000_000_000,
        "root": 0,  # Site racine sans pr√©fixe
    }
    
    TYPE_OFFSETS = {
        "post": 0,
        "page": 10_000_000,
        "landing_page": 20_000_000,
        "product": 30_000_000,
        "custom_post_1": 40_000_000,
        "custom_post_2": 50_000_000,
    }
    
    site_offset = SITE_OFFSETS.get(site_id, 9_000_000_000)
    type_offset = TYPE_OFFSETS.get(content_type, 100_000_000)
    
    return site_offset + type_offset + wp_id

# 5. Classe principale du connecteur

In [0]:
class WordPressConnector:
    """
    Connecteur WordPress multi-sites vers Databricks.
    G√®re la r√©cup√©ration pagin√©e avec authentification et l'√©criture incr√©mentale.
    """
    
    def __init__(self, site_id: str, site_config: Dict, config: Dict = WORDPRESS_CONFIG):
        self.site_id = site_id
        self.site_config = site_config
        self.base_url = config["base_url"].rstrip('/')
        self.api_endpoint = config["api_endpoint"]
        self.per_page = config["per_page"]
        self.timeout = config["timeout"]
        self.auth = config.get("auth")
        self.session = requests.Session()
        
        # Configure l'authentification
        if self.auth:
            self.session.auth = self.auth
        
    def _get_site_url(self) -> str:
        """Construit l'URL du site avec le pr√©fixe de langue."""
        prefix = self.site_config.get("prefix", "")
        if prefix:
            return f"{self.base_url}/{prefix}"
        return self.base_url
        
    def _get_api_url(self, endpoint: str) -> str:
        """Construit l'URL compl√®te de l'API pour ce site."""
        return f"{self._get_site_url()}{self.api_endpoint}{endpoint}"
    
    def _fetch_page(self, endpoint: str, page: int, params: Dict = None) -> Tuple[List[Dict], int]:
        """
        R√©cup√®re une page de r√©sultats de l'API WordPress.
        Retourne les items et le nombre total de pages.
        """
        url = self._get_api_url(endpoint)
        
        request_params = {
            "page": page,
            "per_page": self.per_page,
            "_embed": 1,  # Inclut les donn√©es li√©es (auteur, featured image, etc.)
            "status": "publish"  # Seulement les contenus publi√©s
        }
        
        if params:
            request_params.update(params)
        
        try:
            response = self.session.get(
                url, 
                params=request_params, 
                timeout=self.timeout
            )
            response.raise_for_status()
            
            total_pages = int(response.headers.get('X-WP-TotalPages', 1))
            total_items = int(response.headers.get('X-WP-Total', 0))
            items = response.json()
            
            return items, total_pages
            
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 400:
                # Page au-del√† du maximum, fin de la pagination
                return [], 0
            print(f"‚ùå Erreur HTTP {e.response.status_code}: {e}")
            return [], 0
        except requests.exceptions.RequestException as e:
            print(f"‚ùå Erreur API WordPress: {e}")
            return [], 0
    
    def fetch_all_content(self, content_type: str, endpoint: str, 
                          since_id: Optional[int] = None) -> List[Dict]:
        """
        R√©cup√®re tous les contenus d'un type donn√© pour ce site.
        Support optionnel de l'import incr√©mental via since_id.
        """
        all_items = []
        page = 1
        total_pages = 1
        
        params = {}
        if since_id:
            # R√©cup√®re seulement les IDs sup√©rieurs (nouveaux contenus)
            params["after"] = since_id
        
        site_label = self.site_config.get("label", self.site_id)
        print(f"üì• [{site_label}] R√©cup√©ration des {content_type}s...")
        print(f"   URL: {self._get_api_url(endpoint)}")
        
        while page <= total_pages:
            items, total_pages = self._fetch_page(endpoint, page, params)
            
            if not items:
                break
                
            all_items.extend(items)
            print(f"   Page {page}/{total_pages} - {len(items)} items r√©cup√©r√©s")
            page += 1
        
        print(f"‚úÖ [{site_label}] Total {content_type}s r√©cup√©r√©s: {len(all_items)}")
        return all_items
    
    def transform_item(self, item: Dict, content_type: str) -> Dict:
        """
        Transforme un item WordPress en format standardis√© pour Databricks.
        
        Sch√©ma cible: gdp_cdt_dev_04_gld.sandbox_mkt.cegid_website
        22 colonnes (hors partitions)
        """
        wp_id = item.get('id')
        
        # === CONTENU ===
        content_raw = get_nested_value(item, 'content.rendered', '')
        content_text = clean_html_content(content_raw)
        excerpt_raw = get_nested_value(item, 'excerpt.rendered', '')
        
        # === MEDIA ===
        featured_image_url = get_nested_value(item, '_embedded.wp:featuredmedia.0.source_url')
        
        # === LANGUE ===
        language = item.get('lang') or self.site_config.get("language", "fr")
        
        # === SEO: noindex ===
        # Yoast stocke "index" ou "noindex" dans robots.index
        robots_index = get_nested_value(item, 'yoast_head_json.robots.index')
        noindex = robots_index == 'noindex' if robots_index else None
        
        # === TAXONOMIES CUSTOM ===
        custom_taxonomies = {}
        # Collecte toutes les taxonomies custom (occupation, etc.)
        for key in ['occupation', 'solution', 'secteur', 'product_type']:
            if key in item and isinstance(item[key], list) and item[key]:
                custom_taxonomies[key] = item[key]
        
        return {
            # --- IDENTIFIANTS ---
            "id": calculate_composite_id(wp_id, content_type, self.site_id),
            "wp_id": wp_id,
            "content_type": content_type,
            "site_id": self.site_id,
            
            # --- M√âTADONN√âES ---
            "slug": item.get('slug'),
            "url": item.get('link'),
            "title": get_nested_value(item, 'title.rendered', ''),
            
            # --- SEO ---
            "meta_description": get_nested_value(item, 'yoast_head_json.description'),
            "meta_title": get_nested_value(item, 'yoast_head_json.title'),
            "meta_keyword": get_nested_value(item, 'yoast_head_json.focuskw'),
            "noindex": noindex,
            
            # --- CONTENU ---
            "content_raw": content_raw,
            "content_text": content_text,
            "excerpt": clean_html_content(excerpt_raw),
            
            # --- TAXONOMIES ---
            "categories": item.get('categories', []),
            "tags": item.get('tags', []),
            "custom_taxonomies": custom_taxonomies if custom_taxonomies else None,
            
            # --- DATES ---
            "date_published": parse_wp_date(item.get('date')),
            "date_modified": parse_wp_date(item.get('modified')),
            "date_imported": datetime.now(),
            
            # --- AUTEUR & STATUT ---
            "status": item.get('status'),
            "author_id": item.get('author'),
            
            # --- MEDIA ---
            "featured_image_url": featured_image_url,
            
            # --- LANGUE ---
            "language": language,
            
            # --- DONN√âES BRUTES ---
            "raw_json": json.dumps(item, ensure_ascii=False),
        }

## 6. Fonctions de gestion de la table Databricks

In [0]:
def create_table_if_not_exists(catalog: str, schema: str, table_name: str):
    """Cr√©e la table si elle n'existe pas."""
    
    full_table_name = f"{catalog}.{schema}.{table_name}"
    
    # Cr√©e le sch√©ma si n√©cessaire
    spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{schema}")
    
    # V√©rifie si la table existe
    if not spark.catalog.tableExists(full_table_name):
        print(f"üìù Cr√©ation de la table {full_table_name}...")
        
        # Cr√©e une DataFrame vide avec le sch√©ma
        empty_df = spark.createDataFrame([], CONTENT_SCHEMA)
        
        # √âcrit en Delta avec partitionnement par site et type
        empty_df.write \
            .format("delta") \
            .partitionBy("site_id", "content_type") \
            .option("delta.enableChangeDataFeed", "true") \
            .saveAsTable(full_table_name)
        
        print(f"‚úÖ Table {full_table_name} cr√©√©e avec succ√®s")
    else:
        print(f"‚ÑπÔ∏è Table {full_table_name} existe d√©j√†")


def get_last_imported_id(catalog: str, schema: str, table_name: str, 
                         content_type: str, site_id: str) -> Optional[int]:
    """R√©cup√®re le dernier ID WordPress import√© pour un type de contenu et un site."""
    
    full_table_name = f"{catalog}.{schema}.{table_name}"
    
    try:
        result = spark.sql(f"""
            SELECT MAX(wp_id) as max_id 
            FROM {full_table_name} 
            WHERE content_type = '{content_type}'
              AND site_id = '{site_id}'
        """).collect()
        
        if result and result[0]['max_id']:
            return result[0]['max_id']
    except Exception as e:
        print(f"Note: {e}")
    
    return None


def upsert_content(df: DataFrame, catalog: str, schema: str, table_name: str):
    """
    Upsert (MERGE) des contenus dans la table Delta.
    Met √† jour les contenus existants, ins√®re les nouveaux.
    """
    
    full_table_name = f"{catalog}.{schema}.{table_name}"
    
    # Cr√©e une vue temporaire
    df.createOrReplaceTempView("new_content")
    
    # MERGE pour upsert
    spark.sql(f"""
        MERGE INTO {full_table_name} AS target
        USING new_content AS source
        ON target.id = source.id
        WHEN MATCHED THEN
            UPDATE SET
                title = source.title,
                content_raw = source.content_raw,
                content_text = source.content_text,
                excerpt = source.excerpt,
                meta_description = source.meta_description,
                categories = source.categories,
                tags = source.tags,
                custom_taxonomies = source.custom_taxonomies,
                date_modified = source.date_modified,
                date_imported = source.date_imported,
                status = source.status,
                featured_image_url = source.featured_image_url,
                raw_json = source.raw_json
        WHEN NOT MATCHED THEN
            INSERT *
    """)
    
    print(f"‚úÖ Upsert termin√© dans {full_table_name}")

## 7. Pipeline principal

In [0]:
def run_import_pipeline(content_types: Dict = CONTENT_TYPES, 
                        sites_to_import: List[str] = WP_SITES_TO_IMPORT,
                        incremental: bool = True,
                        specific_type: Optional[str] = None):
    """
    Ex√©cute le pipeline d'import complet pour un ou plusieurs sites.
    
    Args:
        content_types: Dictionnaire des types de contenu √† importer
        sites_to_import: Liste des site_id √† importer (ex: ["fr", "es"])
        incremental: Si True, importe seulement les nouveaux contenus
        specific_type: Si sp√©cifi√©, importe seulement ce type de contenu
    """
    
    catalog = DATABRICKS_CONFIG["catalog"]
    schema = DATABRICKS_CONFIG["schema"]
    table_name = DATABRICKS_CONFIG["table_name"]
    
    # Cr√©e la table si n√©cessaire
    create_table_if_not_exists(catalog, schema, table_name)
    
    # Filtre les types si sp√©cifi√©
    types_to_import = {specific_type: content_types[specific_type]} if specific_type else content_types
    
    total_imported = 0
    
    # Boucle sur les sites
    for site_id in sites_to_import:
        if site_id not in WP_SITES:
            print(f"‚ö†Ô∏è Site '{site_id}' non configur√©, ignor√©")
            continue
            
        site_config = WP_SITES[site_id]
        site_label = site_config.get("label", site_id)
        
        print(f"\n{'#'*60}")
        print(f"üåê SITE: {site_label} ({site_id})")
        print(f"   URL: {WORDPRESS_CONFIG['base_url']}/{site_config.get('prefix', '')}")
        print(f"{'#'*60}")
        
        # Initialise le connecteur pour ce site
        connector = WordPressConnector(site_id, site_config)
        
        for content_type, config in types_to_import.items():
            print(f"\n{'='*50}")
            print(f"üì¶ [{site_label}] Import: {config['label']} ({content_type})")
            print(f"{'='*50}")
            
            # R√©cup√®re le dernier ID pour import incr√©mental
            since_id = None
            if incremental:
                since_id = get_last_imported_id(catalog, schema, table_name, content_type, site_id)
                if since_id:
                    print(f"‚ÑπÔ∏è Mode incr√©mental - depuis ID: {since_id}")
            
            # R√©cup√®re les contenus WordPress
            items = connector.fetch_all_content(
                content_type=content_type,
                endpoint=config["endpoint"],
                since_id=since_id
            )
            
            if not items:
                print(f"‚ÑπÔ∏è Aucun nouveau contenu √† importer pour {content_type}")
                continue
            
            # Transforme les items
            transformed_items = [
                connector.transform_item(item, content_type) 
                for item in items
            ]
            
            # Cr√©e le DataFrame
            df = spark.createDataFrame(transformed_items, CONTENT_SCHEMA)
            
            # Upsert dans la table
            upsert_content(df, catalog, schema, table_name)
            
            total_imported += len(transformed_items)
            print(f"üìä [{site_label}] {len(transformed_items)} {content_type}(s) import√©(s)")
    
    print(f"\n{'#'*60}")
    print(f"üéâ Import termin√©! Total: {total_imported} contenus import√©s")
    print(f"   Sites trait√©s: {', '.join(sites_to_import)}")
    print(f"{'#'*60}")
    
    return total_imported

## 8. Ex√©cution

In [0]:

# =============================================================================
# EXEMPLES D'EX√âCUTION
# =============================================================================

# Import d'un seul site (FR), tous les types de contenu
# run_import_pipeline(sites_to_import=["fr"], incremental=False)

# Import incr√©mental d'un seul site
# run_import_pipeline(sites_to_import=["fr"], incremental=True)

# Import de plusieurs sites
# run_import_pipeline(sites_to_import=["fr", "es", "uk"], incremental=False)

# Import de TOUS les sites configur√©s
# run_import_pipeline(sites_to_import=list(WP_SITES.keys()), incremental=False)

# Import d'un type sp√©cifique sur un site
run_import_pipeline(sites_to_import=["fr"], specific_type="post", incremental=False)


‚ÑπÔ∏è Table gdp_cdt_dev_04_gld.sandbox_mkt.cegid_website existe d√©j√†

############################################################
üåê SITE: France (fr)
   URL: https://www.cegid.com/fr
############################################################

üì¶ [France] Import: blog (post)
üì• [France] R√©cup√©ration des posts...
   URL: https://www.cegid.com/fr/wp-json/wp/v2/posts
   Page 1/7 - 100 items r√©cup√©r√©s
   Page 2/7 - 100 items r√©cup√©r√©s
   Page 3/7 - 100 items r√©cup√©r√©s
   Page 4/7 - 100 items r√©cup√©r√©s
   Page 5/7 - 100 items r√©cup√©r√©s
   Page 6/7 - 100 items r√©cup√©r√©s
   Page 7/7 - 77 items r√©cup√©r√©s
‚úÖ [France] Total posts r√©cup√©r√©s: 677
‚úÖ Upsert termin√© dans gdp_cdt_dev_04_gld.sandbox_mkt.cegid_website
üìä [France] 677 post(s) import√©(s)

############################################################
üéâ Import termin√©! Total: 677 contenus import√©s
   Sites trait√©s: fr
############################################################


677

## 9. V√©rification des donn√©es

In [0]:
# Affiche un aper√ßu des donn√©es import√©es par site et type
full_table = f"{DATABRICKS_CONFIG['catalog']}.{DATABRICKS_CONFIG['schema']}.{DATABRICKS_CONFIG['table_name']}"

display(spark.sql(f"""
    SELECT 
        site_id,
        content_type,
        language,
        COUNT(*) as nb_items,
        MIN(date_published) as oldest,
        MAX(date_published) as newest,
        MAX(date_imported) as last_import
    FROM {full_table}
    GROUP BY site_id, content_type, language
    ORDER BY site_id, content_type
"""))

# COMMAND ----------

# Exemple: Aper√ßu des derniers articles par site
display(spark.sql(f"""
    SELECT 
        site_id,
        id,
        wp_id,
        title,
        
        url,
        language,
        LEFT(content_text, 200) as content_preview,
        date_published
    FROM {full_table}
    WHERE content_type = 'post'
    ORDER BY site_id, date_published DESC
    LIMIT 20
"""))

site_id,content_type,language,nb_items,oldest,newest,last_import
fr,post,fr,677,2011-11-24T15:17:11.000Z,2026-01-28T09:36:20.000Z,2026-02-04T13:38:39.714Z


site_id,id,wp_id,title,url,language,content_preview,date_published
fr,1000075982,75982,Comment un responsable formation peut devenir sponsor d‚Äôun projet learning tech,https://www.cegid.com/fr/blog/responsable-formation/,fr,,2026-01-28T09:36:20.000Z
fr,1000075877,75877,Harc√®lement moral institutionnel : comprendre la notion et s√©curiser vos pratiques RH,https://www.cegid.com/fr/blog/harcelement-moral-institutionnel/,fr,,2026-01-27T10:28:39.000Z
fr,1000075834,75834,Les tendances de la paie en 2026 : ce que les professionnels RH doivent anticiper,https://www.cegid.com/fr/blog/tendances-paie-2026-fait-generateur-transparence-salariale-competences-metier/,fr,,2026-01-26T09:56:50.000Z
fr,1000075826,75826,Calendrier fiscal 2026 : bien pr√©parer la nouvelle ann√©e !,https://www.cegid.com/fr/blog/calendrier-fiscal-2026-echeances-et-conseils/,fr,,2026-01-26T09:42:34.000Z
fr,1000075754,75754,Excellence Omnicanal : Le guide d‚Äôun CEO pour r√©pondre aux attentes des consommateurs en 2026,https://www.cegid.com/fr/blog/excellence-omnicanal-le-guide-ceo/,fr,,2026-01-22T09:52:04.000Z
fr,1000075750,75750,Int√©gration solution collaborative : r√©ussir sans perturber vos process¬†,https://www.cegid.com/fr/blog/integrer-solution-collaborative/,fr,,2026-01-22T09:28:45.000Z
fr,1000074809,74809,5 signaux critiques : votre tr√©sorerie est-elle brid√©e par Excel ?,https://www.cegid.com/fr/blog/5-signaux-critiques-tresorerie-bridee-par-excel/,fr,,2026-01-20T11:28:08.000Z
fr,1000075374,75374,D√©claration sociale nominative : pourquoi 2026 marque un tournant pour les entreprises ?,https://www.cegid.com/fr/blog/paie-declaration-sociale-nominative-2026/,fr,,2026-01-20T09:00:44.000Z
fr,1000074852,74852,Fiscalit√© internationale : les enjeux de conformit√© pour les grandes entreprises,https://www.cegid.com/fr/blog/fiscalite-internationale-entreprises/,fr,,2026-01-15T09:53:01.000Z
fr,1000074785,74785,Gestion des cong√©s : les nouvelles r√®gles √† appliquer pendant un arr√™t maladie,https://www.cegid.com/fr/blog/gestion-conges-arret-maladie/,fr,,2026-01-14T09:30:13.000Z


## 10. AI Functions - Formatage automatique du contenu

Cette section utilise `AI_QUERY` avec Claude Haiku pour convertir le contenu brut en Markdown structur√©.

**Logique de traitement :**
- Traite les items avec `date_modified` dans les 7 derniers jours
- Traite les nouveaux items non encore pr√©sents dans la table format√©e
- Met √† jour uniquement si la date de modification a chang√© (√©vite les appels AI inutiles)

In [None]:
# Configuration de la table destination
FORMATTED_TABLE = f"{DATABRICKS_CONFIG['catalog']}.{DATABRICKS_CONFIG['schema']}.cegid_website_formatted"
SOURCE_TABLE = f"{DATABRICKS_CONFIG['catalog']}.{DATABRICKS_CONFIG['schema']}.{DATABRICKS_CONFIG['table_name']}"

# Cr√©ation de la table format√©e si elle n'existe pas
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {FORMATTED_TABLE} (
        id BIGINT NOT NULL,
        wp_id INT,
        site_id STRING,
        content_type STRING,
        title STRING,
        url STRING,
        content_markdown STRING,
        date_modified TIMESTAMP,
        date_processed TIMESTAMP,
        PRIMARY KEY (id)
    )
    USING DELTA
""")

### 10.2 V√©rification des r√©sultats AI

In [None]:
# V√©rification des items trait√©s
display(spark.sql(f"""
    SELECT 
        site_id,
        content_type,
        COUNT(*) as nb_formatted,
        MIN(date_processed) as first_processed,
        MAX(date_processed) as last_processed
    FROM {FORMATTED_TABLE}
    GROUP BY site_id, content_type
    ORDER BY site_id, content_type
"""))

# Aper√ßu des derniers items format√©s
display(spark.sql(f"""
    SELECT 
        id,
        title,
        LEFT(content_markdown, 500) as markdown_preview,
        date_modified,
        date_processed
    FROM {FORMATTED_TABLE}
    ORDER BY date_processed DESC
    LIMIT 5
"""))