In [21]:
import re
import os
import math
import random

from tqdm import tqdm
from collections import Counter
from itertools import islice

In [22]:
directory = 'corpus2mw'

# Part I

In [23]:
def preprocess_text(text):
    # normalize punctuation
    text = text.replace("’", "'").replace("‘", "'").replace("−", "-").replace("–", "-").replace("—", "-")

    # agregate numbers with commas like 4,000–7,000 → 4000_7000
    text = re.sub(r'(\d{1,3}(?:,\d{3})*)\s*[-–—]\s*(\d{1,3}(?:,\d{3})*)',
                  lambda m: m.group(1).replace(',', '') + '_' + m.group(2).replace(',', ''),
                  text)

    # remove commas in numbers like 9,500 → 9500
    text = re.sub(r'(?<=\d),(?=\d)', '', text)

    # remove URLs
    text = re.sub(r'https?://\S+|www\.\S+', '', text)

    # substitute hyphens between words with underscores like "word-word" → "word_word"
    text = re.sub(r"(?<=\w)-(?=\w)", "_", text)

    # remove characters that are not letters, numbers, spaces, or underscores
    text = re.sub(r"[^a-zA-Z0-9\s_áéíóúâêîôûàèìòùãõçÁÉÍÓÚÂÊÎÔÛÀÈÌÒÙÃÕÇ']", ' ', text)

    # normalize spaces
    text = re.sub(r'\s+', ' ', text)

    return text.lower().strip()

In [24]:
def tokenize(text):
    return re.findall(r'\b\w+\b', text)

In [25]:
def extract_ngrams(tokens, max_n):
    result = Counter()

    for n in range(1, max_n + 1):
        ngrams = zip(*(islice(tokens, i, None) for i in range(n)))
        result.update(ngrams)

    return result

In [26]:
def dice_score(ngram, freq, ngrams):
    if len(ngram) < 2:
        return 0

    total = 0
    for k in range(1, len(ngram)):
        part1 = ngram[:k]
        part2 = ngram[k:]
        freq1 = ngrams.get(part1, 0)
        freq2 = ngrams.get(part2, 0)
        total += freq1 + freq2

    if total == 0:
        return 0

    return (freq * 2) / (total / (len(ngram) - 1))

In [27]:
def scp_score(ngram, freq, ngrams):
    if len(ngram) < 2:
        return 0

    total = 0
    for k in range(1, len(ngram)):
        part1 = ngram[:k]
        part2 = ngram[k:]
        freq1 = ngrams.get(part1, 0)
        freq2 = ngrams.get(part2, 0)
        total += freq1 * freq2

    if total == 0:
        return 0

    return (freq ** 2) / (total / (len(ngram) - 1))

In [28]:
def mi_score(ngram, freq, ngrams, corpus_size):
    if len(ngram) < 2:
        return 0

    total = 0
    for k in range(1, len(ngram)):
        part1 = ngram[:k]
        part2 = ngram[k:]
        freq1 = ngrams.get(part1, 0)
        freq2 = ngrams.get(part2, 0)
        total += (freq1 / corpus_size) * (freq2 / corpus_size)

    if total == 0:
        return 0

    return math.log((freq / corpus_size) / (total / (len(ngram) - 1)))

In [29]:
"""
    detect relevant ngrams that are local maxima
    (those that have a score higher than the scores of their parts)
"""
def is_local_max(ngram, score, all_scores):
    if len(ngram) < 2:
        return False

    for k in range(1, len(ngram)):
        left = ngram[:k]
        right = ngram[k:]
        left_score = all_scores.get(left, 0)
        right_score = all_scores.get(right, 0)

        if score <= max(left_score, right_score):
            return False

    return True

In [30]:
def extract_res(directory, num_files=None, specific_file=None, metric="scp"):
    files = sorted([f for f in os.listdir(directory) if f.startswith('fil_')],
                  key=lambda x: int(x.split('_')[1]))
    
    if specific_file:
        files = [specific_file] if specific_file in files else exit(f"File {specific_file} not found in directory {directory}.")
    elif num_files:
        files = files[:num_files]
    
    results = []
    all_tokens = []
    all_ngrams = Counter()
    
    for filename in tqdm(files, desc="Extracting REs"):
        with open(os.path.join(directory, filename), 'r', encoding='utf-8') as f:
            original = f.read()
        
        preprocessed = preprocess_text(original)
        tokens = tokenize(preprocessed)
        ngrams = extract_ngrams(tokens, max_n=7)  # up to 7 words per n-gram
        
        all_tokens.extend(tokens)
        all_ngrams.update(ngrams)

    dice_scores = {}
    scp_scores = {}
    mi_scores = {}

    for ngram, freq in all_ngrams.items():
        dice_scores[ngram] = dice_score(ngram, freq, all_ngrams)
        scp_scores[ngram] = scp_score(ngram, freq, all_ngrams)
        mi_scores[ngram] = mi_score(ngram, freq, all_ngrams, len(all_tokens))

    local_max_dice_scores = [
        (ngram, score) for ngram, score in dice_scores.items()
        if is_local_max(ngram, score, dice_scores)
    ]

    local_max_scp_scores = [
        (ngram, score) for ngram, score in scp_scores.items()
        if is_local_max(ngram, score, scp_scores)
    ]

    local_max_mi_scores = [
        (ngram, score) for ngram, score in mi_scores.items()
        if is_local_max(ngram, score, mi_scores)
    ]

    results.append({
        'dice_scores': sorted(dice_scores.items(), key=lambda x: x[1], reverse=True),
        'local_max_dice_scores': sorted(local_max_dice_scores, key=lambda x: x[1], reverse=True),
        'scp_scores': sorted(scp_scores.items(), key=lambda x: x[1], reverse=True),
        'local_max_scp_scores': sorted(local_max_scp_scores, key=lambda x: x[1], reverse=True),
        'mi_scores': sorted(mi_scores.items(), key=lambda x: x[1], reverse=True),
        'local_max_mi_scores': sorted(local_max_mi_scores, key=lambda x: x[1], reverse=True)
    })

    return results

In [31]:
# process a specific file
#results = process_scores_localmax(directory, specific_file='fil_2')[0]
# or process a limited number of files
results = extract_res(directory, num_files=100)[0]
# or process all files
#results = process_scores_localmax(directory)[0]

print("Dice Scores:")
for ngram, score in results['dice_scores'][:10]:
    print(f"{' '.join(ngram)}: {score:.4f}")

print("\nLocal Max Dice Scores:")
for ngram, score in results['local_max_dice_scores'][:10]:
    print(f"{' '.join(ngram)}: {score:.4f}")

print("\nSCP Scores:")
for ngram, score in results['scp_scores'][:10]:
    print(f"{' '.join(ngram)}: {score:.4f}")

print("\nLocal Max SCP Scores:")
for ngram, score in results['local_max_scp_scores'][:10]:
    print(f"{' '.join(ngram)}: {score:.4f}")

print("\nMI Scores:")
for ngram, score in results['mi_scores'][:10]:
    print(f"{' '.join(ngram)}: {score:.4f}")

print("\nLocal Max MI Scores:")
for ngram, score in results['local_max_mi_scores'][:10]:
    print(f"{' '.join(ngram)}: {score:.4f}")

Extracting REs: 100%|██████████| 100/100 [00:00<00:00, 129.90it/s]


Dice Scores:
unaided eyesight: 1.0000
mwene mbandu: 1.0000
lyondthzi kapova: 1.0000
dianhenga aspirante: 1.0000
aspirante mjinji: 1.0000
sultans murad: 1.0000
brahmi alphabet: 1.0000
urdu kashmiri: 1.0000
laos cambodia: 1.0000
ldc conferences: 1.0000

Local Max Dice Scores:
unaided eyesight: 1.0000
mwene mbandu: 1.0000
lyondthzi kapova: 1.0000
dianhenga aspirante: 1.0000
aspirante mjinji: 1.0000
sultans murad: 1.0000
brahmi alphabet: 1.0000
urdu kashmiri: 1.0000
laos cambodia: 1.0000
ldc conferences: 1.0000

SCP Scores:
unaided eyesight: 1.0000
mwene mbandu: 1.0000
lyondthzi kapova: 1.0000
dianhenga aspirante: 1.0000
aspirante mjinji: 1.0000
sultans murad: 1.0000
brahmi alphabet: 1.0000
urdu kashmiri: 1.0000
laos cambodia: 1.0000
ldc conferences: 1.0000

Local Max SCP Scores:
unaided eyesight: 1.0000
mwene mbandu: 1.0000
lyondthzi kapova: 1.0000
dianhenga aspirante: 1.0000
aspirante mjinji: 1.0000
sultans murad: 1.0000
brahmi alphabet: 1.0000
urdu kashmiri: 1.0000
laos cambodia: 1.0000

## Precision

In [36]:
def precision_evaluation(candidates, sample_size):
    return random.sample(candidates, min(sample_size, len(candidates)))

In [37]:
precision_sample_dice = precision_evaluation(results['local_max_dice_scores'], 50)
precision_sample_scp = precision_evaluation(results['local_max_scp_scores'], 50)
precision_sample_mi = precision_evaluation(results['local_max_mi_scores'], 50)

with open('precision_sample_dice.txt', 'w', encoding='utf-8') as f:
    for i, (ngram, score) in enumerate(precision_sample_dice, 1):
        phrase = ' '.join(ngram)
        f.write(f"{phrase} | Score: {score:.4f} | Annotation: TP / FP\n")

with open('precision_sample_scp.txt', 'w', encoding='utf-8') as f:
    for i, (ngram, score) in enumerate(precision_sample_scp, 1):
        phrase = ' '.join(ngram)
        f.write(f"{phrase} | Score: {score:.4f} | Annotation: TP / FP\n")

with open('precision_sample_mi.txt', 'w', encoding='utf-8') as f:
    for i, (ngram, score) in enumerate(precision_sample_mi, 1):
        phrase = ' '.join(ngram)
        f.write(f"{phrase} | Score: {score:.4f} | Annotation: TP / FP\n")

# after the creation of the precision sample file we need to edit it manually
# writting TP (valid) or FP (not valid) for each relevant expression

In [40]:
def calculate_precision(file):
    valid = 0
    total = 0
    
    with open(file, 'r', encoding='utf-8') as f:
        for line in f:
            if line.strip().endswith('TP'):
                valid += 1
            total += 1
    
    return valid / total if total > 0 else 0

In [41]:
print("Dice Precision:", calculate_precision("precision_sample_dice.txt"))
print("SCP Precision:", calculate_precision("precision_sample_scp.txt"))
print("MI Precision:", calculate_precision("precision_sample_mi.txt"))

Dice Precision: 0.36
SCP Precision: 0.32
MI Precision: 0.46


## Recall

In [15]:
def extract_paragraphs(directory, num_paragraphs, max_files=None):
    files = sorted([f for f in os.listdir(directory) if f.startswith('fil_')],
                  key=lambda x: int(x.split('_')[1]))
    
    if max_files:
        files = files[:max_files]
    
    all_paragraphs = []
    
    for filename in files:
        with open(os.path.join(directory, filename), 'r', encoding='utf-8') as f:
            text = f.read()
        
        paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
        all_paragraphs.extend(paragraphs)
    
    return random.sample(all_paragraphs, min(num_paragraphs, len(all_paragraphs)))

In [81]:
paragraphs_sample = extract_paragraphs(directory, 4, max_files=5)

for i, paragraph in enumerate(paragraphs_sample, 1):
    print(f"Paragraph {i}:\n{paragraph}\n{'='*50}\n")

# after getting the paragraphs sample, we need to create the file recall_sample.txt
# and write the relevant expressions found in each paragraph manually

Paragraph 1:
ENSO may be linked to civil conflicts. Scientists at The Earth Institute of Columbia University, having analyzed data from 1950 to 2004, suggest ENSO may have had a role in 21% of all civil conflicts since 1950, with the risk of annual civil conflict doubling from 3% to 6% in countries affected by ENSO during El Niño years relative to La Niña years.
Thus it is even smaller than the next correction term formula_31 of Stirling's formula.

Paragraph 2:
The blue crane ("Anthropoides paradiseus"), also known as the Stanley crane and the paradise crane, is the national bird of South Africa.
Note that phenolic resin products are apt to swell slightly if they are used in areas that are perpetually damp. Varnishing the product helps to prevent this.
In the mid-2000s (decade) Juan Francisco Casas generated Internet attention for a series of large-scale, photo-realistic ballpoint duplications of his own snapshots of friends, utilising only blue pens.
That said, overlap of sexual pref

In [42]:
def calculate_recall(file, candidates):
    with open(file, 'r', encoding='utf-8') as f:
        manual_expressions = set(line.strip() for line in f if line.strip())
    
    candidates_expressions = set(' '.join(ngram) for ngram, _ in candidates)
    
    matches = 0
    for expression in manual_expressions:
        if expression.lower() in candidates_expressions:
            matches += 1
    
    return matches / len(manual_expressions) if manual_expressions else 0

In [43]:
print("Recall (Dice):", calculate_recall("recall_sample.txt", results["local_max_dice_scores"]))
print("Recall (SCP):", calculate_recall("recall_sample.txt", results["local_max_scp_scores"]))
print("Recall (MI):", calculate_recall("recall_sample.txt", results["local_max_mi_scores"]))

Recall (Dice): 0.33613445378151263
Recall (SCP): 0.35294117647058826
Recall (MI): 0.35294117647058826


# Part II

In [21]:
def extract_explicit_keywords_per_doc(directory, num_files=None, top_n=15):
    files = sorted([f for f in os.listdir(directory) if f.startswith('fil_')],
                   key=lambda x: int(x.split('_')[1]))
    
    if num_files:
        files = files[:num_files]

    explicit_keywords = []

    for filename in files:
        results = extract_res(directory, specific_file=filename)[0]
        local_max_scores = results['local_max_scp_scores']
        top_terms = [ngram for ngram, score in local_max_scores[:top_n]]
        explicit_keywords.append(top_terms)

    return explicit_keywords

In [22]:
def get_term_frequencies(directory, num_files=None):
    files = sorted([f for f in os.listdir(directory) if f.startswith('fil_')],
                   key=lambda x: int(x.split('_')[1]))
    
    if num_files:
        files = files[:num_files]

    freqs = []
    total_tokens = []

    for filename in files:
        with open(os.path.join(directory, filename), 'r', encoding='utf-8') as f:
            text = f.read()

        preprocessed = preprocess_text(text)
        tokens = tokenize(preprocessed)
        total_tokens.append(len(tokens))

        ngrams = extract_ngrams(tokens, max_n=7)
        freqs.append(ngrams)

    return freqs, total_tokens

In [23]:
# calculate probabilities P(T,d) and P(T,D)
def calculate_probabilities(freqs, total_tokens):
    num_docs = len(freqs)
    all_terms = set()
    for doc_freq in freqs:
        all_terms.update(doc_freq.keys())

    P_T_d = [{} for _ in range(num_docs)]
    P_T_D = {}

    for d, doc_freq in enumerate(freqs):
        total = total_tokens[d]
        for term in all_terms:
            freq = doc_freq.get(term, 0)
            P_T_d[d][term] = freq / total if total > 0 else 0

    for term in all_terms:
        sum_p = sum(P_T_d[d][term] for d in range(num_docs))
        P_T_D[term] = sum_p / num_docs

    return P_T_d, P_T_D

In [24]:
# statistical correlation between terms
def covariance(term1, term2, P_T_d, P_T_D):
    num_docs = len(P_T_d)
    s = 0
    for d in range(num_docs):
        s += (P_T_d[d].get(term1, 0) - P_T_D.get(term1, 0)) * (P_T_d[d].get(term2, 0) - P_T_D.get(term2, 0))
    return s / num_docs

def correlation(term1, term2, P_T_d, P_T_D):
    cov_12 = covariance(term1, term2, P_T_d, P_T_D)
    cov_11 = covariance(term1, term1, P_T_d, P_T_D)
    cov_22 = covariance(term2, term2, P_T_d, P_T_D)

    denom = math.sqrt(cov_11) * math.sqrt(cov_22)
    if denom == 0:
        return 0
    return cov_12 / denom

In [29]:
# score and rank implicit terms by correlation to explicit ones
def calculate_implicit_scores(explicit_keywords, P_T_d, P_T_D, min_doc_occurrences=2):
    num_docs = len(explicit_keywords)
    implicit_scores_per_doc = []

    all_terms = list(P_T_D.keys())
    candidate_terms = [
        term for term in all_terms
        if sum(1 for d in range(num_docs) if P_T_d[d].get(term, 0) > 0) >= min_doc_occurrences
    ]

    for d in tqdm(range(num_docs), desc="Calculating Implicits"):
        explicit = set(explicit_keywords[d])
        scores = {}

        for T in candidate_terms:
            if T in explicit:
                continue

            corr_sum = 0
            count = 0

            for K in explicit:
                c = correlation(T, K, P_T_d, P_T_D)
                corr_sum += c
                count += 1

            scores[T] = corr_sum / count if count > 0 else 0

        implicit_scores_per_doc.append(scores)

    return implicit_scores_per_doc

In [30]:
explicit_keywords = extract_explicit_keywords_per_doc(directory, num_files=10, top_n=10)

freqs, total_tokens = get_term_frequencies(directory, num_files=10)

P_T_d, P_T_D = calculate_probabilities(freqs, total_tokens)

implicit_scores = calculate_implicit_scores(explicit_keywords, P_T_d, P_T_D)

Extracting REs:   0%|          | 0/1 [00:00<?, ?it/s]

Extracting REs: 100%|██████████| 1/1 [00:00<00:00,  8.44it/s]
Extracting REs: 100%|██████████| 1/1 [00:00<00:00, 486.58it/s]
Extracting REs: 100%|██████████| 1/1 [00:00<00:00, 58.36it/s]
Extracting REs: 100%|██████████| 1/1 [00:00<?, ?it/s]
Extracting REs: 100%|██████████| 1/1 [00:00<00:00, 58.10it/s]
Extracting REs: 100%|██████████| 1/1 [00:00<00:00, 100.93it/s]
Extracting REs: 100%|██████████| 1/1 [00:00<00:00, 37.52it/s]
Extracting REs: 100%|██████████| 1/1 [00:00<00:00, 56.55it/s]
Extracting REs: 100%|██████████| 1/1 [00:00<00:00, 102.22it/s]
Extracting REs: 100%|██████████| 1/1 [00:00<00:00, 110.16it/s]
Calculating Implicits: 100%|██████████| 10/10 [00:05<00:00,  1.67it/s]


In [None]:
document = 1

In [35]:
print(f"Top Explicit Expressions (Doc {document}):")

for term in explicit_keywords[document-1]:
    print(' '.join(term) if isinstance(term, tuple) else term)

Top Explicit Expressions (Doc 1):
christian scribes
crucial role
language manuscripts
comment extensively
stands out
fundamental critique
ephesus reappears
apparently sponsored
anna comnena
codes represent


In [36]:
top_implicit = sorted(implicit_scores[document-1].items(), key=lambda x: x[1], reverse=True)[:10]

print(f"Top Implicit Expressions (Doc {document}):")

for term, score in top_implicit:
    expression = " ".join(term)
    print(f'"{expression}" (score={score:.3f})')

Top Implicit Expressions (Doc 1):
"ii" (score=0.985)
"instruments" (score=0.980)
"users" (score=0.945)
"rate" (score=0.945)
"the title" (score=0.943)
"the beginning" (score=0.919)
"the beginning of" (score=0.919)
"beginning of" (score=0.919)
"hydrogen" (score=0.916)
"removal" (score=0.916)
