In [None]:
import os
from pathlib import Path
import glob
import pandas as pd
import numpy as np
from scipy.signal import find_peaks
from scipy.stats import kurtosis
import csv
from math import floor
import npeet.entropy_estimators as ee
import random
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.utils import Sequence


## Files path

In [None]:
# Path related configuration
DATASET_BASE_DIR = Path("/home/italolanza/workspace/TG/dataset_raw/")

NORMAL_BASE_DIR = Path(str(DATASET_BASE_DIR) + '/normal/')

NORMAL_FILES = glob.glob(str(DATASET_BASE_DIR) + '/normal/*.csv')

# imbalance data
IMBALANCE_BASE_DIR = Path(str(DATASET_BASE_DIR) + '/imbalance/')

IMBALANCE_LOW_FILES = glob.glob(str(DATASET_BASE_DIR) + '/imbalance/6g/*.csv') \
                        + glob.glob(str(DATASET_BASE_DIR) + '/imbalance/10g/*.csv')

IMBALANCE_MEDIUM_FILES = glob.glob(str(DATASET_BASE_DIR) + '/imbalance/15g/*.csv') \
                        + glob.glob(str(DATASET_BASE_DIR) + '/imbalance/20g/*.csv') \
                        + glob.glob(str(DATASET_BASE_DIR) + '/imbalance/25g/*.csv')

IMBALANCE_HIGH_FILES = glob.glob(str(DATASET_BASE_DIR) + '/imbalance/30g/*.csv') \
                        + glob.glob(str(DATASET_BASE_DIR) + '/imbalance/35g/*.csv')

# horizontal misalignment data
HOR_MISALIGNMENT_BASE_DIR = Path(str(DATASET_BASE_DIR) + '/horizontal-misalignment/')

HOR_MISALIGNMENT_LOW_FILES = glob.glob(str(DATASET_BASE_DIR) + '/horizontal-misalignment/0.5mm/*.csv')

HOR_MISALIGNMENT_MEDIUM_FILES = glob.glob(str(DATASET_BASE_DIR) + '/horizontal-misalignment/1.0mm/*.csv') \
                                + glob.glob(str(DATASET_BASE_DIR) + '/horizontal-misalignment/1.5mm/*.csv')

HOR_MISALIGNMENT_HIGH_FILES = glob.glob(str(DATASET_BASE_DIR) + '/horizontal-misalignment/2.0mm/*.csv')

# vertigal misalignment data
VER_MISALIGNMENT_BASE_DIR = Path(str(DATASET_BASE_DIR) + '/vertical-misalignment/')

VER_MISALIGNMENT_LOW_FILES = glob.glob(str(DATASET_BASE_DIR) + '/vertical-misalignment/0.51mm/*.csv') \
                                + glob.glob(str(DATASET_BASE_DIR) + '/vertical-misalignment/0.63mm/*.csv')

VER_MISALIGNMENT_MEDIUM_FILES = glob.glob(str(DATASET_BASE_DIR) + '/vertical-misalignment/1.27mm/*.csv') \
                                + glob.glob(str(DATASET_BASE_DIR) + '/vertical-misalignment/1.40mm/*.csv')

VER_MISALIGNMENT_HIGH_FILES = glob.glob(str(DATASET_BASE_DIR) + '/vertical-misalignment/1.78mm/*.csv') \
                                + glob.glob(str(DATASET_BASE_DIR) + '/vertical-misalignment/1.90mm/*.csv')
#overhang data
OVERHANG_MISALIGNMENT_BASE_DIR = Path(str(DATASET_BASE_DIR) + '/overhang/')
#underhang data
UNDERHANG_MISALIGNMENT_BASE_DIR = Path(str(DATASET_BASE_DIR) + '/underhang/')


OUTPUT_DATA_DIR = Path("/home/italolanza/workspace/TG/dataset/")

## Creating dataset

In [None]:
sampling_rate = 50000 # Sample rate
T = 1/sampling_rate # Sampling period
time = np.arange(0, 5, T) # Create a time array from 0 to 5 seconds
sensors = ['Tacom', 'Aceler_Underhang_X','Aceler_Underhang_Y', 'Aceler_Underhang_Z', 'Aceler_Overhang_X', 'Aceler_Overhang_Y', 'Aceler_Overhang_Z', 'Audio']

features = []
for name in sensors:
    for i in range(1,4):
        column_name = name+'_'+str(i)+'f0'
        features.append(column_name)
    column_name = name+'_'+'kurtosis'
    features.append(column_name)
    column_name = name+'_'+'entropy'
    features.append(column_name)
    
features.append('Class')
print(features)



In [None]:
def process_files(root_dir="", defect_class="", single_file=True, suffle_data=True, test_size=0.3):
    """
    It creates a validation and training dataset from the file list
    """
    
    defect_class = ""
    sensor_features = [] # List to store features

    for root, dirs, files_list in os.walk(root_dir):

        print("Checking directory:" + root)
        splitted_root = root.split("/")
        
        if(len(splitted_root) > 6):
            defect_class = splitted_root[6]
        
        # Define class for classification
        defect_value = -1 # unitialized
        if(defect_class == 'normal'):
            defect_value = 0
        elif(defect_class == 'imbalance'):
            defect_value = 1
        elif(defect_class == 'horizontal-misalignment'):
            defect_value = 2
        elif(defect_class == 'vertical-misalignment'):
            defect_value = 3
        elif(defect_class == 'overhang'):
            defect_value = 4
        elif(defect_class == 'underhang'):
            defect_value = 5

        for file in files_list:

            file_path = root + '/' + file
            
            # Read data
            print(f'Opening file: {file_path}')
            file_data = pd.read_csv(file_path, names=sensors)

            # Compute Fourer Transform
            fourier_transform_normal = np.fft.rfft(file_data['Tacom'].values) # Calculando a tranformada de fourier do sinal(espectro)
            abs_fourier_transform_normal = np.abs(fourier_transform_normal) # Calcula o valor absoluto do números (Amplitude do espectro)
            power_spectrum_normal = np.square(abs_fourier_transform_normal) # Tira a raiz do valor absoluto
            frequency_normal = np.linspace(0, sampling_rate/2, len(power_spectrum_normal)) # Contruindo vetor das frequencias calculadas 

            # Find peaks
            Temp = file_data['Tacom'] > np.amax(file_data['Tacom'].values)/2 # Temporário
            # Vetor de booleanos
            peaks, _ = find_peaks(Temp) # Encontra os picos
            
            # print(peaks)

            Tempo_entre_picos = np.mean(np.diff(peaks))*T # Time between peaks
            f = 1/Tempo_entre_picos # Frequency

            # Harmonic frequencies
            ind_freq_rot1 = np.argwhere(np.abs(frequency_normal-f)<0.1) # Econtrando o valor da frequencia obtida f dentro do vetor 
            ind_freq_rot2 = np.argwhere(np.abs(frequency_normal-2*f)<0.1) # Econtrando o valor da frequencia obtida f dentro do vetor
            ind_freq_rot3 = np.argwhere(np.abs(frequency_normal-3*f)<0.1) # Econtrando o valor da frequencia obtida f dentro do vetor

            feature_record = []
            for column in file_data:

                # print(column)
                
                # Compute Fourier transform at each sensor
                fourier_transform_normal = np.fft.rfft(file_data[column].values) # Calculando a tranformada de fourier do sinal(espectro)
                abs_fourier_transform_normal = np.abs(fourier_transform_normal) # Calcula o valor absoluto do números (Amplitude do espectro)
                power_spectrum_normal = np.square(abs_fourier_transform_normal) # Tira a raiz do valor absoluto
                frequency_normal = np.linspace(0, sampling_rate/2, len(power_spectrum_normal)) # Contruindo vetor das frequencias calculadas 

                # Find amplitudes at harmonic frequencies
                abs_fourier_transform_f0 = abs_fourier_transform_normal[ind_freq_rot1] # Amplitude do espectro f0
                abs_fourier_transform_2f0 = abs_fourier_transform_normal[ind_freq_rot2] # Amplitude do espectro 2xf0
                abs_fourier_transform_3f0 = abs_fourier_transform_normal[ind_freq_rot3] # Amplitude do espectro 3xf0
                
                #print(file_data)
                feature_record.append(abs_fourier_transform_f0[0,0])
                feature_record.append(abs_fourier_transform_2f0[0,0])
                feature_record.append(abs_fourier_transform_3f0[0,0])
                
                # Find Kurtosi and Entropy
                curtose_normal = kurtosis(file_data[column]) #Calcula a curtose do sinal 
                list_normal = list(map(lambda x: [x], file_data[column].tolist())) #Adiciona os sinais a uma lista para o cáclulo da entropia
                entropy_normal = ee.entropy(list_normal) #Calcula a entropia
                
                feature_record.append(curtose_normal) #Adiciona a curtose
                feature_record.append(entropy_normal) #Adiciona a entropia

            # add defect class value
            feature_record.append(defect_value)

            
            # append list with features
            sensor_features.append(feature_record)

    
    #Already processed all files, creates a DataFrame
    dataset = pd.DataFrame(sensor_features, columns=features)
    if single_file:
        #Write dataset to csv
        dataset.to_csv(f'{str(OUTPUT_DATA_DIR)}/dataset_completo.csv', header=False, mode='a', index=False)
    else:
        #Write dataset to csv
        dataset.to_csv(f'{str(OUTPUT_DATA_DIR)}/{defect_class}_data.csv', header=False, mode='a', index=False)


In [None]:
# Normal data
def process_normal_data(suffle_data=True, test_size=0.3):
    
    if (Path.exists(OUTPUT_DATA_DIR.joinpath("normal_data.csv"))):
        return
 
    
    OUTPUT_DATA_DIR.mkdir(parents=True, exist_ok=True)
    # Y values appedend to the list
    NORMAL_DATA_OUTPUT = [0, 0.0]

    total_lines = 0

    for file_name in NORMAL_FILES:

        data_list = list()

        with open(file_name, 'r') as data_file:
            data_iter = csv.reader(data_file, delimiter=",")            
            for data in data_iter:
                data.extend(NORMAL_DATA_OUTPUT)
                data_list.append(data)

        if (suffle_data):
            random.suffle(data_list)
        
        with open(OUTPUT_DATA_DIR.joinpath("normal_data_treinamento.csv"), 'a') as training_file, \
             open(OUTPUT_DATA_DIR.joinpath("normal_data_validacao.csv"), 'a') as test_file:
            
            data_size = len(data_list)
            test_index = floor(data_size * test_size)
            total_lines += data_size 
           
            test_writer = csv.writer(test_file)
            training_writer = csv.writer(training_file)

            test_writer.writerows(data_list[:test_index])
            training_writer.writerows(data_list[test_index:])
    

    print(f"Dataset size: {total_lines} lines")
    print(f"Test dataset size {floor(total_lines * test_size)} lines")
    print(f"Training dataset size {total_lines - floor(total_lines * test_size)} lines")   

In [None]:
# Imbalance data
def process_imbalance_data(suffle_data=True, test_size=0.3):
    
    #IMBALANCE_OUTPUT_FILES = ["imbalance_low_data.csv", "imbalance_medium_data.csv", "imbalance_high_data.csv"]


    # if (Path.exists(OUTPUT_DATA_DIR.joinpath("imbalance_data.csv"))):
    #     return
 

    OUTPUT_DATA_DIR.mkdir(parents=True, exist_ok=True)
    # Y values appedend to the list
    LOW_IMBALANCE_OUTPUT = [1, 1.0]
    MEDIUM_IMBALANCE_OUTPUT = [1, 2.0]
    HIGH_IMBALANCE_OUTPUT = [1, 3.0]

    total_lines = 0

    # Low criticality (6g, 10g)
    if not ( Path.exists(OUTPUT_DATA_DIR.joinpath("imbalance_low_data.csv")) ):
        for file_name in IMBALANCE_LOW_FILES:

            data_list = list()

            with open(file_name, 'r') as data_file:
                data_iter = csv.reader(data_file, delimiter=",")
                for data in data_iter:
                    data.extend(LOW_IMBALANCE_OUTPUT)
                    data_list.append(data)
            
            if (suffle_data):
                random.suffle(data_list)

            with open(OUTPUT_DATA_DIR.joinpath("imbalance_low_data_treinamento.csv"), 'a') as training_file, \
                 open(OUTPUT_DATA_DIR.joinpath("imbalance_low_data_validacao.csv"), 'a') as test_file:
            
                data_size = len(data_list)
                test_index = floor(data_size * test_size)
                total_lines += data_size 
            
                test_writer = csv.writer(test_file)
                training_writer = csv.writer(training_file)

                test_writer.writerows(data_list[:test_index])
                training_writer.writerows(data_list[test_index:])                
    

    print("Low data")
    print("######################################")
    print(f"Dataset size: {total_lines} lines")
    print(f"Test dataset size {floor(total_lines * test_size)} lines")
    print(f"Training dataset size {total_lines - floor(total_lines * test_size)} lines\n")

    total_lines = 0

    # Medium criticality (15g, 20g, 25g)
    if not ( Path.exists(OUTPUT_DATA_DIR.joinpath("imbalance_medium_data.csv")) ):
        for file_name in IMBALANCE_MEDIUM_FILES:

            data_list = list()

            with open(file_name, 'r') as data_file:
                data_iter = csv.reader(data_file, delimiter=",")                
                for data in data_iter:
                    data.extend(MEDIUM_IMBALANCE_OUTPUT)
                    data_list.append(data)
            
            if (suffle_data):
                random.suffle(data_list)

            with open(OUTPUT_DATA_DIR.joinpath("imbalance_medium_data_treinamento.csv"), 'a') as training_file, \
                 open(OUTPUT_DATA_DIR.joinpath("imbalance_medium_data_validacao.csv"), 'a') as test_file:
            
                data_size = len(data_list)
                test_index = floor(data_size * test_size)
                total_lines += data_size 
            
                test_writer = csv.writer(test_file)
                training_writer = csv.writer(training_file)

                test_writer.writerows(data_list[:test_index])
                training_writer.writerows(data_list[test_index:])

    
    print("Medium data")
    print("######################################")
    print(f"Dataset size: {total_lines} lines")
    print(f"Test dataset size {floor(total_lines * test_size)} lines")
    print(f"Training dataset size {total_lines - floor(total_lines * test_size)} lines\n")

    total_lines = 0

    # High criticality (30g, 35g)
    if not ( Path.exists(OUTPUT_DATA_DIR.joinpath("imbalance_high_data.csv")) ):
        for file_name in IMBALANCE_HIGH_FILES:

            data_list = list()

            with open(file_name, 'r') as data_file:
                data_iter = csv.reader(data_file, delimiter=",")                
                for data in data_iter:
                    data.extend(HIGH_IMBALANCE_OUTPUT)
                    data_list.append(data)

            if (suffle_data):
                random.suffle(data_list)

            with open(OUTPUT_DATA_DIR.joinpath("imbalance_high_data_treinamento.csv"), 'a') as training_file, \
                 open(OUTPUT_DATA_DIR.joinpath("imbalance_high_data_validacao.csv"), 'a') as test_file:
            
                data_size = len(data_list)
                test_index = floor(data_size * test_size)
                total_lines += data_size 
            
                test_writer = csv.writer(test_file)
                training_writer = csv.writer(training_file)

                test_writer.writerows(data_list[:test_index])
                training_writer.writerows(data_list[test_index:])
    
    print("High data")
    print("######################################")
    print(f"Dataset size: {total_lines} lines")
    print(f"Test dataset size {floor(total_lines * test_size)} lines")
    print(f"Training dataset size {total_lines - floor(total_lines * test_size)} lines\n")

In [None]:
# Horizontal misalignment
def process_hor_misalignment_data(suffle_data=True, test_size=0.3):
    
    # HOR_MISLAGNMENT_OUTPUT_FILES=["hor_misalignment_low_data.csv", "hor_misalignment_medium_data.csv", "hor_misalignment_high_data.csv"]


    # if (Path.exists( OUTPUT_DATA_DIR.joinpath("hor_misalignment_data.csv")) ):
    #     return

 

    OUTPUT_DATA_DIR.mkdir(parents=True, exist_ok=True)
    # Y values appedend to the list
    LOW_HOR_MISALIGNMENT_OUTPUT = [2, 1.0]
    MEDIUM_HOR_MISALIGNMENT_OUTPUT = [2, 2.0]
    HIGH_HOR_MISALIGNMENT_OUTPUT = [2, 3.0]

    total_lines = 0

    # Low criticality (6g, 10g)
    if not ( Path.exists(OUTPUT_DATA_DIR.joinpath("hor_misalignment_low_data.csv")) ):
        for file_name in IMBALANCE_LOW_FILES:

            data_list = list()

            with open(file_name, 'r') as data_file:
                data_iter = csv.reader(data_file, delimiter=",")
                for data in data_iter:
                    data.extend(LOW_HOR_MISALIGNMENT_OUTPUT)
                    data_list.append(data)
            
            if (suffle_data):
                random.suffle(data_list)

            with open(OUTPUT_DATA_DIR.joinpath("hor_misalignment_low_data_treinamento.csv"), 'a') as training_file, \
                 open(OUTPUT_DATA_DIR.joinpath("hor_misalignment_low_data_validacao.csv"), 'a') as test_file:
            
                data_size = len(data_list)
                test_index = floor(data_size * test_size)
                total_lines += data_size 
            
                test_writer = csv.writer(test_file)
                training_writer = csv.writer(training_file)

                test_writer.writerows(data_list[:test_index])
                training_writer.writerows(data_list[test_index:])
    
    print("Low data")
    print("######################################")
    print(f"Dataset size: {total_lines} lines")
    print(f"Test dataset size {floor(total_lines * test_size)} lines")
    print(f"Training dataset size {total_lines - floor(total_lines * test_size)} lines\n")
    
    total_lines = 0

    # Medium criticality (15g, 20g, 25g)
    if not ( Path.exists(OUTPUT_DATA_DIR.joinpath("hor_misalignment_medium_data.csv")) ):
        for file_name in IMBALANCE_MEDIUM_FILES:

            data_list = list()

            with open(file_name, 'r') as data_file:
                data_iter = csv.reader(data_file, delimiter=",")                
                for data in data_iter:
                    data.extend(MEDIUM_HOR_MISALIGNMENT_OUTPUT)
                    data_list.append(data)
            
            if (suffle_data):
                random.suffle(data_list)

            with open(OUTPUT_DATA_DIR.joinpath("hor_misalignment_medium_data_treinamento.csv"), 'a') as training_file, \
                 open(OUTPUT_DATA_DIR.joinpath("hor_misalignment_medium_data_validacao.csv"), 'a') as test_file:
            
                data_size = len(data_list)
                test_index = floor(data_size * test_size)
                total_lines += data_size 
            
                test_writer = csv.writer(test_file)
                training_writer = csv.writer(training_file)

                test_writer.writerows(data_list[:test_index])
                training_writer.writerows(data_list[test_index:])
    
    print("Medium data")
    print("######################################")
    print(f"Dataset size: {total_lines} lines")
    print(f"Test dataset size {floor(total_lines * test_size)} lines")
    print(f"Training dataset size {total_lines - floor(total_lines * test_size)} lines\n")
    
    total_lines = 0

    # High criticality (30g, 35g)
    if not ( Path.exists(OUTPUT_DATA_DIR.joinpath("hor_misalignment_high_data.csv")) ):
        for file_name in IMBALANCE_HIGH_FILES:

            data_list = list()

            with open(file_name, 'r') as data_file:
                data_iter = csv.reader(data_file, delimiter=",")                
                for data in data_iter:
                    data.extend(HIGH_HOR_MISALIGNMENT_OUTPUT)
                    data_list.append(data)    

            if (suffle_data):
                random.suffle(data_list)
            
            with open(OUTPUT_DATA_DIR.joinpath("hor_misalignment_high_data_treinamento.csv"), 'a') as training_file, \
                 open(OUTPUT_DATA_DIR.joinpath("hor_misalignment_high_data_validacao.csv"), 'a') as test_file:
            
                data_size = len(data_list)
                test_index = floor(data_size * test_size)
                total_lines += data_size 
            
                test_writer = csv.writer(test_file)
                training_writer = csv.writer(training_file)

                test_writer.writerows(data_list[:test_index])
                training_writer.writerows(data_list[test_index:])
    
    print("High data")
    print("######################################")
    print(f"Dataset size: {total_lines} lines")
    print(f"Test dataset size {floor(total_lines * test_size)} lines")
    print(f"Training dataset size {total_lines - floor(total_lines * test_size)} lines\n")
    
    total_lines = 0

In [None]:
# Vertical misalignment
def process_ver_misalignment_data(suffle_data=True, test_size=0.3):
    
    # VER_MISLAGNMENT_OUTPUT_FILES=["ver_misalignment_low_data.csv", "ver_misalignment_medium_data.csv", "ver_misalignment_high_data.csv"]


    # if (Path.exists( OUTPUT_DATA_DIR.joinpath("ver_misalignment_data.csv")) ):
    #     return

 

    OUTPUT_DATA_DIR.mkdir(parents=True, exist_ok=True)
    # Y values appedend to the list
    LOW_VER_MISALIGNMENT_OUTPUT = [3, 1.0]
    MEDIUM_VER_MISALIGNMENT_OUTPUT = [3, 2.0]
    HIGH_VER_MISALIGNMENT_OUTPUT = [3, 3.0]

    total_lines = 0

    # Low criticality (6g, 10g)
    if not ( Path.exists(OUTPUT_DATA_DIR.joinpath("ver_misalignment_low_data.csv")) ):
        for file_name in IMBALANCE_LOW_FILES:

            data_list = list()

            with open(file_name, 'r') as data_file:
                data_iter = csv.reader(data_file, delimiter=",")
                for data in data_iter:
                    data.extend(LOW_VER_MISALIGNMENT_OUTPUT)
                    data_list.append(data)
            
            if (suffle_data):
                random.suffle(data_list)             

            with open(OUTPUT_DATA_DIR.joinpath("ver_misalignment_low_data_treinamento.csv"), 'a') as training_file, \
                 open(OUTPUT_DATA_DIR.joinpath("ver_misalignment_low_data_validacao.csv"), 'a') as test_file:
            
                data_size = len(data_list)
                test_index = floor(data_size * test_size)
                total_lines += data_size 
            
                test_writer = csv.writer(test_file)
                training_writer = csv.writer(training_file)

                test_writer.writerows(data_list[:test_index])
                training_writer.writerows(data_list[test_index:])
    
    print("Low data")
    print("######################################")
    print(f"Dataset size: {total_lines} lines")
    print(f"Test dataset size {floor(total_lines * test_size)} lines")
    print(f"Training dataset size {total_lines - floor(total_lines * test_size)} lines\n")
    
    total_lines = 0

    # Medium criticality (15g, 20g, 25g)
    if not ( Path.exists(OUTPUT_DATA_DIR.joinpath("ver_misalignment_medium_data.csv")) ):
        for file_name in IMBALANCE_MEDIUM_FILES:

            data_list = list()

            with open(file_name, 'r') as data_file:
                data_iter = csv.reader(data_file, delimiter=",")                
                for data in data_iter:
                    data.extend(MEDIUM_VER_MISALIGNMENT_OUTPUT)
                    data_list.append(data)

            if (suffle_data):
                random.suffle(data_list)

            with open(OUTPUT_DATA_DIR.joinpath("ver_misalignment_medium_data_treinamento.csv"), 'a') as training_file, \
                 open(OUTPUT_DATA_DIR.joinpath("ver_misalignment_medium_data_validacao.csv"), 'a') as test_file:
            
                data_size = len(data_list)
                test_index = floor(data_size * test_size)
                total_lines += data_size 
            
                test_writer = csv.writer(test_file)
                training_writer = csv.writer(training_file)

                test_writer.writerows(data_list[:test_index])
                training_writer.writerows(data_list[test_index:])
    
    print("Medium data")
    print("######################################")
    print(f"Dataset size: {total_lines} lines")
    print(f"Test dataset size {floor(total_lines * test_size)} lines")
    print(f"Training dataset size {total_lines - floor(total_lines * test_size)} lines\n")
    
    total_lines = 0

    # High criticality (30g, 35g)
    if not ( Path.exists(OUTPUT_DATA_DIR.joinpath("ver_misalignment_high_data.csv")) ):
        for file_name in IMBALANCE_HIGH_FILES:

            data_list = list()

            with open(file_name, 'r') as data_file:
                data_iter = csv.reader(data_file, delimiter=",")                
                for data in data_iter:
                    data.extend(HIGH_VER_MISALIGNMENT_OUTPUT)
                    data_list.append(data)    

            if (suffle_data):
                random.suffle(data_list)
            
            with open(OUTPUT_DATA_DIR.joinpath("ver_misalignment_high_data_treinamento.csv"), 'a') as training_file, \
                 open(OUTPUT_DATA_DIR.joinpath("ver_misalignment_high_data_validacao.csv"), 'a') as test_file:
            
                data_size = len(data_list)
                test_index = floor(data_size * test_size)
                total_lines += data_size 
            
                test_writer = csv.writer(test_file)
                training_writer = csv.writer(training_file)

                test_writer.writerows(data_list[:test_index])
                training_writer.writerows(data_list[test_index:])
    
    print("High data")
    print("######################################")
    print(f"Dataset size: {total_lines} lines")
    print(f"Test dataset size {floor(total_lines * test_size)} lines")
    print(f"Training dataset size {total_lines - floor(total_lines * test_size)} lines\n")
    
    total_lines = 0

## Loading and processing data

In [None]:
#normal data
process_files(root_dir=NORMAL_BASE_DIR, defect_class='normal', single_file=False)

In [None]:
#imbalance data
process_files(root_dir=IMBALANCE_BASE_DIR, defect_class='normal', single_file=False)

In [None]:
#horizontal misalignment data
process_files(root_dir=HOR_MISALIGNMENT_BASE_DIR, defect_class='normal', single_file=False)

In [None]:
#vertical misalignment data
process_files(root_dir=VER_MISALIGNMENT_BASE_DIR, defect_class='normal', single_file=False)

In [None]:
#overhang data
process_files(root_dir=OVERHANG_MISALIGNMENT_BASE_DIR, defect_class='normal', single_file=False)

In [None]:
#underhang data
process_files(root_dir=UNDERHANG_MISALIGNMENT_BASE_DIR, defect_class='normal', single_file=False)

In [None]:
class DataGenerator(Sequence):
    """
    """

    MAX_DATASET_SIZE_TRAINING = 8575000 # The number of lines of the training set of the Normal data (the smaller one)
    MAX_DATASET_SIZE_VALIDATION = 3675000 # The number of lines of the validation set of the Normal data (the smaller)

    def __init__(self, batch_size=8575, is_validation=False):
        self.batch_size = batch_size
        self.is_validation = is_validation

    
    def __len__(self):
        # Returns the number of batches in the sequence
        if not self.is_validation:
            return int(floor(DataGenerator.MAX_DATASET_SIZE_TRAINING / self.batch_size))
        else:
            return int(floor(DataGenerator.MAX_DATASET_SIZE_VALIDATION / self.batch_size))
            

    def __getitem__(self, index):
        # Returns the next batch of values based on the index
        name_modifier = ""
        data = pd.DataFrame()
        

        if self.is_validation:
            name_modifier = "_validacao"

        else:
            name_modifier = "_treinamento"

           
        normal_data = pd.read_csv(OUTPUT_DATA_DIR.joinpath(f"normal_data{name_modifier}.csv"),chunksize=self.batch_size, header=None, skiprows=(index*self.batch_size) )
        imbalance_low_data = pd.read_csv(OUTPUT_DATA_DIR.joinpath(f"imbalance_low_data{name_modifier}.csv"),chunksize=self.batch_size, header=None, skiprows=(index*self.batch_size))
        imbalance_medium_data = pd.read_csv(OUTPUT_DATA_DIR.joinpath(f"imbalance_medium_data{name_modifier}.csv"), chunksize=self.batch_size, header=None, skiprows=(index*self.batch_size) )
        imbalance_high_data = pd.read_csv(OUTPUT_DATA_DIR.joinpath(f"imbalance_high_data{name_modifier}.csv"),chunksize=self.batch_size, header=None, skiprows=(index*self.batch_size) )
        hor_misalignment_low_data = pd.read_csv(OUTPUT_DATA_DIR.joinpath(f"hor_misalignment_low_data{name_modifier}.csv"),chunksize=self.batch_size, header=None, skiprows=(index*self.batch_size) )
        hor_misalignment_medium_data = pd.read_csv(OUTPUT_DATA_DIR.joinpath(f"hor_misalignment_medium_data{name_modifier}.csv"),chunksize=self.batch_size, header=None, skiprows=(index*self.batch_size) )
        hor_misalignment_high_data = pd.read_csv(OUTPUT_DATA_DIR.joinpath(f"hor_misalignment_high_data{name_modifier}.csv"),chunksize=self.batch_size, header=None, skiprows=(index*self.batch_size) )
        ver_misalignment_low_data = pd.read_csv(OUTPUT_DATA_DIR.joinpath(f"ver_misalignment_low_data{name_modifier}.csv"),chunksize=self.batch_size, header=None, skiprows=(index*self.batch_size) )
        ver_misalignment_medium_data = pd.read_csv(OUTPUT_DATA_DIR.joinpath(f"ver_misalignment_medium_data{name_modifier}.csv"),chunksize=self.batch_size, header=None, skiprows=(index*self.batch_size) )
        ver_misalignment_high_data = pd.read_csv(OUTPUT_DATA_DIR.joinpath(f"ver_misalignment_high_data{name_modifier}.csv"),chunksize=self.batch_size, header=None, skiprows=(index*self.batch_size) )


        data = data.append(normal_data.get_chunk(self.batch_size), ignore_index=True)
        data = data.append(imbalance_low_data.get_chunk(self.batch_size), ignore_index=True)
        data = data.append(imbalance_medium_data.get_chunk(self.batch_size), ignore_index=True)
        data = data.append(imbalance_high_data.get_chunk(self.batch_size), ignore_index=True)
        data = data.append(hor_misalignment_low_data.get_chunk(self.batch_size), ignore_index=True)
        data = data.append(hor_misalignment_medium_data.get_chunk(self.batch_size), ignore_index=True)
        data = data.append(hor_misalignment_high_data.get_chunk(self.batch_size), ignore_index=True)
        data = data.append(ver_misalignment_low_data.get_chunk(self.batch_size), ignore_index=True)
        data = data.append(ver_misalignment_medium_data.get_chunk(self.batch_size), ignore_index=True)
        data = data.append(ver_misalignment_high_data.get_chunk(self.batch_size), ignore_index=True)


        data_x = data.iloc[:,0:8].to_numpy()
        data_y = data.iloc[:,8:10].to_numpy().astype(int)

        return (data_x, data_y)