In [180]:
import os
import re
import unicodedata
import logging

logging.basicConfig(level=logging.DEBUG)

MEM_BYTES = 8000 #i byte che si puo tenere in memoria prima di scrivere i dati processati

In [181]:
# format dei vari corpus, ognuno viene riportato al mio format standard

def format_paisa(in_path: str, out_path: str):
    """
    Divides the paisa corpus text into paragraphs.
    
    The new format separates paragraphs with a newline.
    """
    if not os.path.exists(out_path):
        open(out_path, 'x').close()

    with open(in_path, 'r', encoding='utf-8') as in_f, open(out_path, 'a', encoding="utf-8") as out_f:
        for line in in_f:
            if "<text" in line:
                chunk = ""
                for line2 in in_f:
                    if "</text>" in line2: # ho trovato la fine del paragrafo
                        break
                    else:
                        chunk += line2
                chunk = chunk.replace('\n', '')    
                out_f.write(chunk + '\n')
                chunk = ""

In [182]:
safe_words = [
    "re", "cd", "tv", "ia",
    "afa", "ago", "ala", "ali", "alt", "ama", "ami", "amo", "ano",
    "ape", "api", "app", "avi", "avo", "bar", "blu", "boa", "boe",
    "bot", "bra", "bue", "bus", "cai", "dea", "dei", "deo", "dio",
    "dna", "don", "due", "dvd", "eco", "ego", "emo", "gel", "gin",
    "gru", "ira", "ire", "iri", "iva", "jet", "lsd", "nei", "neo",
    "oca", "odi", "odo", "ora", "ore", "ori", "oro", "pin", "pro",
    "rum", "sci", "sim", "sms", "sis", "tre", "ufo", "uni", "uri", 
    "url", "usa", "uva", "uve", "web", "yin", "zen", "zia", "zie", 
    "zii", "zio"
]


def clean(chunk: str):
    """
    - turns all text into lowercase
    this function:
    - replaces into an empty space ' ' every character that is not a letter
    - collapses multiple whitespaces
    - saves the chunk into file 1.txt, adding a prefix and suffix to the chunk
    - deletes all words with 3 or less characters that are not whitelisted
    """

    try:
        open("./corpus/refining_steps/1.txt", 'x', encoding='utf-8').close()
    except FileExistsError:
        pass

    # 1) Lowercase
    cleaned = chunk.lower()

    # normalize accents
    normalized = unicodedata.normalize("NFD", cleaned)
    cleaned = "".join(ch for ch in normalized if unicodedata.category(ch) != "Mn")

    # 2) Keep only letters, newlines, and periods; replace everything else with space
    cleaned = re.sub(r"[^a-z]", " ", cleaned)
    
    # collapse multiple spaces
    cleaned = re.sub(r"\s+", " ", cleaned)
        
    #remove words below 4 characters that are not whitelisted
    filtered_words = [
        w for w in cleaned.split()
        if len(w) > 3 or w in safe_words
    ]
    cleaned = " ".join(filtered_words)
    return cleaned + '\n'

def format_(in_path: str, out_path: str):
    if not os.path.exists(out_path):
        open(out_path, 'x').close()

    with open(in_path, 'r', encoding='utf-8') as in_f, open(out_path, 'a', encoding="utf-8") as out_f:
        chunk = []
        for line in in_f:
            chunk.append(clean(line))
            if (sum([len(c) for c in chunk]) > MEM_BYTES): #se supero MEM_BYTES, allora scrivo sul file 
                out_f.writelines(chunk)
                chunk.clear()
        out_f.writelines(chunk)
                

In [183]:
# format_paisa("./corpus/sample.txt", "./corpus/refining_steps/sample_out.txt")

In [184]:
import spacy 
nlp = spacy.load("it_core_news_lg")

def spacy_normalizer(chunk: str):
    """
    This function uses spacy library to:

    - remove stop words and non-words
    - lemmatize all words

    maybe repeat the paragraph with only the nouns (to be implemented...)

    returns: the formatted text
    """

    doc = nlp(chunk)
    tokens = [
            token.lemma_ 
            for token in doc
            if token.is_alpha and not token.is_stop
        ]   
    return (" ".join(tokens) + '\n')


def format_spacy(in_path: str, out_path: str):
    chunk = []
    with open(in_path, 'r', encoding='utf-8') as in_f, open(out_path, 'a', encoding="utf-8") as out_f:
        for line in in_f:
            chunk.append(spacy_normalizer(line))
            if (sum([len(c) for c in chunk]) > MEM_BYTES): 
                out_f.writelines(chunk)
                chunk.clear()
        out_f.writelines(chunk)


In [185]:
# format_spacy("./corpus/refining_steps/sample_out_ft.txt", "./corpus/refining_steps/sample_out_ft_to_spacy.txt")

In [186]:
import fasttext
FT_MODEL_PATH = "/home/alessio/models/lid.176.bin"
ft_model = fasttext.load_model(FT_MODEL_PATH)

def format_ft(in_path: str, out_path: str):
    chunk = []
    with open(in_path, 'r', encoding='utf-8') as in_f, open(out_path, 'a', encoding="utf-8") as out_f:
        for line in in_f:
            chunk.append(ft_filter(line))
            if (sum([len(c) for c in chunk]) > MEM_BYTES): #buffero fino a superare MEM_BYTES, poi scrivo sul file 
                out_f.writelines(chunk)
                chunk.clear()
        out_f.writelines(chunk)

def ft_filter(chunk: str):
    words = chunk.replace('\n', "").split(" ")
    predictions = ft_model.predict(words, k=5)
    chunk = ""

    #debugging
    discarded = []

    for i in range(len(predictions[0])):
        label = predictions[0][i][0]
        if is_ok(words[i], predictions[0][i], predictions[1][i]):
            chunk += words[i] + " "
        else:
            #debugging
            # chunk += "[" + words[i] + "] "
            discarded.append((words[i], predictions[0][i], predictions[1][i]))
    
    discarded_formatted = '\n'.join(str(d) for d in discarded)
    logging.debug(f"words discarded:\n{discarded_formatted}")
    chunk = chunk[:len(chunk)-1] + '\n'
    return chunk
    
def is_ok(word: str, labels, probas):
    it_pr = probas[labels.index("__label__it")] if "__label__it" in labels else 0
    en_pr = probas[labels.index("__label__en")] if "__label__en" in labels else 0

    if (
        en_pr > 0.5 and 
        it_pr < 0.05 and
        word.lower() not in ft_safe_words
        ) or word in ft_unsafe_words: #se la parola é sospettata di essere inglese e con pochissima probabilitá é italiana e non é in whitelist
        return False
    return True

# whitelist e blacklist...
ft_safe_words = []
ft_unsafe_words = []

In [187]:
# format_ft("./corpus/refining_steps/sample_out_spacy", "./corpus/refining_steps/sample_out_ft.txt")

In [188]:
ft_model.predict(["store"], k=5)

([['__label__en', '__label__it', '__label__no', '__label__ko', '__label__de']],
 [array([0.6734811 , 0.10802358, 0.0829899 , 0.03402194, 0.02290507],
        dtype=float32)])

In [189]:
#pipeline

format_paisa("./corpus/refining_steps/sample_out.txt", "./corpus/refining_steps/sample_out_f.txt")
format_spacy("./corpus/refining_steps/sample_out_f.txt", "./corpus/refining_steps/sample_out_sp.txt")
format_ft("./corpus/refining_steps/sample_out_sp.txt", "./corpus/refining_steps/sample_out_ft.txt")
format_("./corpus/refining_steps/sample_out_ft.txt", "./corpus/refining_steps/sample_out_final.txt")