In [None]:
# If you are running in Google Colaboratory
# If you are running in local ignore this.
# Authenticate user and create folder "sinhala_racism_detection" to save results

from google.colab import drive
import os

drive.mount('/content/gdrive')
parent_dir = '/content/gdrive/My Drive/sinhala_racism_detection'
if not os.path.exists(parent_dir):
    os.makedirs(parent_dir)

os.chdir(parent_dir)

In [None]:
# If you are running local use this "../data-set/final-data-set.csv"
DATA_SET_PATH = "https://github.com/renuka-fernando/sinhalese_language_racism_detection/raw/master/data-set/final-data-set.csv"

In [None]:
DATA_SET_TWEET_ID = 1
DATA_SET_USER_ID = 2
DATA_SET_TEXT = 4
DATA_SET_CLASS = 5

MAX_WORD_COUNT = 60

DATA_SET_CLASSES = {
    'Neutral': [0, 0, 1],
    'Racist': [0, 1, 0],
    'Sexism': [1, 0, 0]
}

In [None]:
sinhalese_chars = [
    "අ", "ආ", "ඇ", "ඈ", "ඉ", "ඊ",
    "උ", "ඌ", "ඍ", "ඎ", "ඏ", "ඐ",
    "එ", "ඒ", "ඓ", "ඔ", "ඕ", "ඖ",
    "ං", "ඃ",
    "ක", "ඛ", "ග", "ඝ", "ඞ", "ඟ",
    "ච", "ඡ", "ජ", "ඣ", "ඤ", "ඥ", "ඦ",
    "ට", "ඨ", "ඩ", "ඪ", "ණ", "ඬ",
    "ත", "ථ", "ද", "ධ", "න", "ඳ",
    "ප", "ඵ", "බ", "භ", "ම", "ඹ",
    "ය", "ර", "ල", "ව",
    "ශ", "ෂ", "ස", "හ", "ළ", "ෆ",
    "෴", "\u200d"
]
# "\u200d" is used with "යංශය" - කාව්‍ය, "රේඵය" - වර්‍තමාන, "Both" - මහාචාර්‍ය්‍ය, "රකාරාංශය" - මුද්‍රණය

sinhalese_vowel_signs = ["්", "ා", "ැ", "ෑ", "ි", "ී", "ු", "ූ", "ෘ", "ෙ", "ේ", "ෛ", "ො", "ෝ",
                         "ෞ", "ෟ", "ෲ", "ෳ", "ර්‍"]

# dictionary that maps wrong usage of vowels to correct vowels
vowel_sign_fix_dict = {
    "ෙ" + "්": "ේ",
    "්" + "ෙ": "ේ",

    "ෙ" + "ා": "ො",
    "ා" + "ෙ": "ො",

    "ේ" + "ා": "ෝ",
    "ො" + "්": "ෝ",

    "ෙෙ": "ෛ",
    "ෘෘ": "ෲ",

    "ෙ" + "ෟ": "ෞ",
    "ෟ" + "ෙ": "ෞ",

    "ි" + "ී": "ී",
    "ී" + "ි": "ී",

    # duplicating same symbol
    "ේ" + "්": "ේ",
    "ේ" + "ෙ": "ේ",

    "ො" + "ා": "ො",
    "ො" + "ෙ": "ො",

    "ෝ" + "ා": "ෝ",
    "ෝ" + "්": "ෝ",
    "ෝ" + "ෙ": "ෝ",
    "ෝ" + "ේ": "ෝ",
    "ෝ" + "ො": "ෝ",

    "ෞ" + "ෟ": "ෞ",
    "ෞ" + "ෙ": "ෞ",

    # special cases - may be typing mistakes
    "ො" + "ෟ": "ෞ",
    "ෟ" + "ො": "ෞ",
}

simplify_characters_dict = {
    # Consonant
    "ඛ": "ක",
    "ඝ": "ග",
    "ඟ": "ග",
    "ඡ": "ච",
    "ඣ": "ජ",
    "ඦ": "ජ",
    "ඤ": "ඥ",
    "ඨ": "ට",
    "ඪ": "ඩ",
    "ණ": "න",
    "ඳ": "ද",
    "ඵ": "ප",
    "භ": "බ",
    "ඹ": "බ",
    "ශ": "ෂ",
    "ළ": "ල",

    # Vowels
    "ආ": "අ",
    "ඈ": "ඇ",
    "ඊ": "ඉ",
    "ඌ": "උ",
    "ඒ": "එ",
    "ඕ": "ඔ",

    "ා": "",
    "ෑ": "ැ",
    "ී": "ි",
    "ූ": "ු",
    "ේ": "ෙ",
    "ෝ": "ො",
    "ෲ": "ෘ"
}


def is_sinhalese_letter(char: str) -> bool:
    return char in sinhalese_chars


def is_sinhalese_vowel(char: str) -> bool:
    return char in sinhalese_vowel_signs


def get_fixed_vowel(vowel: str) -> str:
    return vowel_sign_fix_dict[vowel]


def get_simplified_character(character: str) -> str:
    if len(character) != 1:
        raise TypeError("character should be a string with length 1")
    try:
        return simplify_characters_dict[character]
    except KeyError:
        return character


In [None]:
!pip install -U -q emoji

import re
import emoji

def replace_url(text: str) -> str:
    """
    replace URL of a text
    :param text: text to replace urls
    :return: url removed text
    """
    return re.sub(r'(http://www\.|https://www\.|http://|https://)[a-z0-9]+([\-.]{1}[a-z0-9A-Z/]+)*', '', text)


def remove_retweet_state(text: str) -> str:
    """
    remove retweet states in the beginning such as "RT @sam92ky: "
    :param text: text
    :return: text removed retweets state
    """
    return re.sub(r'^RT @\w*: ', '', text)


def replace_mention(text: str) -> str:
    return re.sub(r'@\w*', 'PERSON', text)


def split_tokens(text: str) -> list:
    """
    tokenize text
    :param text: text
    :return: token list
    """
    # text characters to split is from: https://github.com/madurangasiriwardena/corpus.sinhala.tools
    emojis = ''.join(emj for emj in emoji.UNICODE_EMOJI.keys())
    return [token for token in
            re.split(r'[.…,‌ ¸‚\"/|—¦”‘\'“’´!@#$%^&*+\-£?˜()\[\]{\}:;–Ê  �‪‬‏0123456789' + emojis + ']', text)
            if token != ""]


def set_spaces_among_emojis(text: str) -> str:
    """
    make spaces among emojis to tokenize them
    :param text: text to be modified
    :return: modified text
    """
    modified_text = ""
    for c in text:
        modified_text += c
        if c in emoji.UNICODE_EMOJI:
            modified_text += " "

    return modified_text


def simplify_sinhalese_text(text: str) -> str:
    """
    simplify
    :param text:
    :return:
    """
    modified_text = ""
    for c in text:
        modified_text += get_simplified_character(c)
    return modified_text


def stem_word(word: str) -> str:
    """
    Stemming words
    :param word: word
    :return: stemmed word
    """
    if len(word) < 4:
        return word

    # remove 'ට'
    if word[-1] == 'ට':
        return word[:-1]

    # remove 'ද'
    if word[-1] == 'ද':
        return word[:-1]

    # remove 'ටත්'
    if word[-3:] == 'ටත්':
        return word[:-3]

    # remove 'එක්'
    if word[-3:] == 'ෙක්':
        return word[:-3]

    # remove 'එ'
    if word[-1:] == 'ෙ':
        return word[:-1]

    # remove 'ක්'
    if word[-2:] == 'ක්':
        return word[:-2]

    # remove 'ගෙ' (instead of ගේ because this step comes after simplifying text)
    if word[-2:] == 'ගෙ':
        return word[:-2]

    # else
    return word


def tokenize(text: str) -> list:
    # todo: add stem_word(token) and simplify_sinhalese_text methods
    return [stem_word(token) for token in split_tokens(replace_url(replace_mention(
        simplify_sinhalese_text(remove_retweet_state(text.strip('"')).lower()))))]

In [None]:
import os
import numpy as np

def tokenize_corpus(corpus: list) -> list:
    return [tokenize(text) for text in corpus]


def transform_class_to_one_hot_representation(classes: list):
    return np.array([DATA_SET_CLASSES[cls] for cls in classes])


def build_dictionary(corpus_token: list) -> dict:
    word_frequency = {}
    dictionary = {}

    for tweet in corpus_token:
        for token in tweet:
            if token in word_frequency:
                word_frequency[token] += 1
            else:
                word_frequency[token] = 1

    frequencies = list(word_frequency.values())
    unique_words = list(word_frequency.keys())

    # sort words by its frequency
    frequency_indexes = np.argsort(frequencies)[::-1]  # reverse for descending
    for index, frequency_index in enumerate(frequency_indexes):
        # 0 is not used and 1 is for UNKNOWN
        dictionary[unique_words[frequency_index]] = index + 2

    return dictionary


def transform_to_dictionary_values(corpus_token: list, dictionary: dict) -> list:
    x_corpus = []
    for tweet in corpus_token:
        # 1 is for unknown (not in dictionary)
        x_corpus.append([dictionary[token] if token in dictionary else 1 for token in tweet])

    return x_corpus


def get_calculated_user_profile(user_ids: list, classes: list) -> dict:
    user_profile = {}
    user_tweets_count = {}

    for i in range(len(user_ids)):
        # count tweets with class
        try:
            user_profile[user_ids[i], classes[i]] += 1
        except KeyError:
            user_profile[user_ids[i], classes[i]] = 1

        # count tweets
        try:
            user_tweets_count[user_ids[i]] += 1
        except KeyError:
            user_tweets_count[user_ids[i]] = 1

    # calculate mean
    for profile in user_profile.keys():
        user_profile[profile] /= user_tweets_count[profile[0]]

    return user_profile


def append_user_profile_features(x_corpus: list, user_ids: list, user_profile: dict) -> list:
    """
    append neutral, racism, sexism user profile probability feature to the end of each sentence
    :param x_corpus: corpus with coded to integers
    :param user_ids: list of user ids in the order of x_corpus
    :param user_profile: user profile with user's probabilities for neutral, racism, sexism
    :return: appended x_corpus
    """
    for i in range(len(x_corpus)):
        uid = user_ids[i]
        try:
            neutral = user_profile[uid, "Neutral"]
        except KeyError:
            neutral = 0

        try:
            racism = user_profile[uid, "Racist"]
        except KeyError:
            racism = 0

        try:
            sexism = user_profile[uid, "Sexism"]
        except KeyError:
            sexism = 0

        x_corpus[i].append(int(neutral * 1000))
        x_corpus[i].append(int(racism * 1000))
        x_corpus[i].append(int(sexism * 1000))

    return x_corpus


def create_next_results_folder():
    """
    Create the next results folder and returns the directory name
    :return: directory name
    """
    result_no = 0
    directory = "results_%d" % result_no

    while os.path.exists(directory):
        result_no += 1
        directory = "results_%d" % result_no

    os.makedirs(directory)
    return directory


def get_last_results_folder():
    """
    Return last created results directory
    :return: last created results directory
    """
    result_no = 0
    directory = "results_%d" % result_no

    while os.path.exists(directory):
        result_no += 1
        directory = "results_%d" % result_no

    return "results_%d" % (result_no - 1)

In [None]:
import logging
import sys

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from keras import regularizers
from keras.layers import Dense, LSTM
from keras.layers.embeddings import Embedding
from keras.models import Sequential
from keras.models import load_model
from keras.optimizers import Adam
from keras.preprocessing import sequence
from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import train_test_split

In [None]:
logging.basicConfig(format='%(levelname)s %(asctime)s: %(message)s', level=logging.INFO)

data_frame = pd.read_csv(DATA_SET_PATH)
data_set = data_frame.values

# for id, tweet in enumerate(data_set[:, :]):
#     print(id, tweet[-1])

# for index, tweet in enumerate(data_frame.text):
#     print(index, tweet)

logging.info("Tokenizing the corpus")
corpus_token = tokenize_corpus(data_set[:, DATA_SET_TEXT])

logging.info("Building the dictionary")
dictionary = build_dictionary(corpus_token)
dictionary_length = len(dictionary) + 2  # 0 is not used and 1 is for UNKNOWN

# to get sentence back
# ' '.join([list(dictionary.keys())[i-2] for i in x_test[0] if i > 1])

logging.info("Transforming the corpus to dictionary values")
x_corpus = transform_to_dictionary_values(corpus_token, dictionary)

y_corpus = transform_class_to_one_hot_representation(data_set[:, DATA_SET_CLASS])
user_profile = get_calculated_user_profile(data_set[:, DATA_SET_USER_ID], data_set[:, DATA_SET_CLASS])

# add user profile feature to end of the sentence
# from: Detecting Offensive Language in Tweets using Deep Learning
# by: Georgios K. Pitsilis, Heri Ramampiaro and Helge Langseth
max_word_count = MAX_WORD_COUNT + 3
x_corpus = append_user_profile_features(x_corpus=x_corpus, user_ids=data_set[:, DATA_SET_USER_ID],
                                        user_profile=user_profile)

# padding with zeros if not enough and else drop left-side words
x_corpus = sequence.pad_sequences(x_corpus, maxlen=max_word_count)

In [None]:
# ################## Deep Neural Network ###################### #
FOLDS_COUNT = 5
MAX_EPOCHS = 15
VALIDATION_TEST_SIZE = 0.12

# splitting data for 5-fold cross validation
k_fold = StratifiedKFold(n_splits=FOLDS_COUNT, shuffle=True, random_state=18)
# to split, raw format (integer) is required
y_corpus_raw = [0 if cls[2] == 1 else (1 if cls[1] == 1 else 2) for cls in y_corpus]

directory = create_next_results_folder()  # directory for saving results
logging.info("created the directory: %s" % directory)

fold = 0
for train_n_validation_indexes, test_indexes in k_fold.split(x_corpus, y_corpus_raw):
    x_train_n_validation = x_corpus[train_n_validation_indexes]
    y_train_n_validation = y_corpus[train_n_validation_indexes]
    x_test = x_corpus[test_indexes]
    y_test = y_corpus[test_indexes]

    # train and validation data sets
    x_train, x_valid, y_train, y_valid = train_test_split(x_train_n_validation, y_train_n_validation,
                                                          test_size=VALIDATION_TEST_SIZE, random_state=94)

    # ################## Deep Neural Network Model ###################### #
    model = Sequential()
    model.add(Embedding(input_dim=dictionary_length, output_dim=60, input_length=max_word_count))
    model.add(LSTM(600))
    model.add(Dense(units=max_word_count, activation='tanh', kernel_regularizer=regularizers.l2(0.04),
                    activity_regularizer=regularizers.l2(0.015)))
    model.add(Dense(units=max_word_count, activation='relu', kernel_regularizer=regularizers.l2(0.01),
                    bias_regularizer=regularizers.l2(0.01)))
    model.add(Dense(3, activation='softmax', kernel_regularizer=regularizers.l2(0.001)))
    adam_optimizer = Adam(lr=0.001, decay=0.0001)
    model.compile(loss='categorical_crossentropy', optimizer=adam_optimizer, metrics=['accuracy'])

    print(model.summary())
    # ################## Deep Neural Network Model ###################### #

    best_accuracy = 0
    best_loss = 100000
    best_epoch = 0

    epoch_history = {
        'acc': [],
        'val_acc': [],
        'loss': [],
        'val_loss': [],
    }

    # for each epoch
    for epoch in range(MAX_EPOCHS):
        logging.info("Fold: %d/%d" % (fold, FOLDS_COUNT))
        logging.info("Epoch: %d/%d" % (epoch, MAX_EPOCHS))
        history = model.fit(x=x_train, y=y_train, epochs=1, batch_size=1, validation_data=(x_valid, y_valid),
                            verbose=1, shuffle=False)

        # get validation (test) accuracy and loss
        accuracy = history.history['val_acc'][0]
        loss = history.history['val_loss'][0]

        # set epochs' history
        epoch_history['acc'].append(history.history['acc'][0])
        epoch_history['val_acc'].append(history.history['val_acc'][0])
        epoch_history['loss'].append(history.history['loss'][0])
        epoch_history['val_loss'].append(history.history['val_loss'][0])

        # select best epoch and save to disk
        if accuracy >= best_accuracy and loss < best_loss + 0.01:
            logging.info("Saving model")
            model.save("%s/model_fold_%d.h5" % (directory, fold))

            best_accuracy = accuracy
            best_loss = loss
            best_epoch = epoch
        # end of epoch

    # Plot training & validation accuracy values
    plt.plot(epoch_history['acc'])
    plt.plot(epoch_history['val_acc'])
    plt.title('Model Accuracy')
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Validation'], loc='upper left')
    plt.savefig("%s/plot_model_accuracy_%d" % (directory, fold))
    plt.show()

    # Plot training & validation loss values
    plt.plot(epoch_history['loss'])
    plt.plot(epoch_history['val_loss'])
    plt.title('Model loss')
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Validation'], loc='upper left')
    plt.savefig("%s/plot_model_loss_%d" % (directory, fold))
    plt.show()

    # Saving evolution history of epochs in this fold
    f = open("%s/history_fold_%d.txt" % (directory, fold), 'w')
    f.write("best_epoch: %d\n" % best_epoch)
    f.write("epoch,training_accuracy,training_loss,validation_accuracy,validation_loss\n")
    for i in range(MAX_EPOCHS):
        f.write("%d,%f,%f,%f,%f\n" % (i, epoch_history['acc'][i], epoch_history['loss'][i],
                                      epoch_history['val_acc'][i], epoch_history['val_loss'][i]))
    f.close()

    # load the best model saved on disk
    del model
    model = load_model("%s/model_fold_%d.h5" % (directory, fold))

    evaluation = model.evaluate(x=x_test, y=y_test)
    logging.info("Accuracy: %f" % evaluation[1])

    prediction = model.predict(x_test)

    # save predictions to disk
    test_indexes = test_indexes.reshape(test_indexes.shape[0], 1)
    tweet_ids = data_set[:, DATA_SET_USER_ID][test_indexes]
    true_labels = np.asarray(y_corpus_raw, dtype=int)[test_indexes]
    class_1 = prediction[:, 2]
    class_2 = prediction[:, 1]
    class_3 = prediction[:, 0]
    output = np.append(tweet_ids, true_labels, axis=1)
    output = np.append(output, class_1.reshape(test_indexes.shape[0], 1), axis=1)
    output = np.append(output, class_2.reshape(test_indexes.shape[0], 1), axis=1)
    output = np.append(output, class_3.reshape(test_indexes.shape[0], 1), axis=1)

    np.savetxt("%s/test_set_predicted_output_%d.txt" % (directory, fold), X=output, fmt="%s", delimiter=",")
    logging.info("Fold: %d - Completed" % fold)
    fold += 1
    # end of fold