## Creating the pipeline with Apache Beam.

In this notebook I want to create a pipeline for training, using Apache Beam.

## Pipeline


In [2]:
import csv
import random
import numpy as np
import pyts.image as pyti
import pyts.approximation as pyta
import re
from typing import Dict
import apache_beam as beam

LABELS=['Break_in', 'Steady_state', 'Severe', 'Failure']


def retrieve_label(filename: str):
    file_string = os.path.basename(filename)
    (_, _, cut_event, _) = re.split(r"_|\.", file_string)
    cut_event = int(cut_event)
    if cut_event < 50:
        label = LABELS[0]
    elif cut_event < 175:
        label = LABELS[1]
    elif cut_event < 250:
        label = LABELS[2]
    else:
        label = LABELS[3]
    return label

def rescale(x, low=0, high=1):
    return (high - low) * (x - np.min(x)) / (np.max(x) - np.min(x)) + low


def get_gasf(cutter_sample): 
    cutter_acos = rescale(cutter_sample, low=-1, high=1)
    cutter_acos = np.arccos(cutter_acos)
    cutter_acos = np.asarray(cutter_acos).transpose()
    cutter_acos = cutter_acos.reshape(1, -1)
    paa = pyta.PiecewiseAggregateApproximation(window_size=4)
    fcutter_paa = paa.transform(cutter_acos) # Piecewise Aggregate Approximation applied to the time-domain data
    nelem = fcutter_paa.shape[1]
    gasf = pyti.GramianAngularField(image_size=nelem, method='summation') # Grammar Angular Summation Field
    fcutter_gasf = gasf.transform(fcutter_paa)
    image = rescale(fcutter_gasf, low=0, high=1)
    return image

class ProduceGasf(beam.DoFn):
    """Produces GASF images from a CSV given file."""
    def __init__(self, nparts: Dict[str, int]):
        beam.DoFn.__init__(self)
        self._numrows = 2048
        self._nparts = nparts
    
    def process(self, filename: str):
        label = retrieve_label(filename)
        with open(filename, newline='\n') as f:
            contents = csv.reader(f)
            contents = list(contents)
            num_rows = len(contents)
            for _ in range(self._nparts[label]):
                idx = np.random.choice(num_rows - self._numrows, 1)[0] # Random choice of the index
                selected_contents = contents[idx:(idx+self._numrows)]
                selected_contents = np.array([[float(x[0]), float(x[1]), float(x[2])] for x in selected_contents]) # Take only the required columns.
                image = get_gasf(selected_contents)
                yield {
                    'label': label,
                    'image': image            
                }
            




# Test.

In [10]:
filenames = [
            "c_1_037.csv", # "c_1_003.csv", # Break_in
            "c_1_151.csv", # "c_1_062.csv", # Steady state
            "c_1_205.csv", # "c_1_240.csv", # Severe
            "c_1_294.csv", # "c_1_305.csv" # Failure
        ]
filenames = [ f"../../Datasets/PHM2010Challenge/c1/c1/{fn}" for fn in filenames]
sampling = {"Break_in": 4, "Steady_state": 2, "Severe": 4, "Failure": 4} # Specify occurrences for each class for undersampling.

with beam.Pipeline() as pipeline:
    (
        pipeline
        | "Create file patterns" >> beam.Create(filenames)
        | "Transform data into GASF images" >> beam.ParDo(ProduceGasf(sampling))
        | 'Get the classes' >> beam.Map(lambda x: x['label'])
        | 'Count elements per key' >> beam.combiners.Count.PerElement()
        | beam.Map(print)
    )



('Break_in', 2)
('Steady_state', 1)
('Severe', 4)
('Failure', 3)


# Count how many files of each class there are

In [11]:
with beam.Pipeline() as pipeline:
    (
        pipeline
        | "Create file patterns"  >> beam.io.fileio.MatchFiles("../../Datasets/PHM2010Challenge/c1/c1/*.csv")
        | "Read matches" >> beam.io.fileio.ReadMatches()
        | "Get labels"  >> beam.Map(lambda f: retrieve_label(f.metadata.path))
        | "Count labels" >> beam.combiners.Count.PerElement()
        | beam.Map(print)
    )



('Steady_state', 125)
('Failure', 66)
('Severe', 75)
('Break_in', 49)


# Serialize into TFRecords

Serialize the images and labels into TFRecords. We will use undersampling of the `Steady-state` class.

#### TODO: Use PuLP or scipy.optimize to find integers for the partitions to balance the number of classes.

In [33]:
import tensorflow as tf

labels_to_int = {"Break_in": 0, "Steady_state": 1, "Severe": 2, "Failure": 3}

# Serialize the label and image
def _bytes_feature(value):
    """
    Returns a bytes_list from a string / byte.
    """
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def _int64_feature(value):
    """Create an int64 feature from a label"""
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

def serialize_example(image, label):
    """Serialized the image and its corresponding label"""
    feature = {
        "image": _bytes_feature(image.tobytes()),
        "label": _int64_feature(labels_to_int[label])
    }
    example_proto = tf.train.Example(features=tf.train.Features(feature=feature))
    return example_proto.SerializeToString()



class WriteToTFRecords(beam.DoFn):

    def process(self, element, OUTPUT_FOLDER):
        """Serialize the element into a TFRecord.

        The element is a dictionary {'label': label, 'image': np.array}
        """
        image = element.get("image").astype(dtype=np.float32)
        label = element.get("label")
        path = os.path.join(OUTPUT_FOLDER, label + str(hash(random.random()))  + ".TFRecord")

        with tf.io.TFRecordWriter(path) as writer:
            example = serialize_example(image, label)
            writer.write(example)
    




In [34]:
sampling = {"Break_in": 5, "Steady_state": 2, "Severe": 4, "Failure": 4} # Specify occurrences for each class for undersampling.

with beam.Pipeline() as pipeline:
    (
        pipeline
        | "Create file patterns"  >> beam.io.fileio.MatchFiles("../../Datasets/PHM2010Challenge/c1/c1/*.csv")
        | "Read matches"  >> beam.io.fileio.ReadMatches()
        | "Get filenames" >> beam.Map(lambda x: x.metadata.path)
        | "Transform data into GASF images" >> beam.ParDo(ProduceGasf(sampling))
        | "Write into TFRecords" >> beam.ParDo(WriteToTFRecords(), OUTPUT_FOLDER="GASF_Images")
    )



## Check if the classes are somehow balanced

In [49]:
with beam.Pipeline() as pipeline:
    (
        pipeline
        | "Create file patterns" >> beam.io.fileio.MatchFiles("GASF_Images/*.TFRecord")
        | "Read matches" >> beam.io.fileio.ReadMatches()
        | "Get filenames" >> beam.Map(lambda x: os.path.basename(x.metadata.path) )
        | "Split names into classes and the rest" >> beam.Regex.split(r'([A-Za-z]+(_[A-Za-z]+)?)([0-9]+\.TFRecord)')
        | "Extract classes" >> beam.Map(lambda x: x[0])
        | "Count classes" >> beam.combiners.Count.PerElement()
        | beam.FlatMap(print)
    )



('Break_in', 245)
('Failure', 264)
('Steady_state', 250)
('Severe', 300)


Seems ok


# Read files from TFRecord and train the model.