# Preprocessing
Bring structure to comments

In [4]:
from multiprocessing import cpu_count
from pathlib import Path

import dask.dataframe as ddf
import numpy as np
import pandas as pd
import spacy
import swifter
from cleantext import clean
from fastai.text import *

pd.options.display.max_colwidth = 1000

import dask

from dask.diagnostics import ProgressBar

n_cores = cpu_count()

## Parameters

In [5]:
dask.__version__

'1.1.0'

In [6]:
# parameters in Model
max_sequence_length = 2000 # just to make to sure to have some kind of upper band
max_tokens_per_comment = 200 # truncate after

kind = 'threads_root_headline'
# kind = 'only_threads'

# kind = 'threads_headline'
max_previous = None
max_vocab = 30000
cut = 1398

base = Path('/mnt/data/group07/johannes/ynacc_proc/proper_threads')

ds_name = '_'.join([kind, 'unlimited' if max_previous is None else str(max_previous), str(max_vocab)])

if cut is not None:
    ds_name += '_cut'

ds_name

'threads_root_headline_unlimited_30000_cut'

In [7]:
# is a problem with multiprocessing
tok = spacy.blank('en')

# truncate individual comments
def truncate(text, max_tokens_per_comment=max_tokens_per_comment):
    return str(tok.tokenizer(text)[:max_tokens_per_comment].merge())


def calc_num_tokens(text):
    return len(tok.tokenizer(text))

# def truncate_num_tokens(text):
#     tokens = tok.tokenizer(text)
#     return str(tokens[:max_tokens_per_comment].merge()), len(tokens)


def only_threads_process_text(row, n_cores=n_cores):
    the_index = row['commentindex'] + 1
    comments_before = list(row['text_proc_threads'])[
        :the_index]  # for this comment

    if not max_previous is None:
        comments_before = comments_before[-max_previous:]

    # start with this and prepend comments
    res = ' xx_thread_end'
    token_count = 1

    for i, com in enumerate(reversed(comments_before)):
        com = ' xx_comment_start ' + com + ' xx_comment_end '
        if i == len(comments_before) - 1:
            com = 'xx_thread_start ' + com

        num_tokens = calc_num_tokens(com)
        if token_count + num_tokens < max_sequence_length:
            res = com + res
            token_count += num_tokens
        else:
            break

    res = clean(res, lower=False)  # clean up too many spaces

    # simple asserts
#     print(row.name == 1)
    if row['text_proc_threads'] != 'foo':
        if not res.endswith(row['text_proc'] + ' xx_comment_end xx_thread_end'):
            print(res, row['text_proc'])
#         assert res.endswith(row['text_proc'] + ' xx_comment_end xx_thread_end')
        assert res.count(' ') < max_sequence_length

    return res


def clean_and_truncate(text, linebreaks_token=False):
    if pd.isna(text):
        return ''
    text = clean(text, lower=False, no_urls=True,
                 no_emails=True, zero_digits=True)
    if linebreaks_token:
        tl = len(text)
        text = text.replace('\n', ' xx_linebreak ')
        assert tl == len(text) or 'xx_linebreak' in text
        assert not '\n' in text
        
    text = truncate(text)
    return text


def keep_longest(rows):
    if len(rows) == 1:
        return rows.iloc[0]
    max_len = -1
    res = None
    for i in range(len(rows)):
        r = rows.iloc[i]
        if not type(r) is str and len(r['text_proc']) > max_len:
            max_len = len(r['text_proc'])
            res = r
    return res


def only_threads_process_df(df, n_cores=n_cores):
    with ProgressBar():
        df['text_proc'] = ddf.from_pandas(df, npartitions=n_cores).map_partitions(
            lambda dfx: dfx['text'].apply(lambda x: clean_and_truncate(x))).compute(scheduler="processes")

    # only drop after the cleaning
    df = df.drop_duplicates(subset=['text_proc', 'sdid', 'commentindex'])

#     keep only logest comment if there more thant two. this is important because in some comments the quotations marks are removed
    df = df.groupby(['commentindex', 'sdid'], as_index=False).apply(
        keep_longest).reset_index(drop=True)

# for smaller data this is fine
#     df['text_proc'] = df['text'].swifter.apply(lambda x: clean(x, lower=False, no_urls=True, no_emails=True, zero_digits=True))
#     df['text_proc'] = df['text_proc'].swifter.apply(truncate)

    threads = df.sort_values(by=['commentindex']).groupby(
        'sdid', as_index=False).agg({'text_proc': lambda x: list(x)})
    df_res = df.merge(threads, on='sdid', suffixes=['', '_threads'])

#     df_res['text_proc'] = df_res.swifter.apply(only_threads_process_text, axis=1)

    with ProgressBar():
        df_res['text_proc'] = ddf.from_pandas(df_res, npartitions=n_cores).map_partitions(
            lambda dfx: dfx.apply(only_threads_process_text, axis=1)).compute(scheduler="processes")

    return df_res

In [8]:
def threads_article_headline(df, n_cores=n_cores):
    with ProgressBar():
        df['text_proc'] = ddf.from_pandas(df, npartitions=n_cores).map_partitions(
            lambda dfx: dfx['text'].apply(lambda x: clean_and_truncate(x, linebreaks_token=True))).compute(scheduler="processes")

    # only drop after the cleaning
    df = df.drop_duplicates(subset=['text_proc', 'sdid', 'commentindex'])

#     keep only logest comment if there more thant two. this is important because in some comments the quotations marks are removed
    df = df.groupby(['commentindex', 'sdid'], as_index=False).apply(
        keep_longest).reset_index(drop=True)

    threads = df.sort_values(by=['commentindex']).groupby(
        'sdid', as_index=False).agg({'text_proc': lambda x: list(x)})
    
    df_res = df.merge(threads, on='sdid', suffixes=['', '_threads'])

    with ProgressBar():
        df_res['text_proc'] = ddf.from_pandas(df_res, npartitions=n_cores).map_partitions(
            lambda dfx: dfx.apply(only_threads_process_text, axis=1)).compute(scheduler="processes")
    
    df_res['text_proc'] = df_res.swifter.apply(lambda x: 'xx_headline_start ' + clean_and_truncate(x['headline'], linebreaks_token=True) + ' xx_headline_end ' + x['text_proc'], axis=1)
    return df_res

In [9]:
def threads_article_headline_article(df, n_cores=n_cores):
    with ProgressBar():
        df['text_proc'] = ddf.from_pandas(df, npartitions=n_cores).map_partitions(
            lambda dfx: dfx['text'].apply(lambda x: clean_and_truncate(x, linebreaks_token=True))).compute(scheduler="processes")

    # only drop after the cleaning
    df = df.drop_duplicates(subset=['text_proc', 'sdid', 'commentindex'])

#     keep only logest comment if there more thant two. this is important because in some comments the quotations marks are removed
    df = df.groupby(['commentindex', 'sdid'], as_index=False).apply(
        keep_longest).reset_index(drop=True)

    threads = df.sort_values(by=['commentindex']).groupby(
        'sdid', as_index=False).agg({'text_proc': lambda x: list(x)})
    
    df_res = df.merge(threads, on='sdid', suffixes=['', '_threads'])

    with ProgressBar():
        df_res['text_proc'] = ddf.from_pandas(df_res, npartitions=n_cores).map_partitions(
            lambda dfx: dfx.apply(only_threads_process_text, axis=1)).compute(scheduler="processes")
    
    df_res['text_proc'] = df_res.swifter.apply(lambda x: 'xx_headline_start ' + clean_and_truncate(x['headline'], linebreaks_token=True) + ' xx_sep ' + clean_and_truncate(x['article_text'], linebreaks_token=True) +  ' xx_headline_end ' + x['text_proc'], axis=1)
    return df_res

In [10]:
def process_threads_root_headline(df, n_cores=n_cores):
    with ProgressBar():
        df['text_proc'] = ddf.from_pandas(df, npartitions=n_cores).map_partitions(
            lambda dfx: dfx['text'].apply(lambda x: clean_and_truncate(x, linebreaks_token=True))).compute(scheduler="processes")

    # only drop after the cleaning
    df = df.drop_duplicates(subset=['text_proc', 'sdid', 'commentindex'])

#     keep only logest comment if there more thant two. this is important because in some comments the quotations marks are removed
    df = df.groupby(['commentindex', 'sdid'], as_index=False).apply(
        keep_longest).reset_index(drop=True)

    threads = df.sort_values(by=['commentindex']).groupby(
        'sdid', as_index=False).agg({'text_proc': lambda x: list(x)})
    
    df_res = df.merge(threads, on='sdid', suffixes=['', '_threads'])

    with ProgressBar():
        df_res['text_proc'] = ddf.from_pandas(df_res, npartitions=n_cores).map_partitions(
            lambda dfx: dfx.apply(only_threads_process_text, axis=1)).compute(scheduler="processes")
        
#     only manipulate 
    df_res['text_proc'] = df_res.swifter.apply(lambda x: 'xx_headline_start ' + clean_and_truncate(x['headline'], linebreaks_token=True) + ' xx_headline_end ' + x['text_proc'] if x['commentindex'] == 0 else x['text_proc'], axis=1)
    return df_res

In [11]:
process_df = None
if kind == 'only_threads':
    process_df = only_threads_process_df
    
if kind == 'threads_headline':
    process_df = threads_article_headline

if kind == 'threads_headline_article':
    process_df = threads_article_headline_article
    
if kind == 'threads_root_headline':
    process_df = process_threads_root_headline

# Classification Data

In [12]:
for f in ['train.csv', 'val.csv', 'test.csv']:
    df = pd.read_csv('~/data/ynacc_proc/replicate/split/' + f)
    df_out = process_df(df, n_cores=4)
    out_f = base/'data'/'cls'/ds_name
    ! mkdir -p {out_f}
    df_out.to_csv(out_f/f, index=False)
    del df
    del df_out

[########################################] | 100% Completed | 11.9s
[########################################] | 100% Completed | 18.2s


Pandas Apply: 100%|██████████| 7910/7910 [00:01<00:00, 5197.80it/s]


[########################################] | 100% Completed |  7.6s
[########################################] | 100% Completed |  8.2s


Pandas Apply: 100%|██████████| 583/583 [00:00<00:00, 5294.76it/s]


[########################################] | 100% Completed |  7.6s
[########################################] | 100% Completed |  8.1s


Pandas Apply: 100%|██████████| 553/553 [00:00<00:00, 4850.74it/s]


In [10]:
df_check = pd.read_csv('/mnt/data/group07/johannes/ynacc_proc/proper_threads/data/cls/threads_root_headline_unlimited_30000_cut/train.csv')

In [11]:
df_check.columns

Index(['Unnamed: 0', 'Unnamed: 0.1', 'url', 'article_text', 'title',
       'publish_date', 'commentid', 'clcontroversial', 'clmean',
       'clinformative', 'cldisagreement', 'clagreement', 'clsentiment',
       'clpersuasive', 'claudience', 'index', 'sdid', 'commentindex',
       'headline', 'guid', 'timestamp', 'thumbs-up', 'thumbs-down', 'text',
       'parentid', 'constructiveclass', 'sd_agreement', 'sd_type', 'sentiment',
       'tone', 'commentagreement', 'topic', 'intendedaudience',
       'persuasiveness', 'text_proc', 'cltopic', 'text_proc_threads'],
      dtype='object')

In [12]:
df_check['text_proc']

0                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   xx_headline_start Math Teacher Marries 00-Year-Old Former Student, Allegedly To Avoid Her Testifying in Sex Case Against Him xx_headline_end xx_thread_start xx_comment_start She was 00, not 00. And these prosecuting lawyers act like they have never done anything wrong or questionable in their life. If they look happy, and say they are happy, just leave them alone. Why ruin someone else's life, when there really isn't any good reason to do so? xx_comment_end xx_thr

# Language Model Data

## To Threads

In [13]:
dfs = []
for f in ['train.csv', 'val.csv']:
    df = pd.read_csv('~/data/ynacc_proc/replicate/lmdata_complete/' + f)
    dfs.append(df)
df = pd.concat(dfs, verify_integrity=True, ignore_index=True)

if kind == 'threads_headline_article':
    art = pd.read_csv('/home/group7/masters-thesis/ynacc/00 data/04 all articles/articles.csv')
    art = art[['text', 'url']]
    art = art.rename(columns={'text': 'article_text'})
    df = df.merge(art, how="left")

In [14]:
df_out = process_df(df, n_cores=n_cores - 1)
out_f = base/'data'/'lm'/ds_name
! mkdir -p {out_f}

df_lm_train = df_out[:190000]
df_lm_train.to_csv(out_f/'train.csv', index=False)

df_lm_val = df_out[190000:]
df_lm_val.to_csv(out_f/'val.csv', index=False)

del df
del df_out

[########################################] | 100% Completed |  1min 36.8s
[                                        ] | 0% Completed | 14.0sxx_thread_start xx_comment_start How do we know that global warming is caused by humans? xx_linebreak The 'natural' warming cycles of interglacial periods here on Earth are quite well-known and documented. And yes, Co0 did not cause them, but instead were caused by a variety of cosmic and orbital effects associated with the Milankovic Cycle. And in each of these interglacial events in the past, Co0 levels increased after the temperature increased, by as much as 000 years, thus reinstating the necessary physical equilibrium between atmospheric Co0 and temperature which has been well-known and documented since the nineteenth century (Arrhenius, 0000). This sequence has been carefully and precisely documented for the past 000,000 years through the use of ice-core data from Greenland and the Antarctic as well, and all indications from the deeper past vi

Pandas Apply: 100%|██████████| 222411/222411 [00:55<00:00, 4007.76it/s]


## Prepare FastAI data

In [15]:
EX_PA = base/'exp'/'lm'/ds_name

data_lm = TextLMDataBunch.from_df(EX_PA, df_lm_train, df_lm_val, max_vocab=max_vocab, text_cols='text_proc')

itos_new = data_lm.train_dl.vocab.itos

# move special tokens to the beginning of the vocab we can know their ids
specials = ['xx_comment_start', 'xx_comment_end', 'xx_thread_start', 'xx_thread_end']
for s in specials:
    itos_new.remove(s)
    itos_new.insert(2, s)

itos_new

['xxunk',
 'xxpad',
 'xx_thread_end',
 'xx_thread_start',
 'xx_comment_end',
 'xx_comment_start',
 'xxbos',
 'xxfld',
 'xxmaj',
 'xxup',
 'xxrep',
 'xxwrep',
 '.',
 'the',
 ',',
 'to',
 'and',
 'a',
 'of',
 'you',
 'is',
 'that',
 'i',
 'it',
 'in',
 'are',
 '"',
 'for',
 'not',
 '?',
 "n't",
 'they',
 "'s",
 'have',
 'do',
 'be',
 'with',
 'this',
 'was',
 'on',
 '!',
 'he',
 'as',
 'your',
 'but',
 'people',
 'if',
 'or',
 'what',
 'all',
 '-',
 'so',
 'just',
 'we',
 'their',
 'who',
 'like',
 'about',
 'no',
 'would',
 'she',
 'will',
 '0',
 'there',
 '...',
 'at',
 'can',
 'one',
 'my',
 'her',
 '00',
 'has',
 'from',
 'them',
 'by',
 'out',
 'an',
 'when',
 'get',
 'his',
 'up',
 '$',
 'how',
 'because',
 'did',
 'more',
 '4',
 ')',
 'know',
 'does',
 'were',
 '(',
 'why',
 'should',
 'me',
 'think',
 "'",
 'had',
 'then',
 'only',
 '%',
 'than',
 'been',
 'some',
 'time',
 'any',
 'other',
 'even',
 'want',
 'being',
 'right',
 'these',
 'go',
 'trump',
 'now',
 'our',
 'make',


In [16]:
# the number comes from the number of repetition
vocab = Vocab(itos_new)
if not cut is None:
    data_lm = TextLMDataBunch.from_df(EX_PA, df_lm_train, df_lm_val, vocab=vocab, text_cols='text_proc', tokenizer=Tokenizer(cut_n_from_behind=cut))
else:
    data_lm = TextLMDataBunch.from_df(EX_PA, df_lm_train, df_lm_val, vocab=vocab, text_cols='text_proc')
data_lm.save()