In [None]:
"""For google colab workflow - mounts Google Drive and go to it"""
import os
from google.colab import drive
drive.mount('/content/gdrive')
os.chdir('./gdrive/MyDrive/Projects/RNN_for_GCPL/Notebooks')

In [1]:
"""Imports all necessary libs"""
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import sklearn
from Code.setup import *
import datetime as dt
import torch.nn as nn
import copy
from sklearn.model_selection import KFold
from sklearn.model_selection import GroupKFold
from RNN_for_GCPL import setup
import plotly
import plotly.graph_objs as go
import plotly.express as px
from plotly.subplots import make_subplots


seedEverything(seed=DEFAULT_RANDOM_SEED)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [2]:
"""For compatibility - cd to folder with data and models"""
os.chdir('../../RNN_for_GCPL/')

In [8]:
def check_correct_range(cycle:dict, correct_range:dict):
    """

    Args:
        cycle (): Cycle from resampled dataset, dictionary
        correct_range (): Range for selected keys, in notation {key:(min, max)}

    Returns:

    """
    check_results = {key : np.all((cycle[key]>range[0]) & (cycle[key]<range[1])) for key, range in correct_range.items()}
    return check_results


def segmentation(current:np.array):
    """

    Args:
        current (np.array): Current from experiment cycle

    Returns: segmentation - list of points for each segment

    """
    cc_charge = np.nonzero(np.isclose(current, 75 / 150, atol=1 / 150))[0]
    discharge = np.nonzero(np.isclose(current, -75 / 150, atol=1 / 150))[0]
    cv_charge = np.nonzero((current > 7.4 / 150) & (current < 74 / 150))[0]
    if cc_charge.shape[0] >0 and discharge.shape[0] >0:
        cv_charge = cv_charge[(cv_charge > cc_charge.max()) & (cv_charge < discharge.min())]
    rest_1 = np.nonzero(np.isclose(current, 0, atol=0.1 / 150))[0]
    if cv_charge.shape[0] >0 and discharge.shape[0] >0:
        rest_1 = rest_1[(rest_1 < discharge.min()) & (rest_1 > cv_charge.max())]
    rest_2 = np.nonzero(np.isclose(current, 0, atol=0.1 / 150))[0]
    if discharge.shape[0] > 0:
        rest_2 = rest_2[rest_2 > discharge.max()]
    else:
        rest_2 = rest_2[np.isin(rest_2, rest_1, invert=True)]
    segments = [cc_charge, cv_charge, rest_1, discharge, rest_2]
    return segments


def find_bad_cycles(dataset_full:torch.utils.data.Dataset):
    """

    Args:
        dataset_full (Dataset):

    Returns: list of dicts, cleared dataset.

    """
    soh, info = statistics(dataset_full)
    broken_cycles = set()
    correct_range = {'I':(-76/150, 76/150), 'E':(-0.01/1.45, 1+ .01/1.45)}
    range_error = {key:[] for key in correct_range} #Values out of correct range
    zero_size_error = {i:[] for i in range(5)} #Some segments have zero size
    not_unite_error = {i:[] for i in range(5)} #Some segments are not all together
    unknown_values_error = {'Below zero':[], 'High values':[]} #Different
    all_segments = []
    undefined_length = []
    gaps_between_segments = {i:[] for i in range(6)}
    for cycle_number in info.index.to_numpy():
        cycle = dataset_full[cycle_number]
        check_results = check_correct_range(cycle, correct_range)
        for key, result in check_results.items():
            if not result:
                range_error[key].append(cycle_number)

        current = cycle["I"]
        segments = segmentation(current)
        undefined_length.append(len(current)-np.sum([len(i) for i in segments]))
        all_segments.append(segments)
        # max_segment=0
        for i, segment in enumerate(segments):
            # gaps_between_segments[i].append(segment.min()- max_segment)
            # max_segment = segment.max()
            if segment.shape[0] == 0:
                zero_size_error[i].append(cycle_number)
            elif segment.ptp()- segment.shape[0]>0:
                not_unite_error[i].append(cycle_number)
        # gaps_between_segments[5].append(len(current) - max_segment)
    undefined_length = np.array(undefined_length)
    unknown_values_error['Below zero'] = np.where((undefined_length<0))[0]
    unknown_values_error['High values'] =np.where((undefined_length>=35*30/sampling_size))[0]
    for i in range_error.values():
        broken_cycles.update(i)
    for i in zero_size_error.values():
        broken_cycles.update(i)
    for i in not_unite_error.values():
        broken_cycles.update(i)
    for i in unknown_values_error.values():
        broken_cycles.update(i)
    # gaps_between_segments = np.array([i for i in gaps_between_segments.values()])
    all_pouches = info.groupby('Pouch').count()['SoH']
    short_pouches = all_pouches[all_pouches<50].index
    short_pouches_cycles = info[info.Pouch.isin(short_pouches)].index.tolist()
    broken_cycles.update(short_pouches_cycles)
    high_temp_cycles = info[info.Filename.str.contains('T50', case=False)].index
    broken_cycles.update(high_temp_cycles)
    refilled_cycles = info[info.Filename.str.contains('ref', case=False)].index
    broken_cycles.update(refilled_cycles)
    print(len(broken_cycles))
    return broken_cycles

In [9]:
sampling_size = 30
dataset_full_path = os.path.normpath(fr'./data/v5/{sampling_size}/')
dataset_full = GCPL_dataset_resampled3(dataset_full_path)
broken_cycles = find_bad_cycles(dataset_full)

3793


In [10]:
for sampling_size in [30, 60, 120, 180, 300]:
    dataset_full_path = os.path.normpath(fr'./data/v5/{sampling_size}/')
    dataset_full = GCPL_dataset_resampled3(dataset_full_path)
    dataset_full.data = [cycle for i, cycle in enumerate(dataset_full) if i not in broken_cycles]
    dataset_full.save(os.path.normpath(fr'./data/v6/{sampling_size}/'), overwrite=True)