# Functions and libraries

In [1]:
import os
import re
import numpy as np
import matplotlib.pyplot as plt
from collections import Counter, defaultdict
from scipy.optimize import curve_fit
from math import log
import string

In [2]:
#loads the corpus in a dictionary corpus[author][book][text] 
#The files have to be stored in a directory organized by: author -> book -> text for the function to work properly
def load_corpus_by_author_tokenized(
    root_dir,
    merge_fragments=True,
    token_length=4,
    max_tokens=None
):

    pattern = re.compile(r"^(.*?)(\d+)([a-z]?)\.txt$", re.IGNORECASE)

    corpus = defaultdict(lambda: defaultdict(dict))
    punct_table = str.maketrans('', '', string.punctuation)

    for author in os.listdir(root_dir):
        author_path = os.path.join(root_dir, author)
        if not os.path.isdir(author_path):
            continue

        for book in os.listdir(author_path):
            book_path = os.path.join(author_path, book)
            if not os.path.isdir(book_path):
                continue


            grouped = defaultdict(list)
            for fname in os.listdir(book_path):
                if not fname.lower().endswith(".txt"):
                    continue
                m = pattern.match(fname)
                if not m:
                    continue
                prefix, number, suffix = m.groups()
                file_id = f"{prefix}{number}" if merge_fragments else f"{prefix}{number}{suffix}"
                grouped[file_id].append(fname)


            for file_id, fragment_list in grouped.items():
                fragment_list.sort()  
                text = ""
                for fname in fragment_list:
                    path = os.path.join(book_path, fname)
                    try:
                        with open(path, "r", encoding="utf-8", errors="ignore") as f:
                            text += f.read() + "\n"
                    except Exception as e:
                        print(f"Error reading {path}: {e}")


                if token_length == 0:
                    tokens = re.findall(r"\b\w+\b", text.lower())
                elif token_length != 0:
                    tokens = []
                    text = text.lower()
                    s = text.lower().replace('\n', ' ').replace('\t', ' ').strip()
                    N = token_length
                    for i in range(len(s) - N + 1):
                        tok = s[i:i+N]

                        if tok[1:-1].count(" ") == 0:
                            tokens.append(tok)


                if isinstance(max_tokens, int) and max_tokens > 0:
                    for i in range(0, len(tokens), max_tokens):
                        chunk = tokens[i:i + max_tokens]
                        chunk_id = f"{file_id}_chunk{i//max_tokens+1}"
                        corpus[author][book][chunk_id] = chunk
                else:
                    corpus[author][book][file_id] = tokens

                

    return corpus

In [3]:
#split the dataset in training and test 
def split_corpus_by_books(corpus_by_author, train_books_by_author):

    train_set = defaultdict(lambda: defaultdict(dict))
    test_set = defaultdict(lambda: defaultdict(dict))

    for author, books in corpus_by_author.items():
        train_books = train_books_by_author.get(author, [])
        for book, files in books.items():
            target_set = train_set if book in train_books else test_set
            for fname, text in files.items():
                target_set[author][book][fname] = text

    return train_set, test_set

In [4]:
#transforms the structure of the dataset dictionary from corpus[author][book][text] to corpus[author][texts]
#this operation is necessary to compute the author parameters
def flatten_corpus(corpus_nested):

    return {
        author: [
            tokens
            for books in corpus_nested[author].values()
            for tokens in books.values()
        ]
        for author in corpus_nested
    }

In [5]:
#functions used to compute the heaps' law 
def heaps_curve_texts(texts):

    seen = set() 
    t = 0
    out = []
    for txt in texts:
        for w in txt:
            t += 1
            seen.add(w)
            out.append((t, len(seen)))
    return np.array(out)

def heaps_func(t, alpha, theta):
    return (theta/alpha) * ((1 + t/theta)**alpha - 1)

In [6]:
#returns a dictionary with all the information needed to do the attribution, divided by author
def build_author_model(texts, alpha, theta, P0, delta=1.0):
    tokens = []
    for txt in texts:
        tokens += txt
    
    m = len(tokens)
    count_A = Counter(tokens)
    D_A = len(count_A)
    if METHODP=="global":
        P0_A = {w: delta*P0[w] for w in P0}
    else:
        unseen = {w: P0[w] for w in P0 if w not in count_A}
        Z = sum(unseen.values())
        P0_A = {w: delta*unseen[w]/Z for w in unseen}
    return {
        "count_A": count_A,
        "m": m,
        "D_A": D_A,
        "alpha": alpha,
        "theta": theta,
        "P0_A": P0_A
    }

In [7]:
#functions to compute the conditional probability formula as written in the paper
from scipy.special import loggamma, poch

def log_pochhammer(z, n, step=1):
    if n <= 0:
        return 0.0
    terms = z + np.arange(n) * step
    return np.log(terms).sum()

def log_cond_prob(tokens_T, model):
    count_T = Counter(tokens_T)
    n=sum(count_T.values())
    
    m = model["m"]
    D_A = model["D_A"]
    alpha = model["alpha"]
    theta = model["theta"]
    count_A = model["count_A"]
    P0_A = model["P0_A"]
    S_Qj = 0
    

    D_union = len(set(count_A.keys()) | set(count_T.keys()))

    K = D_union - D_A
    logB = log_pochhammer(theta + alpha*D_A, K, alpha) - log_pochhammer(theta + m, n, 1)

    

    for w, nj in count_T.items():
        if w in count_A:
            log_Qj = log_pochhammer(count_A[w] - alpha, nj, 1)
        else:
            p0 = P0_A.get(w, 1e-10)
            if p0!=0:
                log_Qj = log_pochhammer(1-alpha, nj-1, 1) + np.log(p0)
            elif p0==0:
                log_Qj = log_pochhammer(1-alpha, nj-1, 1)
                
        S_Qj += log_Qj 
    return logB + S_Qj

In [8]:
#Attribution method function. Majority or full text
#if METHOD=MR assigns every fragment in a text indipendently and chooses the author with more fragments assigned
#otherwise the full text is assigned (the probability is the sum of the fragments probabilities)
#in both cases the function log_cond_prob is used to compute the probability
def assign_corpus_by_chunks(corpus_by_author,models,aggregation="majority"):
    assigned=defaultdict(lambda: defaultdict(dict))
    for true_a,books in corpus_by_author.items():
        for book,files in books.items():
            grouping=defaultdict(list)
            for fid,toks in files.items():
                base=fid.split("_chunk")[0]
                grouping[base].append(toks)
            for base,chunks in grouping.items():
                if aggregation=="majority":
                    votes=Counter()
                    for ch in chunks:
                        scores={a:log_cond_prob(ch,mdl) for a,mdl in models.items()}
                        votes[max(scores,key=scores.get)]+=1
                    best=votes.most_common(1)[0][0]
                else:  # logsum
                    totals={a:0.0 for a in models}
                    for ch in chunks:
                        for a,mdl in models.items():
                            totals[a]+=log_cond_prob(ch,mdl)
                    best=max(totals,key=totals.get)
                assigned[true_a][book][base]=best
    return assigned

In [9]:
#saves the result of an iteration in a file txt with: token_length, books used in the Training_set, P0 author or global, results
# and for every wrong attribution the correct result along with the wrong prediction 
from datetime import datetime

def save_results_report(
    results,                  
    train_books_by_author,    
    token_length,             
    P0_type= "global",         
    output_path="risultati.txt"
):
    total = len(results)
    correct = sum(1 for t in results if t[0] == t[3])
    incorrect = total - correct
    acc = 100 * correct / total if total > 0 else 0

    now = datetime.now().strftime("%Y-%m-%d %H:%M")

    with open(output_path, "w", encoding="utf-8") as f:
        f.write(f"=== REPORT ===\n")
        f.write(f"Date and time: {now}\n\n")
        f.write(f" Token length: {token_length}\n")
        f.write(f" P0 : {P0_type}\n")
        f.write(f" Training_set:\n")
        for author, books in train_books_by_author.items():
            f.write(f"    - {author}: {', '.join(books)}\n")

        f.write(f"\n Results:\n")
        f.write(f"    - Total number of texts to be assigned: {total}\n")
        f.write(f"    - Correct:     {correct}\n")
        f.write(f"    - Wrong:       {incorrect}\n")
        f.write(f"    - Accuracy:     {acc:.2f}%\n")

        if incorrect > 0:
            f.write(f"\n Texts assigned improperly:\n")
            for true_a, book, base, pred in results:
                if true_a != pred:
                    f.write(f"    - {base} (book: {book}) → predicted: {pred}\n")

# Attribution Model

In [10]:
#Fit bounds and points 
N_SAMPLE = 50000
EPS = 1e-5
#Bounds: 0<alpha<1, theta>0
BOUNDS = ([EPS, EPS], [1-EPS, 1e6])

In [12]:
#Corpus directory and attribution's parameters
CORPUS_DIR = "Corpus"

TOK_LENG = 2
FRAG_N = None 
DELTA = 1

#METHOD set the rule used by the algorithm (MR= majority rule, FNN= full text)(MR works only if the FRAG_N isn't None)
METHOD = "FNN" #"MR" 
#METHODP set the normalization of P0 (author= normalized by author removing 'words' which appear in the author corpus, global= frequency of token in the entire corpus)
METHODP = "global" # "author"
#MERGE=True merges the texts which have a suffix letter (ex: text1a and text1b becomes one single text named text1)
MERGE = False

In [13]:
#Saving variables (file_name and directory to be saved it in)
PATH = "results"
PATHDIR = "EsperimentiLibri"

In [14]:
#Choice of trainingset books
train_books_by_author = {
    "Ovidio Amores": ["Ovidio Amores I"],
    "Properzio": ["Properzio I"],
    "Tibullo": ["Tibullo I"]}

In [15]:
#Main 
corpus_by_author = load_corpus_by_author_tokenized(
    root_dir="Corpus",
    merge_fragments= MERGE,
    token_length= TOK_LENG,
    max_tokens= FRAG_N
)

#Both sets are dictionaries with structure: train_set[author][book][text]
train_set, test_set = split_corpus_by_books(corpus_by_author, train_books_by_author)
train_flat = flatten_corpus(train_set)


#Fit and extrapolation of authors' parameters
author_params = {}
for auth, texts in train_flat.items():
    data = heaps_curve_texts(texts)
    t_vals, D_vals = data[:,0], data[:,1]
    if len(t_vals) > N_SAMPLE:
        idx = np.linspace(0, len(t_vals)-1, N_SAMPLE, dtype=int)
        t_fit, D_fit = t_vals[idx], D_vals[idx]
    else:
        t_fit, D_fit = t_vals, D_vals
    p0 = [0.3, len(D_vals)]
    try:
        popt, _ = curve_fit(
            heaps_func, t_fit, D_fit,
            p0=p0, bounds=BOUNDS, maxfev=5000
        )
    except RuntimeError:
        popt = p0 
        print('FIT ERROR')
    author_params[auth] = tuple(popt)
    #print(f"Author {auth}: α = {popt[0]:}, θ = {popt[1]:}")

#computation of the global frequencies of tokens (later they will be multiplied by DELTA and normalized by author if METHODP=author )
corpus_flat = flatten_corpus(corpus_by_author)
all_tokens = [w for texts in corpus_flat.values() for tok_list in texts for w in tok_list]
global_counts = Counter(all_tokens)
total = sum(global_counts.values())
P0_global = {w: c/total for w,c in global_counts.items()}


models = {auth: build_author_model(train_flat[auth], *author_params[auth], P0_global, DELTA) for auth in train_flat}

#attribution
if METHOD=="MR":
    assigned_majority = assign_corpus_by_chunks(test_set, models, aggregation="majority")
else:
    assigned_logsum  = assign_corpus_by_chunks(test_set, models, aggregation="logsum")

#results compilation
correct=0
wrong=0
results = []
for true_a,books in test_set.items():
    for book,files in books.items():
        base_files = set(fid.split("_chunk")[0] for fid in files)
        for base in base_files:
            if METHOD=="FNN":
                pred = assigned_logsum[true_a][book][base]
            elif METHOD=="MR":
                pred = assigned_majority[true_a][book][base]
            results.append((true_a, book, base, pred))

            
for true_a,book,base,pred in results: 
    if pred!=true_a:
         wrong+=1
    if pred==true_a:
        correct+=1

#Save results in a file.txt      
save_results_report(
results,
train_books_by_author=train_books_by_author,
token_length= TOK_LENG,
P0_type= METHODP, 
output_path= PATH + ".txt")
#!move {PATH + ".txt"} {PATHDIR}