In [None]:
#Pembobotan Jarak yang Stabil

import numpy as np
import pandas as pd
import pickle
import concurrent.futures
from os import cpu_count
from tqdm import tqdm
import time

class KNN:
    def __init__(self, k=5, n_jobs=1, metric='minkowski', p=2, weights='uniform'):
        if k < 1 or not isinstance(k, int):
            raise ValueError("Invalid k. k must be an integer greater than 0.")
        if metric not in ['manhattan', 'euclidean', 'minkowski']:
            raise ValueError("Invalid metric. Valid metrics are 'manhattan', 'euclidean', and 'minkowski'.")
        if p < 1 or not isinstance(p, (int, float)):
            raise ValueError("Invalid p. p must be a number greater than 0.")
        if weights not in ['uniform', 'distance']:
            raise ValueError("Invalid weights. Choose 'uniform' or 'distance'.")
        if n_jobs < 1 and n_jobs != -1 or not isinstance(n_jobs, int):
            raise ValueError("Invalid n_jobs. Must be an integer greater than 0, or -1 to use all available cores.")
        
        self.k = k
        self.metric = metric
        self.p = p if metric == 'minkowski' else (1 if metric == 'manhattan' else 2)
        self.weights = weights
        self.n_jobs = cpu_count() if n_jobs == -1 else n_jobs

    def get_params(self, deep=True):
        """Mengembalikan parameter model dalam bentuk dictionary."""
        return {
            "k": self.k,
            "metric": self.metric,
            "p": self.p,
            "weights": self.weights,
            "n_jobs": self.n_jobs,
        }

    def set_params(self, **params):
        """Mengatur parameter model dari dictionary."""
        for key, value in params.items():
            setattr(self, key, value)
        return self

    def _compute_distances(self, test):
        if test.shape[0] != self.X_train.shape[1]:
            raise ValueError("Test instance dimensions must match training instance dimensions.")

        distances = np.linalg.norm(self.X_train - test, ord=self.p, axis=1)
        return distances

    def fit(self, X_train, y_train):
        if len(X_train) != len(y_train):
            raise ValueError("Length of X_train and y_train must be the same.")

        if isinstance(X_train, pd.DataFrame):
            self.X_train = X_train.values.astype(float)
        else:
            self.X_train = np.array(X_train).astype(float)
        self.y_train = pd.Series(y_train) if not isinstance(y_train, pd.Series) else y_train

    def predict(self, X_test):
        """
        Prediksi label untuk dataset uji.

        Args:
            X_test (array-like): Dataset uji.

        Returns:
            np.ndarray: Prediksi untuk data uji.
        """
        if isinstance(X_test, pd.DataFrame):
            X_test = X_test.values.astype(float)
        else:
            X_test = np.array(X_test, dtype=float)
        
        def predict_instance(row):
            """Memprediksi satu data uji."""
            distances = self._compute_distances(row)
            nearest_indices = np.argsort(distances)[:self.k]
            nearest_labels = self.y_train.iloc[nearest_indices]

            if self.weights == 'distance':
                weights = np.exp(-nearest_distances)  # Exponential decay
                weighted_votes = {}
                for label, weight in zip(nearest_labels, weights):
                    weighted_votes[label] = weighted_votes.get(label, 0) + weight
                prediction = max(weighted_votes, key=weighted_votes.get)
            else:
                prediction = nearest_labels.value_counts().idxmax()  # Uniform voting
            return prediction

        # Paralelisme atau sekuensial berdasarkan n_jobs
        start_time = time.time()
        if self.n_jobs != 1:
            with concurrent.futures.ProcessPoolExecutor(max_workers=self.n_jobs) as executor:
                predictions = list(tqdm(executor.map(predict_instance, X_test), total=len(X_test)))
        else:
            predictions = [predict_instance(row) for row in tqdm(X_test)]
        elapsed_time = time.time() - start_time
        print(f"Prediction completed in {elapsed_time:.2f} seconds.")

        return np.array(predictions)

    def save(self, path):
        """Menyimpan model ke file."""
        with open(path, 'wb') as file:
            pickle.dump(self, file)
    
    @staticmethod
    def load(path):
        """Memuat model dari file."""
        with open(path, 'rb') as file:
            return pickle.load(file)

In [None]:
#Penanganan Ties (Seri)

import numpy as np
import pandas as pd
import pickle
import concurrent.futures
from os import cpu_count
from tqdm import tqdm
import time

class KNN:
    def __init__(self, k=5, n_jobs=1, metric='minkowski', p=2, weights='uniform'):
        if k < 1 or not isinstance(k, int):
            raise ValueError("Invalid k. k must be an integer greater than 0.")
        if metric not in ['manhattan', 'euclidean', 'minkowski']:
            raise ValueError("Invalid metric. Valid metrics are 'manhattan', 'euclidean', and 'minkowski'.")
        if p < 1 or not isinstance(p, (int, float)):
            raise ValueError("Invalid p. p must be a number greater than 0.")
        if weights not in ['uniform', 'distance']:
            raise ValueError("Invalid weights. Choose 'uniform' or 'distance'.")
        if n_jobs < 1 and n_jobs != -1 or not isinstance(n_jobs, int):
            raise ValueError("Invalid n_jobs. Must be an integer greater than 0, or -1 to use all available cores.")
        
        self.k = k
        self.metric = metric
        self.p = p if metric == 'minkowski' else (1 if metric == 'manhattan' else 2)
        self.weights = weights
        self.n_jobs = cpu_count() if n_jobs == -1 else n_jobs

    def get_params(self, deep=True):
        """Mengembalikan parameter model dalam bentuk dictionary."""
        return {
            "k": self.k,
            "metric": self.metric,
            "p": self.p,
            "weights": self.weights,
            "n_jobs": self.n_jobs,
        }

    def set_params(self, **params):
        """Mengatur parameter model dari dictionary."""
        for key, value in params.items():
            setattr(self, key, value)
        return self

    def _compute_distances(self, test):
        if test.shape[0] != self.X_train.shape[1]:
            raise ValueError("Test instance dimensions must match training instance dimensions.")

        distances = np.linalg.norm(self.X_train - test, ord=self.p, axis=1)
        return distances

    def fit(self, X_train, y_train):
        if len(X_train) != len(y_train):
            raise ValueError("Length of X_train and y_train must be the same.")

        if isinstance(X_train, pd.DataFrame):
            self.X_train = X_train.values.astype(float)
        else:
            self.X_train = np.array(X_train).astype(float)
        self.y_train = pd.Series(y_train) if not isinstance(y_train, pd.Series) else y_train

    def predict(self, X_test):
        """
        Prediksi label untuk dataset uji.

        Args:
            X_test (array-like): Dataset uji.

        Returns:
            np.ndarray: Prediksi untuk data uji.
        """
        if isinstance(X_test, pd.DataFrame):
            X_test = X_test.values.astype(float)
        else:
            X_test = np.array(X_test, dtype=float)
        
        def predict_instance(row):
            """Memprediksi satu data uji."""
            distances = self._compute_distances(row)
            nearest_indices = np.argsort(distances)[:self.k]
            nearest_labels = self.y_train.iloc[nearest_indices]

            if self.weights == 'distance':
                nearest_distances = distances[nearest_indices]
                weights = 1 / (nearest_distances + 1e-10)  # Hindari pembagian dengan nol
                weights /= np.sum(weights)
                weighted_votes = {}
                for label, weight in zip(nearest_labels, weights):
                    weighted_votes[label] = weighted_votes.get(label, 0) + weight
                prediction = max(weighted_votes, key=weighted_votes.get)
            else:
                value_counts = nearest_labels.value_counts()
                if len(value_counts) > 1 and value_counts.iloc[0] == value_counts.iloc[1]:
                    prediction = value_counts.idxmin()  # Pilih kelas dengan indeks lebih kecil
                else:
                    prediction = value_counts.idxmax()

            return prediction

        # Paralelisme atau sekuensial berdasarkan n_jobs
        start_time = time.time()
        if self.n_jobs != 1:
            with concurrent.futures.ProcessPoolExecutor(max_workers=self.n_jobs) as executor:
                predictions = list(tqdm(executor.map(predict_instance, X_test), total=len(X_test)))
        else:
            predictions = [predict_instance(row) for row in tqdm(X_test)]
        elapsed_time = time.time() - start_time
        print(f"Prediction completed in {elapsed_time:.2f} seconds.")

        return np.array(predictions)

    def save(self, path):
        """Menyimpan model ke file."""
        with open(path, 'wb') as file:
            pickle.dump(self, file)
    
    @staticmethod
    def load(path):
        """Memuat model dari file."""
        with open(path, 'rb') as file:
            return pickle.load(file)

In [None]:
#Menggunakan concurrent.features dengan joblib

import numpy as np
import pandas as pd
import pickle
import concurrent.futures
from os import cpu_count
from tqdm import tqdm
import time
from joblib import Parallel, delayed

class KNN:
    def __init__(self, k=5, n_jobs=1, metric='minkowski', p=2, weights='uniform'):
        if k < 1 or not isinstance(k, int):
            raise ValueError("Invalid k. k must be an integer greater than 0.")
        if metric not in ['manhattan', 'euclidean', 'minkowski']:
            raise ValueError("Invalid metric. Valid metrics are 'manhattan', 'euclidean', and 'minkowski'.")
        if p < 1 or not isinstance(p, (int, float)):
            raise ValueError("Invalid p. p must be a number greater than 0.")
        if weights not in ['uniform', 'distance']:
            raise ValueError("Invalid weights. Choose 'uniform' or 'distance'.")
        if n_jobs < 1 and n_jobs != -1 or not isinstance(n_jobs, int):
            raise ValueError("Invalid n_jobs. Must be an integer greater than 0, or -1 to use all available cores.")
        
        self.k = k
        self.metric = metric
        self.p = p if metric == 'minkowski' else (1 if metric == 'manhattan' else 2)
        self.weights = weights
        self.n_jobs = cpu_count() if n_jobs == -1 else n_jobs

    def get_params(self, deep=True):
        """Mengembalikan parameter model dalam bentuk dictionary."""
        return {
            "k": self.k,
            "metric": self.metric,
            "p": self.p,
            "weights": self.weights,
            "n_jobs": self.n_jobs,
        }

    def set_params(self, **params):
        """Mengatur parameter model dari dictionary."""
        for key, value in params.items():
            setattr(self, key, value)
        return self

    def _compute_distances(self, test):
        if test.shape[0] != self.X_train.shape[1]:
            raise ValueError("Test instance dimensions must match training instance dimensions.")

        distances = np.linalg.norm(self.X_train - test, ord=self.p, axis=1)
        return distances

    def fit(self, X_train, y_train):
        if len(X_train) != len(y_train):
            raise ValueError("Length of X_train and y_train must be the same.")

        if isinstance(X_train, pd.DataFrame):
            self.X_train = X_train.values.astype(float)
        else:
            self.X_train = np.array(X_train).astype(float)
        self.y_train = pd.Series(y_train) if not isinstance(y_train, pd.Series) else y_train

    def predict(self, X_test):
        """
        Prediksi label untuk dataset uji.

        Args:
            X_test (array-like): Dataset uji.

        Returns:
            np.ndarray: Prediksi untuk data uji.
        """
        if isinstance(X_test, pd.DataFrame):
            X_test = X_test.values.astype(float)
        else:
            X_test = np.array(X_test, dtype=float)
        
        def predict_instance(row):
            """Memprediksi satu data uji."""
            distances = self._compute_distances(row)
            nearest_indices = np.argsort(distances)[:self.k]
            nearest_labels = self.y_train.iloc[nearest_indices]

            if self.weights == 'distance':
                nearest_distances = distances[nearest_indices]
                weights = 1 / (nearest_distances + 1e-10)  # Hindari pembagian dengan nol
                weights /= np.sum(weights)
                weighted_votes = {}
                for label, weight in zip(nearest_labels, weights):
                    weighted_votes[label] = weighted_votes.get(label, 0) + weight
                prediction = max(weighted_votes, key=weighted_votes.get)
            else:
                prediction = nearest_labels.value_counts().idxmax()  # Uniform voting
            return prediction

        # Paralelisme atau sekuensial berdasarkan n_jobs
        start_time = time.time()
        predictions = Parallel(n_jobs=self.n_jobs)(
            delayed(predict_instance)(row) for row in X_test
        )
        elapsed_time = time.time() - start_time
        print(f"Prediction completed in {elapsed_time:.2f} seconds.")

        return np.array(predictions)

    def save(self, path):
        """Menyimpan model ke file."""
        with open(path, 'wb') as file:
            pickle.dump(self, file)
    
    @staticmethod
    def load(path):
        """Memuat model dari file."""
        with open(path, 'rb') as file:
            return pickle.load(file)

In [None]:
#perhitungan jarak

import numpy as np
import pandas as pd
import pickle
import concurrent.futures
from os import cpu_count
from tqdm import tqdm
import time
from scipy.spatial.distance import cdist

class KNN:
    def __init__(self, k=5, n_jobs=1, metric='minkowski', p=2, weights='uniform'):
        if k < 1 or not isinstance(k, int):
            raise ValueError("Invalid k. k must be an integer greater than 0.")
        if metric not in ['manhattan', 'euclidean', 'minkowski']:
            raise ValueError("Invalid metric. Valid metrics are 'manhattan', 'euclidean', and 'minkowski'.")
        if p < 1 or not isinstance(p, (int, float)):
            raise ValueError("Invalid p. p must be a number greater than 0.")
        if weights not in ['uniform', 'distance']:
            raise ValueError("Invalid weights. Choose 'uniform' or 'distance'.")
        if n_jobs < 1 and n_jobs != -1 or not isinstance(n_jobs, int):
            raise ValueError("Invalid n_jobs. Must be an integer greater than 0, or -1 to use all available cores.")
        
        self.k = k
        self.metric = metric
        self.p = p if metric == 'minkowski' else (1 if metric == 'manhattan' else 2)
        self.weights = weights
        self.n_jobs = cpu_count() if n_jobs == -1 else n_jobs

    def get_params(self, deep=True):
        """Mengembalikan parameter model dalam bentuk dictionary."""
        return {
            "k": self.k,
            "metric": self.metric,
            "p": self.p,
            "weights": self.weights,
            "n_jobs": self.n_jobs,
        }

    def set_params(self, **params):
        """Mengatur parameter model dari dictionary."""
        for key, value in params.items():
            setattr(self, key, value)
        return self

    def _compute_distances(self, test):
        if test.shape[0] != self.X_train.shape[1]:
            raise ValueError("Test instance dimensions must match training instance dimensions.")

        return cdist(self.X_train, [test], metric=self.metric).flatten()

    def fit(self, X_train, y_train):
        if len(X_train) != len(y_train):
            raise ValueError("Length of X_train and y_train must be the same.")

        if isinstance(X_train, pd.DataFrame):
            self.X_train = X_train.values.astype(float)
        else:
            self.X_train = np.array(X_train).astype(float)
        self.y_train = pd.Series(y_train) if not isinstance(y_train, pd.Series) else y_train

    def predict(self, X_test):
        """
        Prediksi label untuk dataset uji.

        Args:
            X_test (array-like): Dataset uji.

        Returns:
            np.ndarray: Prediksi untuk data uji.
        """
        if isinstance(X_test, pd.DataFrame):
            X_test = X_test.values.astype(float)
        else:
            X_test = np.array(X_test, dtype=float)
        
        def predict_instance(row):
            """Memprediksi satu data uji."""
            distances = self._compute_distances(row)
            nearest_indices = np.argsort(distances)[:self.k]
            nearest_labels = self.y_train.iloc[nearest_indices]

            if self.weights == 'distance':
                nearest_distances = distances[nearest_indices]
                weights = 1 / (nearest_distances + 1e-10)  # Hindari pembagian dengan nol
                weights /= np.sum(weights)
                weighted_votes = {}
                for label, weight in zip(nearest_labels, weights):
                    weighted_votes[label] = weighted_votes.get(label, 0) + weight
                prediction = max(weighted_votes, key=weighted_votes.get)
            else:
                prediction = nearest_labels.value_counts().idxmax()  # Uniform voting
            return prediction

        # Paralelisme atau sekuensial berdasarkan n_jobs
        start_time = time.time()
        if self.n_jobs != 1:
            with concurrent.futures.ProcessPoolExecutor(max_workers=self.n_jobs) as executor:
                predictions = list(tqdm(executor.map(predict_instance, X_test), total=len(X_test)))
        else:
            predictions = [predict_instance(row) for row in tqdm(X_test)]
        elapsed_time = time.time() - start_time
        print(f"Prediction completed in {elapsed_time:.2f} seconds.")

        return np.array(predictions)

    def save(self, path):
        """Menyimpan model ke file."""
        with open(path, 'wb') as file:
            pickle.dump(self, file)
    
    @staticmethod
    def load(path):
        """Memuat model dari file."""
        with open(path, 'rb') as file:
            return pickle.load(file)

In [None]:
#KDTree

import numpy as np
import pandas as pd
import pickle
import concurrent.futures
from os import cpu_count
from tqdm import tqdm
import time
from scipy.spatial import KDTree

class KNN:
    def __init__(self, k=5, n_jobs=1, metric='minkowski', p=2, weights='uniform'):
        if k < 1 or not isinstance(k, int):
            raise ValueError("Invalid k. k must be an integer greater than 0.")
        if metric not in ['manhattan', 'euclidean', 'minkowski']:
            raise ValueError("Invalid metric. Valid metrics are 'manhattan', 'euclidean', and 'minkowski'.")
        if p < 1 or not isinstance(p, (int, float)):
            raise ValueError("Invalid p. p must be a number greater than 0.")
        if weights not in ['uniform', 'distance']:
            raise ValueError("Invalid weights. Choose 'uniform' or 'distance'.")
        if n_jobs < 1 and n_jobs != -1 or not isinstance(n_jobs, int):
            raise ValueError("Invalid n_jobs. Must be an integer greater than 0, or -1 to use all available cores.")
        
        self.k = k
        self.metric = metric
        self.p = p if metric == 'minkowski' else (1 if metric == 'manhattan' else 2)
        self.weights = weights
        self.n_jobs = cpu_count() if n_jobs == -1 else n_jobs

    def get_params(self, deep=True):
        """Mengembalikan parameter model dalam bentuk dictionary."""
        return {
            "k": self.k,
            "metric": self.metric,
            "p": self.p,
            "weights": self.weights,
            "n_jobs": self.n_jobs,
        }

    def set_params(self, **params):
        """Mengatur parameter model dari dictionary."""
        for key, value in params.items():
            setattr(self, key, value)
        return self

    def _compute_distances(self, test):
        if test.shape[0] != self.X_train.shape[1]:
            raise ValueError("Test instance dimensions must match training instance dimensions.")

        distances = np.linalg.norm(self.X_train - test, ord=self.p, axis=1)
        return distances

    def fit(self, X_train, y_train):
        if len(X_train) != len(y_train):
            raise ValueError("Length of X_train and y_train must be the same.")

        # if isinstance(X_train, pd.DataFrame):
        #     self.X_train = X_train.values.astype(float)
        # else:
        #     self.X_train = np.array(X_train).astype(float)
        # self.y_train = pd.Series(y_train) if not isinstance(y_train, pd.Series) else y_train

        self.X_train = np.array(X_train, dtype=float)
        self.y_train = np.array(y_train)
        self.tree = KDTree(self.X_train)

    def predict(self, X_test):
        X_test = np.array(X_test, dtype=float)
        distances, indices = self.tree.query(X_test, k=self.k)
        predictions = []

        for dist, idx in zip(distances, indices):
            labels = self.y_train[idx]
            if self.weights == 'distance':
                weights = np.exp(-dist / dist.max())
                weighted_votes = {}
                for label, weight in zip(labels, weights):
                    weighted_votes[label] = weighted_votes.get(label, 0) + weight
                predictions.append(max(weighted_votes, key=weighted_votes.get))
            else:
                value_counts = pd.Series(labels).value_counts()
                if len(value_counts) > 1 and value_counts.iloc[0] == value_counts.iloc[1]:
                    predictions.append(value_counts.idxmin())  # Tie handling
                else:
                    predictions.append(value_counts.idxmax())
        return np.array(predictions)
    
    def save(self, path):
        """Menyimpan model ke file."""
        with open(path, 'wb') as file:
            pickle.dump(self, file)
    
    @staticmethod
    def load(path):
        """Memuat model dari file."""
        with open(path, 'rb') as file:
            return pickle.load(file)

In [None]:
#bobot dinamis

import numpy as np
import pandas as pd
import pickle
import concurrent.futures
from os import cpu_count
from tqdm import tqdm
import time

class KNN:
    def __init__(self, k=5, n_jobs=1, metric='minkowski', p=2, weights='uniform'):
        if k < 1 or not isinstance(k, int):
            raise ValueError("Invalid k. k must be an integer greater than 0.")
        if metric not in ['manhattan', 'euclidean', 'minkowski']:
            raise ValueError("Invalid metric. Valid metrics are 'manhattan', 'euclidean', and 'minkowski'.")
        if p < 1 or not isinstance(p, (int, float)):
            raise ValueError("Invalid p. p must be a number greater than 0.")
        if weights not in ['uniform', 'distance']:
            raise ValueError("Invalid weights. Choose 'uniform' or 'distance'.")
        if n_jobs < 1 and n_jobs != -1 or not isinstance(n_jobs, int):
            raise ValueError("Invalid n_jobs. Must be an integer greater than 0, or -1 to use all available cores.")
        
        self.k = k
        self.metric = metric
        self.p = p if metric == 'minkowski' else (1 if metric == 'manhattan' else 2)
        self.weights = weights
        self.n_jobs = cpu_count() if n_jobs == -1 else n_jobs

    def get_params(self, deep=True):
        """Mengembalikan parameter model dalam bentuk dictionary."""
        return {
            "k": self.k,
            "metric": self.metric,
            "p": self.p,
            "weights": self.weights,
            "n_jobs": self.n_jobs,
        }

    def set_params(self, **params):
        """Mengatur parameter model dari dictionary."""
        for key, value in params.items():
            setattr(self, key, value)
        return self

    def _compute_distances(self, test):
        if test.shape[0] != self.X_train.shape[1]:
            raise ValueError("Test instance dimensions must match training instance dimensions.")

        distances = np.linalg.norm(self.X_train - test, ord=self.p, axis=1)
        return distances

    def fit(self, X_train, y_train):
        if len(X_train) != len(y_train):
            raise ValueError("Length of X_train and y_train must be the same.")

        if isinstance(X_train, pd.DataFrame):
            self.X_train = X_train.values.astype(float)
        else:
            self.X_train = np.array(X_train).astype(float)
        self.y_train = pd.Series(y_train) if not isinstance(y_train, pd.Series) else y_train

    def predict(self, X_test):
        """
        Prediksi label untuk dataset uji.

        Args:
            X_test (array-like): Dataset uji.

        Returns:
            np.ndarray: Prediksi untuk data uji.
        """
        if isinstance(X_test, pd.DataFrame):
            X_test = X_test.values.astype(float)
        else:
            X_test = np.array(X_test, dtype=float)
        
        def predict_instance(row):
            """Memprediksi satu data uji."""
            distances = self._compute_distances(row)
            nearest_indices = np.argsort(distances)[:self.k]
            nearest_labels = self.y_train.iloc[nearest_indices]

            if self.weights == 'distance':
                nearest_distances = distances[nearest_indices]
                weights = 1 / (nearest_distances + 1e-10)  # Hindari pembagian dengan nol
                weights /= np.sum(weights)
                weighted_votes = {}
                for label, weight in zip(nearest_labels, weights):
                    weighted_votes[label] = weighted_votes.get(label, 0) + weight
                prediction = max(weighted_votes, key=weighted_votes.get)
            else:
                prediction = nearest_labels.value_counts().idxmax()  # Uniform voting
            return prediction

        # Paralelisme atau sekuensial berdasarkan n_jobs
        start_time = time.time()
        if self.n_jobs != 1:
            with concurrent.futures.ProcessPoolExecutor(max_workers=self.n_jobs) as executor:
                predictions = list(tqdm(executor.map(predict_instance, X_test), total=len(X_test)))
        else:
            predictions = [predict_instance(row) for row in tqdm(X_test)]
        elapsed_time = time.time() - start_time
        print(f"Prediction completed in {elapsed_time:.2f} seconds.")

        return np.array(predictions)

    def save(self, path):
        """Menyimpan model ke file."""
        with open(path, 'wb') as file:
            pickle.dump(self, file)
    
    @staticmethod
    def load(path):
        """Memuat model dari file."""
        with open(path, 'rb') as file:
            return pickle.load(file)

In [None]:
# GABUNGAN

import numpy as np
import pandas as pd
from scipy.spatial import KDTree
from joblib import Parallel, delayed

class KNN:
    def __init__(self, k=5, metric='minkowski', p=2, weights='distance', n_jobs=1):
        """
        Inisialisasi KNN.

        Args:
        - k (int): Jumlah tetangga terdekat.
        - metric (str): Metode jarak ('euclidean', 'manhattan', 'minkowski').
        - p (int): Parameter Minkowski (default: 2 untuk Euclidean).
        - weights (str): Pembobotan ('uniform', 'distance').
        - n_jobs (int): Jumlah core yang digunakan untuk prediksi paralel.
        """
        if k < 1 or not isinstance(k, int):
            raise ValueError("Invalid k. k must be an integer greater than 0.")
        if metric not in ['manhattan', 'euclidean', 'minkowski']:
            raise ValueError("Invalid metric. Valid metrics are 'manhattan', 'euclidean', 'minkowski'.")
        if p < 1 or not isinstance(p, (int, float)):
            raise ValueError("Invalid p. p must be a number greater than 0.")
        if weights not in ['uniform', 'distance']:
            raise ValueError("Invalid weights. Choose 'uniform' or 'distance'.")
        if n_jobs < 1 and n_jobs != -1:
            raise ValueError("Invalid n_jobs. Must be an integer greater than 0 or -1 for all cores.")
        
        self.k = k
        self.metric = metric
        self.p = p if metric == 'minkowski' else (1 if metric == 'manhattan' else 2)
        self.weights = weights
        self.n_jobs = n_jobs

    def fit(self, X_train, y_train):
        """
        Melatih model KNN dengan data training.

        Args:
        - X_train (array-like): Fitur data training.
        - y_train (array-like): Label data training.
        """
        self.X_train = np.array(X_train, dtype=float)
        self.y_train = np.array(y_train, dtype=int)
        self.tree = KDTree(self.X_train)

    def predict(self, X_test):
        """
        Memprediksi label untuk data uji.

        Args:
        - X_test (array-like): Fitur data testing.

        Returns:
        - np.ndarray: Prediksi label.
        """
        X_test = np.array(X_test, dtype=float)
        distances, indices = self.tree.query(X_test, k=self.k)

        def predict_instance(dist, idx):
            labels = self.y_train[idx]
            if self.weights == 'distance':
                # Pembobotan dengan exponential decay
                weights = np.exp(-dist / (dist.max() + 1e-10))  # Stabil dengan epsilon
                weighted_votes = {}
                for label, weight in zip(labels, weights):
                    weighted_votes[label] = weighted_votes.get(label, 0) + weight
                return max(weighted_votes, key=weighted_votes.get)
            else:
                # Uniform voting
                value_counts = pd.Series(labels).value_counts()
                if len(value_counts) > 1 and value_counts.iloc[0] == value_counts.iloc[1]:
                    return value_counts.idxmin()  # Penanganan ties
                else:
                    return value_counts.idxmax()

        # Paralelisme menggunakan joblib
        predictions = Parallel(n_jobs=self.n_jobs)(
            delayed(predict_instance)(dist, idx) for dist, idx in zip(distances, indices)
        )
        return np.array(predictions)
