In [None]:
import os
import pathlib

**This notebook uses Google Cloud Storage instead of local storage**

All data for current pipeline is sotred in root folder

In [None]:
KEY = r'C:\Users\User\Desktop\Strat\ml_pipeline-hafizur\stratuscent\combined\keys\testproject8008-24541b9591e3.json'
BUCKET_NAME = 'data-store-strat'


# Break down of ML components for Training Pipeline implemented:

1. Read Positive Data
2. Read Negative Data
3. Process Data
    1. Merge Data
    2. Pad Data
4. Data_Split
5. Model_Training
6. Model Evaluation

## Current Architecture Of ML-Pipeline

![Architecture](./architecture.jpeg)

## Read Postive Data


In [None]:
def read_postive_data(raw_files_path: str = None, analyte_name: str ='no2', run_name: str = 'no2', training:bool = True):
    # Loading Dependencies
    import os
    import pathlib
    import shutil
    import pandas as pd
    from tqdm import tqdm
    from cloud_wrapper import GStore

    
    gstore = GStore(BUCKET_NAME)
    # Checking for edge cases
    if raw_files_path == None or analyte_name == None or run_name == None:
        raise ValueError('Reading positive data failed: Component Configuration Invalid!')
    if not gstore.is_dir(raw_files_path):
        raise ValueError('Reading positive data failed: Raw files path do not exist')  
    
    # Read files
    analyte_files = gstore.list_files(raw_files_path, recurse=True)
    if len(analyte_files) == 0:
        raise ValueError('Reading positive data failed: Raw files folder empty') 
 
    sensor_cols = ["s"+ str(i) for i in range(1, 33)]
    removed_files = []

    # Create Output folder
    output_file_path = 'data/no2_positive'
    if gstore.is_dir(output_file_path):
         gstore.rmdir(output_file_path)
    gstore.mkdir(output_file_path)

    print("Num of {} files before filter: {}".format(analyte_name, len(analyte_files)))
    print('Processing files...')
    if training:
        for analyte_file in tqdm(analyte_files, colour='red'):
            content = pd.read_csv(f'gs://{gstore.get_bucket_name()}/{analyte_file}')
            if len(content[analyte_name].unique()) < 10:
                removed_files.append(analyte_file)
                continue
            if not('run_name' in content.columns):
                removed_files.append(analyte_file)
                continue
            if not('_'+ run_name + '_' in content.run_name[0]):
                removed_files.append(analyte_file)
                continue
            for an_input in sensor_cols:
                if an_input not in content.columns:
                    removed_files.append(analyte_file)
                    break 
        analyte_filtered_files = [x for x in analyte_files if x not in removed_files]
        print("Num of {} files after filter: {}".format(analyte_name, len(analyte_filtered_files)))

        pos_files = analyte_filtered_files
    else:
        pos_files = analyte_files
        print('Not training model so not filtering data')
          
    print('Moving positive data files...')
    for x in tqdm(pos_files, colour='green',):
        gstore.copy_file(x, f'{output_file_path}/{os.path.basename(x)}')
    print('Moving Postive Data Completed!')
read_postive_data('raw_data_sample/no2_positive', training=True)


### Function 
-   Traverses directory containing raw data files and filters only postive sample of a given Analyte of a particular run
### Input
-   Path to Folder containing raw data files
-   Analyte name for which files are being filtered
-   Run name under consideration
-   Training flag: True if the files are going to 
    be used for training a new model or not
    
### Output
-   Output folder path where read data is saved
-   Output folder automatically created 
-   Saved Path: `data/no2_positive`

## Read Negative Data



### Function 
-   Traverses directory containing raw data files and filters only negative sample of a given Analyte of a particular run
### Input
-   Path to Folder containing raw data files
-   Analyte name for which files are being filtered
-   Run name under consideration
-   Training flag: True if the files are going to 
    be used for training a new model or not
    
### Output
-   Output folder path where read data is saved
-   Output folder automatically created 
-   Saved Path: `data/no2_negative`

In [None]:
def read_negative_data(raw_files_path: str = None, analyte_name: str ='no2', run_name: str = 'no2', training:bool = True):
    # Loading Dependencies
    import os
    import pathlib
    import shutil
    import pandas as pd
    from tqdm import tqdm
    from cloud_wrapper import GStore

    
    gstore = GStore(BUCKET_NAME)
    # Checking for edge cases
    if raw_files_path == None or analyte_name == None or run_name == None:
        raise ValueError('Reading negative data failed: Component Configuration Invalid!')
    if not gstore.is_dir(raw_files_path):
        raise ValueError('Reading negative data failed: Raw files path do not exist')  
    
    # Read files
    analyte_files = gstore.list_files(raw_files_path, recurse=True)
    if len(analyte_files) == 0:
        raise ValueError('Reading negative data failed: Raw files folder empty') 
 
    sensor_cols = ["s"+ str(i) for i in range(1, 33)]
    removed_files = []

    # Create Output folder
    output_file_path = 'data/no2_negative'
    if gstore.is_dir(output_file_path):
         gstore.rmdir(output_file_path)
    gstore.mkdir(output_file_path)

    print("Num of {} files before filter: {}".format(analyte_name, len(analyte_files)))
    print('Processing files...')
    if training:
        for neg_file in tqdm(analyte_files, colour='red'):
            content =  pd.read_csv(f'gs://{gstore.get_bucket_name()}/{neg_file}')
            if not('run_name' in content.columns):
                removed_files.append(neg_file)
                continue
            if '_'+analyte_name+'_' in content.run_name[0]:
                removed_files.append(neg_file)
                continue
            for an_input in sensor_cols:
                if not(an_input in content.columns):
                    removed_files.append(neg_file)
                    break
        analyte_filtered_files = [x for x in analyte_files if x not in removed_files]
        print("Num of {} file after filter: {}".format(analyte_name, len(analyte_filtered_files)))

        neg_files = analyte_filtered_files
    else:
        neg_files = analyte_files
        print('Not training model so not filtering data')
          
    print('Moving negative data files...')
    for x in tqdm(neg_files, colour='green',):
        gstore.copy_file(x, f'{output_file_path}/{os.path.basename(x)}')
    print('Moving Negative Data Completed!')
read_negative_data('raw_data_sample/no2_negative', training=True)

## Merge Data

### Function 
-   Traverses filtered positive and negative files to combines files: 
### Input
-   Path to Folder containing filtered data files
-   The path is to the parent directory. The components assumes that following two folders already exist:
    - `no2_negative`
    - `no2_positive`
-   Analyte name under consideration
-   Run name under consideration
-   Training flag: True if the files are going to 
    be used for training a new model or not
    
### Output
-   Output folder path where merge data is saved
-   Output folder automatically created, deletes previously created folders
-   Saved Path: `data/merged`
-   The merged data is saved in the following way:
    - Each data frame read is saved in `data/merged` folder as a csv file
    - A accompanying json file is saved that stores the association between the each `trail_id` and associated `data file`


In [None]:
def merge_data(data_folder: str = None, analyte_name: str ='no2', run_name: str = 'no2', training:bool = True):
    # Loading Dependencies
    import os
    import pathlib
    import shutil
    import json
    import pandas as pd
    from tqdm import tqdm
    from cloud_wrapper import GStore

    gstore = GStore(BUCKET_NAME)
    # Checking for edge cases
    if data_folder == None or analyte_name == None or run_name == None:
        raise ValueError('Merging data failed: Component Configuration Invalid!')

    if not gstore.is_dir(f'{data_folder}/no2_positive') or not gstore.is_dir(f'{data_folder}/no2_negative'):
        raise ValueError('Merging data failed: Data folder does not exist')
    
    # Read files
    pos_files =  gstore.list_files(f'{data_folder}/no2_positive', recurse=True)
    neg_files = gstore.list_files(f'{data_folder}/no2_negative', recurse=True)
    all_files = pos_files + neg_files
    
    if len(all_files) == 0:
        raise ValueError('Merge data failed: No files found')

    all_data = {}

    # Create output folders
    output_folder_path ='data/merged'
    if gstore.is_dir(output_folder_path):
         gstore.rmdir(output_folder_path)
    gstore.mkdir(output_folder_path)

    print("Num of all files: {}".format(len(all_files)))
    print('Merging files...')
    for idx, ctrial in enumerate(tqdm(all_files, colour='red')):
        if 'checkpoint' in ctrial:
            continue
        df = pd.read_csv(f'gs://{gstore.get_bucket_name()}/{ctrial}')
        df["filename"] = os.path.basename(ctrial)[:-4]
        if 'trial' in df.columns:
            df.rename(columns={"trial": "trial_id"}, inplace = True)
        try:
            run = df.run_name.iloc[0]
        except:
            run = -1
        if run in all_data:
            all_data[run].append(f'{output_folder_path}/{str(df["filename"].values[0])}.csv')
        else:
            all_data[run] = [f'{output_folder_path}/{str(df["filename"].values[0])}.csv']
        gstore.upload_from_memory(df.to_csv(index=False), f'{output_folder_path}/{str(df["filename"].values[0])}.csv', content_type='text/csv')
    
    gstore.upload_from_memory(json.dumps(all_data), f'{output_folder_path}/all_data.json', content_type='application/json')
    print('Merged files saved!')

merge_data('data', training=True)


## Process Data

### Function 
- Reads the merged data and processes the data in two steps:
    - Preparation stage
    - Padding stage
### Input
-   Path to Folder containing filtered data files
-   The path is to the parent directory. The components assumes that following sub folders already exist:
    - `merged`
-   Analyte name under consideration
-   Run name under consideration
-   Training flag: True if the files are going to 
    be used for training a new model or not
-   Additional following paramters that determine how the data is processed
    -   div_by=800
    -   start_from=0
    -   norm_bias_factor=-1 
    -   sensormoduleid_or_deviceid='sensor_module_id', 
    -   zeros=True
    -   max_len=None
    
### Output
-   Output folder path where prepared data is saved
-   Output folder automatically created, deletes previously created folders
-   Saved Path: `data/processed`
-   The processed data is saved in the following way:
    - `meta_data.json` file: stores information about types and length of processed data headers
    - `processed_data.json` file stores processed data that is stored as list
    - `numpy/` directory is created and all headers having numpy arrays are saved in this directory as a seprate binary file

In [None]:
def process_data(merged_data_path: str = None, analyte_name: str ='no2', run_name: str = 'no2', training=True,
                        div_by=800, start_from=0, norm_bias_factor=-1, 
                        sensormoduleid_or_deviceid='sensor_module_id', 
                        zeros=True, max_len=None):
    # Loading Dependencies
    import os
    import json
    import pathlib
    import shutil
    import io
    import pandas as pd
    import numpy as np
    import tensorflow as tf
    from scipy.interpolate import interp1d
    from scipy import interpolate
    from tqdm import tqdm
    from cloud_wrapper import GStore

    # Helper Class
    class NumpyEncoder(json.JSONEncoder):
        """ Special json encoder for numpy types """
        def default(self, obj):
            if isinstance(obj, np.integer):
                return int(obj)
            elif isinstance(obj, np.floating):
                return float(obj)
            elif isinstance(obj, np.ndarray):
                return obj.tolist()
            return json.JSONEncoder.default(self, obj)

    # Helper Function 
    def interpolate_vector(original_signal, desired_signal_length, kind="cubic"):
        """Modifies the signal shape by sampling it based on the provided shape.

        Args:
            original_signal: np.ndarray, a numpy array containing the original signal. The original signal must be a 2d tensor.
            desired_signal_length: int, specifying what is the length of the interpolated output signal.
            kind: str, function to be used for fitting points on the signal. For more info refer to:
                https://docs.scipy.org/doc/scipy/reference/generated/scipy.interpolate.interp1d.html 

        Returns:
            np.ndarray: A numpy array containing the artificial signal, sampled from the original signal.
        """
        if len(original_signal.shape) != 1:
            raise ValueError("Function is specific to vectors! Input has {} dimensions.".format(len(original_signal.shape)))
        interp_func = interp1d(range(len(original_signal)), original_signal, kind=kind)
        xnew = np.linspace(0, len(original_signal) - 1, desired_signal_length, endpoint=True)
        interpolated = interp_func(xnew)
        return interpolated

    def interpolate_tensor2d(input_signals, desired_signal_length, axis=0, kind="cubic"):
        """Modifies the signal shape by sampling it based on the provided shape.

        Args:
            input_signals: np.ndarray, a numpy array containing the original signal. The original signal must be a 2d tensor.
            desired_signal_length: int, specifying what is the length of the interpolated output signal.
            axis: int, axis which contains the sequence of the data.
            kind: str, function to be used for fitting points on the signal. For more info refer to:
                https://docs.scipy.org/doc/scipy/reference/generated/scipy.interpolate.interp1d.html 

        Returns:
            np.ndarray: A numpy array containing the artificial signals, sampled from the original signals.
        """
        if len(input_signals.shape) != 2:
            raise ValueError("Function is specific to 2D tensors! Input has {} dimensions.".format(len(input_signals.shape)))
        original_signal = np.moveaxis(input_signals, axis, 1)
        interpolated = []
        for curr_sig in original_signal:
            interpolated.append(interpolate_vector(curr_sig, desired_signal_length, kind))
        interpolated = np.moveaxis(interpolated, 1, axis)
        return interpolated

    def get_length_adjusted_features(features, desired_len=None, print_progress_bar=False):
        """Modifies the length of sequence in features to have the same size. If desired_len is not selected, The median
        value of all sequences lengths will be assigned as desired_length of final features. 

        Args:
            features: np.ndarray, three dimensional numpy array containing the input signal.
            desired_len: int, Specifying what should be the final length of sequences in features. If None is passed, the median
                of all sequences is used. 
            print_progress_bar: bool, Flag to set progress bar on or off.

        Returns:
            np.ndarray: numpy array containing features with the same sequence length.
        """
        features = np.array(features)
        if desired_len is None:
            dimensions = np.array([len(cf) for cf in features])
            desired_len = int(np.median(dimensions))
        final_features = np.zeros((features.shape[0], desired_len, features[0].shape[-1]))
        for i, cf in enumerate(features):
            if print_progress_bar:
                _print_progress_bar(i, len(features), "Feature length adjustment:")
            current_features = cf
            if len(cf) != desired_len:
                current_features = interpolate_tensor2d(cf, desired_len, 0)
            final_features[i] = current_features
        return final_features
    
    gstore = GStore(BUCKET_NAME)
    print('Loading merged data!')
    sensor_cols =  sensor_cols = ["s"+ str(i) for i in range(1, 33)]
     # Checking for edge cases
    if merged_data_path == None or analyte_name == None or run_name == None:
        raise ValueError('Data processing failed: Component Configuration Invalid!')

    if not gstore.is_dir(f'{merged_data_path}/merged'):
        raise ValueError('Data processing failed: Merged Data folder does not exist')
    
    if not gstore.is_file(f'{merged_data_path}/merged/all_data.json'):
        raise ValueError('Data processing failed: Merged Data does not exist')
    
    # Read data
    data_files_path = {}
    all_data = {}
    data_files_path = json.loads(gstore.download_into_memory(f'{merged_data_path}/merged/all_data.json'))
    
    if sum(map(lambda x : len(x) if type(x) == list else 0, data_files_path.values())) == 0:
        raise ValueError('Data processing failed: Merged Data file format error')

    for run_id in tqdm(data_files_path, colour='red'):
       all_data[run_id] = list(map(lambda x: pd.read_csv(f'gs://{gstore.get_bucket_name()}/{x}'), data_files_path[run_id]))

    # Create output folders
    output_folder_path ='data/processed'
    if gstore.is_dir(output_folder_path):
         gstore.rmdir(output_folder_path)
    gstore.mkdir(output_folder_path)

    # Create numpy output folders
    numpy_folders_output = f'{output_folder_path}/numpy'
    if gstore.is_dir(numpy_folders_output):
         gstore.rmdir(numpy_folders_output)
    gstore.mkdir(numpy_folders_output)

    # Create temp folder
    if pathlib.Path('tmp').is_dir():
       shutil.rmtree('tmp')
    pathlib.Path('tmp').mkdir(parents=True, exist_ok=True) 

    print('     Initiating data preparation stage!')
    ds = {"run":[], "sensor_responses":[], "labels": [], "extra_inputs": [], 'last_index': [],
                "moduleID": [], "humidity": [], "temperature": [], "trial_id": [], 'original_label_values': [], "filename": []}
    for k in all_data:
        for trial_id, cdata in enumerate(tqdm(all_data[k], colour='red')):

            fin_labels = np.zeros((len(cdata[::]), 1))
            if run_name not in cdata.run_name[0].lower():
                all_labels = np.zeros(len(cdata.iloc[:,0]))
            else:
                cdata[analyte_name] = cdata[analyte_name] - min(cdata[analyte_name])
                cdata[analyte_name] = cdata[analyte_name].div(div_by)
                all_labels = cdata[analyte_name].values

            fin_labels[:, 0]  = all_labels[::] 

            baseline = np.median(cdata[sensor_cols].iloc[start_from:start_from+100],axis=0).reshape(1, len(sensor_cols))

            if norm_bias_factor is None:
                norm = cdata[sensor_cols].values
            else:
                norm = cdata[sensor_cols].values/baseline + norm_bias_factor
            norm = norm[::]

            ###############################################
            if training:
                random_adjust = np.random.randint(100, 500)
                section_to_adjust = norm[0:200].reshape(1,200,len(sensor_cols)) #takes the first 200 rows of the normalized data
                section_to_adjust = get_length_adjusted_features(section_to_adjust, random_adjust)[0] # interpolate and extends them to the desired length
                norm = np.concatenate((section_to_adjust, norm[200:]), axis=0) # replace the first potion with the extended data
                labels_to_adjust = np.zeros((random_adjust, 1))
                fin_labels = np.concatenate((labels_to_adjust, fin_labels[200:]), axis=0)
                adjusted = {}
                for column in ["humidity", "temperature"]:
                    section_to_adjust = cdata[column][::][start_from:][:200].values.reshape(1,200,1)
                    section_to_adjust = get_length_adjusted_features(section_to_adjust, random_adjust)[0]
                    adjust_len = section_to_adjust.shape[0]
                    length = cdata[column][::][start_from:][200:].values.shape[0]
                    adjusted[column] = np.concatenate((section_to_adjust, cdata[column][::][start_from:][200:].values.reshape(length,1)), axis=0).reshape(adjust_len + length)
            ##############################################
                ds["sensor_responses"].append(norm[start_from:, :])
                ds['last_index'].append(len(cdata)+random_adjust-200)
                ds["labels"].append(fin_labels[start_from:])
                ds["humidity"].append(adjusted["humidity"])
                ds["temperature"].append(adjusted["temperature"])
            else:
                ds["sensor_responses"].append(norm[start_from:, :])
                ds['last_index'].append(len(cdata))
                ds["labels"].append(fin_labels[start_from:])
                ds["humidity"].append(cdata["humidity"].to_numpy())
                ds["temperature"].append(cdata["temperature"].to_numpy())
            try:
                ds["moduleID"].append(cdata[sensormoduleid_or_deviceid].iloc[0])
            except:
                ds["moduleID"].append(cdata["sensor_module_id"].iloc[0])
            try:
                ds["trial_id"].append(cdata["trial_id"].iloc[0])
            except:
                ds["trial_id"].append(-1)
            ds["run"].append(k)
            ds["filename"].append(cdata["filename"].iloc[0])

    print('     Initiating data padding stage!')

    if max_len is None:
            max_len = max([len(trial) for trial in ds["sensor_responses"]])
    try:
        if zeros:
            new_data = np.negative(np.ones((len(ds["sensor_responses"]), max_len, len(sensor_cols))))
            new_labels = np.zeros((len(ds["labels"]), max_len, ds["labels"][0].shape[1]))
            new_humidity = np.zeros((len(ds["humidity"]), max_len))
            new_temperature = np.zeros((len(ds["temperature"]), max_len))

        else:
            new_data = np.negative(np.ones((len(ds["sensor_responses"]), max_len, len(sensor_cols))))
            new_labels = np.ones((len(ds["labels"]), max_len, ds["labels"][0].shape[1]))
            new_humidity = np.ones((len(ds["humidity"]), max_len))
            new_temperature = np.ones((len(ds["temperature"]), max_len))

        for i, trial in enumerate(tqdm(ds["sensor_responses"], colour='red')):
                if len(trial) > max_len:
                    length = max_len
                else:
                    length = len(trial)
                new_data[i, :length, :] = trial[:length, :]
                new_labels[i, :length, :] = ds["labels"][i][:length]
                new_humidity[i, :length] = ds["humidity"][i][:length]
                new_temperature[i, :length] = ds["temperature"][i][:length]
    except:
        raise ValueError('Data processing failed: Data padding failed')

    ds["sensor_responses"] = new_data
    ds["labels"] = new_labels
    ds["humidity"] = new_humidity
    ds["temperature"] = new_temperature

    print('     Saving Data!')
    # Saving meta_data
    meta_data = {}
    processed_data = {}
    for i in tqdm(ds, colour='green'):
        if len(ds[i]) > 0 and type(ds[i][0]) == np.ndarray:
            meta_data[i] = True, len(ds[i]), f'{numpy_folders_output}/{i}.npy'
            np.save(os.path.join('tmp',f'{i}.npy'), ds[i], allow_pickle=True)
            gstore.upload_file_local(os.path.join('tmp',f'{i}.npy'), f'{numpy_folders_output}/{i}.npy')
        else:
            meta_data[i] = False, len(ds[i])
            processed_data[i] = ds[i]

    gstore.upload_from_memory(json.dumps(meta_data), f'{output_folder_path}/meta_data.json', content_type='application/json')
    gstore.upload_from_memory(json.dumps(processed_data, cls=NumpyEncoder), f'{output_folder_path}/processed_data.json', content_type='application/json')
    
    # Destroy local temp directory
    if pathlib.Path('tmp').is_dir():
       shutil.rmtree('tmp')
    print('Data processing complete!')
process_data('data', training=True)

## Split Data

### Function 
- Splits the processed data into train and test idx

### Input
-   Path to Folder containing processed data 
-   The path is to the parent directory. The components assumes that following sub folders already exist:
    - `processed`
-   Training flag: True if the files are going to 
    be used for training a new model or not
    
### Output
-   Output folder path where split_data idx are saved
-   Output folder automatically created, deletes previously created folders
-   Saved Path: `data/split_data_idx`
-   The processed data is saved in the following way:
    - `train_idx` numpy file containing indices used for training
    - `test_idx`  numpy file containing indices used for testing

In [None]:
def split_data(processed_data_path: str = None, training:bool = True):
    # Loading Dependencies
    import os
    import json
    import pathlib
    import shutil
    import io
    import numpy as np
    import pandas as pd
    from tqdm import tqdm
    from cloud_wrapper import GStore

    # Helper Functions
    def remove_dupl(ds):
        print('Remove duplicates')
        for key in ds.keys():
            print(key, ":   ", type(ds[key]))

        values_to_remove = []

        for i, row in enumerate(ds["labels"]):
            if np.isnan(row).any():# or np.sum(ds["labels"][i] < 50) == 0:
                values_to_remove.append(i)    

        final_indices = np.array([x for x in range(len(ds["labels"])) if x not in values_to_remove])

        for val in values_to_remove:
            del ds["run"][val]
            # del ds["extra_inputs"][val]
            del ds["moduleID"][val]
            del ds["trial_id"][val]
        for key in ["labels", "humidity", "temperature", "sensor_responses"]:
            ds[key] = ds[key][final_indices]
    
    gstore = GStore(BUCKET_NAME)
    print('Splitting data...')
    # Checking for edge cases
    if processed_data_path == None:
        raise ValueError('Splitting data failed: Component Configuration Invalid!')

    if not gstore.is_dir(f'{processed_data_path}/processed'):
        raise ValueError('Merging data failed: Processed does not exist')

    if not gstore.is_dir(f'{processed_data_path}/processed/numpy'):
        raise ValueError('Merging data failed: Processed data corrupted')

    if not gstore.is_file(f'{processed_data_path}/processed/meta_data.json'):
        raise ValueError('Merging data failed: Processed data meta-data missing')    

    if not gstore.is_file(f'{processed_data_path}/processed/processed_data.json'):
        raise ValueError('Merging data failed: Processed data missing')  

    # Read processed data
    meta_data = json.loads(gstore.download_into_memory(f'{processed_data_path}/processed/meta_data.json'))
    ds = json.loads(gstore.download_into_memory(f'{processed_data_path}/processed/processed_data.json'))   

    for header in tqdm(meta_data, colour='red'):
        if meta_data[header][0]:
            ds[header] = np.load(io.BytesIO(gstore.download_into_memory(meta_data[header][2])), allow_pickle=True)
        
    # Create output folders
    output_folder_path = 'data/split_data_idx'
    if gstore.is_dir(output_folder_path):
         gstore.rmdir(output_folder_path)
    gstore.mkdir(output_folder_path)

    if training == True:
        remove_dupl(ds)
        test_perc = 0.2
    else:
        test_perc = 1

    all_idx = np.arange(len(ds["run"]))
    test_size = int(len(ds["run"]) * test_perc)
    
    np.random.shuffle(all_idx)
    
    test_idx = all_idx[:test_size]
    train_idx = all_idx[test_size:]
    
    temp_bytes = io.BytesIO()
    np.save(temp_bytes, train_idx, allow_pickle=True)
    gstore.upload_from_memory(temp_bytes.getvalue(), f'{output_folder_path}/train_idx.npy', content_type='application/octet-stream')
    
    temp_bytes = io.BytesIO()
    np.save(temp_bytes, test_idx, allow_pickle=True)
    gstore.upload_from_memory(temp_bytes.getvalue(), f'{output_folder_path}/test_idx.npy', content_type='application/octet-stream')
        
    print('Data split!')
split_data('data', training=True)

## Train Model


### Function 
- Uses processed data and data split information to train new model

### Input
-   Path to Folder containing processed data 
-   The path is to the parent directory. The components assumes that following sub folders already exist:
    - `processed`
-   Path to Folder containing data split files 
-   The path is to the parent directory. The components assumes that following sub folders already exist:
    - `split_data_idx` 
-   Additional following parameters to determine the training:
    -   analyte_name, 
    -   epochs
    -   batch_size
    -   div_by    
### Output
-   Output folder path where model is saved
-   Output folder automatically created, deletes previously created folders
-   Saved Path: `models/models_path)`

In [None]:
def model_training(processed_data_path: str = None, data_split_path: str = None, analyte_name: str = 'no2', 
                epochs: int = 1, max_len:int = 10000, div_by:int = 800, before_after:int = 100, 
                start_from: int = 0, norm_bias_factor: int = -1, batch_size:int = 16, layers:int = 32,
                learning_rate:float = 5e-3
                ):
    # Loading Dependencies
    import os
    import pathlib
    import shutil
    import time
    import json
    import io
    import numpy as np
    import pandas as pd
    import tensorflow as tf
    import tensorflow.keras.backend as K
    from tqdm import tqdm
    from cloud_wrapper import GStore

    # Helper Functions
    def get_callbacks(model_name):
        callbacks = []
        callbacks.append(tf.keras.callbacks.EarlyStopping(
            monitor="val_loss", patience=10, verbose=0, mode="auto", baseline=None))
        callbacks.append(tf.keras.callbacks.ReduceLROnPlateau(
            monitor='val_loss', factor=0.1, patience=4, verbose=1, mode='auto', min_delta=0.000001,
            cooldown=0, min_lr=.0000001))
        callbacks.append(tf.keras.callbacks.ModelCheckpoint(
            model_name, monitor="val_loss", verbose=1, save_best_only=True, mode="auto",
            save_weights_only=True))
        return callbacks

    def get_model_sequential(batch_input_shape, layers=20, number_of_classes=1, stateful=False):
        dropouts = 0.2

        inlayer = tf.keras.Input(shape=batch_input_shape[1:], batch_size=batch_input_shape[0], name="input1")
        shared1 = tf.compat.v1.keras.layers.LSTM(layers, return_sequences=True, stateful=stateful, #go_backwards=True,
                                                    batch_input_shape=batch_input_shape)(inlayer)
        shared1 = tf.keras.layers.BatchNormalization()(shared1)
        shared1 = tf.keras.layers.Dropout(dropouts)(shared1)

        shared1 = tf.compat.v1.keras.layers.LSTM(layers, return_sequences=True, stateful=stateful, #go_backwards=True,
                                                    batch_input_shape=batch_input_shape)(shared1)
        shared1 = tf.keras.layers.BatchNormalization()(shared1)
        shared1 = tf.keras.layers.Dropout(dropouts)(shared1)

        shared2 = tf.keras.layers.Dense(layers, activation='relu')(inlayer)
        shared2 = tf.keras.layers.BatchNormalization()(shared2)
        shared2 = tf.keras.layers.Dropout(dropouts)(shared2)

        combined = tf.keras.layers.Add()([shared1, shared2])

        x = tf.keras.layers.Dense(layers-4, activation='relu')(combined)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.Dropout(dropouts)(x)
        x_out = tf.keras.layers.Dense(1, activation='relu')(x)
        model = tf.keras.models.Model(inputs=[inlayer], outputs=[x_out])
        return model

    def train_model(model, ds, analyte_name, train_idx, epochs, batch_size=2):
        # Specify which run to select (it's an index not the actual run number).
        run_index = 1
        data = ds["sensor_responses"]
        print(data.shape)
        labels_analyte = ds["labels"]
        labels_humidity = ds["humidity"]
        labels_temperature = ds["temperature"]

        train_data = data[train_idx]
        train_labels_analyte = labels_analyte[train_idx]
        train_labels_humidity = labels_humidity[train_idx]
        train_labels_temperature = labels_temperature[train_idx]

        model_name = "{}_model.h5".format(analyte_name)
        train_frac = int(len(train_data)*.8)

        if train_frac %2 != 0:
            train_frac += 1
        val_frac = train_frac
        if (len(train_data) - val_frac) % 2 != 0:
            val_frac += 2
        callback = get_callbacks(model_name)

        new_train_data = np.zeros((train_data.shape[0], train_data.shape[1], ds["sensor_responses"][0].shape[1]))
        new_train_data[:, :, :ds["sensor_responses"][0].shape[1]] = train_data

        train_data = new_train_data
        print("Size of training data: ", train_frac)
        print("Size of validation data: ", len(train_data[val_frac+1+2:]))
        print("Size of validation data: ", len(train_labels_analyte[val_frac+1+2:]))
        model.fit(train_data[:train_frac],
                            [
                                train_labels_analyte[:train_frac],

                                ],
                            batch_size=batch_size, epochs=epochs, callbacks=callback, shuffle=True,
                            validation_data=([
                                train_data[val_frac+1+2:],
                            ],
                            [
                                train_labels_analyte[val_frac+1+2:],
                            ]))
        return model

    def custom_loss(y_true, y_pred):
        return K.mean(K.square(K.abs(y_true - y_pred)))
    
    gstore = GStore(BUCKET_NAME)
    print('Starting Model Training...')
    # Checking for edge cases
    if processed_data_path == None or data_split_path == None:
        raise ValueError('Training failed: Component Configuration Invalid!')

    if not gstore.is_dir(f'{processed_data_path}/processed'):
        raise ValueError('Training failed: Processed does not exist')

    if not gstore.is_dir(f'{processed_data_path}/processed/numpy'):
        raise ValueError('Training failed: Processed data corrupted')

    if not gstore.is_file(f'{processed_data_path}/processed/meta_data.json'):
        raise ValueError('Training failed: Processed data meta-data missing')    

    if not gstore.is_file(f'{processed_data_path}/processed/processed_data.json'):
        raise ValueError('Training failed: Processed data missing')  

    # Read processed data
    meta_data = json.loads(gstore.download_into_memory(f'{processed_data_path}/processed/meta_data.json'))
    ds = json.loads(gstore.download_into_memory(f'{processed_data_path}/processed/processed_data.json'))   

    for header in meta_data:
        if meta_data[header][0]:
            ds[header] = np.load(io.BytesIO(gstore.download_into_memory(meta_data[header][2])), allow_pickle=True)

    if not gstore.is_file(f'{data_split_path}/split_data_idx/train_idx.npy'):
        raise ValueError('Training failed: Data splits could not be load')  
        
    if not gstore.is_file(f'{data_split_path}/split_data_idx/test_idx.npy'):
        raise ValueError('Training failed: Data splits could not be load')  
    
    test_idx = np.load(io.BytesIO(gstore.download_into_memory(f'{data_split_path}/split_data_idx/test_idx.npy')), allow_pickle=True)
    train_idx = np.load(io.BytesIO(gstore.download_into_memory(f'{data_split_path}/split_data_idx/train_idx.npy')), allow_pickle=True)

    # Create output folders
    cur_time =  time.strftime("_%Y_%m_%d_%H_%M_%S")
    local_model_path = os.path.join('tmp', 'models', cur_time)
    model_path = f'models/{cur_time}'
    latest_model_path = f'models/latest-{cur_time}'
    # Check on cloud
    if gstore.is_dir(model_path):
         gstore.rmdir(model_path)
    gstore.mkdir(model_path)
    # Create temp local dorectory
    if pathlib.Path(local_model_path).is_dir():
       shutil.rmtree(local_model_path)
    pathlib.Path(local_model_path).mkdir(parents=True, exist_ok=True)

    # Train Model
    model = get_model_sequential((None, None, ds["sensor_responses"][0].shape[1]), layers=layers)
    model.compile(optimizer=tf.optimizers.Adam(learning_rate), loss=custom_loss, metrics=[tf.keras.losses.MeanSquaredError()])
    model = train_model(model, ds, analyte_name=analyte_name, train_idx=train_idx, epochs=epochs, batch_size=batch_size)
    model_name = "{}_model.h5".format(analyte_name)
    model_json = model.to_json()
    with open(os.path.join(local_model_path, model_name), "w") as json_file:
        json_file.write(model_json)
    model.save_weights(os.path.join(local_model_path, model_name))
    model.save(os.path.join(local_model_path, model_name))

    # Upload to cloud
    gstore.upload_file_local(f'{local_model_path}/{model_name}', f'{model_path}/{model_name}')
    gstore.upload_from_memory(cur_time.encode('utf-8'), f'models/latest.txt', content_type='text/plain')
    # Destroy local temp directory
    if pathlib.Path('tmp').is_dir():
       shutil.rmtree('tmp')
       
    print('Training Complete!')
model_training('data', 'data')

## Evaluate

### Function 
- Use processed data to evaluate performance of a trained model

### Input
-   Path to Folder containing processed data
-   The path is to the parent directory. The components assumes that following sub folders already exist:
    - `processed`
-   Path to Folder containing split data files 
-   The path is to the parent directory. The components assumes that following sub folders already exist:
    - `split_data_idx`
-   Path to the folder containing model file
    - The folder itself will be named based on the time the model was trained on and the weights will have the followuing name:
    `no2_models.h5`
-   Ground Truth flag: True if the the data used for evaluation has ground truth labels
-   Addtional parameter required for creating predicted labels
    - div_by = 800

### Output
-   Output folder path where evaluation results are saved
-   Output folder automatically created, deletes previously created folders
-   Saved Path: `results/`
    -   `csv/`
    -   `plots/`
    -   `results.path`

In [None]:
def evaluate(processed_data_path: str = None, data_split_path: str = None, model_path:str=None, groundtruth:bool=True,
            div_by:int=800, layers:int=32):
   # Loading Dependencies
    import json
    import os
    import shutil
    import pathlib
    import re
    import time
    import csv
    import io
    import numpy as np
    import pandas as pd
    import tensorflow as tf
    import matplotlib.pyplot as plt
    from sklearn.metrics import r2_score, mean_squared_error
    from cloud_wrapper import GStore
    from uuid import uuid4
    import warnings
    warnings.filterwarnings('ignore')

    # Variables
    analytes =  ["no2", "no", "nh3", "dry_vape", "ethanol"]
    # Helper Functions
    def plot_pred(pp, ds, idx, plot_path, csv_path, groundtruth=True):
        fig, axes = plt.subplots(2,2, figsize=(15, 10), dpi=50, constrained_layout=True)
        last_idx = ds['last_index'][idx]

        for index_row in range(2):
            for index_col in range(2):
                if index_row == 0 and index_col == 0:
                    axes[index_row][index_col].set_title('prediction')
                    axes[index_row][index_col].plot(ds["labels"][idx][2:last_idx], label="actual_analyte")
                    axes[index_row][index_col].plot(pp[2:last_idx], label="pred_analyte")

                    axes[index_row][index_col].legend()
                elif index_row == 0 and index_col == 1:
                    axes[index_row][index_col].set_title('temperature')
                    axes[index_row][index_col].plot(ds['temperature'][idx][2:last_idx], 'blue', label='baseline')
                    axes[index_row][index_col].legend()
                elif index_row == 1 and index_col == 0:
                    axes[index_row][index_col].set_title('sensor_responses')
                    axes[index_row][index_col].plot(ds['sensor_responses'][idx][2:last_idx])
                else:       
                    axes[index_row][index_col].set_title('humidity')
                    axes[index_row][index_col].plot(ds['humidity'][idx][2:last_idx], 'blue', label='baseline')
                    axes[index_row][index_col].legend()
        new_line = '\n'
        module_id = ds['moduleID'][idx]
        run_id = ds['run'][idx]
        trial = ds['trial_id'][idx]
        filename = ds['filename'][idx]
        fig.suptitle(f'{module_id} \n'
                     f'{run_id} \n'
                     f'{trial} \n'
                     f'{filename}')
        file_name = r"plot_{}_{}_{}.png".format(ds['moduleID'][idx], ds['run'][idx], ds['trial_id'][idx])
        if gstore.is_file(f'{plot_path}/{file_name}'):
            file_name = os.path.splitext(file_name)[0] + str(uuid4()) + '.png'
        # Save figure image to a bytes buffer
        buf = io.BytesIO()
        fig.savefig(buf, bbox_inches='tight', facecolor='white')
        gstore.upload_from_memory(buf.read(), f'{plot_path}/{file_name}', content_type='image/png')
        plt.close(fig)

        # Write results in .csv file
        results_to_csv = pd.DataFrame(ds['labels'][idx][0:last_idx])
        results_to_csv.columns = ['ground_truth']
        results_to_csv['prediction'] = pp[0:last_idx]
        results_to_csv['filename'] = ds['filename'][idx]
        results_to_csv['module_ID'] = ds['moduleID'][idx]
        results_to_csv['run_ID'] = ds['run'][idx]
        results_to_csv['trial_ID'] = ds['trial_id'][idx]
        results_to_csv.to_csv(f'gs://{gstore.get_bucket_name()}/{csv_path}/{ds["filename"][idx]}_predictions.csv', header=['ground_truth', 'prediction', 'filename', 'module_ID', 'run_ID', 'trial_ID'], index = False)
   
    def get_model_sequential(batch_input_shape, layers=20, number_of_classes=1, stateful=False):
        dropouts = 0.2

        inlayer = tf.keras.Input(shape=batch_input_shape[1:], batch_size=batch_input_shape[0], name="input1")
        shared1 = tf.compat.v1.keras.layers.LSTM(layers, return_sequences=True, stateful=stateful, #go_backwards=True,
                                                    batch_input_shape=batch_input_shape)(inlayer)
        shared1 = tf.keras.layers.BatchNormalization()(shared1)
        shared1 = tf.keras.layers.Dropout(dropouts)(shared1)

        shared1 = tf.compat.v1.keras.layers.LSTM(layers, return_sequences=True, stateful=stateful, #go_backwards=True,
                                                    batch_input_shape=batch_input_shape)(shared1)
        shared1 = tf.keras.layers.BatchNormalization()(shared1)
        shared1 = tf.keras.layers.Dropout(dropouts)(shared1)

        shared2 = tf.keras.layers.Dense(layers, activation='relu')(inlayer)
        shared2 = tf.keras.layers.BatchNormalization()(shared2)
        shared2 = tf.keras.layers.Dropout(dropouts)(shared2)

        combined = tf.keras.layers.Add()([shared1, shared2])

        x = tf.keras.layers.Dense(layers-4, activation='relu')(combined)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.Dropout(dropouts)(x)
        x_out = tf.keras.layers.Dense(1, activation='relu')(x)
        model = tf.keras.models.Model(inputs=[inlayer], outputs=[x_out])
        return model
   
    gstore = GStore(BUCKET_NAME)
    print('Evaluating Model...')
    # Checking for edge cases
    if processed_data_path == None or data_split_path == None:
        raise ValueError('Evaluation failed: Component Configuration Invalid!')

    if not gstore.is_dir(f'{processed_data_path}/processed'):
        raise ValueError('Evaluation failed: Processed does not exist')

    if not gstore.is_dir(f'{processed_data_path}/processed/numpy'):
        raise ValueError('Evaluation failed: Processed data corrupted')

    if not gstore.is_file(f'{processed_data_path}/processed/meta_data.json'):
        raise ValueError('Evaluation failed: Processed data meta-data missing')    

    if not gstore.is_file(f'{processed_data_path}/processed/processed_data.json'):
        raise ValueError('Evaluation failed: Processed data missing')  
    
    if not gstore.is_file(f'models/latest.txt'):
        raise ValueError('Evaluation failed: Latest model weights not found')

    # Read processed data
    meta_data = json.loads(gstore.download_into_memory(f'{processed_data_path}/processed/meta_data.json'))
    ds = json.loads(gstore.download_into_memory(f'{processed_data_path}/processed/processed_data.json'))   

    for header in meta_data:
        if meta_data[header][0]:
            ds[header] = np.load(io.BytesIO(gstore.download_into_memory(meta_data[header][2])), allow_pickle=True)

    if not gstore.is_file(f'{data_split_path}/split_data_idx/train_idx.npy'):
        raise ValueError('Training failed: Data splits could not be load')  
        
    if not gstore.is_file(f'{data_split_path}/split_data_idx/test_idx.npy'):
        raise ValueError('Training failed: Data splits could not be load')  
    
    test_idx = np.load(io.BytesIO(gstore.download_into_memory(f'{data_split_path}/split_data_idx/test_idx.npy')), allow_pickle=True)
    train_idx = np.load(io.BytesIO(gstore.download_into_memory(f'{data_split_path}/split_data_idx/train_idx.npy')), allow_pickle=True)  

    # Check if we need to evaluate the latest model
    if not model_path:
        model_time = gstore.download_into_memory(f'models/latest.txt').decode()
        model_path  = f'models/{model_time}'
    
    # Create temp local directory
    local_model_path = os.path.join(*(['tmp'] + list(model_path.split('/'))+ ['no2_model.h5']))
    if pathlib.Path(local_model_path).is_dir():
       shutil.rmtree(local_model_path)
    pathlib.Path(local_model_path).parent.mkdir(parents=True, exist_ok=True)
    
    if not gstore.is_file(f'{model_path}/no2_model.h5'):
        raise ValueError('Evaluation failed: Model weights could not be load') 
    model = get_model_sequential((None, None, ds["sensor_responses"][0].shape[1]), layers=layers)
    # Download model weights to local directtory
    gstore.download_file_local(f'{model_path}/no2_model.h5', local_model_path)
    model.load_weights(local_model_path)


    result_path = f'results/{os.path.basename(model_path)}'
    if gstore.is_dir(result_path):
        gstore.rmdir(result_path)
    gstore.mkdir(result_path)

    # Creating local output folders
    plot_path = f'{result_path}/plots'
    csv_path = f'{result_path}/csv'
    results_path = f'{result_path}/results_path'
    gstore.mkdir(plot_path)
    gstore.mkdir(csv_path)
    gstore.mkdir(results_path)

    X = ds["sensor_responses"]
    X_train = X[train_idx]
    X_test = X[test_idx]

    y = ds["labels"]
    y_train = y[train_idx]
    y_test = y[test_idx]

    y_pred = model.predict(ds['sensor_responses'][test_idx])*div_by
    y_test = y_test*div_by
    ds['labels'] = ds['labels']*div_by
    print("Prediction finished!")

    # Create temporary temp metrics folder
    pathlib.Path(f'tmp/{results_path}').mkdir(parents=True, exist_ok=True)
    with open(f'tmp/{results_path}/metrics.csv', 'w') as f:
        writer = csv.writer(f)
        for analyte in analytes:
            device_id = []
            analyte_GT_max = []
            analyte_pred_max = []
            R_squared = []
            RMSE = []
            delta_temp = []
            delta_humid = []
            filename = []
            for i in range(len(y_pred)):
                idx = test_idx[i]
                if re.search('_'+analyte+'_', ds['run'][idx].lower()):
                    last_idx = ds['last_index'][idx]
                    plot_pred(y_pred[i], ds, test_idx[i], plot_path, csv_path, groundtruth=groundtruth)
                    if groundtruth:
                        R_squared.append(r2_score(y_test[i][2:last_idx], y_pred[i][2:last_idx]))
                        RMSE.append(mean_squared_error(y_test[i][2:last_idx], y_pred[i][2:last_idx], squared=False))
                        device_id.append(ds['moduleID'][idx])
                        filename.append(ds['filename'][idx])
                        analyte_GT_max.append(float(max(y_test[i][2:last_idx])))
                        analyte_pred_max.append(float(max(y_pred[i][2:last_idx])))
                        delta_temp.append(max(ds['temperature'][idx][2:last_idx]) - min(ds['temperature'][idx][2:last_idx]))
                        delta_humid.append(max(ds['humidity'][idx][2:last_idx]) - min(ds['humidity'][idx][2:last_idx]))
            if groundtruth:
                scores_r2_mean = np.mean(R_squared)
                scores_r2_std = np.std(R_squared)
                row = f'{analyte} r2 scores mean = {scores_r2_mean}, std =  {scores_r2_std}'
                writer.writerow([row])

            if device_id:
                results = pd.DataFrame(device_id)
                results.columns = ['device_id']
                results['filename'] = filename
                results['R_squared'] = R_squared
                results['RMSE'] = RMSE
                results['analyte_GT_max'] = analyte_GT_max
                results['analyte_pred_max'] = analyte_pred_max
                results['delta_temp'] = delta_temp
                results['delta_humid'] = delta_humid

                results=results.sort_values(by=['R_squared'], ascending=False)
                
                results.to_csv(f'gs://{gstore.get_bucket_name()}/{results_path}/result_stats_{analyte}.csv', header=['device_id', 'filename', 'R_squared', 'RMSE', 'analyte_GT_max', 'analyte_pred_max', 'delta_temp', 'delta_humid'], index = False)

        # # Save parameter file to folder
        # with open(os.path.join(results_path, 'parameters.json'), 'w') as json_file:
        #     json.dump(self.parameters, json_file)
    gstore.upload_file_local(f'tmp/{results_path}/metrics.csv', f'{results_path}/metrics.csv')
    # Delete temp local directory
    if pathlib.Path('tmp').is_dir():
       shutil.rmtree('tmp')
    print('Model Evaluation Complete!')
evaluate('data', 'data', groundtruth=True)

## Testing End to End Training Pipeline

In [None]:
def test_end_to_end_pipeline():
    read_postive_data('raw_data_sample/no2_positive', training=True)
    read_negative_data('raw_data_sample/no2_negative', training=True)
    merge_data('data', training=True)
    process_data('data', training=True)
    split_data('data', training=True)
    model_training('data', 'data')
    evaluate('data', 'data', groundtruth=True)

test_end_to_end_pipeline()