# Stingray

In [7]:
import os
import argparse

import cv2
import numpy as np
import pandas as pd
from tqdm import tqdm
import librosa

from stingray.lightcurve import Lightcurve
from stingray.bispectrum import Bispectrum

In [8]:
save_folder_path = "here/stingray"
start = 0
end = 4
SEGMENT_SIZE = 400
SEGMENT_OVERLAP = 200

In [9]:
real_audio_df = pd.read_csv("/home/mericdemirors/Desktop/490_datasets/release_in_the_wild/clean_data/real.csv")
fake_audio_df = pd.read_csv("/home/mericdemirors/Desktop/490_datasets/release_in_the_wild/clean_data/fake.csv")
real_audio_df = real_audio_df.sort_values(["seconds"]).reset_index(drop=True)
fake_audio_df = fake_audio_df.sort_values(["seconds"]).reset_index(drop=True)

features = ["absolutes","angles","reals","imags", "cum3s"]
for f in features:
    os.makedirs(os.path.join(save_folder_path, f, "real_audio"), exist_ok=True)
    os.makedirs(os.path.join(save_folder_path, f, "fake_audio"), exist_ok=True)

In [10]:
def get_features(audio_path, segment_size=SEGMENT_SIZE, overlap=SEGMENT_OVERLAP, max_K=-1):
    data, samplerate = librosa.load(audio_path, sr=16000)
    num_segments = (len(data) - segment_size) // overlap + 1
    if max_K > 0:
        num_segments = min(num_segments, max_K)

    RC_layers = np.zeros((num_segments, segment_size+1, segment_size+1), dtype=complex)
    cum3_sum = np.zeros((segment_size+1, segment_size+1))

    time_values = np.linspace(0, segment_size / samplerate, segment_size)
    for idx, segment_start in enumerate(range(0, len(data), overlap)):
        if idx == num_segments:
            break
        segment = data[segment_start:segment_start + segment_size]
        
        lc = Lightcurve(time_values, segment)
        bs = Bispectrum(lc, window="hamming")

        mag, phase, cum3 = bs.bispec_mag, bs.bispec_phase, bs.cum3
        cum3_sum = cum3_sum + cum3

        R = mag * np.cos(phase)
        C = mag * np.sin(phase)
        RC_layers[idx] = R + C * 1j

    return RC_layers, cum3_sum/num_segments

def create_signature_image(RC_layers):
    RC_layers = RC_layers[..., np.newaxis]
    signature_image = np.zeros(RC_layers.shape[1:], dtype=complex)
    tops = np.sum(RC_layers, axis=0)

    signature_image = np.reshape(np.array([tops[r][c]/(np.sqrt(np.dot(RC_layers[:,r,c,:].T,np.conjugate(RC_layers[:,r,c,:])).real) + 0.0001) 
                                        for r in range(signature_image.shape[0]) 
                                        for c in range(signature_image.shape[1])]), signature_image.shape)

    # list comprehension is for this for loop
    # for r in range(signature_image.shape[0]):
    #     for c in range(signature_image.shape[1]):
    #         L = RC_layers[:,r,c,:]
    #         top = tops[r][c]
    #         bottom = np.sqrt(np.dot(L.T, np.conjugate(L)).real)
    #         signature_image[r,c] = top/(bottom + 0.0001)

    return signature_image

def save_images(signature_image, cum3_avg, absolute_path, angle_path, real_path, imag_path, cum3_path):
    absolute = np.absolute(signature_image)
    absolute_norm = (absolute - absolute.min()) / (absolute.max() - absolute.min())
    cv2.imwrite(absolute_path, (absolute_norm*255).astype(np.uint8))

    angle = np.angle(signature_image)
    angle_norm = (angle - angle.min()) / (angle.max() - angle.min())
    cv2.imwrite(angle_path, (angle_norm*255).astype(np.uint8))

    real = signature_image.real
    real_norm = (real - real.min()) / (real.max() - real.min())
    cv2.imwrite(real_path, (real_norm*255).astype(np.uint8))

    imag = signature_image.imag
    imag_norm = (imag - imag.min()) / (imag.max() - imag.min())
    cv2.imwrite(imag_path, (imag_norm*255).astype(np.uint8))

    cum3_norm = (cum3_avg - cum3_avg.min()) / (cum3_avg.max() - cum3_avg.min())
    cv2.imwrite(cum3_path, (cum3_norm*255).astype(np.uint8))


In [11]:
for i in tqdm(range(start, end)):
    path = real_audio_df["path"][i]
    image_name = path.split(os.sep)[-1][:-4]

    [absolute_path,angle_path,real_path,imag_path,cum3_path] = [os.path.join(save_folder_path, f, "real_audio" ,image_name + ".png") for f in features]

    if os.path.isfile(absolute_path) and os.path.isfile(angle_path) and os.path.isfile(real_path) and os.path.isfile(imag_path) and os.path.isfile(cum3_path):
        continue

    RC_layers, cum3_avg = get_features(path, max_K=-1)
    signature_image = create_signature_image(RC_layers)
    save_images(signature_image, cum3_avg, absolute_path, angle_path, real_path, imag_path, cum3_path)

for i in tqdm(range(start, end)):
    path = fake_audio_df["path"][i]
    image_name = path.split(os.sep)[-1][:-4]
    [absolute_path,angle_path,real_path,imag_path,cum3_path] = [os.path.join(save_folder_path, f, "fake_audio" ,image_name + ".png") for f in features]

    if os.path.isfile(absolute_path) and os.path.isfile(angle_path) and os.path.isfile(real_path) and os.path.isfile(imag_path) and os.path.isfile(cum3_path):
        continue

    RC_layers, cum3_avg = get_features(path, max_K=-1)
    signature_image = create_signature_image(RC_layers)
    save_images(signature_image, cum3_avg, absolute_path, angle_path, real_path, imag_path, cum3_path)

100%|██████████| 4/4 [00:23<00:00,  5.88s/it]
100%|██████████| 4/4 [00:27<00:00,  6.87s/it]


# Numpy

In [63]:
import numpy as np
from scipy.linalg import toeplitz

In [67]:
save_folder_path = "here/numpy"
start = 0
end = 4
SEGMENT_SIZE = 400
SEGMENT_OVERLAP = 200
maxlag = int(SEGMENT_SIZE / 2)

In [68]:
real_audio_df = pd.read_csv("/home/mericdemirors/Desktop/490_datasets/release_in_the_wild/clean_data/real.csv")
fake_audio_df = pd.read_csv("/home/mericdemirors/Desktop/490_datasets/release_in_the_wild/clean_data/fake.csv")
real_audio_df = real_audio_df.sort_values(["seconds"]).reset_index(drop=True)
fake_audio_df = fake_audio_df.sort_values(["seconds"]).reset_index(drop=True)

features = ["absolutes","angles","reals","imags", "cum3s"]
for f in features:
    os.makedirs(os.path.join(save_folder_path, f, "real_audio"), exist_ok=True)
    os.makedirs(os.path.join(save_folder_path, f, "fake_audio"), exist_ok=True)

In [69]:
def get_cum3(data):
    cum3_dim = 2 * maxlag
    cum3 = np.zeros((cum3_dim, cum3_dim))

    ind = np.arange((SEGMENT_SIZE - maxlag), SEGMENT_SIZE)
    ind_t = np.arange(maxlag, SEGMENT_SIZE)
    zero_maxlag = np.zeros((1, maxlag))
    zero_maxlag_t = zero_maxlag.transpose()

    signal = np.reshape(data, (1, SEGMENT_SIZE))
    signal = signal - np.mean(data)
    sig = signal.transpose()

    rev_signal = np.array([signal[0][::-1]])
    col = np.concatenate((sig[ind], zero_maxlag_t), axis=0)
    row = np.concatenate((rev_signal[0][ind_t], zero_maxlag[0]), axis=0)

    toep = toeplitz(col, row)
    rev_signal = np.repeat(rev_signal, [2 * maxlag], axis=0)

    cum3 = cum3 + np.matmul(np.multiply(toep, rev_signal), toep.transpose())
    cum3 = cum3/SEGMENT_SIZE

    return cum3

def get_hamming_window():
    N = 2 * maxlag

    n = np.arange(N)
    window_even = 0.54 - 0.46 * np.cos((2 * np.pi * n) / (N - 1))

    # 2d even window
    window2d = np.array(
        [
            window_even,
        ]
        * N
    )

    ## One-sided window with zero padding
    window = np.zeros(N)
    window[: maxlag] = window_even[maxlag :]
    window[maxlag :] = 0

    # 2d window function to apply to bispectrum
    row = np.concatenate(([window[0]], np.zeros(2 * maxlag-1)))
    toep_matrix = toeplitz(window, row)
    toep_matrix += np.tril(toep_matrix, -1).transpose()
    window = toep_matrix[..., ::-1] * window2d * window2d.transpose()
    return window

def get_mag_and_phase(cum3):
    window = get_hamming_window()
    bispec = np.fft.fftshift(np.fft.fft2(np.fft.ifftshift(cum3 * window)))

    mag = np.abs(bispec)
    phase = np.angle(bispec)

    return mag, phase

def get_features(audio_path, segment_size=SEGMENT_SIZE, overlap=SEGMENT_OVERLAP, max_K=-1):
    data, samplerate = librosa.load(audio_path, sr=16000)
    num_segments = (len(data) - segment_size) // overlap + 1
    if max_K > 0:
        num_segments = min(num_segments, max_K)

    RC_layers = np.zeros((num_segments, segment_size, segment_size), dtype=complex)
    cum3_sum = np.zeros((segment_size, segment_size))

    for idx, segment_start in enumerate(range(0, len(data), overlap)):
        if idx == num_segments:
            break
        segment = data[segment_start:segment_start + segment_size]
        
        cum3 = get_cum3(segment)
        mag, phase = get_mag_and_phase(cum3)

        cum3_sum = cum3_sum + cum3

        R = mag * np.cos(phase)
        C = mag * np.sin(phase)
        RC_layers[idx] = R + C * 1j

    return RC_layers, cum3_sum/num_segments

def create_signature_image(RC_layers):
    RC_layers = RC_layers[..., np.newaxis]
    signature_image = np.zeros(RC_layers.shape[1:], dtype=complex)
    tops = np.sum(RC_layers, axis=0)

    signature_image = np.reshape(np.array([tops[r][c]/(np.sqrt(np.dot(RC_layers[:,r,c,:].T,np.conjugate(RC_layers[:,r,c,:])).real) + 0.0001) 
                                        for r in range(signature_image.shape[0]) 
                                        for c in range(signature_image.shape[1])]), signature_image.shape)

    # list comprehension is for this for loop
    # for r in range(signature_image.shape[0]):
    #     for c in range(signature_image.shape[1]):
    #         L = RC_layers[:,r,c,:]
    #         top = tops[r][c]
    #         bottom = np.sqrt(np.dot(L.T, np.conjugate(L)).real)
    #         signature_image[r,c] = top/(bottom + 0.0001)

    return signature_image

def save_images(signature_image, cum3_avg, absolute_path, angle_path, real_path, imag_path, cum3_path):
    absolute = np.absolute(signature_image)
    absolute_norm = (absolute - absolute.min()) / (absolute.max() - absolute.min())
    cv2.imwrite(absolute_path, (absolute_norm*255).astype(np.uint8))

    angle = np.angle(signature_image)
    angle_norm = (angle - angle.min()) / (angle.max() - angle.min())
    cv2.imwrite(angle_path, (angle_norm*255).astype(np.uint8))

    real = signature_image.real
    real_norm = (real - real.min()) / (real.max() - real.min())
    cv2.imwrite(real_path, (real_norm*255).astype(np.uint8))

    imag = signature_image.imag
    imag_norm = (imag - imag.min()) / (imag.max() - imag.min())
    cv2.imwrite(imag_path, (imag_norm*255).astype(np.uint8))

    cum3_norm = (cum3_avg - cum3_avg.min()) / (cum3_avg.max() - cum3_avg.min())
    cv2.imwrite(cum3_path, (cum3_norm*255).astype(np.uint8))

In [70]:
for i in tqdm(range(start, end)):
    path = real_audio_df["path"][i]
    image_name = path.split(os.sep)[-1][:-4]

    [absolute_path,angle_path,real_path,imag_path,cum3_path] = [os.path.join(save_folder_path, f, "real_audio" ,image_name + ".png") for f in features]

    if os.path.isfile(absolute_path) and os.path.isfile(angle_path) and os.path.isfile(real_path) and os.path.isfile(imag_path) and os.path.isfile(cum3_path):
        continue

    RC_layers, cum3_avg = get_features(path, max_K=-1)
    signature_image = create_signature_image(RC_layers)
    save_images(signature_image, cum3_avg, absolute_path, angle_path, real_path, imag_path, cum3_path)

for i in tqdm(range(start, end)):
    path = fake_audio_df["path"][i]
    image_name = path.split(os.sep)[-1][:-4]
    [absolute_path,angle_path,real_path,imag_path,cum3_path] = [os.path.join(save_folder_path, f, "fake_audio" ,image_name + ".png") for f in features]

    if os.path.isfile(absolute_path) and os.path.isfile(angle_path) and os.path.isfile(real_path) and os.path.isfile(imag_path) and os.path.isfile(cum3_path):
        continue

    RC_layers, cum3_avg = get_features(path, max_K=-1)
    signature_image = create_signature_image(RC_layers)
    save_images(signature_image, cum3_avg, absolute_path, angle_path, real_path, imag_path, cum3_path)

100%|██████████| 4/4 [00:16<00:00,  4.07s/it]
100%|██████████| 4/4 [00:16<00:00,  4.17s/it]
