In [151]:
import os
import json
import lxml
from lxml import etree as ET


base_dir = os.path.dirname(os.getcwd())

# Using a subsample of the data for testing purposes
law_ids = [
    "A-0.6",  # Accessible Canada Act
    "SOR-2021-241",  # Accessible Canada Regulations
    "A-2",  # Aeronautics Act
    "B-9.01",  # Broadcasting Act
    "SOR-97-555",  # Broadcasting Distribution Regulations
    "SOR-96-433",  # Canadian Aviation Regulations
    "SOR-2011-318",  # Canadian Aviation Security Regulations, 2012
    "C-15.1",  # Canadian Energy Regulator Act
    "C-15.31",  # Canadian Environmental Protection Act, 1999
    "C-24.5",  # Cannabis Act
    "SOR-2018-144",  # Cannabis Regulations
    "C-46",  # Criminal Code
    "SOR-2021-25",  # Cross-border Movement of Hazardous Waste and Hazardous Recyclable Material Regulations
    "F-14",  # Fisheries Act
    "SOR-93-53",  # Fishery (General) Regulations
    "C.R.C.,_c._870",  # Food and Drug Regulations
    "F-27",  # Food and Drugs Act
    "I-2.5",  # Immigration and Refugee Protection Act
    "SOR-2002-227",  # Immigration and Refugee Protection Regulations
    "I-21",  # Interpretation Act
    "SOR-2016-151",  # Multi-Sector Air Pollutants Regulations
    "SOR-2010-189",  # Renewable Fuels Regulations
    "S-22",  # Statutory Instruments Act
    "C.R.C.,_c._1509",  # Statutory Instruments Regulations
]


def get_file_paths(eng_law_ids, base_dir):
    """
    Search for the English and French file paths for each law ID
    """
    laws_dir = os.path.join(base_dir, "laws-lois-xml")
    file_paths = []
    # French regulations have different names, unfortunately
    # Replace "SOR-" with "DORS-", "SI-" with "TR-" and "_c." with "_ch."
    filenames = set(
        [f"{law_id}.xml" for law_id in eng_law_ids]
        + [f"{law_id.replace('_c.', '_ch.')}.xml" for law_id in eng_law_ids]
        + [f"{law_id.replace('SOR-', 'DORS-')}.xml" for law_id in eng_law_ids]
        + [f"{law_id.replace('SI-', 'TR-')}.xml" for law_id in eng_law_ids]
    )
    for lang in ["eng", "fra"]:
        categories = (
            ["acts", "regulations"] if lang == "eng" else ["lois", "reglements"]
        )
        for category in categories:
            for file in os.listdir(os.path.join(laws_dir, lang, category)):
                if file in filenames:
                    file_paths.append(
                        os.path.join(base_dir, "laws-lois-xml", lang, category, file)
                    )

    print(len(eng_law_ids), "law IDs provided")
    print(len(file_paths), "files found (should be 2x the number of law IDs)")
    return file_paths

In [152]:
file_paths = get_file_paths(law_ids, base_dir)

24 law IDs provided
48 files found (should be 2x the number of law IDs)


In [237]:
def _get_text(element):
    return element.text if element is not None else None


def _get_link(element):
    return (
        element.attrib["link"]
        if element is not None and "link" in element.attrib.keys()
        else None
    )


def _get_joined_text(
    element,
    exclude_tags=["MarginalNote", "Label", "OriginatingRef"],
    break_tags=[
        "Provision",
        "Subsection",
        "Paragraph",
        "Definition",
        "row",
        "TableGroup",
        "HistoricalNote",
        "MarginalNote",
    ],
    pipe_tags=["entry"],
    em_tags=["DefinedTermEn", "DefinedTermFr", "XRefExternal", "XRefInternal"],
    strong_tags=["MarginalNote", "TitleText"],
    underline_tags=[],
):
    def stylized_text(text, tag):
        if tag in em_tags:
            return f"*{text}*"
        if tag in strong_tags:
            return f"**{text}**"
        if tag in underline_tags:
            return f"__{text}__"
        # if tag in strike_tags:
        #     return f"~~{text}~~"
        return text

    all_text = []
    exclude_tags = exclude_tags.copy()
    for e in element.iter():
        if e.tag in exclude_tags:
            exclude_tags.remove(e.tag)
            continue
        if e.text and e.text.strip():
            all_text.append(stylized_text(e.text.strip(), e.tag))
        if e.tail and e.tail.strip():
            all_text.append(e.tail.strip())
        if e.tag in break_tags:
            all_text.append("\n")
        if e.tag in pipe_tags:
            all_text.append("|")
    return (
        " ".join(all_text)
        .replace(" \n ", "\n")
        .strip()
        .replace("\u2002", " ")
        .replace("( ", "(")
        .replace(" )", ")")
        .replace(" .", ".")
        .replace("* ;", "*;")
        .replace("|\n", "\n")
        .strip()
    )


def stringify_children(node):
    return " ".join(node.itertext())


def get_dict_from_xml(xml_filename):
    # Extract a JSON serializable dictionary from a act/regulation XML file
    dom = ET.parse(xml_filename)
    root = dom.getroot()
    # French regulations have slightly different filenames, but we want a unique ID
    # to link the English and French versions
    filename = os.path.basename(xml_filename).replace(".xml", "")
    # Replace "DORS-" with "SOR-", "TR-" with "SI-" and "_ch." with "_c."
    eng_id = (
        filename.replace("DORS-", "SOR-").replace("TR-", "SI-").replace("_ch.", "_c.")
    )
    d = {
        "id": eng_id,
        "lang": os.path.basename(os.path.dirname(os.path.dirname(xml_filename))),
        "filename": filename,
        "type": "act" if root.tag == "Statute" else "regulation",
        "short_title": _get_text(root.find(".//ShortTitle")),
        "long_title": _get_text(root.find(".//LongTitle")),
        "bill_number": _get_text(root.find(".//BillNumber")),
        "instrument_number": _get_text(root.find(".//InstrumentNumber")),
        "consolidated_number": _get_text(root.find(".//ConsolidatedNumber")),
        "last_amended_date": root.attrib.get(
            "{http://justice.gc.ca/lims}lastAmendedDate", None
        ),
        "current_date": root.attrib.get(
            "{http://justice.gc.ca/lims}current-date", None
        ),
        "in_force_start_date": root.attrib.get(
            "{http://justice.gc.ca/lims}inforce-start-date", None
        ),
        "enabling_authority": {
            "link": _get_link(root.find(".//EnablingAuthority/XRefExternal")),
            "text": _get_text(root.find(".//EnablingAuthority/XRefExternal")),
        },
        "preamble": get_preamble(root),
        "sections": [
            section
            for section in [
                get_section(section) for section in root.findall(".//Section")
            ]
            if section is not None
        ],
        "schedules": [
            schedule
            for schedule in [
                get_schedule(schedule) for schedule in root.findall(".//Schedule")
            ]
            if schedule is not None
        ],
    }
    # Aggregate all internal and external references and count instances of each
    for ref_name in ["internal_refs", "external_refs"]:
        ref_list = [
            ref
            for section in d["sections"]
            for ref in section[ref_name]
            if ref["link"] is not None
        ]
        ref_list_set = set([ref["link"] for ref in ref_list])
        d[ref_name] = [
            {
                "link": link,
                "count": len([ref for ref in ref_list if ref["link"] == link]),
            }
            for link in ref_list_set
        ]
    # Some pretty-print versions of the fields
    d["title_str"] = d["short_title"] if d["short_title"] else d["long_title"]
    for section in d["sections"]:
        section["heading_str"] = get_heading_str(section)
        section["section_str"] = f"Section {section['id']}"
        section["all_str"] = "\n".join(
            [
                d["title_str"],
                " " + section["section_str"],
                section["heading_str"],
                section["text"],
            ]
        )
        for subsection in section["subsections"]:
            subsection["heading_str"] = get_heading_str(subsection)
            subsection[
                "section_str"
            ] = f"Sub{section['section_str'].lower()}{subsection['id']}"
            subsection["all_str"] = "\n".join(
                [
                    d["title_str"],
                    " " + subsection["section_str"],
                    subsection["heading_str"],
                    subsection["text"],
                ]
            )
    for schedule in d["schedules"]:
        schedule["all_str"] = "\n".join(
            [
                d["title_str"],
                " " + schedule['id'],
                "",
                schedule["text"],
            ]
        )
    # Finally, the preamble also needs a "all_str" field
    if d["preamble"]:
        d["preamble"]["all_str"] = "\n".join(
            [
                d["title_str"],
                " Preamble",
                "",
                d["preamble"]["text"],
            ]
        )
    return d


def get_heading_str(section):
    heading_str = ""
    for i, heading in enumerate(section["headings"]):
        heading_str += f"{' ' * (i+2)}{heading}\n"
    if section["marginal_note"]:
        # heading_str += f"{' ' * (len(section['headings'])+2)}{section['marginal_note']}\n"
        heading_str += f"\n**{section['marginal_note']}**"
    return heading_str


def get_section(section):
    # If the section has an ancestor <Schedule> tag, skip it
    if section.xpath(".//Schedule"):
        return None
    return {
        "id": _get_text(section.find("Label")),
        "headings": get_headings(section),
        "marginal_note": _get_text(section.find("MarginalNote")),
        "text": _get_joined_text(section),
        "in_force_start_date": section.attrib.get(
            "{http://justice.gc.ca/lims}inforce-start-date", None
        ),
        "last_amended_date": section.attrib.get(
            "{http://justice.gc.ca/lims}lastAmendedDate", None
        ),
        "subsections": [
            get_section(subsection) for subsection in section.findall(".//Subsection")
        ],
        "external_refs": get_external_xrefs(section),
        "internal_refs": get_internal_xrefs(section),
        "lims_id": section.attrib.get("{http://justice.gc.ca/lims}id", None),
    }


def get_external_xrefs(section):
    # External references have an explicit link attribute
    return [
        {
            "link": xref.attrib.get("link", None),
            "reference_type": xref.attrib.get("reference-type", None),
            "text": xref.text,
        }
        for xref in section.findall(".//XRefExternal")
    ]


def get_internal_xrefs(section):
    # Internal references are always a section number which is the text
    return [
        {
            "link": xref.text,
        }
        for xref in section.findall(".//XRefInternal")
    ]


def get_preamble(root):
    # Returns an array with a single element, the preamble, or no elements
    # so that it can be easily prepended to the sections array
    preamble = root.find(".//Preamble")
    if preamble is None:
        return []
    preamble.findall(".//Provision")
    return {
        "id": "preamble",
        "headings": get_headings(preamble),
        "marginal_note": None,
        "text": _get_joined_text(preamble),
        "in_force_start_date": preamble.attrib.get(
            "{http://justice.gc.ca/lims}inforce-start-date", None
        ),
        "last_amended_date": preamble.attrib.get(
            "{http://justice.gc.ca/lims}lastAmendedDate", None
        ),
        "subsections": [
            {
                "id": i,
                "text": _get_joined_text(provision),
            }
            for i, provision in enumerate(preamble.findall(".//Provision"))
        ],
        "internal_refs": get_internal_xrefs(preamble),
        "external_refs": get_external_xrefs(preamble),
        "lims_id": preamble.attrib.get("{http://justice.gc.ca/lims}id", None),
    }


def get_schedule(schedule):
    # if schedule "id" attribute is RelatedProvs or NifProvs, skip it
    if schedule.attrib.get("id", None) in ["RelatedProvs", "NifProvs"]:
        return None
    return {
        "id": _get_text(schedule.find(".//Label")),
        # "headings": get_headings(schedule),
        "marginal_note": _get_text(schedule.find(".//MarginalNote")),
        "text": _get_joined_text(schedule),
        "in_force_start_date": schedule.attrib.get(
            "{http://justice.gc.ca/lims}inforce-start-date", None
        ),
        "last_amended_date": schedule.attrib.get(
            "{http://justice.gc.ca/lims}lastAmendedDate", None
        ),
        "subsections": [],
        "internal_refs": get_internal_xrefs(schedule),
        "external_refs": get_external_xrefs(schedule),
        "originating_ref": _get_text(schedule.find(".//OriginatingRef")),
        "lims_id": schedule.attrib.get("{http://justice.gc.ca/lims}id", None),
    }


def get_headings(element):
    """
    Headings are found in the inner text of <Heading> tags.
    Returns an array of headings, i.e. ["HeadingLevel1", "HeadingLevel2", "HeadingLevel3"]
    In each case (level 1, 2, 3), the returned heading is always the one CLOSEST (i.e. above) the element
    Note that headings are NOT correctly nested in the hierarchy
    They may be siblings to the element etc. We cannot rely on xpath
    """
    # Brute force solution: Traverse document from top to bottom, keeping track of headings until we hit the element
    headings = [None, None, None, None, None, None]  # 6 levels of headings
    root = element.getroottree().getroot()
    for e in root.iter():
        if e.tag == "Heading":
            level = int(e.attrib.get("level", 1))
            headings[level - 1] = _get_joined_text(e)
            # Remove formatting (e.g. bold) from headings
            headings[level - 1] = headings[level - 1].replace("**", "").replace("__", "")
            for i in range(level, 6):
                headings[i] = None
        if e == element:
            break
    return [h for h in headings if h is not None]

In [241]:
# Create output directory if it doesn't exist; otherwise, clear it
output_dir = os.path.join(base_dir, "laws-lois-json")
if not os.path.exists(output_dir):
    os.makedirs(output_dir)
else:
    for file in os.listdir(output_dir):
        os.remove(os.path.join(output_dir, file))

for file_path in file_paths:
    print(file_path)
    d = get_dict_from_xml(file_path)
    with open(
        os.path.join(base_dir, "laws-lois-json", f"{d['id']}_{d['lang']}.json"),
        "w",
        encoding="utf8",
    ) as f:
        json.dump(d, f, indent=2, ensure_ascii=False)

c:\Users\jkuehn\git_repos\laws-qna\laws-lois-xml\eng\acts\A-0.6.xml


TypeError: list indices must be integers or slices, not str

In [242]:
import textwrap as tr

for section in d["sections"]:
    print(section["all_str"])
    print("\n")
    for subsection in section["subsections"]:
        # Indent subsections
        print(tr.indent(subsection["all_str"], "    "))
        print("\n")
    print("\n")

Règlement sur les textes réglementaires
 Section 1
  Titre abrégé

Le présent règlement peut être cité sous le titre : *Règlement sur les textes réglementaires*.




Règlement sur les textes réglementaires
 Section 2
  Interprétation

Dans le présent règlement,
*Loi* désigne la *Loi sur les textes réglementaires*; (*Act*)
*soustrait à la publication* signifie soustrait à l’application du paragraphe 11(1) de la Loi; (*exempt from publication*)
*soustrait à l’enregistrement* signifie soustrait à l’application du paragraphe 5(1) de la Loi; (*exempt from registration*)
*soustrait à l’examen* signifie soustrait à l’application de l’article 3(1) de la Loi. (*exempt from examination*)




Règlement sur les textes réglementaires
 Section 3
  Règlements soustraits à l’examen

Sont soustraits à l’examen les projets de règlements et les catégories de projets de règlements ci-après :
a) tout projet de règlement qui, s’il était établi, serait un règlement soustrait à l’enregistrement; et
b) tout pr

In [240]:
SIR = "../laws-lois-json/C.R.C.,_c._1509_eng.json"
with open(SIR, "r", encoding="utf8") as f:
    sir = json.load(f)

all_text = ""
if sir["preamble"]:
    all_text += sir["preamble"]["all_str"]
    all_text += "\n\n---\n\n"
for section in sir["sections"]:
    all_text += section["all_str"]
    all_text += "\n\n---\n\n"
for schedule in sir["schedules"]:
    all_text += schedule["all_str"]
    all_text += "\n\n---\n\n"

with open("SIR.txt", "w", encoding="utf8") as f:
    f.write(all_text)
