In [None]:
import datetime
import numpy as np
import pandas as pd
import typing as t

In [None]:
# Creating the timestamps for the index
current_date = datetime.datetime.now()
previous_90d_date = current_date - datetime.timedelta(days = 90)
start_date = datetime.datetime(year = previous_90d_date.year, 
                              month = previous_90d_date.month,
                              day = 1)

index = pd.date_range(start = start_date, end = current_date, freq = '4H')

In [None]:
# Generating the sample points
samples = np.random.normal(loc=10, scale=3, size=(len(index), 2))

# Creating the dataframe 
data = pd.DataFrame(data=samples, index=index, columns=['Circuit 1', 'Circuit 2'])

# Randomly drop some samples 
drop_pct = 0.1
drop_index = np.random.choice([*range(len(index))], size=int(len(index) * drop_pct), replace=False)
data.iloc[drop_index] = np.nan

# Printing the resulting dataframe
data.head()

In [None]:
def calculate_monthly_cap_index(df):
    
    # Grouping by business month start frequency
    groups = df.groupby(pd.Grouper(freq='BMS'))
    
    def ppi(x):
        return (np.mean(x) - spec_limits['LSL']) / np.std(x) / 3

    def pps(x):
        return (spec_limits['USL'] - np.mean(x) ) / np.std(x) / 3

    spec_limits = {
        'LSL': 6.0,
        'USL': 12.0
    }
    
    monthly_ppk = groups.agg(['mean', 'std', ppi, pps])
    
    for circ in monthly_ppk.columns.get_level_values(level=0):
        monthly_ppk[(circ, 'PPK')] = monthly_ppk[zip(2*[circ], ['ppi', 'pps'])].min(axis=1)

    #monthly_ppk.index = [item.strftime(format='%m.%Y') for item in monthly_ppk.index]

    return monthly_ppk

In [None]:
monthly_ppk = calculate_monthly_cap_index(data)

monthly_ppk

## 1. Creating a class to the production process data

In [None]:
class ProcessData():
    def __init__(self,
                plant_name: str,
                circuit_names: t.Sequence[str],
                specifications_limits: dict,
                data: pd.DataFrame = None):
        self.plant_name = plant_name
        
        self.data = data
        self.specifications_limits = specifications_limits
        
        if isinstance(circuit_names, list):
            self.circuit_names = circuit_names 
        else:
            self.circuit_names = [circuit_names]
            
        self._check_for_specifications_limits()

        if data is None:
            self.data = self._create_sample_data()
            
    def _check_for_specifications_limits(self):
        set_circ_names = set(self.circuit_names)
        set_circ_spec_lim = set(self.specifications_limits.keys())
        
        if set_circ_names.difference(set_circ_spec_lim) != set():
            raise OSError("There are missing values for specification limits for the circuit(s): ", ", ".join(set_circ_names.difference(set_circ_spec_lim)))
        if set_circ_spec_lim.difference(set_circ_names) != set():
            raise OSError("There are extra values of specification limits for the circuit(s): ", ", ".join(set_circ_spec_lim.difference(set_circ_names)))

    def _create_sample_data(self) -> pd.DataFrame:
        
        # Creating the timestamps for the index
        current_date = datetime.datetime.now()
        previous_90d_date = current_date - datetime.timedelta(days = 90)
        start_date = datetime.datetime(year = previous_90d_date.year,
                                      month = previous_90d_date.month,
                                      day = 1)
        index = pd.date_range(start = start_date, end = current_date, freq = '4H')
        
        # Creating the dataframe
        data = pd.DataFrame(index=index)
        
        for circ in self.circuit_names:

            # Generating the sample points
            lls,uls  = self.specifications_limits[circ]['LLS'], self.specifications_limits[circ]['ULS'] 
            mu = (lls + uls) / 2 + np.random.uniform(0.0, 15.0)
            sigma = np.random.uniform(0.0, 15.0)
            
            samples = np.random.normal(loc=mu, scale=sigma, size=len(index))

            data_ = pd.DataFrame(data=samples, index=index, columns=[circ])

            # Randomly drop some samples
            drop_pct = 0.1
            drop_index = np.random.choice([*range(len(index))], size=int(len(index) * drop_pct), replace=False)
            data_.iloc[drop_index] = np.nan
            
            data = pd.concat([data, data_], axis=1, ignore_index=False)

        return data


In [None]:
data_plant_A = ProcessData(
    plant_name='Plant A',
    circuit_names = 'Circuit 1',
    specifications_limits = {'Circuit 1': {'LLS': 60.0, 'ULS': 70.0}}
)

In [None]:
data_plant_A.data

In [None]:
data_plant_A = ProcessData(
    plant_name='Plant A',
    circuit_names = ['Circuit 1', 'Circuit 2', 'Circuit 3'],
    specifications_limits = {'Circuit 1': {'LLS': 60.0, 'ULS': 70.0},
                             'Circuit 2': {'LLS': 40.0, 'ULS': 70.0},
                             'Circuit 3': {'LLS': 85.0, 'ULS': 90.0}}
)

In [None]:
data_plant_A.data

Reformulating the function to calculate the ppk

In [None]:
def calculate_cap_index_ppk(process_data_obj, freq='BMS'):
    
    # Grouping by business month start frequency
    groups = process_data_obj.data.groupby(pd.Grouper(freq=freq))
    
    def ppi(x):
        return (np.mean(x) - spec_limits['LLS']) / np.std(x) / 3

    def pps(x):
        return (spec_limits['ULS'] - np.mean(x) ) / np.std(x) / 3
    
    capidx_ppk = pd.DataFrame(index=groups.groups.keys())
    
    for circ in process_data_obj.circuit_names:
        spec_limits = process_data_obj.specifications_limits[circ]
        
        capidx_ppk_ = groups[[circ]].agg(['count','mean', 'std', ppi, pps])        
        capidx_ppk_[(circ, 'PPK')] = capidx_ppk_[zip(2*[circ], ['ppi', 'pps'])].min(axis=1)
        
        capidx_ppk = pd.concat([capidx_ppk, capidx_ppk_], axis=1, ignore_index=False)

    return capidx_ppk

In [None]:
monthly_ppk = calculate_cap_index_ppk(data_plant_A)

In [None]:
monthly_ppk

In [None]:
daily_ppk = calculate_cap_index_ppk(data_plant_A, freq='D')

In [None]:
daily_ppk

## 2. Creating class to multiple Process Data objects

In [None]:
class SetProcessData():
    
    def __init__(self, process_data_objs):
        
        if not isinstance(process_data_objs, list):
            process_data_objs = list(process_data_objs)
        self.process_data_objs = process_data_objs
        
        self.list_plant_names = [obj.plant_name for obj in process_data_objs]
            
    def __getitem__(self, plant_name):
        idx_plant_name = self.list_plant_names.index(plant_name)
        return self.process_data_objs[idx_plant_name]
    

        

In [None]:
data_plant_A = ProcessData(
    plant_name='Plant A',
    circuit_names = ['Circuit 1', 'Circuit 2', 'Circuit 3'],
    specifications_limits = {'Circuit 1': {'LLS': 60.0, 'ULS': 70.0},
                             'Circuit 2': {'LLS': 40.0, 'ULS': 70.0},
                             'Circuit 3': {'LLS': 85.0, 'ULS': 90.0}}
)

data_plant_B = ProcessData(
    plant_name='Plant B',
    circuit_names = ['Circuit 1', 'Circuit 2'],
    specifications_limits = {'Circuit 1': {'LLS': 60.0, 'ULS': 70.0},
                             'Circuit 2': {'LLS': 40.0, 'ULS': 70.0}}
)

process_data_ind_park = SetProcessData(process_data_objs=[data_plant_A, data_plant_B])



In [None]:
process_data_ind_park['Plant A'].data.resample('BMS').mean()

In [None]:
process_data_ind_park['Plant B'].specifications_limits