In [3]:
import requests, json, time, random, pandas as pd
from requests.adapters import HTTPAdapter
try:
    from urllib3.util.retry import Retry
except Exception:
    from requests.packages.urllib3.util.retry import Retry

GRAPHQL_URL = "https://leetcode.com/graphql/"
HOMEPAGE = "https://leetcode.com/problemset/"

In [2]:
def make_leetcode_session():
    s = requests.Session()
    retry = Retry(
        total=5, connect=5, read=5,
        backoff_factor=1.2,
        status_forcelist=[429, 500, 502, 503, 504],
        allowed_methods=frozenset(["GET", "POST"]),
        raise_on_status=False,
    )
    adapter = HTTPAdapter(max_retries=retry, pool_connections=20, pool_maxsize=20)
    s.mount("https://", adapter)
    s.mount("http://", adapter)

    r = s.get(
        HOMEPAGE,
        headers={
            "User-Agent": "Mozilla/5.0",
            "Referer": "https://leetcode.com/",
            "Origin": "https://leetcode.com",
            "Accept": "application/json, text/plain, */*",
        },
        timeout=(10, 30),
    )
    csrftoken = s.cookies.get("csrftoken", "")
    s.headers.update({
        "User-Agent": "Mozilla/5.0",
        "Referer": "https://leetcode.com/",
        "Origin": "https://leetcode.com",
        "Accept": "application/json, text/plain, */*",
        "Content-Type": "application/json",
        "x-csrftoken": csrftoken,
    })
    return s

In [3]:
def graphql_query(session, query, variables=None, max_retries=4):
    payload = {"query": query}
    if variables:
        payload["variables"] = variables
    last_err = None
    for attempt in range(1, max_retries + 1):
        try:
            r = session.post(GRAPHQL_URL, json=payload, timeout=(10, 60))
            js = r.json()
        except Exception as e:
            last_err = e
            time.sleep((2 ** (attempt - 1)) + random.uniform(0, 0.6))
            continue
        if "errors" in js:
            last_err = RuntimeError(js["errors"][0].get("message", "GraphQL error"))
            time.sleep((2 ** (attempt - 1)) + random.uniform(0, 0.6))
            continue
        if "data" in js:
            return js["data"]
        last_err = RuntimeError(f"Unexpected response (status {r.status_code}): {r.text[:500]}")
        time.sleep((2 ** (attempt - 1)) + random.uniform(0, 0.6))
    raise last_err or RuntimeError("GraphQL request failed")

In [4]:
PROBLEMSET_QUERY = """
query problemsetQuestionList($categorySlug: String, $limit: Int, $skip: Int, $filters: QuestionListFilterInput) {
  problemsetQuestionList: questionList(categorySlug: $categorySlug, limit: $limit, skip: $skip, filters: $filters) {
    total: totalNum
    questions: data {
      questionId
      questionFrontendId
      title
      titleSlug
      difficulty
      acRate
      isPaidOnly
      topicTags { name id slug }
    }
  }
}
"""

# Query for detailed info (per problem)
QUESTION_DETAIL_QUERY = """
query questionData($titleSlug: String!) {
  question(titleSlug: $titleSlug) {
    questionId
    title
    difficulty
    isPaidOnly
    acRate
    stats
    likes
    dislikes
    topicTags { name slug }
    similarQuestions
    discussionCount
  }
}
"""

In [5]:
def fetch_all_problems_df(page_size=50, checkpoint_path=None):
    session = make_leetcode_session()
    all_rows = []
    skip = 0
    total = None

    while True:
        variables = {"categorySlug": "", "limit": page_size, "skip": skip, "filters": {}}
        data = graphql_query(session, PROBLEMSET_QUERY, variables)
        root = data["problemsetQuestionList"]
        if total is None:
            total = root["total"] or 0
        batch = root["questions"] or []
        if not batch:
            break

        # Enrich each problem with full info
        for q in batch:
            try:
                detail_data = graphql_query(session, QUESTION_DETAIL_QUERY, {"titleSlug": q["titleSlug"]})
                qd = detail_data["question"]

                stats = json.loads(qd.get("stats", "{}"))
                similar = json.loads(qd.get("similarQuestions", "[]"))

                q["id"] = qd.get("questionId")
                q["is_premium"] = qd.get("isPaidOnly")
                q["title"] = qd.get("title")
                q["topic_tags"] = [t["name"] for t in qd.get("topicTags", [])]
                q["difficulty"] = qd.get("difficulty")
                q["similar_questions"] = [s.get("title") for s in similar] if similar else []
                q["no_similar_questions"] = len(q["similar_questions"])
                q["acceptance"] = qd.get("acRate")
                q["accepted"] = stats.get("totalAcceptedRaw")
                q["submission"] = stats.get("totalSubmissionRaw")
                q["discussion_count"] = qd.get("discussionCount")
                q["likes"] = qd.get("likes")
                q["dislikes"] = qd.get("dislikes")
                q["problem_URL"] = f"https://leetcode.com/problems/{q['titleSlug']}/"

                all_rows.append(q)
            except Exception as e:
                print(f"Error fetching {q.get('titleSlug')}: {e}")
                continue

        if checkpoint_path:
            pd.DataFrame(all_rows).to_csv(checkpoint_path, index=False)

        skip += page_size
        if len(all_rows) >= total:
            break

        time.sleep(random.uniform(0.5, 1.2))  # polite delay

    df = pd.DataFrame(all_rows)
    df = df[[
        "id", "is_premium", "title", "topic_tags", "difficulty",
        "similar_questions", "no_similar_questions", "acceptance",
        "accepted", "submission", "discussion_count",
        "likes", "dislikes", "problem_URL"
    ]]
    return df


In [1]:
def scrape(file_name="leetcode_full.csv"):
    df = fetch_all_problems_df(page_size=30, checkpoint_path=file_name)
    df.to_csv(file_name, index=False)
    print(f"Scraping complete! {len(df)} problems saved to {file_name}")

In [7]:
scrape("x1_full.csv")

âœ… Scraping complete! 3706 problems saved to x1_full.csv
