In [None]:
from math import nan
import numpy as np
import pandas as pd

In [None]:
def analyze_sheetnames(sheet_names):
    """ Identify which data to read from which sheet. """
    sheet_data = []
    for sheetname in sheet_names:
        sheetname = sheetname.strip().upper()
        # check that there are brackes
        pos1 = sheetname.find("(")
        pos2 = sheetname.find(")")
        if pos1 <= 0 or pos2 <= 0:
            print("Skipping sheet", sheetname, "(no brackets found)")
            continue

        transition_name = sheetname[:pos1].strip()
        transition = sheetname[pos1+1:pos2]
        pos_arrow = transition.find("->")
        if pos_arrow <= 0:
            print("Skipping sheet", sheetname, "(no arrow found)")
            continue
    
        from_state = int(transition[:pos_arrow].strip())
        to_state = int(transition[pos_arrow+2:].strip())
        
        print("Sheet", sheetname, "describes the transition", from_state, "==>", to_state)
        sheet_data.append((sheetname, transition_name, from_state, to_state))
        
    return sheet_data


In [None]:
class BaseAssumption:
    """ Class to represent a core assumption table as read in from Excel."""
    
    def __init__(self, sheet_data, values, vert_rf, horz_rf, vert_attr_values, horz_attr_values):
        self.values = values
        self.vert_rf = vert_rf
        self.horz_rf = horz_rf
        self.vert_attr_values = vert_attr_values
        self.horz_attr_values = horz_attr_values
        self.transition_name = sheet_data[1]
        self.from_state_num = sheet_data[2]
        self.to_state_num = sheet_data[3]
    
    def __repr__(self):
        return "<BaseAssumption: {}>".format(self.transition_name)

def generate_zero_transition(from_state_num, to_state_num):
    """ Helper method to avoid dealing with case
        distinction for implicitly impossible transitions. """
    tr_name = 'Implicit Impossible Transition ({}->{})'.format(from_state_num, to_state_num)
    sheet_data = (tr_name, tr_name, from_state_num, to_state_num)
    df_table = pd.DataFrame({nan: [0.0]}, index=[nan])
    vert_rf = None
    horz_rf = None,
    vert_attr_values = [nan]
    horz_attr_values = [nan]

    return BaseAssumption(sheet_data, df_table, vert_rf, horz_rf, vert_attr_values, horz_attr_values)


def read_sheet(xl_file, sheet_data):
    """ Create a BaseAssumption object from the data in the sheet."""
    sheet_name = sheet_data[0]
    print("Processing sheet", sheet_name)
    df = pd.read_excel(xl_file, sheet_name= sheet_name, header=None)
    
    if df.loc[3, 0].upper() != "TABLE":
        raise Exception("Invalid format")
    if df.loc[0, 0].upper() != 'VERTICAL_RISK_FACTOR':
        raise Exception("Invalid format")
    if df.loc[1, 0].upper() != 'HORIZ_RISK_FACTOR':
        raise Exception("Invalid format")
    
    # read risk factor names
    vert_rf = df.loc[0, 1].upper()
    horz_rf = df.loc[1, 1].upper()
    
    vert_rf = None if vert_rf == 'NONE' else vert_rf
    vert_rf_for_headers = "DUMMY_FOR_NONE" if vert_rf is None else vert_rf
    horz_rf = None if horz_rf == 'NONE' else horz_rf
    
    # extract horizontal/vertical indexes headers from 3rd row
    vert_attr_values = list(df.loc[4:, 0])
    horz_attr_values = list(df.loc[3:3].values.reshape((df.shape[1],))[1:])
    
    df_table = df[4:].copy()
    # df_table.columns = np.array([vert_rf] + list(horz_attr_values), dtype='object')
    df_table.columns = pd.Series([vert_rf_for_headers] + list(horz_attr_values), dtype="object")
    print(df_table.columns, [vert_rf] + list(horz_attr_values))
    df_table = df_table.set_index(vert_rf_for_headers)
    return BaseAssumption(sheet_data, df_table, vert_rf, horz_rf, vert_attr_values, horz_attr_values)

In [None]:
# only for testing

path = "../data/assumptions/base_assumption.xlsx"
base_assumptions = []
with pd.ExcelFile(path) as inp:
    sheet_data = analyze_sheetnames(inp.sheet_names)

    for sd in sheet_data:
        base_assumpt = read_sheet(inp, sd)
        base_assumptions.append(base_assumpt)


In [None]:
pd.Series(["TEXT", nan], dtype="object")

In [None]:
path = "../data/assumptions/base_assumption.xlsx"
base_assumptions = []
with pd.ExcelFile(path) as inp:
    sheet_data = analyze_sheetnames(inp.sheet_names)

    for sd in sheet_data:
        try:
            base_assumpt = read_sheet(inp, sd)
            base_assumptions.append(base_assumpt)
        except Exception as e:
            print("Skipping sheet", sd[0], e)

In [None]:
base_assumptions

In [None]:
ba = base_assumptions[0]
ba.horz_rf, ba.horz_rf

In [None]:
ba.df_table.columns

In [None]:
ba.vert_attr_values
ba.horz_attr_values

In [None]:
ba.__class__.__name__

In [None]:
from enum import IntEnum, unique
from commons import check_states


@unique
class RiskFactor(IntEnum):

    def is_applicable(self, base_assumption):
        _name = self.__class__.__name__.upper()
        if base_assumption.horz_rf == _name or base_assumption.vert_rf == _name:
            return True
        else:
            return False

    def validate_assumptions(self, base_assumptions):
        raise Exception("Method must be implemented in subclass")

    def __repr__(self):
        return "RiskFactor:{}".format(self.__class__.__name__.upper())


@unique
class Gender(RiskFactor):
    M = 0
    F = 1

    def validate_assumptions(self, base_assumption):
        if base_assumption.horz_rf == "GENDER":
            self._validate_horizontal(base_assumption)
        elif base_assumption.vert_rf == "GENDER":
            base_assumption.df_table = base_assumption.df_table.T
            self._validate_horizontal(base_assumption)
            base_assumption.df_table = base_assumption.df_table.T

    def _validate_horizontal(self, base_assumption):
        required_rf = [g.name for g in Gender]
        if set(base_assumption.df_table.columns) != set(required_rf):
            raise Exception("Risk Factors for Gender must be " + str({g for g in Gender}))
        # enforce column ordering
        base_assumption.df_table = base_assumption.df_table[required_rf]


### Base Assumptions to Assumption Lookup

In [None]:
from state import States

In [None]:
#check states
# find min/max and check everything in between is filled
max_val = -1
min_val = 999999
state_vals = set()
for st in States:
    print(st, int(st))
    max_val = max(int(st), max_val)
    min_val = min(int(st), min_val)
    state_vals.add(int(st))

assert len(States) > 0
assert min_val == 0
assert max_val == len(States) - 1
assert len(state_vals) == len(States)

In [None]:
base_assumptions_map_tmp = {(ba.from_state_num, ba.to_state_num): ba for ba in base_assumptions}

# enumerate all state transitions
transition_assumptions = []
for i in range(max_val + 1):
    this_list = []
    transition_assumptions.append(this_list)
    for j in range(max_val + 1):
        # print("BEF", i, j, transition_assumptions[i][j])
        ts = None
        if i != j:
            ts = base_assumptions_map_tmp.get((i, j))
            #print(i, j, ts)
            if ts is None:
                #print("No assumptions provided for ({i},{j}), assuming impossibility".format(i=i, j=j))
                ts = generate_zero_transition(i, j)
        else:
            # use None on the diagonal - done through the initialization
            ts = None
        this_list.append((ts, i, j))
        #print("AFT",i, j, transition_assumptions[i][j])

In [None]:
transition_assumptions

In [None]:
ta = transition_assumptions[2][3][2]
ta

In [None]:
ta.horz_rf

# TEST

In [1]:
from protolinc.assumptions import _read_sheet

In [2]:
path = r"D:\programming\py\PyMultiState\protolinc\data\assumptions\base_assumption.xlsx"
_read_sheet(path, ("DIS1 (0->1)", "DIS1", 0, 1))

  return Index(sequences[0], name=names)


AttributeError: 'BaseAssumption' object has no attribute 'df_table'

In [None]:
import numpy as np

In [None]:
np.array([1, 2]).reshape((1, 2)).transpose()