In [1]:
import librosa
import numpy as np
from scipy.fftpack import dct
import os 
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
import wave
import math
import scipy.io.wavfile as wav
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import VotingClassifier
from sklearn import svm
from sklearn.ensemble import AdaBoostClassifier
from sklearn.decomposition import LatentDirichletAllocation
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import Pipeline
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.preprocessing import StandardScaler
import pandas as pd
from sklearn.linear_model import LogisticRegression
import matplotlib.pyplot as plt
import json
import pickle

# For preprocessing experiment and model tuning

In [2]:
def load_data(data_dir):
    real_files = [os.path.join(data_dir, "real", f) for f in os.listdir(os.path.join(data_dir, "real")) if f.endswith(".wav")]
    fake_files = [os.path.join(data_dir, "fake", f) for f in os.listdir(os.path.join(data_dir, "fake")) if f.endswith(".wav")]
    fake_labels = [0] * len(fake_files)
    real_labels = [1] * len(real_files)

    files = fake_files + real_files
    labels = fake_labels + real_labels

    return files, labels

In [11]:
def save_data(path, processed_data, labels):
    data_dict = {
        'data': [],
        'label': labels
    }
    for i in range(len(processed_data)):
        data_dict['data'].append(processed_data[i].tolist())
    with open(path, 'w') as file:
        json.dump(data_dict, file)

In [10]:

def preprocessing(audio_file, n_features):
    # Load the audio signal
    y, sr = librosa.load(audio_file, sr=None)

    # Step 1: Pre-emphasis
    pre_emphasis_coeff = 0.97
    y_filt = librosa.effects.preemphasis(y, coef=pre_emphasis_coeff)

    # Step 2: Frame blocking
    frame_length = 0.025*4  # 25 ms
    hop_length = 0.01  # 10 ms
    frame_length_samples = int(frame_length * sr)
    hop_length_samples = int(hop_length * sr)
    frames = librosa.util.frame(y_filt, frame_length=frame_length_samples, hop_length=hop_length_samples)

    # Step 3: Windowing
    window = np.hamming(len(frames))
    windowed_frames = frames * window[:, np.newaxis]
    
    # Step 4: Fast Fourier Transform (FFT)
    fft_size = 2048
    spectrogram = np.abs(np.fft.rfft(windowed_frames, n=fft_size, axis=0))

    # Step 5: Mel frequency wrapping
    if n_features >= 40:
        n_mels = n_features
    else:
        n_mels = 40  # Adjusted number of Mel bands
    mel_spec = librosa.feature.melspectrogram(S=spectrogram, sr=sr, n_mels=n_mels)
    
    # Step 6: Discrete Cosine Transform (DCT) to get MFCC
    n_mfcc = n_features  # Use the desired number of MFCC features
    mfcc = librosa.feature.mfcc(S=librosa.power_to_db(mel_spec), sr=sr, n_mfcc=n_mfcc)
    
    mean_mfcc = np.max(mfcc, axis=1)

    # Initialize the StandardScaler
    scaler = StandardScaler()

    # Reshape the aggregated MFCCs to 2D (needed for fitting the scaler)
    mean_mfcc_reshaped = mean_mfcc.reshape(-1, 1)

    # Fit and transform the data
    mean_mfcc_scaled = scaler.fit_transform(mean_mfcc_reshaped)

    # Flatten the scaled data back to 1D
    mean_mfcc_scaled_flat = mean_mfcc_scaled.flatten()

    return mean_mfcc_scaled_flat
#code 9x accu


In [113]:
def load_data(data_dir):
    real_files = [os.path.join(data_dir, "real", f) for f in os.listdir(os.path.join(data_dir, "real")) if f.endswith(".wav")]
    fake_files = [os.path.join(data_dir, "fake", f) for f in os.listdir(os.path.join(data_dir, "fake")) if f.endswith(".wav")]
    real_files1 = [os.path.join(data_dir, "real_an", f) for f in os.listdir(os.path.join(data_dir, "real_an")) if f.endswith(".wav")][:800]
    fake_files1 = [os.path.join(data_dir, "fake_an", f) for f in os.listdir(os.path.join(data_dir, "fake_an")) if f.endswith(".wav")]

    real_files = real_files+real_files1
    fake_files = fake_files+fake_files1
    fake_labels = [0] * len(fake_files)
    real_labels = [1] * len(real_files)

    files = fake_files + real_files
    labels = fake_labels + real_labels
    print(len(fake_files))
    print(len(real_files))
    return files, labels

In [90]:
url = r"C:\Users\VIET HOANG - VTS\Desktop\tien xu ly"
files, labels = load_data(url)

X_train, X_test, y_train, y_test = train_test_split(files, labels, test_size=0.2, random_state=42)

X_train = [preprocessing(file,68) for file in X_train]
X_test = [preprocessing(file,68) for file in X_test]

2511
2772


In [91]:


from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.svm import SVC

from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB
# Create a list of classifiers
classifiers = [
    LogisticRegression(),
    DecisionTreeClassifier(),
    SVC(C=2,gamma=3),
    RandomForestClassifier(),
    AdaBoostClassifier(),
    KNeighborsClassifier(),
    GaussianNB()
]

# Train and evaluate each classifier
for classifier in classifiers:
    classifier.fit(X_train, y_train)
    y_pred = classifier.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    print(f"{classifier.__class__.__name__}: Accuracy = {accuracy}")

LogisticRegression: Accuracy = 0.8836329233680227
DecisionTreeClassifier: Accuracy = 0.9082308420056765
SVC: Accuracy = 0.9943235572374646
RandomForestClassifier: Accuracy = 0.97918637653737




AdaBoostClassifier: Accuracy = 0.9139072847682119
KNeighborsClassifier: Accuracy = 0.9914853358561968
GaussianNB: Accuracy = 0.8164616840113529


In [6]:
print(len(X_test))

1086


In [92]:
svmm = SVC(C=2,gamma=3)
svmm.fit(X_train,y_train)
print(svmm.score(X_test,y_test))

0.9943235572374646


In [93]:
rf = RandomForestClassifier(min_samples_split=2)
rf.fit(X_train,y_train)
rf.score(X_test,y_test)

0.9772942289498581

Train Trà, Test An

In [86]:
def load_data_an(data_dir):
    real_files = [os.path.join(data_dir, "real_an", f) for f in os.listdir(os.path.join(data_dir, "real_an")) if f.endswith(".wav")][:800]
    fake_files = [os.path.join(data_dir, "fake_an", f) for f in os.listdir(os.path.join(data_dir, "fake_an")) if f.endswith(".wav")]

    fake_labels = [0] * len(fake_files)
    real_labels = [1] * len(real_files)
    print(len(real_files))
    print(len(fake_files))
    files = fake_files + real_files
    labels = fake_labels + real_labels
    return files, labels

In [87]:
url = r"C:\Users\VIET HOANG - VTS\Desktop\tien xu ly"
files, labels = load_data_an(url)


X_test = [preprocessing(file,68) for file in files]
y_test = labels

800
539


In [76]:
process_path = r"C:\Users\VIET HOANG - VTS\Desktop\tien xu ly\processed_data\test_an.json"
save_data(process_path, X_test, y_test)

In [52]:
process_path = r"C:\Users\VIET HOANG - VTS\Desktop\tien xu ly\processed_data\test_an_dct.json"
save_data(process_path, X_test, y_test)

In [68]:
def load_data_tra(data_dir):
    real_files = [os.path.join(data_dir, "real", f) for f in os.listdir(os.path.join(data_dir, "real")) if f.endswith(".wav")]
    fake_files = [os.path.join(data_dir, "fake", f) for f in os.listdir(os.path.join(data_dir, "fake")) if f.endswith(".wav")]

    fake_labels = [0] * len(fake_files)
    real_labels = [1] * len(real_files)

    files = fake_files + real_files
    labels = fake_labels + real_labels
    return files, labels

In [69]:
url = r"C:\Users\VIET HOANG - VTS\Desktop\tien xu ly"
files, labels = load_data_tra(url)


X_train = [preprocessing(file,68) for file in files]
y_train = labels

In [75]:
process_path = r"C:\Users\VIET HOANG - VTS\Desktop\tien xu ly\processed_data\train_tra.json"
save_data(process_path, X_train, y_train)

In [55]:
process_path = r"C:\Users\VIET HOANG - VTS\Desktop\tien xu ly\processed_data\train_tra_dct.json"
save_data(process_path, X_train, y_train)

In [88]:
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.svm import SVC

from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB
# Create a list of classifiers
classifiers = [
    LogisticRegression(),
    DecisionTreeClassifier(),
    SVC(gamma=1),
    RandomForestClassifier(),
    AdaBoostClassifier(),
    KNeighborsClassifier(n_neighbors=2,weights='uniform'),
    GaussianNB()
]

# Train and evaluate each classifier
for classifier in classifiers:
    classifier.fit(X_train, y_train)
    y_pred = classifier.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    print(f"{classifier.__class__.__name__}: Accuracy = {accuracy}")

LogisticRegression: Accuracy = 0.5235250186706497
DecisionTreeClassifier: Accuracy = 0.5743091859596714
SVC: Accuracy = 0.6706497386109037
RandomForestClassifier: Accuracy = 0.5967139656460044




AdaBoostClassifier: Accuracy = 0.5832710978342046
KNeighborsClassifier: Accuracy = 0.666168782673637
GaussianNB: Accuracy = 0.5242718446601942


In [40]:
print(len(X_train))
print(len(X_test))
print(len(X_test)/len(X_train))

3944
1339
0.3395030425963489


In [41]:
svmm  =SVC(C=13,gamma=4,probability=True)
svmm.fit(X_train,y_train)
svmm.score(X_test,y_test)

#doi voi DCT

0.648991784914115

In [110]:
process_an = r"C:\Users\VIET HOANG - VTS\Desktop\tien xu ly\processed_data\test_an.json" # to load data an
process_tra = r"C:\Users\VIET HOANG - VTS\Desktop\tien xu ly\processed_data\train_tra.json" # to load data tra
X_test, y_test = load_data(process_an)
X_train, y_train = load_data(process_tra)

In [None]:
for i in range(1,300):
    lg = SVC(C=i,gamma=1)
    lg.fit(X_train,y_train)
    print(f"{i} : {lg.score(X_test,y_test)}")

In [None]:
for i in range(100,300,5):
    lg = RandomForestClassifier(n_estimators=i)
    lg.fit(X_train,y_train)
    
    print(f"{i} : {lg.score(X_test,y_test)}")

In [None]:
for i in range(1,300):
    lg = KNeighborsClassifier(n_neighbors=i,weights='uniform')
    lg.fit(X_train,y_train)
    
    print(f"{i} : {lg.score(X_test,y_test)}")

In [109]:
svmm  =SVC(C=30,gamma=1,probability=True)
knb = KNeighborsClassifier(n_neighbors=1,weights='uniform')
voting = VotingClassifier(
    estimators=[('svm', svmm),('rf',knb)],
    voting='soft'
)
voting.fit(X_train,y_train)
voting.score(X_test,y_test)

0.7326362957430919

In [44]:
lg = LogisticRegression(C=0.09,solver='liblinear')
lg.fit(X_train,y_train)
print(lg.score(X_test,y_test))

0.8681022880215343


In [28]:

process_path = r"C:\Users\VIET HOANG - VTS\Desktop\tien xu ly\processed_data\frameblock100ms.json"
def save_data(path, processed_data, labels):
    data_dict = {
        'data': [],
        'label': labels
    }
    for i in range(len(processed_data)):
        data_dict['data'].append(processed_data[i].tolist())
    with open(path, 'w') as file:
        json.dump(data_dict, file)

processed_data = X_train+X_test
labels = y_train+y_test
save_data(process_path, processed_data, labels)



In [108]:
def load_data(path):
    with open(path, 'r') as file:
        data_dict = json.load(file)
    processed_data = data_dict['data']
    labels = data_dict['label']
    return processed_data, labels

# Example usage:
processed_data, labels = load_data(process_path)
print(len(processed_data))

1339


In [29]:
def save_model(model, path):
    with open(path, 'wb') as file:
        pickle.dump(model, file)
model_path = r"C:\Users\VIET HOANG - VTS\Desktop\tien xu ly\model\model_frame100ms.pkl"
save_model(voting,model_path)

In [30]:
def load_model(path):
    with open(path, 'rb') as file:
        model = pickle.load(file)
    return model
loaded_model = load_model(model_path)
print(loaded_model.predict(np.array(processed_data[0]).reshape(1,-1)))
print(y_train[0])

[0]
0


In [121]:
def load_data(data_dir):
    real_files = [os.path.join(data_dir, "real", f) for f in os.listdir(os.path.join(data_dir, "real")) if f.endswith(".wav")]
    fake_files = [os.path.join(data_dir, "fake", f) for f in os.listdir(os.path.join(data_dir, "fake")) if f.endswith(".wav")]
    real_files1 = [os.path.join(data_dir, "real_an", f) for f in os.listdir(os.path.join(data_dir, "real_an")) if f.endswith(".wav")][:800]
    fake_files1 = [os.path.join(data_dir, "fake_an", f) for f in os.listdir(os.path.join(data_dir, "fake_an")) if f.endswith(".wav")]

    real_files = real_files+real_files1
    fake_files = fake_files+fake_files1
    fake_labels = [0] * len(fake_files)
    real_labels = [1] * len(real_files)

    files = fake_files + real_files
    labels = fake_labels + real_labels
    print(len(fake_files))
    print(len(real_files))
    return files, labels

In [None]:
def save_data(path, processed_data, labels):
    data_dict = {
        'data': [],
        'label': labels
    }
    for i in range(len(processed_data)):
        data_dict['data'].append(processed_data[i].tolist())
    with open(path, 'w') as file:
        json.dump(data_dict, file)

In [6]:
from librosa import ParameterError
def detect(audio_file):
    try:
        # Load the audio signal
        y, sr = librosa.load(audio_file, sr=None)

        # Step 1: Pre-emphasis
        pre_emphasis_coeff = 0.97
        y_filt = librosa.effects.preemphasis(y, coef=pre_emphasis_coeff)

        # Step 2: Frame blocking
        frame_length = 0.025*4  # 25 ms
        hop_length = 0.01  # 10 ms
        frame_length_samples = int(frame_length * sr)
        hop_length_samples = int(hop_length * sr)
        frames = librosa.util.frame(y_filt, frame_length=frame_length_samples, hop_length=hop_length_samples)
    except ParameterError:
        print(f"ParameterError encountered with file {audio_file}. Deleting file.")
        os.remove(audio_file)

In [None]:
def load_data_ult(data_dir):
    real_files = [os.path.join(data_dir, "real", f) for f in os.listdir(os.path.join(data_dir, "real")) if f.endswith(".wav")]
    fake_files = [os.path.join(data_dir, "fake", f) for f in os.listdir(os.path.join(data_dir, "fake")) if f.endswith(".wav")]

    real_files1 = [os.path.join(data_dir, "real_an", f) for f in os.listdir(os.path.join(data_dir, "real_an")) if f.endswith(".wav")]
    fake_files1 = [os.path.join(data_dir, "fake_an", f) for f in os.listdir(os.path.join(data_dir, "fake_an")) if f.endswith(".wav")]
    
    real_files2 = [os.path.join(data_dir, "predict_real", f) for f in os.listdir(os.path.join(data_dir, "predict_real")) if f.endswith(".wav")]
    fake_files2 = [os.path.join(data_dir, "predict_fake", f) for f in os.listdir(os.path.join(data_dir, "predict_fake")) if f.endswith(".wav")]
    
    real_files = real_files+real_files1+real_files2
    fake_files = fake_files+fake_files1+fake_files2
    fake_labels = [0] * len(fake_files)
    real_labels = [1] * len(real_files)

    files = fake_files + real_files
    labels = fake_labels + real_labels
    print(len(fake_files))
    print(len(real_files))
    return files, labels

In [12]:
def load_fake(data_dir):
    fake_files = [os.path.join(data_dir, "fake_shit", f) for f in os.listdir(os.path.join(data_dir, "fake_shit")) if f.endswith(".wav")]
    

    fake_labels = [0] * len(fake_files)

    return fake_files,fake_labels 
url = r"C:\Users\VIET HOANG - VTS\Desktop\tien xu ly" 
files, labels = load_fake(url)

files = [preprocessing(file,68) for file in files]

In [16]:
def save_data(path, processed_data, labels):
    data_dict = {
        'data': [],
        'label': labels
    }
    for i in range(len(processed_data)):
        data_dict['data'].append(processed_data[i].tolist())
    with open(path, 'w') as file:
        json.dump(data_dict, file)

In [22]:
def load_data_ult(data_dir):
    real_files = [os.path.join(data_dir, "real", f) for f in os.listdir(os.path.join(data_dir, "real")) if f.endswith(".wav")]
    fake_files = [os.path.join(data_dir, "fake", f) for f in os.listdir(os.path.join(data_dir, "fake")) if f.endswith(".wav")]

    real_files1 = [os.path.join(data_dir, "real_an", f) for f in os.listdir(os.path.join(data_dir, "real_an")) if f.endswith(".wav")]
    fake_files1 = [os.path.join(data_dir, "fake_an", f) for f in os.listdir(os.path.join(data_dir, "fake_an")) if f.endswith(".wav")]
    
    real_files2 = [os.path.join(data_dir, "predict_real", f) for f in os.listdir(os.path.join(data_dir, "predict_real")) if f.endswith(".wav")]
    fake_files2 = [os.path.join(data_dir, "predict_fake", f) for f in os.listdir(os.path.join(data_dir, "predict_fake")) if f.endswith(".wav")]
    
    real_files = real_files+real_files1+real_files2
    fake_files = fake_files+fake_files1+fake_files2
    fake_labels = [0] * len(fake_files)
    real_labels = [1] * len(real_files)

    files = fake_files + real_files
    labels = fake_labels + real_labels
    print(len(fake_files))
    print(len(real_files))
    return files, labels

# Run and save mfcc features

In [None]:
for i in range(20,81):
    url = r"C:\Users\VIET HOANG - VTS\Desktop\tien xu ly"
    files, labels = load_data_ult(url)
    files = [preprocessing(file,i) for file in files]
    url = f"C:\\Users\\VIET HOANG - VTS\\Desktop\\tien xu ly\\processed\\dct_{i}.json"
    save_data(url,files,labels)
    print(f"Save data succeed: {url}")

In [8]:

def load_fake(data_dir):
    fake_files = [os.path.join(data_dir, "fake_shit", f) for f in os.listdir(os.path.join(data_dir, "fake_shit")) if f.endswith(".wav")]
    

    fake_labels = [0] * len(fake_files)

    return fake_files,fake_labels 
url = r"C:\Users\VIET HOANG - VTS\Desktop\tien xu ly" 
files, labels = load_fake(url)

files = [detect(file) for file in files]

In [None]:
def save_data(path, processed_data, labels):
    data_dict = {
        'data': [],
        'label': labels
    }
    for i in range(len(processed_data)):
        data_dict['data'].append(processed_data[i].tolist())
    with open(path, 'w') as file:
        json.dump(data_dict, file)

In [None]:
def load_data(path):
    with open(path, 'r') as file:
        data_dict = json.load(file)
    processed_data = data_dict['data']
    labels = data_dict['label']
    return processed_data, labels

In [20]:
def merg(path1,path2):
    with open(path1, 'r') as file:
        data_1 = json.load(file)
    with open(path2, 'r') as file:
        data_2 = json.load(file)
    processed_data = data_1['data']+data_2['data']
    labels = data_1['label']+data_2['label']
    data_dict = {
        'data': processed_data,
        'label': labels
    }
    with open(path1, 'w') as file:
        json.dump(data_dict, file)
for i in range(20,81):
    path1 = f"C:\\Users\\VIET HOANG - VTS\\Desktop\\tien xu ly\\data_ne\\dct_{i}.json"
    path2 = f"C:\\Users\\VIET HOANG - VTS\\Desktop\\tien xu ly\\processed_fake\\fake_{i}.json"
    merg(path1,path2)