Getting data from source

In [1]:
# Download & unzip the Zenodo archive
!wget -q "https://zenodo.org/record/7152317/files/dataset.zip?download=1" -O dataset.zip
!unzip -q dataset.zip -d data/

In [2]:
import os, shutil, random
import json
import re
import unicodedata
from collections import Counter
from tqdm import tqdm

random.seed(42)

Cleaning the data to Normalize Unicode, collapse whitespace, lowercase, strip boilerplate

In [3]:
def clean_text(text: str) -> str:
    # Normalize Unicode, collapse whitespace, lowercase, strip boilerplate
    text = unicodedata.normalize("NFKC", text).strip()
    text = re.sub(r"\s+", " ", text)
    text = text.lower()
    text = re.sub(r"page \d+ of \d+", "", text)
    return text

Merge the three datasources

In [9]:
import os

DATA_ROOT = "data/dataset"       # location where we unzipped Zenodo
OUT       = "merged_dataset"     # final output folder

# ── Ensure the merged_dataset folder exists ────────────────────────────────────
os.makedirs(OUT, exist_ok=True)

# ── Prepare for gathering cases ────────────────────────────────────────────────
cases   = []
TMP_DIR = "/tmp"  # where we’ll write merged segment summaries

def gather(folder, doc_sub, sum_sub, full_sub=None):
    """
    Populates `cases` with (doc_path, summary_path, filename).
    For IN-Ext, tries full_sub first, else concatenates segment-wise.
    """
    base   = os.path.join(DATA_ROOT, folder)
    splits = ("train-data","test-data") if folder != "IN-Ext" else ("",)

    for sd in splits:
        src      = os.path.join(base, sd) if sd else base
        docs_dir = os.path.join(src, doc_sub)
        if not os.path.isdir(docs_dir):
            continue

        for fn in os.listdir(docs_dir):
            if not fn.endswith(".txt"):
                continue

            doc_path     = os.path.join(docs_dir, fn)
            summary_path = None

            if folder == "IN-Ext":
                # 1) Try full/A1
                candidate = os.path.join(src, sum_sub, full_sub, fn)
                if os.path.isfile(candidate):
                    summary_path = candidate
                else:
                    # 2) Fallback: merge segment-wise A1
                    segments = ["analysis","argument","facts","judgement","statute"]
                    pieces = []
                    for seg in segments:
                        seg_file = os.path.join(src, "summary", "segment-wise", full_sub, seg, fn)
                        if os.path.isfile(seg_file):
                            with open(seg_file, "r", encoding="utf-8", errors="ignore") as f:
                                pieces.append(f.read())
                    if pieces:
                        tmp_file = os.path.join(TMP_DIR, fn)
                        with open(tmp_file, "w", encoding="utf-8") as out:
                            out.write("\n\n".join(pieces))
                        summary_path = tmp_file

            else:
                # IN-Abs / UK-Abs: either full_sub or direct summary
                if full_sub:
                    fullp   = os.path.join(src, sum_sub, full_sub, fn)
                    direct  = os.path.join(src, sum_sub, fn)
                    summary_path = fullp if os.path.isfile(fullp) else (direct if os.path.isfile(direct) else None)
                else:
                    direct  = os.path.join(src, sum_sub, fn)
                    summary_path = direct if os.path.isfile(direct) else None

            if summary_path:
                cases.append((doc_path, summary_path, fn))

# ── Gather from each source ─────────────────────────────────────────────────────
gather("IN-Abs", "judgement", "summary")                  # IN-Abs
gather("UK-Abs", "judgement", "summary", full_sub="full") # UK-Abs
gather("IN-Ext", "judgement", "summary", full_sub="A1")   # IN-Ext

Sanity check for duplicated, empty texts

In [10]:
# Sanity-check: every pair exists and is non-empty

clean_cases = []
for doc, summ, fn in cases:
    if os.path.getsize(doc) == 0 or os.path.getsize(summ) == 0:
        print(f"Skipping empty file: {fn}")
    else:
        clean_cases.append((doc, summ, fn))
cases = clean_cases
print(f"{len(cases)} cases retained after dropping empty files.")

# 2) Assert all remaining files exist
missing = [fn for doc, summ, fn in cases
           if not (os.path.isfile(doc) and os.path.isfile(summ))]
assert not missing, f"❌ Missing files: {missing}"

# 3) Assert no duplicate filenames
dupes = [fn for fn, cnt in Counter(fn for _, _, fn in cases).items() if cnt > 1]
assert not dupes, f"❌ Duplicate filenames: {dupes}"

print("✅ Sanity check passed: all files present, non-empty, and unique.")

Skipping empty file: 4799.txt
Skipping empty file: 299.txt
7971 cases retained after dropping empty files.
✅ Sanity check passed: all files present, non-empty, and unique.


Drop outliers - Drop any case whose summary is under 50 words, over 1500 words, or whose summary / document word count ratio is outside 0.01, 0.5

In [11]:
# Define the helper
def get_stats(txt: str):
    """Return (word_count, sentence_count)."""
    words = len(txt.split())
    # naive sentence split on [.?!] + whitespace
    sents = len([s for s in re.split(r'[\.!?]\s+', txt) if s.strip()])
    return words, sents

# Apply stats‐based filtering independently
stats_filtered = []
for doc_path, sum_path, fn in cases:
    # read & clean (reuse your clean_text)
    raw_doc = open(doc_path, "r", encoding="utf-8", errors="ignore").read()
    raw_sum = open(sum_path, "r", encoding="utf-8", errors="ignore").read()
    doc_clean = clean_text(raw_doc)
    sum_clean = clean_text(raw_sum)

    # compute stats
    dw, ds = get_stats(doc_clean)
    sw, ss = get_stats(sum_clean)
    ratio = sw / dw if dw > 0 else 0

    # outlier check
    if sw < 50 or sw > 1500 or not (0.01 <= ratio <= 0.5):
        # drop this case
        continue

    # keep if it passes
    stats_filtered.append((doc_path, sum_path, fn))

# Replace cases list
cases = stats_filtered
print(f"{len(cases)} cases remain after word/sentence‐count filtering.")


7035 cases remain after word/sentence‐count filtering.


Randomize and split into train, test, production (70-20-10)

In [12]:
# Shuffle & split 70/20/10
random.shuffle(cases)
N       = len(cases)
n_train = int(0.7 * N)
n_test  = int(0.2 * N)

splits = [
    ("train",      cases[:n_train]),
    ("test",       cases[n_train:n_train+n_test]),
    ("production", cases[n_train+n_test:])
]

Convert to JSONL

In [13]:
import json

# Dump one JSONL per split with metadata baked in
for split, subset in splits:
    out_path = os.path.join(OUT, f"{split}.jsonl")
    with open(out_path, "w", encoding="utf-8") as out_f:
        for doc_path, summ_path, fn in subset:
            # Read raw files
            raw_doc = open(doc_path, "r", encoding="utf-8", errors="ignore").read()
            raw_sum = open(summ_path, "r", encoding="utf-8", errors="ignore").read()

            # Clean
            doc_clean = clean_text(raw_doc)
            sum_clean = clean_text(raw_sum)

            # Stats + outlier filter
            dw, ds = get_stats(doc_clean)
            sw, ss = get_stats(sum_clean)
            ratio = sw / dw if dw else 0
            if sw < 50 or sw > 1500 or not (0.01 <= ratio <= 0.5):
                continue

            # Build & write record
            record = {
                "filename":  fn,
                "judgement": doc_clean,
                "summary":   sum_clean,
                "meta": {
                    "doc_words": dw, "doc_sents": ds,
                    "sum_words": sw, "sum_sents": ss,
                    "ratio":     ratio
                }
            }
            out_f.write(json.dumps(record, ensure_ascii=False) + "\n")

    # Confirm how many you wrote
    count = sum(1 for _ in open(out_path, "r", encoding="utf-8"))
    print(f"📦 {split:10s} → {count} records to {out_path}")


📦 train      → 4924 records to merged_dataset/train.jsonl
📦 test       → 1407 records to merged_dataset/test.jsonl
📦 production → 704 records to merged_dataset/production.jsonl


Pushing to Git repository

In [None]:
# Export your PAT so it isn’t visible in the notebook
os.environ["GITHUB_PAT"] = ""

# Clone your fork, not the upstream
!git clone https://$GITHUB_PAT@github.com/robo-ro/LLM_LegalDocSummarization.git
%cd LLM_LegalDocSummarization

# Configure Git identity
!git config user.email "you@example.com"
!git config user.name  "Your Name"

# Create & switch to your branch
!git checkout -b add-merged-dataset

# Zip the folder
!zip -r merged_dataset.zip merged_dataset

# Stage & commit the zip (instead of the raw folder)
!git add merged_dataset.zip
!git commit -m "Add merged_dataset.zip containing processed JSONL data"

# Push as before
!git push origin add-merged-dataset
