# Notebook for Digestion Diseases Prediction using Word Embeddings and Convolutional Neural Network

## Import Libraries

In [21]:
import pandas as pd
import numpy as np
import ast
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.utils import pad_sequences
import re
import json

## Data Preprocessing Functions

In [22]:
def get_data() -> pd.DataFrame :
  """
  Get pandas.DataFrame for diseases data.

  Params : None

  Return : pandas.DataFrame
  """

  try :
    df = pd.read_csv('Digestion Diseases Symptoms.csv')
  except Exception as e :
    print(e)

  return df

In [23]:
def str_to_list(dataframe: pd.DataFrame, col: str) -> pd.DataFrame :
  """
  Convert values to list in a column from string with valid list format
  (start and end with [ and ] respectively.)

  Params :
  dataframe : pandas.DataFrame
  col : str -> string of column name

  Return : pandas.DataFrame
  """

  try :
    dataframe[col] = dataframe[col].apply(lambda x: ast.literal_eval(x))
    if type(dataframe[col].values[0]) == str :
      dataframe[col] = dataframe[col].apply(lambda x: ast.literal_eval(x))
  except Exception as e :
    print(e)

  return dataframe

In [24]:
def sample(dataframe: pd.DataFrame, col_of_list: str,
           label_col: str, num_sample: int = 5, n: int = 5,
           random_state: int = 1) -> pd.DataFrame :
  """
  Sample randomly from list for every record. Column col_of_list in the dataframe
  must have list data type as the values.

  Params :
  1. pandas.DataFrame

  2. col_of_list (column name) : str -> Column in the dataframe that has list as its values.
  For example : dataframe.loc[0, col_of_list] = [a,b,c]

  3. label_col (column name) : str -> Class column in the dataframe.

  4. num_sample : int -> How many samples to generate.
  For example, num_samples = 3 and a record has the list [a,b,c]. Then,
  the list will be sampled 3 times, generating new 2 records for the same class.

  5. n : int -> How many values for each sample.

  6. random_state : int -> Integer for random seed for reproducibility.

  Return : pandas.DataFrame

  """
  np.random.seed(random_state)
  samples, labels = [], []

  try :
    col_of_list_index = dataframe.columns.to_list().index(col_of_list)
    label_col_index = dataframe.columns.to_list().index(label_col)

    for record_num in range(len(dataframe)) :
      record_list = dataframe.iloc[record_num, col_of_list_index]
      record_label = dataframe.iloc[record_num, label_col_index]
      if len(record_list) > n :
        for _ in range(num_sample):
          samples.append(np.random.choice(record_list, n, replace=False))
          labels.append(record_label)
      else :
        for _ in range(num_sample):
          samples.append(np.random.choice(record_list, len(record_list)-1, replace=False))
          labels.append(record_label)

    new_df = pd.DataFrame(list(zip(samples, labels)), columns = dataframe.columns)
    return new_df

  except Exception as e :
    print(e)

In [25]:
def make_merged_data(dataframe: pd.DataFrame, col_of_list: str,
                     label_col: str, num_samples: list,
                     n_per_samples: list, random_state: int = 1) -> pd.DataFrame :
  """
  Make a pandas DataFrame that contains concatenated pandas DataFrames that have been
  sampled.

  Params :
  1. dataframe : pandas.DataFrame

  2. col_of_list (column name) : str -> Column in the dataframe that has list as its values.
  For example : dataframe.loc[0, col_of_list] = [a,b,c]

  3. label_col (column name) : str -> Class column in the dataframe.

  4. num_samples : list -> List consists of number of samples that wanted to be generated.
  For example, [5,4,3] means that the function will return pandas.DataFrame that is the concatenated
  of 3 pandas.DataFrame, upsampled 5, 4, and 3 respectively.

  5. n_per_samples : list -> List consists of number of values for each sample in corresponding
  pandas.DataFrame that has been upsampled based on num_samples. For example, if num_samples = [5,4,3]
  and n_per_samples = [4,3,2], this means that the function will return the concatenated pandas.DataFrame
  which consists of these : 5x upsampled pandas.Dataframe, each sample with 4 values/elements, etc.

  6. random_state : int -> Integer for random seed for reproducibility.

  Return : pandas.DataFrame -> Concatenated dataframe.
  """

  datasets = []

  try :
    for num_sample, n in zip(num_samples, n_per_samples):
      df_sampled = sample(dataframe, col_of_list, label_col, num_sample, n, random_state)
      datasets.append(df_sampled)
    df_concat = pd.concat(datasets).sort_values(by=label_col).reset_index(drop=True)
    return df_concat
  except Exception as e :
    print(e)

In [26]:
def shuffle(dataframe: pd.DataFrame, random_state: int = 1) -> pd.DataFrame :
  """
  Shuffle the pandas.DataFrame

  Params :

  1. dataframe : pandas.DataFrame
  2. random_state : int -> Random seed for reproducibility.

  Return : pandas.DataFrame -> Shuffled pandas.DataFrame.

  """

  try :
    new_df = dataframe.sample(len(dataframe), random_state=random_state)
    new_df = new_df.reset_index(drop=True)
    return new_df
  except Exception as e :
    print(e)

In [27]:
def give_numerical_label(dataframe: pd.DataFrame, labels_col: str) -> tuple[pd.DataFrame, dict] :
  """
  Give numerical labels for the class column

  Params :

  1. dataframe : pd.DataFrame

  2. labels_col : str -> name of label column.

  Return :

  1. pandas.DataFrame -> Annotated pandas.DataFrame

  2. col_dict : dict -> Dictionary that saves labels in integer as the key and the real labels as the values.
  """

  try:
    labels = dataframe[labels_col].unique()
    col_dict = {key:value for key, value in zip(list(range(len(labels))), labels)}
    reverse_dict = {value:key for key, value in col_dict.items()}
    dataframe["Label"] = dataframe[labels_col].apply(lambda x: reverse_dict[x])
    return dataframe, col_dict
  except Exception as e :
    print(e)

In [28]:
def split(dataframe: pd.DataFrame, stratify_col: str,
          test_size: float = 0.2, random_state: int = 1) -> tuple[pd.DataFrame, pd.DataFrame] :
  """
  Split the DataFrame.

  Params :
  1. dataframe : pandas.DataFrame
  2. stratify_col : str -> Column name to be stratified in train_test_split.
  3. test_size : float -> Percentage of test set in floating point.
  4. random_state : int -> Random seed for reproducibility.

  Return : (pandas.DataFrame, pandas.DataFrame) -> train set and test set.
  """

  try:
    dataframe_train, dataframe_test = train_test_split(dataframe, test_size=test_size,
                                                       random_state=random_state, stratify=dataframe[stratify_col])
    return dataframe_train, dataframe_test
  except Exception as e:
    print(e)

In [29]:
def list_to_sentences(dataframe: pd.DataFrame, col_of_list: str) -> pd.DataFrame :
  """
  Convert list values in a column of the DataFrame to string.

  Params :
  1. dataframe : pandas.DataFrame
  2. col_of_list : str -> Column name that has list data type for its values.

  Return : pandas.DataFrame
  """

  try :
    dataframe[col_of_list] = dataframe[col_of_list].apply(lambda x: ", ".join(x))
    return dataframe
  except Exception as e :
    print(e)

## Data Gathering Functions

In [30]:
def get_train_test_data(col_of_list: str,
                        label_col: str, num_samples: list,
                        n_per_samples: list,
                        random_state: int = 1) -> tuple[tuple, tuple, dict] :
    """
    Get train and test data.

    Params :

    1. col_of_list : str -> Column name that has list data type for its values.

    2. label_col : str -> Name of label column.

    3. num_samples : list -> List consists of number of samples that wanted to be generated.
    For example, [5,4,3] means that the function will return pandas.DataFrame that is the concatenated
    of 3 pandas.DataFrame, upsampled 5, 4, and 3 respectively.

    4. n_per_samples : list -> List consists of number of values for each sample in corresponding
    pandas.DataFrame that has been upsampled based on num_samples. For example, if num_samples = [5,4,3]
    and n_per_samples = [4,3,2], this means that the function will return the concatenated pandas.DataFrame
    which consists of these : 5x upsampled pandas.Dataframe, each sample with 4 values/elements, etc.

    5. random_state : int -> Integer for random seed for reproducibility.

    Return :

    tuple -> Consists of 3 elements :

    1. train_data : tuple of train sentences and train labels,

    2. test_data : tuple of test sentences and test labels,

    3. col_dict : dict -> Dictionary that saves labels in integer as the key and the real labels as the values.
    """
    df = get_data()
    df = str_to_list(df, col_of_list)
    df = make_merged_data(df, col_of_list, label_col,
                        num_samples, n_per_samples, random_state)
    df = shuffle(df, random_state)
    df, col_dict = give_numerical_label(df, label_col)
    df = list_to_sentences(df, col_of_list)
    train_df, test_df = split(df, "Label")

    train_sentences = train_df[col_of_list].values
    test_sentences = test_df[col_of_list].values
    train_labels = train_df["Label"].values
    test_labels = test_df["Label"].values

    train_data = (train_sentences, train_labels)
    test_data = (test_sentences, test_labels)

    return train_data, test_data, col_dict

In [31]:
def get_train_test_sequences(data: tuple, vocab_size: int = 1000,
                             max_length: int = 88, oov_tok: str = "<UNK>",
                             padding_type: str = "post", trunc_type: str = "post") -> tuple[tuple, tuple, dict] :
    """
    Get training and testing sequences with their labels.

    Params :

    1. data : tuple -> Consists of train_data and test_data, each is tuple with 2 elements,
    the sentences and the labels.

    2. vocab_size : int -> Maximum number of words the tokenizer could save.

    3. max_length : int -> Maximum length of a sentence that the tokenizer could save.

    4. oov_tok : str -> Out-of-vocab token for unseen words in training data.

    5. padding_type : str -> Type of padding in pad_sequences.

    6. trunc_type : str -> Type of truncating in pad_sequences.

    Return :

    tuple -> Consists of 3 elements :

    1. train_data : tuple of train sequences and train labels,

    2. test_data : tuple of test sequences and test labels

    3. word_index : dict -> Dictionary that saves word index from the tokenizer.
    """
    tokenizer = Tokenizer(num_words=vocab_size, oov_token=oov_tok)
    (train_sentences, train_labels), (test_sentences, test_labels) = data
    tokenizer.fit_on_texts(train_sentences)
    train_sequences = tokenizer.texts_to_sequences(train_sentences)
    test_sequences = tokenizer.texts_to_sequences(test_sentences)

    train_sequences = pad_sequences(train_sequences, maxlen=max_length,
                                    padding=padding_type, truncating=trunc_type)
    test_sequences = pad_sequences(test_sequences, maxlen=max_length,
                                   padding=padding_type, truncating=trunc_type)

    train_data = train_sequences, train_labels
    test_data = test_sequences, test_labels
    return train_data, test_data, tokenizer.word_index

## Model Building Functions

In [32]:
def build_model(vocab_size: int = 1000, embedding_dim: int = 32,
                max_length: int = 88, print_summary: bool = False) -> tf.keras.models.Model :
    """
    Building Deep Learning model for text classification.

    Params :

    1. vocab_size : int -> Maximum number of words the tokenizer could save.

    2. embedding_dim : int -> Dimension of word embedding used in Embedding Layer.

    3. max_length : int -> Maximum length of a sentence that the tokenizer could save.

    4. print_summary : bool -> Print model summary if True.

    Return : tf.keras.models.Model
    """
    model = tf.keras.models.Sequential([
        tf.keras.layers.Embedding(vocab_size, embedding_dim, input_length=max_length),
        tf.keras.layers.Conv1D(filters=8, kernel_size=8, padding='same', activation='relu'),
        tf.keras.layers.MaxPooling1D(pool_size=2),
        tf.keras.layers.Conv1D(filters=16, kernel_size=16, activation='relu'),
        tf.keras.layers.GlobalMaxPooling1D(),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(units=32, activation='relu'),
        tf.keras.layers.Dense(units=14, activation='softmax')
    ])

    if print_summary :
        model.summary()
    return model

In [33]:
def compile_model(model: tf.keras.models.Model,
                  optimizer: tf.keras.optimizers.Optimizer,
                  loss: tf.keras.losses.Loss,
                  metrics: list) :
    """
    Compile the model with optimizer, loss, and metrics.

    Params :

    1. model : tf.keras.models.Model

    2. optimizer : tf.keras.optimizers.Optimizer

    3. loss : tf.keras.losses.Loss

    4. metrics : list -> Consists of strings that represents metrics' names that
    should be displayed while training.

    Return : None
    """
    model.compile(optimizer=optimizer,
                  loss=loss,
                  metrics=metrics)

In [34]:
def make_callback(threshold: float = 0.98) -> tf.keras.callbacks.Callback :
    """
    Make custom callback that stop training where the metrics have reached certain threshold.

    Params :

    1. threshold : float -> threshold of the metrics in floating point percentage.

    Return : tf.keras.callbacks.Callback
    """
    class myCallback(tf.keras.callbacks.Callback):
        def on_epoch_end(self, epoch, logs=None):
            if logs.get('accuracy') >= threshold and logs.get('val_accuracy') >= threshold :
                self.model.stop_training = True

    mycallback = myCallback()
    return mycallback

In [35]:
def training(model: tf.keras.models.Model, data: tuple, epochs: int = 1000,
             use_callback: bool = True) -> tf.keras.callbacks.History :
    """
    Training the model.

    Params :

    1. model : tf.keras.models.Model

    2. data : tuple -> Consists of train_data and test_data, each is tuple with 2 elements,
    the sentences and the labels.

    3. epochs : int -> Number of epochs for training.

    4. use_callback : bool -> Use the custom callback if True.

    Return : tf.keras.callbacks.History
    """
    (train_data, train_labels), validation_data = data

    if use_callback :
        callback = make_callback()
        history = model.fit(train_data, train_labels, epochs=epochs,
                            validation_data=validation_data, callbacks=[callback])
    else :
        history = model.fit(train_data, train_labels, epochs=epochs,
                            validation_data=validation_data)

    return history

In [36]:
def save_model(model: tf.keras.models.Model) :
    model.save('model.h5')

## Train the Model

In [37]:
def run_training():
    train_data, test_data, col_dict = get_train_test_data("Gejala", "Penyakit",
                                                        [20]*7, [8,7,6,5,4,3,2])

    train_sequenced, test_sequenced, word_index = get_train_test_sequences((train_data, test_data))

    model = build_model()

    compile_model(model, tf.keras.optimizers.Adam(),
                tf.keras.losses.SparseCategoricalCrossentropy(),
                ['accuracy'])

    history = training(model, (train_sequenced, test_sequenced))
    print("\nTraining accuracy = {:.2f} %\nTesting accuracy = {:.2f} %".format(history.history['accuracy'][-1]*100,
                                                                               history.history['val_accuracy'][-1]*100))

    save_model(model)

    with open('word_index.json', 'w') as words:
        json.dump(word_index, words)

    with open('label_dict.json', 'w') as labels_dict:
        json.dump(col_dict, labels_dict)

In [38]:
run_training()

Epoch 1/1000
Epoch 2/1000
Epoch 3/1000
Epoch 4/1000
Epoch 5/1000
Epoch 6/1000
Epoch 7/1000
Epoch 8/1000
Epoch 9/1000
Epoch 10/1000
Epoch 11/1000
Epoch 12/1000
Epoch 13/1000
Epoch 14/1000
Epoch 15/1000
Epoch 16/1000
Epoch 17/1000
Epoch 18/1000
Epoch 19/1000
Epoch 20/1000
Epoch 21/1000
Epoch 22/1000
Epoch 23/1000
Epoch 24/1000
Epoch 25/1000
Epoch 26/1000
Epoch 27/1000
Epoch 28/1000
Epoch 29/1000
Epoch 30/1000
Epoch 31/1000
Epoch 32/1000
Epoch 33/1000
Epoch 34/1000
Epoch 35/1000
Epoch 36/1000
Epoch 37/1000

Training accuracy = 98.21 %
Testing accuracy = 98.21 %


  saving_api.save_model(


## Inference Function Helper

In [47]:
def remove_punc(string: str) -> str:
    """
    Remove punctuations from string.

    Param :

    1. string : str

    Return : str
    """
    new_str = re.sub(r'[^\w\s]', ' ', string)
    return new_str

In [48]:
def get_model() -> tf.keras.models.Model :
    """
    Load the model.

    Return : tf.keras.models.Model
    """
    model = tf.keras.models.load_model('model.h5')
    return model

In [49]:
def get_word_index() -> dict:
    """
    Get the word index.

    Return : dict -> The word index.
    """
    with open('word_index.json', 'r') as word_index :
        words = json.load(word_index)
    return words

In [50]:
def get_label_dict() -> dict:
    """
    Get the label dictionary to translate the prediction from integer to string.

    Return : dict -> The label dictionary.
    """
    with open('label_dict.json', 'r') as labels :
        label_dict = json.load(labels)
    new_label_dict = {}
    for key, value in label_dict.items() :
        new_label_dict[int(key)] = value
    return new_label_dict

In [51]:
def to_sequence(string: str, word_index: dict,
                max_length: int = 88) -> np.ndarray :
    """
    Convert the sentence into sequence of integers, refers from the word index.

    Params :

    1. string : str -> The sentence to be converted.

    2. word_index : dict -> The word index dictionary.

    3. max_length : int -> The maximum length of the sequences. Must match the max_length for the model.

    Return : np.ndarray -> The sequence in numpy array form.
    """
    sentence = remove_punc(string).lower()
    sentence_arr = sentence.split()
    words = word_index.keys()
    sequence = []
    for w in sentence_arr :
        if w in words :
            sequence.append(word_index[w])
        else :
            sequence.append(1)

    if len(sequence) < max_length :
        num_zero = max_length - len(sequence)
        sequence += [0]*num_zero
    else :
        sequence = sequence[:max_length]

    sequence = np.array(sequence).reshape((1, -1))
    return sequence

In [52]:
def predict(model: tf.keras.models.Model, sequence: np.ndarray,
            label_dict: dict) -> str :
    """
    Predict the class using the model.

    Params :

    1. model : tf.keras.models.Model

    2. sequence : np.ndarray -> The sequence of integers, resulted from the conversion based on the word index.

    3. label_dict : dict -> The label dictionary that stores integers as its keys and the class string as its values.

    Return : str -> The predicted class.
    """
    result = model.predict(sequence)
    class_pred = np.argmax(result)
    prediction = label_dict[class_pred]

    return prediction

In [53]:
def main() :
    sentence = input("\nMasukkan gejala Anda : ").strip()

    model = get_model()
    word_index = get_word_index()
    label_dict = get_label_dict()

    sequence = to_sequence(sentence, word_index)
    pred = predict(model, sequence, label_dict)

    print("\nPrediction : {}".format(pred))

## Run the Inference

In [55]:
run = True
while run :
  main()
  run_ans = input("Ulang Program? (Y/N) ")
  if run_ans.lower() != 'y' :
    break


Masukkan gejala Anda : saya merasa mual, heartburn, jantung berdebar, dan nyeri di ulu hati

Prediction : GERD
Ulang Program? (Y/N) Y

Masukkan gejala Anda : kulit kuning, nyeri di perut kanan atas, demam, dan urine gelap

Prediction : Batu Empedu
Ulang Program? (Y/N) N


## Convert to TFjs

In [None]:
!pip install tensorflowjs

In [None]:
!tensorflowjs_converter \
--input_format=keras \
/content/model.h5 \
/content/model

In [3]:
!zip -r digestion_diseases_prediction_json.zip model/

  adding: model/ (stored 0%)
  adding: model/model.json (deflated 78%)
  adding: model/group1-shard1of1.bin (deflated 8%)
