In [None]:
import random
from pathlib import Path
from typing import List, Union

import dask.dataframe as dd
import pandas as pd
from tqdm import trange

from src.Preprocessor.LanguageDetector import LanguageDetector
from src.Preprocessor.NgramProcessor import replace_ngrams, suggest_ngrams
from src.Preprocessor.TextProcessor import TextPreprocessor
from src.Preprocessor.utils import merge_data
from src.Utils.utils import load_stopwords, load_vocabulary

random.seed(42)


In [None]:
def load_df(dir_df: Path, lang: Union[str, List[str]] = "all"):
    df = None

    df = pd.read_parquet(dir_df)
    # Choose languages
    if lang and "lang" in df.columns:
        if lang == "all":
            return df
        lang = list(lang)
        df = df[df["lang"].isin(lang)]

    return df


def save_df(df, dir_df: Path):
    df.to_parquet(dir_df, engine="pyarrow")


In [None]:
# Options
use_dask = False
# subsample = 200_000
subsample = 0
# pipe = ["merge_data", "lang_id", "normalization", "preprocess"]
# pipe = ["preprocess"]
pipe = ["lang_id", "normalization"]
merge_dfs = ["minors", "insiders", "outsiders"]
lang = ["es"]

# Define directories
dir_data = Path("data")
dir_metadata = dir_data.joinpath("metadata")
dir_stopwords = dir_data.joinpath("stopwords")
dir_ngrams = dir_data.joinpath("ngrams/ngrams.txt")
dir_vocabulary = dir_data.joinpath("RAE/vocabulary_extended.json")

dir_text_metadata = dir_metadata.joinpath("df_text.parquet")
dir_text_processed = dir_metadata.joinpath("df_processed_full.parquet")

# Load data
stop_words = load_stopwords(dir_stopwords, use_stopwords="all")
vocabulary = load_vocabulary(dir_vocabulary)
with dir_ngrams.open("r", encoding="utf-8") as f:
    ngrams = [el.strip() for el in f.readlines()]


In [None]:
df = pd.read_parquet(dir_metadata.joinpath("df_processed_pd.parquet"))

In [None]:
df["preprocessed_text"].dropna().tolist()

In [None]:
# Load data
# case df_text is not (re)created
if not "merge_data" in pipe:
    if any([p in ["lang_id", "normalization", "preprocess"] for p in pipe]):
        # dir_text_processed already exists and can be used
        if dir_text_processed.exists():
            df_processed = load_df(dir_text_processed)

            if subsample:
                if dir_text_metadata.exists():
                    df_text = load_df(dir_text_metadata)
                    df_processed_ids = random.sample(list(df_text.index), subsample)
                else:
                    df_processed_ids = df_processed.index
            else:
                df_processed_ids = df_processed.index

        else:
            # df_text already exists?
            if not dir_text_metadata.exists():
                print(
                    f"Error: '{dir_text_metadata}' does not exist. Create it first."
                )
                exit()
            else:
                df_text = load_df(dir_text_metadata)
                if subsample:
                    df_processed_ids = random.sample(list(df_text.index), subsample)
                    df_processed = df_text.loc[df_text.index.isin(df_processed_ids)]
                else:
                    df_processed_ids = df_text.index
                    df_processed = df_text

In [None]:
# subsample = 500_000
# if subsample:
#     df_processed_ids = random.sample(list(df_text.index), subsample)
#     df_processed = df_text.loc[df_text.index.isin(df_processed_ids)]
# else:
#     df_processed_ids = df_text.index
#     df_processed = df_text

## Process text

In [None]:
# # Merge multiple dataframes
# merge_data(dir_metadata, dir_text_metadata, merge_dfs=merge_dfs)    


### Language identification

In [None]:
# # Language Identification
# lang_detector = LanguageDetector(
#     library="fasttext", ft_model=str(Path("models/lid.176.bin").absolute())
# )

# if use_dask:
#     ids = df_processed.index.isin(df_processed_ids)
#     aux = dd.from_pandas(df_processed.loc[ids][["text"]], npartitions=100)
#     aux["lang"] = aux["text"].apply(
#         lang_detector.identify_language, meta=(None, "object")
#     )
#     aux = aux.compute()["lang"]
#     df_processed.loc[aux.index, "lang"] = aux

# else:
#     df_processed.loc[df_processed_ids, "lang"] = df_processed.loc[
#         df_processed_ids, "text"
#     ].apply(lang_detector.identify_language)

# # Save
# save_df(df_processed, dir_text_processed)


### Text normalization

In [None]:
# Text normalization
preprocessor_normalizer = TextPreprocessor(
    methods=[
        "lowercase",
        "remove_urls",
        ("clean_text", {"min_len": 1}),
        "convert_ngrams"
    ],
    ngrams=ngrams,
)

if use_dask:
    ids = df_processed.index.isin(df_processed_ids)
    aux = dd.from_pandas(df_processed.loc[ids][["text"]], npartitions=100)
    aux["normalized_text"] = aux["text"].apply(
        preprocessor_normalizer.preprocess, meta=(None, "object")
    )
    aux = aux.compute()["normalized_text"]
    df_processed.loc[aux.index, "normalized_text"] = aux

else:
    df_processed.loc[
        df_processed_ids, "normalized_text"
    ] = df_processed.loc[df_processed_ids, "text"].apply(
        preprocessor_normalizer.preprocess
    )

# Save
save_df(df_processed, dir_text_processed)

### Full process text

In [None]:
# Full process text
preprocessor_full = TextPreprocessor(
    methods=[
        "lowercase",
        "remove_urls",
        "lemmatize_text",
        ("clean_text", {"min_len": 1}),
        "convert_ngrams",
        ("clean_text", {"min_len": 2}),
        "remove_stopwords",
        # "tokenize_text",
    ],
    stopwords=stop_words,
    vocabulary=vocabulary,
    ngrams=ngrams,
)

# Compute and save iteratively
step = 1_000
indices = range(len(df_processed_ids))

# Skip columns already processed
skip = 0
if "preprocessed_text" in df_processed.columns:
    skip = len(df_processed["preprocessed_text"].dropna().index)
t = trange(skip, len(df_processed_ids), step, desc="", leave=True)

if use_dask:
    for i in t:
        ids = df_processed.index.isin(df_processed_ids[i : i + step])
        aux = dd.from_pandas(df_processed.loc[ids][["text"]], npartitions=100)
        aux["preprocessed_text"] = aux["text"].apply(
            preprocessor_full.preprocess, meta=(None, "object")
        )
        aux = aux.compute()["preprocessed_text"]
        df_processed.loc[aux.index, "preprocessed_text"] = aux

        # Save
        save_df(df_processed, dir_text_processed)
        df_processed = load_df(dir_text_processed)

else:
    for i in t:
        # t.set_description(f"")
        # t.refresh()
        ids = df_processed_ids[i : i + step]
        df_processed.loc[ids, "preprocessed_text"] = df_processed.loc[
            ids, "text"
        ].apply(preprocessor_full.preprocess)

        # Save
        save_df(df_processed, dir_text_processed)
        df_processed = load_df(dir_text_processed)

# Save
save_df(df_processed, dir_text_processed)


In [None]:
# df_partition["lang"].value_counts()


#### ngrams

In [None]:
from src.Preprocessor.NgramProcessor import suggest_ngrams, replace_ngrams

df_processed = load_df(dir_text_processed)
corpus = df_processed[df_processed["lang"] == "es"]["normalized_text"]

# Stopwords with spaces
stw = set([s.lower() for s in stop_words if " " in s])
stw = sorted([tuple(s.split()) for s in stw], key=len, reverse=True)
max_length_ngrams = len(stw[0])
stw = {s:"-".join(s) for s in stw}


In [None]:
sug_ngrams_ng = suggest_ngrams(corpus, ngram_size=4, stop_words=stw)
corpus_ng = corpus[:400_000].apply(lambda x: replace_ngrams(x, sug_ngrams_ng))

In [None]:
with open("data/stopwords/ngrams.txt", "w", encoding="utf-8") as f:
    f.writelines([" ".join([n for n in ng])+"\n" for ng in sug_ngrams_ng])

In [None]:
with open("data/stopwords/ngrams.txt", "r", encoding="utf-8") as f:
    ngrams = {el.strip():el.strip().replace(" ", "-") for el in f.readlines()}
ngrams