In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score
from keras.models import Sequential
from keras.layers import LSTM, Dense, Dropout
from keras.utils import to_categorical
import pickle

In [4]:
def plot_confusion_matrix(cm, classes, title, dataset_name):
    plt.figure(figsize=(8, 8))
    percentages = (cm.T / cm.sum(axis=1) * 100).T
    plt.imshow(percentages, interpolation='nearest', cmap=plt.cm.Blues, vmin=0, vmax=100)
    plt.title(f'Confusion Matrix - {title} - {dataset_name}')
    plt.colorbar(label='Percentage')
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)
    for i in range(len(classes)):
        for j in range(len(classes)):
            plt.text(j, i, f"{cm[i, j]}\n{percentages[i, j]:.1f}%", horizontalalignment='center',
                     color='white' if percentages[i, j] > 50 else 'black')
    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.show()
    plt.close()

In [6]:
def prepare_data(data, sequence_length):
    X, y = [], []
    for i in range(len(data) - sequence_length):
        X.append(data[i:i + sequence_length])
        y.append(data[i + sequence_length])
    return np.array(X), np.array(y)

def create_lstm_model(units=50, dropout_rate=0.2, input_shape=(5, 10), num_classes=2):
    model = Sequential()
    model.add(LSTM(units, input_shape=input_shape, return_sequences=True))
    model.add(Dropout(dropout_rate))
    model.add(LSTM(units))
    model.add(Dropout(dropout_rate))
    model.add(Dense(num_classes, activation='softmax'))
    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
    return model

from sklearn.preprocessing import LabelEncoder

def modeling(Y, X, nama_model, image=False):
    # Stratified split
    sss = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=42)
    for train_index, temp_index in sss.split(X, Y):
        X_train, X_temp = X[train_index], X[temp_index]
        Y_train, Y_temp = Y[train_index], Y[temp_index]

    sss_val = StratifiedShuffleSplit(n_splits=1, test_size=0.5, random_state=42)
    for test_index, val_index in sss_val.split(X_temp, Y_temp):
        X_test, X_val = X_temp[test_index], X_temp[val_index]
        Y_test, Y_val = Y_temp[test_index], Y_temp[val_index]

    # Encode labels
    num_classes = len(np.unique(Y))
    Y_train = to_categorical(Y_train, num_classes=num_classes)
    Y_test = to_categorical(Y_test, num_classes=num_classes)
    Y_val = to_categorical(Y_val, num_classes=num_classes)

    # Check shapes
    print(f"Shape of X_train: {X_train.shape}")
    print(f"Shape of Y_train: {Y_train.shape}")
    print(f"Shape of X_val: {X_val.shape}")
    print(f"Shape of Y_val: {Y_val.shape}")

    # Grid search untuk hyperparameter tuning
    param_grid = {
        'units': [50, 100],
        'dropout_rate': [0.2, 0.3],
        'batch_size': [16, 32],
        'epochs': [10, 20]
    }

    def build_model(units, dropout_rate):
        return create_lstm_model(units=units, dropout_rate=dropout_rate, input_shape=(X_train.shape[1], X_train.shape[2]), num_classes=num_classes)

    model = KerasClassifier(build_fn=build_model, verbose=0)
    grid = GridSearchCV(estimator=model, param_grid=param_grid, n_jobs=-1, cv=3)
    
    # Try-except block to capture detailed errors
    try:
        grid_result = grid.fit(X_train, Y_train)
    except Exception as e:
        print(f"GridSearchCV failed: {e}")
        raise

    # Model terbaik
    best_model = grid_result.best_estimator_
    print("Best parameters:", grid_result.best_params_)

    # Evaluasi model
    Y_val_pred_prob = best_model.predict_proba(X_val)
    Y_val_pred = np.argmax(Y_val_pred_prob, axis=1)
    Y_true = np.argmax(Y_val, axis=1)

    accuracy_val_pred = accuracy_score(Y_true, Y_val_pred)
    precision_val_pred = precision_score(Y_true, Y_val_pred, average='weighted')
    recall_val_pred = recall_score(Y_true, Y_val_pred, average='weighted')
    f1_micro = f1_score(Y_true, Y_val_pred, average='micro')
    f1_macro = f1_score(Y_true, Y_val_pred, average='macro')
    cm = confusion_matrix(Y_true, Y_val_pred)

    print("Accuracy:", accuracy_val_pred)
    print("Precision:", precision_val_pred)
    print("Recall:", recall_val_pred)
    print("F1-Micro:", f1_micro)
    print("F1-Macro:", f1_macro)
    
    plot_confusion_matrix(cm, classes=np.unique(Y_true), title=nama_model, dataset_name='Validation')

    # Save the best model
    joblib.dump(best_model, f'best_model_{nama_model}.pkl')

    return best_model


In [8]:
import joblib

def model_utilization(data, sequence_length, nama_model):
    # Memisahkan fitur dan label
    X = data[:, :-1, :]  # Fitur (misalnya: (n_samples, timesteps, features))
    y = data[:, -1, :]   # Label (misalnya: (n_samples, num_classes))

    # Mengatur jumlah kelas
    num_classes = y.shape[1]
    
    # Menyusun label menjadi one-hot encoding
    y = to_categorical(np.argmax(y, axis=1), num_classes=num_classes)

    # Split data menjadi train, validation, dan test
    X_train, X_temp, Y_train, Y_temp = train_test_split(X, y, test_size=0.4, random_state=42)
    X_val, X_test, Y_val, Y_test = train_test_split(X_temp, Y_temp, test_size=0.5, random_state=42)

    # Memanggil fungsi modeling
    best_model = modeling(Y_train, X_train, nama_model)
    
    # Evaluasi model pada data val
    Y_val_pred_prob = best_model.predict_proba(X_val)
    Y_val_pred = np.argmax(Y_val_pred_prob, axis=1)
    Y_val_true = np.argmax(Y_val, axis=1)

    accuracy_val_pred = accuracy_score(Y_val_true, Y_val_pred)
    precision_val_pred = precision_score(Y_val_true, Y_val_pred, average='weighted')
    recall_val_pred = recall_score(Y_val_true, Y_val_pred, average='weighted')
    f1_micro = f1_score(Y_val_true, Y_val_pred, average='micro')
    f1_macro = f1_score(Y_val_true, Y_val_pred, average='macro')
    cm = confusion_matrix(Y_val_true, Y_val_pred)

    print("Validation Accuracy:", accuracy_val_pred)
    print("Validation Precision:", precision_val_pred)
    print("Validation Recall:", recall_val_pred)
    print("Validation F1-Micro:", f1_micro)
    print("Validation F1-Macro:", f1_macro)
    print("Confusion Matrix:\n", cm)

    # Save evaluation results to a file
    with open(f'evaluation_{nama_model}.txt', 'w') as f:
        f.write(f"Validation Accuracy: {accuracy_val_pred}\n")
        f.write(f"Validation Precision: {precision_val_pred}\n")
        f.write(f"Validation Recall: {recall_val_pred}\n")
        f.write(f"Validation F1-Micro: {f1_micro}\n")
        f.write(f"Validation F1-Macro: {f1_macro}\n")
        f.write(f"Confusion Matrix:\n{cm}\n")
    
    # Save the best model
    with open(f'best_model_{nama_model}.pkl', 'wb') as file:
        joblib.dump(best_model, file)
    
    return best_model

In [26]:
# import numpy as np
# import pandas as pd
# from sklearn.model_selection import train_test_split, GridSearchCV
# from sklearn.preprocessing import StandardScaler
# from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
# from keras.utils import to_categorical
# from keras.models import Sequential
# from keras.layers import LSTM, Dense, Dropout
# import joblib
# import matplotlib.pyplot as plt
# import seaborn as sns

# def create_lstm_model(units=50, dropout_rate=0.2, input_shape=(None, 17), num_classes=2):
#     model = Sequential()
#     model.add(LSTM(units, input_shape=input_shape, return_sequences=True))
#     model.add(Dropout(dropout_rate))
#     model.add(LSTM(units))
#     model.add(Dropout(dropout_rate))
#     model.add(Dense(num_classes, activation='softmax'))
#     model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
#     return model

# def prepare_data(df, sequence_length, feature_columns, target_column):
#     X, y = [], []
#     for i in range(len(df) - sequence_length):
#         X.append(df[feature_columns].values[i:i + sequence_length])
#         y.append(df[target_column].values[i + sequence_length])
#     return np.array(X), np.array(y)

# def plot_confusion_matrix(cm, classes, title, dataset_name):
#     plt.figure(figsize=(8, 8))
#     percentages = (cm.T / cm.sum(axis=1) * 100).T
#     plt.imshow(percentages, interpolation='nearest', cmap=plt.cm.Blues, vmin=0, vmax=100)
#     plt.title(f'Confusion Matrix - {title} - {dataset_name}')
#     plt.colorbar(label='Percentage')
#     tick_marks = np.arange(len(classes))
#     plt.xticks(tick_marks, classes, rotation=45)
#     plt.yticks(tick_marks, classes)
#     for i in range(len(classes)):
#         for j in range(len(classes)):
#             plt.text(j, i, f"{cm[i, j]}\n{percentages[i, j]:.1f}%", horizontalalignment='center',
#                      color='white' if percentages[i, j] > 50 else 'black')
#     plt.tight_layout()
#     plt.ylabel('True label')
#     plt.xlabel('Predicted label')
#     plt.show()
#     plt.close()

# def build_model(units, dropout_rate, input_shape, num_classes):
#     return create_lstm_model(units=units, dropout_rate=dropout_rate, input_shape=input_shape, num_classes=num_classes)

# def modeling(Y, X, nama_model):
#     # Stratified split
#     X_train, X_temp, Y_train, Y_temp = train_test_split(X, Y, test_size=0.2, random_state=42, stratify=Y)
#     X_val, X_test, Y_val, Y_test = train_test_split(X_temp, Y_temp, test_size=0.5, random_state=42, stratify=Y_temp)

#     # One-hot encoding for labels
#     num_classes = len(np.unique(Y))
#     Y_train = to_categorical(Y_train, num_classes=num_classes)
#     Y_test = to_categorical(Y_test, num_classes=num_classes)
#     Y_val = to_categorical(Y_val, num_classes=num_classes)

#     # Define the Keras model
#     model = KerasClassifier(build_fn=build_model, input_shape=(X_train.shape[1], X_train.shape[2]), num_classes=num_classes, verbose=0)

#     # Grid search for hyperparameter tuning
#     param_grid = {
#         'units': [20, 50],  # Range of units to test
#         'dropout_rate': [0.2, 0.3],  # Range of dropout rates to test
#         'batch_size': [16, 32],  # Range of batch sizes to test
#         'epochs': [5, 10]  # Range of epochs to test
#     }

#     grid = GridSearchCV(estimator=model, param_grid=param_grid, n_jobs=1, cv=2)  # Use fewer jobs and folds
#     grid_result = grid.fit(X_train, Y_train)

#     # Best model
#     best_model = grid_result.best_estimator_
#     print("Best parameters:", grid_result.best_params_)

#     # Evaluation
#     Y_val_pred_prob = best_model.predict_proba(X_val)
#     Y_val_pred = np.argmax(Y_val_pred_prob, axis=1)
#     Y_true = np.argmax(Y_val, axis=1)

#     accuracy_val_pred = accuracy_score(Y_true, Y_val_pred)
#     precision_val_pred = precision_score(Y_true, Y_val_pred, average='weighted')
#     recall_val_pred = recall_score(Y_true, Y_val_pred, average='weighted')
#     f1_micro_val_pred = f1_score(Y_true, Y_val_pred, average='micro')
#     f1_macro_val_pred = f1_score(Y_true, Y_val_pred, average='macro')
#     cm = confusion_matrix(Y_true, Y_val_pred)

#     print("Accuracy:", accuracy_val_pred)
#     print("Precision:", precision_val_pred)
#     print("Recall:", recall_val_pred)
#     print("F1 Micro:", f1_micro_val_pred)
#     print("F1 Macro:", f1_macro_val_pred)
    
#     # Save the evaluation results
#     with open(f'evaluation_{nama_model}.txt', 'w') as f:
#         f.write(f"Accuracy: {accuracy_val_pred}\n")
#         f.write(f"Precision: {precision_val_pred}\n")
#         f.write(f"Recall: {recall_val_pred}\n")
#         f.write(f"F1 Micro: {f1_micro_val_pred}\n")
#         f.write(f"F1 Macro: {f1_macro_val_pred}\n")

#     plot_confusion_matrix(cm, classes=np.unique(Y_true), title=nama_model, dataset_name='Validation')

#     # Save the best model
#     joblib.dump(best_model, f'best_model_{nama_model}.pkl')

#     return best_model

# def model_utilization(file_path, sequence_length, nama_model):
#     try:
#         df = pd.read_csv(file_path, sep=';')
#     except pd.errors.ParserError as e:
#         print(f"Error parsing the CSV file: {e}")
#         return None

#     # Clean data: remove rows with inconsistent columns
#     df = df.dropna()  # Drop rows with any NaN values
#     df = df.loc[:, ~df.columns.str.contains('^Unnamed')]  # Drop columns that start with 'Unnamed'

#     feature_columns = [f'F{i}' for i in range(1, 18)]
#     target_column = 'observation'

#     # Prepare data
#     X, y = prepare_data(df, sequence_length, feature_columns, target_column)

#     # Modeling and evaluation
#     best_model = modeling(y, X, nama_model)

#     return best_model

# # Menjalankan fungsi model_utilization jika dijalankan sebagai script utama
# if __name__ == "__main__":
#     # Contoh path file
#     file_path = '../../00_Data_Input/sampel_to_explore.csv'  # Update path here
#     sequence_length = 5
#     nama_model = 'LSTM_Model'
#     best_model = model_utilization(file_path, sequence_length, nama_model)


IndexError: index 7 is out of bounds for axis 1 with size 7

In [None]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
from keras.utils import to_categorical
from keras.models import Sequential
from keras.layers import LSTM, Dense, Dropout
import joblib
import matplotlib.pyplot as plt
import seaborn as sns

def create_lstm_model(units=50, dropout_rate=0.2, input_shape=(None, 17), num_classes=2):
    model = Sequential()
    model.add(LSTM(units, input_shape=input_shape, return_sequences=True))
    model.add(Dropout(dropout_rate))
    model.add(LSTM(units))
    model.add(Dropout(dropout_rate))
    model.add(Dense(num_classes, activation='softmax'))
    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
    return model

def prepare_data(df, sequence_length, feature_columns, target_column):
    X, y = [], []
    for i in range(len(df) - sequence_length):
        X.append(df[feature_columns].iloc[i:i + sequence_length].values)
        y.append(df[target_column].iloc[i + sequence_length])
    return np.array(X), np.array(y)

def plot_confusion_matrix(cm, classes, title, dataset_name):
    plt.figure(figsize=(8, 8))
    percentages = (cm.T / cm.sum(axis=1) * 100).T
    plt.imshow(percentages, interpolation='nearest', cmap=plt.cm.Blues, vmin=0, vmax=100)
    plt.title(f'Confusion Matrix - {title} - {dataset_name}')
    plt.colorbar(label='Percentage')
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)
    for i in range(len(classes)):
        for j in range(len(classes)):
            plt.text(j, i, f"{cm[i, j]}\n{percentages[i, j]:.1f}%", horizontalalignment='center',
                     color='white' if percentages[i, j] > 50 else 'black')
    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.show()
    plt.close()

def build_model(units, dropout_rate, input_shape, num_classes):
    return create_lstm_model(units=units, dropout_rate=dropout_rate, input_shape=input_shape, num_classes=num_classes)

def modeling(Y, X, nama_model):
    # Adjust labels to start from 0
    Y = Y - 1  # Assuming labels start from 1 and need to be adjusted to start from 0

    # Stratified split
    X_train, X_temp, Y_train, Y_temp = train_test_split(X, Y, test_size=0.2, random_state=42, stratify=Y)
    X_val, X_test, Y_val, Y_test = train_test_split(X_temp, Y_temp, test_size=0.5, random_state=42, stratify=Y_temp)

    # Ensure that X is float32 and Y is int
    X_train = X_train.astype(np.float32)
    X_val = X_val.astype(np.float32)
    X_test = X_test.astype(np.float32)
    Y_train = Y_train.astype(np.int)
    Y_val = Y_val.astype(np.int)
    Y_test = Y_test.astype(np.int)

    # One-hot encoding for labels
    num_classes = len(np.unique(Y))
    Y_train = to_categorical(Y_train, num_classes=num_classes)
    Y_test = to_categorical(Y_test, num_classes=num_classes)
    Y_val = to_categorical(Y_val, num_classes=num_classes)

    # Define the Keras model
    model = KerasClassifier(build_fn=build_model, input_shape=(X_train.shape[1], X_train.shape[2]), num_classes=num_classes, verbose=0)

    # Grid search for hyperparameter tuning
    param_grid = {
        'units': [20, 50],  # Range of units to test
        'dropout_rate': [0.2, 0.3],  # Range of dropout rates to test
        'batch_size': [16, 32],  # Range of batch sizes to test
        'epochs': [5, 10]  # Range of epochs to test
    }

    grid = GridSearchCV(estimator=model, param_grid=param_grid, n_jobs=1, cv=2)  # Use fewer jobs and folds
    grid_result = grid.fit(X_train, Y_train)

    # Best model
    best_model = grid_result.best_estimator_
    print("Best parameters:", grid_result.best_params_)

    # Evaluation
    Y_val_pred_prob = best_model.predict_proba(X_val)
    Y_val_pred = np.argmax(Y_val_pred_prob, axis=1)
    Y_true = np.argmax(Y_val, axis=1)

    accuracy_val_pred = accuracy_score(Y_true, Y_val_pred)
    precision_val_pred = precision_score(Y_true, Y_val_pred, average='weighted')
    recall_val_pred = recall_score(Y_true, Y_val_pred, average='weighted')
    f1_micro_val_pred = f1_score(Y_true, Y_val_pred, average='micro')
    f1_macro_val_pred = f1_score(Y_true, Y_val_pred, average='macro')
    cm = confusion_matrix(Y_true, Y_val_pred)

    print("Accuracy:", accuracy_val_pred)
    print("Precision:", precision_val_pred)
    print("Recall:", recall_val_pred)
    print("F1 Micro:", f1_micro_val_pred)
    print("F1 Macro:", f1_macro_val_pred)
    
    # Save the evaluation results
    with open(f'evaluation_{nama_model}.txt', 'w') as f:
        f.write(f"Accuracy: {accuracy_val_pred}\n")
        f.write(f"Precision: {precision_val_pred}\n")
        f.write(f"Recall: {recall_val_pred}\n")
        f.write(f"F1 Micro: {f1_micro_val_pred}\n")
        f.write(f"F1 Macro: {f1_macro_val_pred}\n")

    plot_confusion_matrix(cm, classes=np.unique(Y_true), title=nama_model, dataset_name='Validation')

    # Save the best model
    joblib.dump(best_model, f'best_model_{nama_model}.pkl')

    return best_model

def model_utilization(file_path, sequence_length, nama_model):
    try:
        df = pd.read_csv(file_path, sep = ";")
    except pd.errors.ParserError as e:
        print(f"Error parsing the CSV file: {e}")
        return None

    # Clean data: remove rows with inconsistent columns
    df = df.dropna()  # Drop rows with any NaN values

    feature_columns = [f'F{i}' for i in range(1, 18)]
    target_column = 'observation'

    # Prepare data
    X, y = prepare_data(df, sequence_length, feature_columns, target_column)

    # Debugging: Check unique values in y
    print(f"Unique values in y: {np.unique(y)}")

    # Modeling and evaluation
    best_model = modeling(y, X, nama_model)

    return best_model

# Menjalankan fungsi model_utilization jika dijalankan sebagai script utama
if __name__ == "__main__":
    # Contoh path file
    file_path = '../../00_Data_Input/sampel_to_explore.csv'  # Update path here
    sequence_length = 5
    nama_model = 'LSTM_Model'
    best_model = model_utilization(file_path, sequence_length, nama_model)
