In [None]:
import os, time, random, logging , datetime , cv2 , csv , subprocess , json
import numpy as np


import tensorflow as tf
print("tf",tf.version.VERSION)
from tensorflow import keras


import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from matplotlib.cm import get_cmap
from scipy.interpolate import griddata

import plotly.express as px
import plotly.graph_objs as go
import plotly.io as pio
import plotly.subplots as sp
import pandas as pd


from utils import globo , xdv , tfh5 , sinet


''' GPU CONFIGURATION '''

#os.environ["CUDA_VISIBLE_DEVICES"]="1,2"
#tfh5.set_tf_loglevel(logging.ERROR)
#tfh5.tf.debugging.set_log_device_placement(False) #Enabling device placement logging causes any Tensor allocations or operations to be printed.
#tfh5.set_memory_growth()
#os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true'


CFG_SINET = {
    
    'sinet_version': 'fsd-sinet-vgg42-tlpf_aps-1',
    
    'graph_filename' : os.path.join(globo.FSDSINET_PATH,"fsd-sinet-vgg42-tlpf_aps-1.pb"),
    'metadata_file'  : os.path.join(globo.FSDSINET_PATH,"fsd-sinet-vgg42-tlpf_aps-1.json"),
    
    'audio_fs_input':22050,
    'batchSize' : 64,
    'lastPatchMode': 'repeat',
    'patchHopSize' : 50,
    
    
    'anom_labels' : ["Alarm","Boom","Crowd","Dog","Drill","Explosion","Fire","Gunshot and gunfire","Hammer","Screaming","Screech",\
                    "Shatter","Shout","Siren","Slam","Squeak","Yell"],
    'anom_labels_i' : [4,18,51,59,65,72,78,92,94,145,146,147,148,152,154,161,198],
    
    'anom_labels2' : ["Boom","Explosion","Fire","Gunshot and gunfire","Screaming",\
                    "Shout","Siren","Yell"],
    'anom_labels_i2' : [18,72,78,92,147,148,152,198],
    
    'full_or_max' : 'max', #chose output to (timesteps,labels_total) ot (1,labels_total)
    'labels_total' : 200
    
}
CFG_WAV= {
    
    "full_or_max" : CFG_SINET["full_or_max"],
    "sinet_aas_len" : CFG_SINET["labels_total"], 
    
    "shuffle" : False,
    
    "ativa" : 'relu',
    "optima" : 'sgd',
    "batch_type" : 0,   # =0 all batch have frame_max or video length // =1 last batch has frame_max frames // =2 last batch has no repetead frames
    "frame_max" : 8000,
    "ckpt_start" : f"{0:0>8}",  #used in train_model: if 00000000 start from scratch, else start from ckpt with CFG_WAV stated
    
    "epochs" : 1,
    "batch_size" : 1
    
}

In [None]:
data = np.load(os.path.join(globo.AAS_PATH,f"{CFG_SINET['sinet_version']}--fl_train.npz"), allow_pickle=True)["data"]

p_es_arr_total = ([data[k]['p_es_array'] for k in range(len(data))])
labels_total = [data[k]['label'] for k in range(len(data))]

# Convert the lists to NumPy arrays
p_es_arr_total = np.array(p_es_arr_total)
labels_total = np.array(labels_total)

# Check the shapes of the resulting NumPy arrays
print("Shape of p_es_arr_total_np:", np.shape(p_es_arr_total[1])[1])
print("Shape of labels_total_np:", labels_total.shape)

In [None]:
typee = 2
printt = False

metadata = json.load(open(CFG_SINET['metadata_file'], "r"))
labels = metadata["classes"]

# Create a colormap with a unique color for each class
cmap = get_cmap('RdYlGn', len(labels))
# Assigning a color to each class
colors = cmap(np.arange(len(labels)))


for k in range(len(data)):
    vname = os.path.basename(data[k]['vpath'])
    label = data[k]['label']
    p_es_arr = data[k]['p_es_array']
    print("*************************\n",vname,"\n", \
        np.shape(p_es_arr),"\n",label)

    timesteps = np.shape(p_es_arr)[0]

    # Preparing the coordinates for the 3D scatter plot
    x = np.repeat(np.arange(timesteps), len(labels))
    y = np.tile(np.arange(len(labels)), timesteps)
    z = p_es_arr.ravel()

  
    # Define custom colors for each class
    colors = ['red', 'green', 'blue', 'yellow', 'cyan', 'magenta', 'orange']

    # Create a DataFrame to store the data
    df = pd.DataFrame({'Timesteps': x, 'Classes': y, 'Values': z})
        
        
    num_anom_classes = len(CFG_SINET["anom_labels_i"])
    colors = generate_colors(num_anom_classes)
        
        
    # Filter the DataFrame to include only the specific classes
    df_filtered = df[df['Classes'].isin(CFG_SINET["anom_labels_i"])]
    
    
    ###############
    ## 3D ANIMATION 
    # Initialize the cumulative data dictionary
    '''cumulative_data = {cls: {'x': [], 'y': [], 'z': []} for cls in CFG_SINET["anom_labels_i"]}

    frames = []
    for t in range(timesteps):
        frame_data = []
        for cls in CFG_SINET["anom_labels_i"]:
            current_data = df_filtered[(df_filtered['Timesteps'] == t) & (df_filtered['Classes'] == cls)]
            if not current_data.empty:
                cumulative_data[cls]['x'].extend(current_data['Timesteps'].tolist())
                cumulative_data[cls]['y'].extend(current_data['Classes'].tolist())
                cumulative_data[cls]['z'].extend(current_data['Values'].tolist())

            frame_data.append(
                go.Scatter3d(
                    x=cumulative_data[cls]['x'],
                    y=cumulative_data[cls]['y'],
                    z=cumulative_data[cls]['z'],
                    mode='lines+markers',
                    line=dict(color=colors[CFG_SINET["anom_labels_i"].index(cls)], width=2),
                    marker=dict(size=4, color=colors[CFG_SINET["anom_labels_i"].index(cls)]),
                    name=CFG_SINET["anom_labels"][CFG_SINET["anom_labels_i"].index(cls)],
                )
            )
        frames.append(go.Frame(data=frame_data, name=f'Time step {t}'))    

    custom_tickvals = [CFG_SINET["anom_labels_i"][i] for i in range(len(CFG_SINET["anom_labels_i"])) if i % 2 == 0]  # Every second tick value

    # Create the animation
    animation = go.Figure(data=frames[0].data)
    animation.frames = frames
    animation.update_layout(
        updatemenus=[
            {
                'type': 'buttons',
                'showactive': False,
                'buttons': [
                    {
                        'label': 'Play',
                        'method': 'animate',
                        'args': [None, {'frame': {'duration': 200, 'redraw': True}, 'fromcurrent': True}],
                    },
                    {
                        'label': 'Pause',
                        'method': 'animate',
                        'args': [[None], {'frame': {'duration': 0, 'redraw': False}, 'mode': 'immediate'}],
                    },
                ],
                'pad': {'r': 10, 't': 60},
                'direction': 'left',
            }
        ],
        scene=dict(
            bgcolor='white',
            yaxis=dict(
                tickvals=custom_tickvals,
                ticktext=[CFG_SINET["anom_labels"][i] for i in range(len(CFG_SINET["anom_labels_i"])) if i % 2 == 0],
                #tickvals=CFG_SINET["anom_labels_i"],  # Show only the specific classes on the y-axis
                #ticktext=CFG_SINET["anom_labels"],    # Display the corresponding class labels
                
            ),
        ),
        width=1200,  # Set the width of the plot
        height=800   # Set the height of the plot
    )

    # Enable the full-screen button
    animation.show(config={'toImageButtonOptions': {'format': 'png', 'filename': 'custom_image', 'height': 600, 'width': 1000, 'scale': 1}})
    '''
    
    ################
    ## 2D SUBPLOTS FOR EACH ANOM CLASS
    # Create a subplot with one row and a column for each class
    fig = sp.make_subplots(rows=1, cols=len(CFG_SINET["anom_labels_i"]), subplot_titles=CFG_SINET["anom_labels"])

    # Iterate over each class and add a scatter plot to the subplot
    for i, cls in enumerate(CFG_SINET["anom_labels_i"]):
        cls_data = df_filtered[df_filtered['Classes'] == cls]
        fig.add_trace(
            go.Scatter(
                x=cls_data['Timesteps'],
                y=cls_data['Values'],
                mode='lines',
                fill='tozeroy',  # Fill the area under the line
                fillcolor=colors[i],  # Set the fill color to match the line color
                line=dict(color=colors[i], width=2),
                name=CFG_SINET["anom_labels"][i]
            ),
            row=1,
            col=i + 1
        )
        # Update the yaxis domain for each subplot
        fig.update_yaxes(domain=[0, 1], range=[0, 1], row=1, col=i + 1)  # Set the range to [0, 1] for the y-axis

    # Update the layout
    fig.update_layout(
        title_text='Values by Timesteps and Classes',
        xaxis_title='Timesteps',
        yaxis_title='Values',
        width=4000,
        height=600
    )

    # Show the plot
    fig.show()
    
    #pio.write_html(
    #    animation,
    #    file='animation.html',
    #    config={'toImageButtonOptions': {'format': 'png', 'filename': 'custom_image', 'height': 600, 'width': 1000, 'scale': 1}},
    #    auto_open=True,
    #)
    
    
    '''# Create a DataFrame to store the data
    df = pd.DataFrame({'Timesteps': x, 'Classes': y, 'Values': z})

    # Plotting the 3D scatter plot using Plotly
    fig = px.scatter_3d(df, x='Timesteps', y='Classes', z='Values', color='Classes', opacity=0.8)
    fig.update_layout(scene=dict(bgcolor='white'))
    fig.show()'''
    
    if k == 100: break

In [None]:
labels[4]

MATLAPLOT

In [None]:
def rescale_sigmoid(data):
    
    def sigmoid(x):return 1 / (1 + np.exp(-x))
    
    num_features = data.shape[1]
    scaled_data = np.zeros_like(data)

    for feature_idx in range(num_features):
        feature_data = data[:, feature_idx]

        # Center the data around the mean
        centered_data = feature_data - np.mean(feature_data)

        # Apply sigmoid function
        scaled_feature_data = sigmoid(centered_data)
        scaled_data[:, feature_idx] = scaled_feature_data

    return scaled_data

def min_max_scaling(data):
    num_features = data.shape[1]
    scaled_data = np.zeros_like(data)

    for feature_idx in range(num_features):
        feature_data = data[:, feature_idx]

        min_val = np.min(feature_data)
        max_val = np.max(feature_data)

        # Avoid division by zero
        feature_range = max_val - min_val
        if feature_range == 0:
            feature_range = 1
        
        scaled_feature_data = (feature_data - min_val) / feature_range
        scaled_data[:, feature_idx] = scaled_feature_data
        
        #print(f"{feature_idx} , max{max_val} , min{min_val} , fr{feature_range}")
    
    return scaled_data

In [None]:
typee = 2
printt = False

metadata = json.load(open(CFG_SINET['metadata_file'], "r"))
labels = metadata["classes"]

# Create a colormap with a unique color for each class
cmap = get_cmap('RdYlGn', len(labels))
# Assigning a color to each class
colors = cmap(np.arange(len(labels)))


for k in range(len(data)):
    label = data[k]['label']
    p_es_arr = data[k]['p_es_array']
    print("*************************\n", \
        np.shape(p_es_arr),"\n",label)

    timesteps = np.shape(p_es_arr)[0]

    # Preparing the coordinates for the 3D scatter plot
    x = np.repeat(np.arange(timesteps), len(labels))
    y = np.tile(np.arange(len(labels)), timesteps)
    z = p_es_arr.ravel()


    fig = plt.figure(figsize=(12,8))
    ax = fig.add_subplot(111, projection='3d')
    # 3D scatter plot 
    if typee == 0:  
        # with unique colors for each class
        for class_idx, color in enumerate(colors):
            mask = (y == class_idx)
            ax.scatter(x[mask], y[mask], z[mask], c=color.reshape(1, -1)) #, label=f'Class {class_idx}'

        ax.set_xlabel('Timesteps')
        ax.set_ylabel('Classes')
        ax.set_zlabel('Values')
        plt.title(f'3D Visualization for Position {k}')
        plt.legend()
        plt.show()
        
    # 3D surface plot
    if typee == 1:
        # Create a grid for the surface plot
        xi = np.linspace(x.min(), x.max(), timesteps)
        yi = np.linspace(y.min(), y.max(), len(labels))
        X, Y = np.meshgrid(xi, yi)

        # Interpolate z values for the grid
        Z = griddata((x, y), z, (X, Y), method='cubic')

        # Plotting the 3D surface plot
        surf = ax.plot_surface(X, Y, Z, cmap='viridis', linewidth=0, antialiased=True)

        ax.set_xlabel('Timesteps')
        ax.set_ylabel('Classes')
        ax.set_zlabel('Values')
        plt.title(f'3D Visualization for Position {k}')
        fig.colorbar(surf, shrink=0.5, aspect=10)
        plt.show()
        
        
        ''' PRINTS '''
    if printt:
        
        for label_i in range(len(labels)):
            #print('\n\n',p_es_arr[:, label_i])
            print("\n\n",labels[label_i])
            label_array = p_es_arr[:, label_i]
            
            maxx = np.amax(label_array,axis=0)
            avgg = np.mean(label_array)
            print("@ MAX", maxx)
            print("@ AVG", avgg)
            #mmmaxx = np.amax(p_es_arr_norm[:, label_i])
            #mmavgg = np.mean(p_es_arr_norm[:, label_i])
            
            #print("\nORIGNAL",label_array.shape,"\n",label_array)
            #print("\nMinMaxNORM\n",p_es_arr_norm[:,label_i])
            #print("\nSIGMOID\n",p_es_arr_sigm[:,label_i])
            
            #print("@ mmMAX",mmmaxx)
            #print("@ AVG", avgg,"@ mmAVG",mmavgg)



        '''
        for i in range(len(CFG_SINET['anom_labels_i2'])):
            label_i = CFG_SINET['anom_labels_i2'][i]
            print("\n\n********\n",label_i, CFG_SINET['anom_labels2'][i], label)
        
            label_array = p_es_arr[:, label_i]
            
            maxx = np.amax(label_array,axis=0)
            avgg = np.mean(label_array)
            
            print("@ MAX", maxx)
            print("@ AVG", avgg)
            
            #mmmaxx = np.amax(p_es_arr_norm[:, label_i])
            #mmavgg = np.mean(p_es_arr_norm[:, label_i])
            
            #print("\nORIGNAL",label_array.shape,"\n",label_array)
            #print("\nMinMaxNORM\n",p_es_arr_norm[:,label_i])
            #print("\nSIGMOID\n",p_es_arr_sigm[:,label_i])
            
            #print("@ mmMAX",mmmaxx)
            #print("@ AVG", avgg,"@ mmAVG",mmavgg)
        '''

In [None]:
'''
from tensorflow.keras.preprocessing.sequence import pad_sequences

p_es = [] #np.empty((len(data) ,None, CFG_SINET["labels_total"]), dtype=np.float32)

for j in range(len(data)):
    p_es.append(data[j]['p_es_array'])
    

# Assuming X is a list of input sequences with varying lengths
X_padded = pad_sequences(p_es, dtype='float32', padding='post')

# Now, you can use X_padded to train or evaluate your model
# '''

In [None]:
#xdv.create_train_valdt_test_from_xdvtest_bg()
data=xdv.train_valdt_test_from_xdvtest_bg_from_npy(True)
