# Extracting all CPC classifications and definitions

https://data.epo.org/linked-data/download has the CPC as N-TRIPLES.

Extract the specific titles, definitions and tree (broader) from the N-TRIPLES.

In [22]:
import re, json, os
from urllib.parse import urlparse

CPC_NS   = "http://data.epo.org/linked-data/def/cpc/"
SKOS_BR  = "http://www.w3.org/2004/02/skos/core#broader"
CPC_TTL  = "http://data.epo.org/linked-data/def/cpc/fullTitle"

SPLIT_RE = re.compile(
    r'^<([^>]+)>\s+<([^>]+)>\s+(?:(<([^>]+)>)|"((?:[^"\\]|\\.)*)"(?:\^\^<[^>]+>|@[a-zA-Z\-]+)?)\s+\.\s*$'
)
REF_RE = re.compile(r"\[CPC:\s*([^\]]+)\]", re.IGNORECASE)

def last_seg(uri):
    return os.path.basename(urlparse(uri).path)

def extract_refs(title):
    if not title: return []
    refs = []
    for block in REF_RE.findall(title):
        refs += [x.strip() for x in re.split(r"[;,]", block) if x.strip()]
    return refs

def build_cpc_jsonl(nt_path, out_path):
    recs = {}
    with open(nt_path, "r", encoding="utf-8") as f:
        for line in f:
            m = SPLIT_RE.match(line)
            if not m:
                continue
            s, p, o_uri, o_uri_inner, o_lit = m.groups()
            if not s.startswith(CPC_NS):
                continue

            key = last_seg(s)
            r = recs.get(key)
            if r is None:
                r = recs[key] = {"key": key, "title": None, "references": [], "broader": set()}

            if p == CPC_TTL and o_lit is not None:
                title = o_lit.encode('utf-8').decode('unicode_escape')
                r["references"] = extract_refs(title)
                
                # Remove [CPC: ...]
                title = re.sub(r"\[CPC:[^\]]+\]", "", title)

                # Remove any parentheses containing 'take precedence' or 'takes precedence'
                title = re.sub(r"\([^)]*take(?:s)? precedence[^)]*\)", "", title, flags=re.IGNORECASE)

                # Remove () or {} with no letters inside
                title = re.sub(r"\(\s*[^a-zA-Z]*\)", "", title)
                title = re.sub(r"\{\s*[^a-zA-Z]*\}", "", title)

                # Collapse multiple spaces and commas
                title = re.sub(r"\s{2,}", " ", title)         # multiple spaces → single
                title = re.sub(r",\s*,+", ",", title)         # ", , ," → ","
                title = re.sub(r"\s+,", ",", title)           # " ,word" → ",word"
                title = re.sub(r",\s+", ", ", title)          # ",word" → ", word"
                r["title"] = title

            elif p == SKOS_BR and o_uri_inner and o_uri_inner.startswith(CPC_NS):
                r["broader"].add(last_seg(o_uri_inner))

    with open(out_path, "w", encoding="utf-8") as out:
        for r in recs.values():
            r["broader"] = sorted(r["broader"])
            out.write(json.dumps(r, ensure_ascii=False) + "\n")


# --- use it ---
nt_file = "../data/cpc.nt"
out_file = "../data/cpc.jsonl"
build_cpc_jsonl(nt_file, out_file)


## Quick analysis of the results

In [23]:
import json

jsonl_file = "../data/cpc.jsonl"

keys_seen = set()
duplicates = []
missing_title = []

with open(jsonl_file, "r", encoding="utf-8") as f:
    for i, line in enumerate(f, 1):
        rec = json.loads(line)
        key = rec.get("key")
        title = rec.get("title")

        if key in keys_seen:
            duplicates.append((i, key))
        else:
            keys_seen.add(key)

        if not title or not title.strip():
            missing_title.append((i, key))

print(f"Total records: {len(keys_seen)}")
print(f"Duplicate keys: {len(duplicates)}")
if duplicates:
    print(duplicates[:10], "...")

print(f"Missing titles: {len(missing_title)}")
if missing_title:
    print(missing_title, "...")


Total records: 261962
Duplicate keys: 0
Missing titles: 11
[(20383, 'H0'), (38239, 'G9'), (44731, 'G1'), (63581, 'F9'), (88586, 'E9'), (113544, 'D9'), (116385, 'Y0'), (138441, 'C9'), (163627, 'B9'), (189069, 'A9'), (258333, 'scheme')] ...


## Adding a fullTitle key by walking up

In [36]:
import json
from functools import lru_cache

in_file  = "../data/cpc.jsonl"
out_file = "../data/cpc_full.jsonl"

# Load nodes
nodes = {}
with open(in_file, "r", encoding="utf-8") as f:
    for line in f:
        rec = json.loads(line)
        nodes[rec["key"]] = {
            "title":   rec.get("title") or rec["key"],
            "broader": rec.get("broader") or [],
            "refs":    rec.get("references", []),
        }

parents = {k: v["broader"] for k, v in nodes.items()}
def title_of(k): return nodes.get(k, {}).get("title") or k

@lru_cache(maxsize=None)
def paths_to_roots(key):
    def dfs(k, seen):
        if k in seen: return [[k]]  # break cycles
        seen = seen | {k}
        ps = parents.get(k, [])
        if not ps: return [[k]]
        out = []
        for p in ps:
            if p not in nodes:
                out.append([p, k])
            else:
                for up in dfs(p, seen):
                    out.append(up + [k])
        return out
    return tuple(tuple(p) for p in dfs(key, frozenset()))

def pick_single_path(path_keys):
    # Expect exactly one; if multiple ever appear, pick the longest and note it.
    if not path_keys: return ()
    if len(path_keys) == 1: return path_keys[0]
    # fallback: choose the longest path
    return max(path_keys, key=len)

# Write simplified JSONL
with open(out_file, "w", encoding="utf-8") as out:
    for key in nodes.keys():
        path_keys  = paths_to_roots(key)
        path       = pick_single_path(path_keys)
        full_title = " → ".join(title_of(k) for k in path)
        tree_path  = [{"key": k, "title": title_of(k)} for k in path]

        rec = {
            "key": key,
            "title": title_of(key),
            "references": nodes[key]["refs"],
            "broader": parents.get(key, []),
            "fullTitle": full_title,   # single string
            "treePath": tree_path      # single path (root → leaf)
        }
        out.write(json.dumps(rec, ensure_ascii=False) + "\n")

print(f"Wrote → {out_file} ({len(nodes)} records)")


Wrote → ../data/cpc_full.jsonl (261962 records)


### View random record

In [37]:
import json, random

def random_record(jsonl_path, n=1):
    with open(jsonl_path, "r", encoding="utf-8") as f:
        lines = f.readlines()
    for rec in random.sample(lines, n):
        obj = json.loads(rec)
        print(json.dumps(obj, ensure_ascii=False, indent=2))

# Example:
random_record("../data/cpc_full.jsonl", n=1)


{
  "key": "C25D7-065",
  "title": "{Diaphragms}",
  "references": [],
  "broader": [
    "C25D7-0614"
  ],
  "fullTitle": "CHEMISTRY; METALLURGY → METALLURGY → ELECTROLYTIC OR ELECTROPHORETIC PROCESSES; APPARATUS THEREFOR → PROCESSES FOR THE ELECTROLYTIC OR ELECTROPHORETIC PRODUCTION OF COATINGS; ELECTROFORMING; APPARATUS THEREFOR → Electroplating characterised by the article coated → Wires; Strips; Foils → {Strips or foils} → {Diaphragms}",
  "treePath": [
    {
      "key": "C",
      "title": "CHEMISTRY; METALLURGY"
    },
    {
      "key": "C2",
      "title": "METALLURGY"
    },
    {
      "key": "C25",
      "title": "ELECTROLYTIC OR ELECTROPHORETIC PROCESSES; APPARATUS THEREFOR"
    },
    {
      "key": "C25D",
      "title": "PROCESSES FOR THE ELECTROLYTIC OR ELECTROPHORETIC PRODUCTION OF COATINGS; ELECTROFORMING; APPARATUS THEREFOR"
    },
    {
      "key": "C25D7-00",
      "title": "Electroplating characterised by the article coated"
    },
    {
      "key": "C25D7-06"

# Lookup

In [42]:
import json

def get_record_by_key(jsonl_path, key):
    with open(jsonl_path, "r", encoding="utf-8") as f:
        for line in f:
            rec = json.loads(line)
            if rec.get("key") == key:
                return rec
    return None

# Example usage
record = get_record_by_key("../data/cpc_full.jsonl", "Y02E")
if record:
    print(json.dumps(record, ensure_ascii=False, indent=2))
else:
    print("Key not found.")


{
  "key": "Y02E",
  "title": "REDUCTION OF GREENHOUSE GAS [GHG] EMISSIONS, RELATED TO ENERGY GENERATION, TRANSMISSION OR DISTRIBUTION",
  "references": [],
  "broader": [
    "Y02"
  ],
  "fullTitle": "GENERAL TAGGING OF NEW TECHNOLOGICAL DEVELOPMENTS; GENERAL TAGGING OF CROSS-SECTIONAL TECHNOLOGIES SPANNING OVER SEVERAL SECTIONS OF THE IPC; TECHNICAL SUBJECTS COVERED BY FORMER USPC CROSS-REFERENCE ART COLLECTIONS [XRACs] AND DIGESTS → Y0 → TECHNOLOGIES OR APPLICATIONS FOR MITIGATION OR ADAPTATION AGAINST CLIMATE CHANGE → REDUCTION OF GREENHOUSE GAS [GHG] EMISSIONS, RELATED TO ENERGY GENERATION, TRANSMISSION OR DISTRIBUTION",
  "treePath": [
    {
      "key": "Y",
      "title": "GENERAL TAGGING OF NEW TECHNOLOGICAL DEVELOPMENTS; GENERAL TAGGING OF CROSS-SECTIONAL TECHNOLOGIES SPANNING OVER SEVERAL SECTIONS OF THE IPC; TECHNICAL SUBJECTS COVERED BY FORMER USPC CROSS-REFERENCE ART COLLECTIONS [XRACs] AND DIGESTS"
    },
    {
      "key": "Y0",
      "title": "Y0"
    },
    {
      "

## Upload to HF

In [47]:
from datasets import load_dataset

path = "../data/cpc_full.jsonl"


In [46]:
ds = load_dataset("json", data_files=path, split="train")

Generating train split: 0 examples [00:00, ? examples/s]

In [48]:
from huggingface_hub import create_repo
repo_id = "mhurhangee/cpc-classifications"   # pick a name
create_repo(repo_id, repo_type="dataset", private=False, exist_ok=True)

RepoUrl('https://huggingface.co/datasets/mhurhangee/cpc-classifications', endpoint='https://huggingface.co', repo_type='dataset', repo_id='mhurhangee/cpc-classifications')

In [49]:
ds.push_to_hub(repo_id)

Uploading the dataset shards:   0%|          | 0/1 [00:00<?, ? shards/s]

Creating parquet from Arrow format:   0%|          | 0/262 [00:00<?, ?ba/s]

CommitInfo(commit_url='https://huggingface.co/datasets/mhurhangee/cpc-classifications/commit/c1df8e3bac8a5b8c6363763c29d2ab3bd3d95a1f', commit_message='Upload dataset', commit_description='', oid='c1df8e3bac8a5b8c6363763c29d2ab3bd3d95a1f', pr_url=None, repo_url=RepoUrl('https://huggingface.co/datasets/mhurhangee/cpc-classifications', endpoint='https://huggingface.co', repo_type='dataset', repo_id='mhurhangee/cpc-classifications'), pr_revision=None, pr_num=None)

In [None]:
from huggingface_hub import HfApi
api = HfApi()
readme = """\
# CPC Classifications (Full Titles)

Extracted from the [EPO's CPC RDF](https://data.epo.org/linked-data/download) data from 07 Aug 2025.

Titles were cleaned to remove references and the references extracted.

Fields:
- `key`: CPC symbol (e.g., B29C45-00)
- `title`: cleaned CPC title
- `references`: list extracted from `[CPC: ...]`
- `broader`: parent keys
- `fullTitle`: concatenated hierarchy title (root → leaf)
- `treePath`: list of {key, title} from root → leaf
"""
api.upload_file(
    path_or_fileobj=readme.encode(),
    path_in_repo="README.md",
    repo_id=repo_id,
    repo_type="dataset",
)


### Misc

#### Check number of titles and tree paths

Originally, I wasn't sure if all trees only had one broader node, so one fullTitle and one treePath. This was to check when I saved the fullTitles and treePaths as arrays.

In [35]:
import json

def check_single_path(jsonl_path):
    multi_full = []
    multi_tree = []
    total = 0

    with open(jsonl_path, "r", encoding="utf-8") as f:
        for i, line in enumerate(f, 1):
            rec = json.loads(line)
            total += 1
            if len(rec.get("fullTitles", [])) > 1:
                multi_full.append(rec["key"])
            if len(rec.get("treePaths", [])) > 1:
                multi_tree.append(rec["key"])

    print(f"Checked {total} records")
    print(f"fullTitles with >1 entry: {len(multi_full)}")
    print(f"treePaths with >1 entry: {len(multi_tree)}")

    if multi_full:
        print("Example keys with multiple fullTitles:", multi_full[:10])
    if multi_tree:
        print("Example keys with multiple treePaths:", multi_tree[:10])

# Example usage
check_single_path("../data/cpc_full.jsonl")


Checked 261962 records
fullTitles with >1 entry: 0
treePaths with >1 entry: 0
