# Analyse des performances des médias français sur YouTube

Ce notebook présente le pipeline de collecte et de traitement des données YouTube
utilisé pour analyser la performance des chaînes de médias français.

Il couvre :
- la récupération des vidéos via l’API YouTube,
- le filtrage des vidéos longues (< 3 minutes) ainsi d'autres fonctions indispensables.
- le calcul des indicateurs finaux exportés dans un CSV.

In [None]:
# ===============================
# STARTER SCRAPING READY
# ===============================

# Modules principaux
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import datetime
import json
import requests
from bs4 import BeautifulSoup
import re
import sys
from urllib.parse import urljoin
from pprint import pprint
import time
from tqdm import tqdm
import random
import unicodedata
import statistics

# Vérifier que le kernel est bien actif
print(f"Python version: {sys.version}")

# Options Pandas
pd.set_option('display.max_rows', 200)
pd.set_option('display.float_format', '{:,.2f}'.format)
pd.set_option("display.max_colwidth", None)
pd.set_option('display.width', 2000)
pd.set_option('display.max_columns', 200)

# Style Seaborn
sns.set_theme(style="whitegrid")

# Test rapide Pandas
df_test = pd.DataFrame({'A':[1,2], 'B':[3,4]})
display(df_test)

print("✅ Starter ready: tout est actif !")

Python version: 3.14.0 (tags/v3.14.0:ebf955d, Oct  7 2025, 10:15:03) [MSC v.1944 64 bit (AMD64)]


Unnamed: 0,A,B
0,1,3
1,2,4


✅ Starter ready: tout est actif !


In [None]:
api_key = "///////////////////////////////"
youtube_overrides = {
    "L'Express" : "UCp2kK3DyhpgFOdILF3BBC9A"
}

In [3]:
df_medias = pd.read_csv("../data/raw/liste_medias.csv", encoding="utf-8-sig", sep=";")

## 1. Récupération des vidéos via l’API YouTube

In [None]:
def search_youtube_raw(query: str, api_key: str, max_results: int = 5):

    url = "https://www.googleapis.com/youtube/v3/search"

    params = {
        "part": "snippet",
        "q": query,
        "type" : "channel",
        "maxResults": max_results,
        "key" : api_key
    }
    
    page = requests.get(url, params=params)
    
    print("HTTP status code :", page.status_code)
    
    if page.status_code != 200:
        print("Erreur API :", page.text)
        return []
    
    data = page.json()
    
    return data.get("items", [])

In [33]:
def get_channel_subscribers(channel_id, api_key):

    url = "https://www.googleapis.com/youtube/v3/channels"

    params = {
        "part" : "statistics",
        "id" : channel_id,
        "key" : api_key
    }
    
    r = requests.get(url, params=params)

    if r.status_code != 200:
        return None
    
    data = r.json()
    items = data.get("items", [])
    if not items:
        return None

    stats = items[0].get("statistics", {})
    subscribers_string = stats.get("subscriberCount")

    if subscribers_string is None or not subscribers_string.isdigit():
        return None

    return int(subscribers_string)

In [None]:
def resolve_channel(label_name: str, api_key: str):
 
    if label_name in youtube_overrides:
        channel_id = youtube_overrides[label_name]
        
    else:
        results = search_youtube_raw(label_name, api_key, max_results=1)

        if not results:
            return None

        channel_id = results[0]["id"]["channelId"]

    subscribers = get_channel_subscribers(channel_id, api_key)

    return {
        "label_name": label_name,
        "channel_id": channel_id,
        "subscribers": subscribers
    }

In [35]:
channels = []

for _, row in df_medias.iterrows():
    media_name = row["media_name"]
    label_name = row["label_name"]

    res = resolve_channel(label_name, api_key)
    time.sleep(2)

    if res is None:
        print("SKIP :", label_name)
        continue

    channels.append({
        "media_name": media_name,
        "label_name": label_name,
        "channel_id": res["channel_id"],
        "subscribers": res["subscribers"]
    })

df_channels = pd.DataFrame(channels)

HTTP status code : 200
HTTP status code : 200
HTTP status code : 200
HTTP status code : 200
HTTP status code : 200
HTTP status code : 200
HTTP status code : 200
HTTP status code : 200
HTTP status code : 200
HTTP status code : 200
HTTP status code : 200
HTTP status code : 200
HTTP status code : 200
HTTP status code : 200
HTTP status code : 200
HTTP status code : 200
HTTP status code : 200
HTTP status code : 200
HTTP status code : 200
HTTP status code : 200


## 2. Création des différentes fonctions pour nos indicateurs

In [54]:
def get_uploads_playlist_id(channel_id, api_key):

    url = "https://www.googleapis.com/youtube/v3/channels"

    params = {
        "part": "contentDetails",
        "id": channel_id,
        "key": api_key
    }

    r = requests.get(url, params=params)

    if r.status_code != 200:
        return None

    items = r.json().get("items", [])

    if not items:
        return None

    return items[0]["contentDetails"]["relatedPlaylists"]["uploads"]

In [55]:
def get_videos_from_playlist(playlist_id, api_key, max_pages=None):
    
    videos = []
    page_token = None
    pages_fetched = 0

    while True:
        url = "https://www.googleapis.com/youtube/v3/playlistItems"
        params = {
            "part": "snippet",
            "playlistId": playlist_id,
            "maxResults": 50,
            "pageToken": page_token,
            "key": api_key
        }

        r = requests.get(url, params=params)
        if r.status_code != 200:
            break
        
        data = r.json()

        for item in data.get("items", []):
            videos.append({
                "video_id": item["snippet"]["resourceId"]["videoId"],
                "published_at": item["snippet"]["publishedAt"]
            })

        page_token = data.get("nextPageToken")
        pages_fetched += 1

        if not page_token:
            break
        if max_pages and pages_fetched >= max_pages:
            break

    return videos

In [56]:
def parse_iso8601_duration_to_seconds(duration):

    h = m = s = 0

    if not duration:
        return 0

    if "H" in duration:
        h = int(re.search(r"(\d+)H", duration).group(1))
        
    if "M" in duration:
        m = int(re.search(r"(\d+)M", duration).group(1))

    if "S" in duration:
        s = int(re.search(r"(\d+)S", duration).group(1))
        
    return h * 3600 + m * 60 + s

In [57]:
def get_durations_from_videos(video_ids, api_key):

    durations = {}

    for i in range(0, len(video_ids), 50):
        chunk = video_ids[i:i+50]
        ids_str = ",".join(chunk)

        url = "https://www.googleapis.com/youtube/v3/videos"
        
        params = {
            "part": "contentDetails",
            "id": ids_str,
            "key": api_key
        }

        r = requests.get(url, params=params)
        if r.status_code !=200:
            continue

        data = r.json()

        for item in data.get("items", []):
            vid = item["id"]
            iso_dur = item.get("contentDetails", {}).get("duration")
            durations[vid] = parse_iso8601_duration_to_seconds(iso_dur)

    return durations

In [None]:
# les YouTube Shorts étant limités à 3 minutes max, on limite l'extraction des vidéos à celles faisant au moins 3:01

def get_last_50_long_videos(playlist_id, api_key, min_duration=181, max_pages=10):

    collected = []
    page_token = None
    pages = 0

    while len(collected) < 50 and pages < max_pages:

        params = {
            "part": "snippet",
            "playlistId": playlist_id,
            "maxResults": 50,
            "key": api_key
        }

        if page_token:
            params["pageToken"] = page_token

        r = requests.get("https://www.googleapis.com/youtube/v3/playlistItems", params=params)
        if r.status_code !=200:
            break

        data = r.json()

        items = data.get("items", [])
        if not items:
            break

        video_ids = [
            it["snippet"]["resourceId"]["videoId"] for it in items
        ]
    
        durations = get_durations_from_videos(video_ids, api_key)

        for it in items:
            vid = it["snippet"]["resourceId"]["videoId"]
            if durations.get(vid, 0) >= min_duration:
                collected.append(vid)
                if len(collected) == 50:
                    break

        page_token = data.get("nextPageToken")
        if not page_token:
            break

        pages +=1

    return collected, len(collected)

In [59]:
def get_viewcount_from_videos(video_ids, api_key):

    views = {}

    for i in range(0, len(video_ids), 50):
        batch = video_ids[i:i+50]

        url = "https://www.googleapis.com/youtube/v3/videos"

        params = {
            "part": "statistics",
            "id": ",".join(batch),
            "key": api_key
        }

        r = requests.get(url, params=params)
        if r.status_code !=200:
            break

        data = r.json()

        for item in data.get("items", []):
            views[item["id"]] = int(item["statistics"].get("viewCount", 0))
            
    return views

In [60]:
def get_cadence_30j_long_only(playlist_id, api_key, min_duration=181, max_pages=50):
    cutoff = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc) - datetime.timedelta(days=30)

    count = 0
    next_token = None
    pages = 0

    while pages < max_pages:
        # 1) Page d'IDs (triée du plus récent au plus ancien)
        params = {
            "part": "snippet,contentDetails",
            "playlistId": playlist_id,
            "maxResults": 50,
            "key": api_key
        }
        if next_token:
            params["pageToken"] = next_token

        r = requests.get("https://www.googleapis.com/youtube/v3/playlistItems", params=params)
        r.raise_for_status()
        data = r.json()
        items = data.get("items", [])
        if not items:
            break

        video_ids = [it["contentDetails"]["videoId"] for it in items]

        # 2) Détails canonique (date + durée) sur ces IDs
        r2 = requests.get(
            "https://www.googleapis.com/youtube/v3/videos",
            params={"part": "snippet,contentDetails", "id": ",".join(video_ids), "key": api_key}
        )
        r2.raise_for_status()
        vdata = r2.json()

        for v in vdata.get("items", []):
            published = datetime.datetime.fromisoformat(v["snippet"]["publishedAt"].replace("Z", "+00:00"))
            if published >= cutoff:
                duration_iso = v.get("contentDetails", {}).get("duration")
                if not duration_iso:
                    continue
                
                dur = parse_iso8601_duration_to_seconds(duration_iso)
                if dur >= min_duration:
                    count += 1

        # arrêt propre : si le PLUS ANCIEN item de la page est déjà < cutoff, tout le reste sera plus ancien
        oldest_in_page = datetime.datetime.fromisoformat(items[-1]["snippet"]["publishedAt"].replace("Z", "+00:00"))
        if oldest_in_page < cutoff:
            break

        next_token = data.get("nextPageToken")
        if not next_token:
            break

        pages += 1

    return count

## 3. Boucle finale avec les indicateurs

In [None]:
results = []

for _, row in df_channels.iterrows():

    channel_id = row["channel_id"]
    subscribers = row["subscribers"]
    label_name = row["label_name"]

    playlist_id = get_uploads_playlist_id(channel_id, api_key)
    if playlist_id is None:
        continue

    video_ids = get_last_50_long_videos(
        playlist_id,
        api_key,
        min_duration=181
    )

    if not video_ids:
        continue

    views_dict = get_viewcount_from_videos(video_ids, api_key)
    views_list = [
        views_dict[v] 
        for v in video_ids 
        if v in views_dict
    ]
    

    mediane_vues = statistics.median(views_list) if views_list else None
    ratio_vues_abonnes = mediane_vues / subscribers if mediane_vues is not None and subscribers > 0 else None
    ecart_type = statistics.stdev(views_list) if len(views_list) >= 2 else None
    coherence_relative = ecart_type / mediane_vues if ecart_type and mediane_vues else None
    cadence_30j = get_cadence_30j_long_only(playlist_id, api_key, min_duration=181)
    

    results.append({
        "Nom_du_media": label_name,
        "mediane_vues": mediane_vues,
        "ratio_vues_abonnes": ratio_vues_abonnes,
        "cadence_30j": cadence_30j,
        "ecart_type_mediane": ecart_type,
        "coherence_relative": coherence_relative,
    })

    time.sleep(1)

df_indicators = pd.DataFrame(results)

  cutoff = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc) - datetime.timedelta(days=30)


## 4. Export du dataset final

In [None]:
df_indicators.to_csv("../data/clean/medias_youtube_indicateurs_pointvirgule.csv", sep=";", encoding="utf-8-sig", index=False)

In [None]:
df_indicators.to_csv("./data/clean/medias_youtube_indicateurs_virgule.csv", sep=",", encoding="utf-8-sig", index=False)