After loading data from Timber, it has to be preprocessed. It has to be decided when the source was stable, when voltage breakdowns occured an so on. These are tasks that in practice only have to be done once, which is the reason why this notebook exists. One is able to specify a raw data file and this notebook will export a labeled one together with visualizations of the performed tasks so that a visual quality check can be done.

In [1]:
# %load ../ionsrcopt/load_data.py
import pandas as pd
import numpy as np

def read_data_from_csv(filename, cols_to_read, rows_to_read):
    """ Read a csv file into a DataFrame

    Parameters:
        filename (string): Filename
        cols_to_read (list of string): The column names to read, None if everything should be read
        rows_to_read (list of int): The rown numbers to read, None if everything should be read

    Returns:
        DataFrame
    """

    print("Loading data from csv file \'{}\'".format(filename))
    if cols_to_read is None:
        df = pd.read_csv(filename).fillna(method='ffill')
    else:
        df = pd.read_csv(filename, usecols=cols_to_read).fillna(method='ffill')
    
    df = df.rename(columns={'Timestamp (UTC_TIME)' : 'Timestamp'})

    if rows_to_read is None:
        return df
    else:
        return df.iloc[rows_to_read]

def convert_column(df, column, type):
    """ Converts the dtype of a column

    Parameters:
        df (DataFrame): The DataFrame containing the column
        column (string): The column name
        type (string): dtype the column should be converted to

    Returns:
        DataFrame: The altered DataFrame or the old one, if it did not contain the specified column
    """

    if column in df.columns:
        print("Converting column \'{}\' to \'{}\'".format(column, type))
        return df.astype({column:type})
    else:
        #print("Column \'{}\' does not exist".format(column))
        return df

def convert_column_types(df):
    """ Convert all columns of a Dataframe of measurements to single precision values.

    Parameters:
        df (DataFrame): DataFrame to be altered

    Returns:
        DataFrame
    """

    print("Started type conversion of columns...")
    if 'Timestamp' in df.columns:
        print("Converting column \'{}\' to \'{}\'".format('Timestamp', 'datetime'))
        df['Timestamp'] = pd.to_datetime(df['Timestamp']) 
        df = df.set_index('Timestamp')
    df = convert_column(df, 'IP.NSRCGEN:BIASDISCAQNV', 'float32')
    df = convert_column(df, 'IP.NSRCGEN:GASSASAQN', 'float32')
    df = convert_column(df, 'IP.NSRCGEN:SOURCEHTAQNI', 'float32')
    df = convert_column(df, 'IP.SAIREM2:FORWARDPOWER', 'float32')
    df = convert_column(df, 'IP.SOLCEN.ACQUISITION:CURRENT', 'float32')
    df = convert_column(df, 'IP.SOLEXT.ACQUISITION:CURRENT', 'float32')
    df = convert_column(df, 'IP.SOLINJ.ACQUISITION:CURRENT', 'float32')
    df = convert_column(df, 'ITF.BCT15:CURRENT', 'float32')
    df = convert_column(df, 'ITF.BCT25:CURRENT', 'float32')
    df = convert_column(df, 'ITH.BCT41:CURRENT', 'float32')
    df = convert_column(df, 'ITL.BCT05:CURRENT', 'float32')
    df = convert_column(df, 'source_stable', 'int32')
    df = convert_column(df, 'is_breakdown', 'int32')
    df = convert_column(df, 'duration_seconds', 'float32')
    df = convert_column(df, 'optigrid_cluster', 'int32')
    return df

def clean_data(df):
    """ Clean the data of measurements, that are outliers, e.g. spikes in the extraction current.

    Parameters:
        df (DataFrame): DataFrame containing the measurements.

    Returns:
        DataFrame: Cleaned data.
    """

    print("Filtering data...")
    if 'ITF.BCT15:CURRENT' in df.columns:
        df['ITF.BCT25:CURRENT'] = df['ITF.BCT15:CURRENT'].apply(lambda x: np.nan if x < 0 else x)
    if 'ITF.BCT25:CURRENT' in df.columns:
        df['ITF.BCT25:CURRENT'] = df['ITF.BCT25:CURRENT'].apply(lambda x: np.nan if x < 0 else x)
    if 'ITH.BCT41:CURRENT' in df.columns:
        df['ITF.BCT25:CURRENT'] = df['ITF.BCT41:CURRENT'].apply(lambda x: np.nan if x < 0 else x)
    if 'ITL.BCT05:CURRENT' in df.columns:
        df['ITF.BCT25:CURRENT'] = df['ITF.BCT05:CURRENT'].apply(lambda x: np.nan if x < 0 else x)
    if 'IP.NSRCGEN:OVEN1AQNP' in df.columns:
        df['ITF.BCT25:CURRENT'] = df['IP.NSRCGEN:OVEN1AQNP'].apply(lambda x: np.nan if x < 4.5 else x)
    if 'IP.SOLEXT.ACQUISITION:CURRENT' in df.columns:
        df['ITF.BCT25:CURRENT'] = df['IP.SOLEXT.ACQUISITION:CURRENT'].apply(lambda x: np.nan if x < 1200 else x)
    if 'IP.NSRCGEN:BIASDISCAQNV' in df.columns:
        df['ITF.BCT25:CURRENT'] = df['IP.NSRCGEN:BIASDISCAQNV'].apply(lambda x: np.nan if x == 0 else x)
    if 'IP.SAIREM2:FORWARDPOWER' in df.columns:
        df['ITF.BCT25:CURRENT'] = df['IP.SAIREM2:FORWARDPOWER'].apply(lambda x: np.nan if x < 500 else x)
    if 'IP.NSRCGEN:SOURCEHTAQNI' in df.columns:
        df['ITF.BCT25:CURRENT'] = df['IP.NSRCGEN:SOURCEHTAQNI'].apply(lambda x: np.nan if x > 2.5 else x)
    if 'IP.NSRCGEN:SOURCEHTAQNI' in df.columns:
        df['ITF.BCT25:CURRENT'] = df['IP.NSRCGEN:SOURCEHTAQNI'].apply(lambda x: np.nan if x < 0.5 else x)
    
    return df

First we need to read the data into a dataframe that we will manipulate and save afterwards. We will not do any column preselection at this point.

In [2]:
input_file = '../Data_Raw/Nov2018.csv'
output_file = '../Data_Preprocessed/Nov2018.csv'

In [3]:
df = read_data_from_csv(input_file, None, None)
df = convert_column_types(df)
df.dropna(inplace=True)
df.shape

Loading data from csv file '../Data_Raw/Nov2018.csv'
Started type conversion of columns...
Converting column 'Timestamp' to 'datetime'
Converting column 'IP.NSRCGEN:BIASDISCAQNV' to 'float32'
Converting column 'IP.NSRCGEN:GASSASAQN' to 'float32'
Converting column 'IP.NSRCGEN:SOURCEHTAQNI' to 'float32'
Converting column 'IP.SAIREM2:FORWARDPOWER' to 'float32'
Converting column 'IP.SOLCEN.ACQUISITION:CURRENT' to 'float32'
Converting column 'IP.SOLEXT.ACQUISITION:CURRENT' to 'float32'
Converting column 'IP.SOLINJ.ACQUISITION:CURRENT' to 'float32'
Converting column 'ITF.BCT15:CURRENT' to 'float32'
Converting column 'ITF.BCT25:CURRENT' to 'float32'
Converting column 'ITH.BCT41:CURRENT' to 'float32'
Converting column 'ITL.BCT05:CURRENT' to 'float32'


(2866351, 12)

The first thing we are going to do, is marking the source as stable/unstable. The parameters used are from experiments on the Nov2018 data.

In [7]:
def timedelta_to_seconds(timedelta):
    if not pd.isnull(timedelta):
        return timedelta.total_seconds()
    else:
        return np.nan
    
df['duration_seconds'] = (df.index.to_series().diff(-1)).apply(timedelta_to_seconds).values
df['duration_seconds'] *= -1

In [9]:
# %load ../ionsrcopt/source_stability.py
import pandas as pd
import numpy as np

def stability_mean_variance_classification(df, value_column, weight_column, sliding_window_size_mean=500, sliding_window_size_std=1000, minimum_mean=0.025, maximum_variance=0.00005):
    """ Classifies all points in the data frame into the categories source stable/unstable, based on a rolling window and a minimum mean and maximum variance in this window.

    Parameters:
        df (DataFrame): The data input loaded as a DataFrame
        current_column (string): name of the column that contains the beam current we are interested in, typically BCT25
        sliding_window_size (int): size of the sliding window, by default 5000 (100 Minutes of data every 1.2 seconds)
        minimum_mean (double): minimal intensity of the beam in the sliding window for it to be considered stable
        maximum_variance (double): maximum variance of intensity of the beam in the sliding window for it to be considered stable

    Returns:
        Series: A series that for every data point indicates if the source was running stable or not (1 is stable, 0 is unstable)
    """

    df['wvalue'] = df[value_column] * df[weight_column]

    #mean = df.rolling(, min_periods=1, ).apply(time_weighted_mean)[current_column].shift(-sliding_window_size_mean // 2, freq='s').values
    mean_weight_sum = df[['wvalue', weight_column]].rolling('{}s'.format(sliding_window_size_mean), closed='left').sum()
    mean_weight_sum = mean_weight_sum.shift(-sliding_window_size_mean // 2, freq='s')
    wmean = mean_weight_sum['wvalue'] / mean_weight_sum[weight_column]
    wmean.name = 'wmean'

    df['wdeviation'] = df[value_column] - wmean
    df['wdeviation'] = df['wdeviation'] ** 2
    df['wdeviation'] *= df[weight_column]
    var_weight_sum = df[['wdeviation', weight_column]].rolling('{}s'.format(sliding_window_size_mean), closed='left').sum().shift(-sliding_window_size_mean // 2, freq='s')
    wvar = var_weight_sum['wdeviation'] / (var_weight_sum[weight_column] - 1)
    wvar.name = 'wvar'

    df.drop(['wvalue', 'wdeviation'], axis=1, inplace=True)

    #result = [int(m > minimum_mean and v < maximum_variance) if not np.isnan(m) and not np.isnan(v) else np.nan for (m, v) in zip(wmean, wvar)]
    stats = pd.concat([wmean, wvar], axis=1)
    stats['result'] = 0
    stats.loc[(stats['wmean'] > minimum_mean) & (stats['wvar'] < maximum_variance), 'result'] = 1

    return stats['result']


In [12]:
value_column = 'ITF.BCT25:CURRENT'
weight_column = 'duration_seconds'
sliding_window_size_mean=1500
sliding_window_size_std=2000
minimum_mean=0.022
#minimum_mean=0.027 #for Nov 2018
#minimum_mean=0.035 #for Nov 2016
maximum_variance=0.000035

df['source_stable'] = stability_mean_variance_classification(
                            df, 
                            value_column=value_column, 
                            weight_column=weight_column,
                            sliding_window_size_mean=sliding_window_size_mean,
                            sliding_window_size_std=sliding_window_size_std,
                            minimum_mean=minimum_mean, 
                            maximum_variance=maximum_variance)

The next thing we are interested in are the high voltage breakdowns.

In [17]:
# %load ../ionsrcopt/voltage_breakdowns.py
import pandas as pd
import numpy as np

def classify_using_std_threshold(values, threshold):
    """ Classify values based on the standard deviation exceding a certain threshold """

    std = np.std(values)
    return int(std >= threshold)

def detect_breakdowns(df, column='IP.NSRCGEN:SOURCEHTAQNI', window_size=40, threshold=0.5):
    """ Detection of high voltage breakdown based on standard deviation exceding a certain threshold that has to be determined by experiments.
    
    Parameters:
        df (DataFrame): The frame containing the data
        column (string): High voltage current, typically this should be 'IP.NSRCGEN:SOURCEHTAQNI' 
        window_size (int): Size of the rolling window. Once a breakdown is detected, every value in this window will be set to 1.
        threshold (double): Threshold for the standard deviation.
    
    Returns: 
        np.array: array that has ones, wherever a breakdown is found and is zero otherwise
    """

    if not column in df:
        raise ValueError("Error: The column cannot be found in the dataframe.")

    min_index = df.iloc[0].name

    result = np.zeros(len(df.index))
    values = df[column].values
    times = (df.index.astype('int64') * 1E-9).values
    
    current_breakdown = 0
    for i in range(len(values) - window_size):
        is_breakdown = classify_using_std_threshold(values[i:i+window_size], threshold)
        if is_breakdown:
            if not result[i]:
                current_breakdown = times[i]
                
            result[i:(i + window_size)] = current_breakdown

    return result

In [18]:
column = 'IP.NSRCGEN:SOURCEHTAQNI'
window_size = 40
threshold = 0.5

df['is_breakdown'] = detect_breakdowns(df, column, window_size, threshold)
df = df.astype({'is_breakdown':'int64'})

Because data is only registered when a parameter changes, the datapoints can correspond to a different time duration. When we do the clustering we need to take this into account using some form of weight. We will create a column that gives the duration of a datapoint in seconds.

### Visualizations
#### Source classification

In [16]:
%matplotlib notebook

import matplotlib.pyplot as plt
import matplotlib
plt.rcParams["figure.figsize"] = (30,6)
from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()

dates_stable = matplotlib.dates.date2num(df.loc[df['source_stable'] == 1].index.values)
dates_unstable = matplotlib.dates.date2num(df.loc[df['source_stable'] == 0].index.values)

fig = plt.figure()
ax = plt.subplot('111')
ax.plot_date(dates_stable, df.loc[df['source_stable'] == 1, 'ITF.BCT25:CURRENT'].values, fmt='.', c='orange')
ax.plot_date(dates_unstable, df.loc[df['source_stable'] == 0, 'ITF.BCT25:CURRENT'].values, fmt='.', c='blue')
ax.set_ylim(-0.01, None)

plt.show()

<IPython.core.display.Javascript object>

#### Voltage Breakdowns

In [22]:
%matplotlib notebook

import matplotlib.pyplot as plt
plt.rcParams["figure.figsize"] = (30,6)

s = df.loc[(df.index >= '2018-11-09 17:58.500') & (df.index <= '2018-11-09 18:02')].copy()

ax1 = df.plot(y=['IP.NSRCGEN:SOURCEHTAQNI', 'is_breakdown'], secondary_y=['is_breakdown'])
plt.show()

<IPython.core.display.Javascript object>

Now we can save the frame as a csv file. To save storage and increase loading time we set consequitve duplicates to nan. This can be reversed while loading using pd.fillna

In [24]:
df[df.shift(1)==df] = np.nan
df.to_csv(output_file)