In [None]:
import pandas as pd
import numpy as np
import os
import re
import ast

In [None]:
DATA_PATH = '../data'
FILENAME_TAXO = 'simple_taxo.csv'
FILENAME_RULES = '2020-01-09 Set aanvullende controleregels Solvency II_tcm46-386880.xlsx'

## Read simplified taxonomy of Solvency 2

In [None]:
df_taxo = pd.read_csv(os.path.join(DATA_PATH, FILENAME_TAXO), encoding='latin-1')
df_taxo = df_taxo.drop("Unnamed: 0", axis = 1)

In [None]:
df_taxo.head(5)

## Construct test Solvency 2 instance (put here your own data)

In [None]:
taxo_datatypes = dict()
for row in df_taxo.index:
    taxo_datatypes[df_taxo.loc[row, "datapoint"]] = df_taxo.loc[row, "dtype"]
    
unique_dp = list(df_taxo['datapoint'].unique())
data_dp = [[0 if taxo_datatypes[i]=='int64' else 0.0 if taxo_datatypes[i]=='float64' else "text" for i in unique_dp]]

df = pd.DataFrame(columns = unique_dp, data = data_dp)

## Read DNBs Additional Validation Rules

In [None]:
df_vr = pd.read_excel(os.path.join(DATA_PATH, FILENAME_RULES), header = 1)
df_vr = df_vr.set_index('ControleRegelCode')
df_vr = df_vr.drop('S.28.01_129', axis = 0)
df_vr.fillna("", inplace = True)

In [None]:
df_vr['Formule']

In [None]:
def add_brackets(s):
    """Add brackets around expressions with & and | (this is not consistent in EVA2)
    TODO: should not apply is AND or OR is in string text
    """
    item = re.search(r'(.*)\s*([&|\||>|<|!=|<=|>=|==])\s*(.*)', s)
    if item is not None:
        if item[2].strip() in ['&', '|']:
            return '('+ add_brackets(item[1]) + ') ' + item[2].strip() + ' (' + add_brackets(item[3]) + ')'
        else:
            return add_brackets(item[1]) + item[2].strip() + add_brackets(item[3])
    else:
        return s.strip()
    
def preprocess(s):
    """Transform EVA2 code to Python Pandas code"""
    res = s
    res = res.replace("=" , "==")
    res = res.replace(">==" , ">=")
    res = res.replace("<==" , "<=")
    res = res.replace("<>", "!=")
    res = res.replace("< >", "!=") # the space between < and > should be deleted in EVA2
    res = res.replace('"', "'")
    res = res.replace(' OR{', " | {")
    res = res.replace(' OR ', " | ")
    res = res.replace(' AND ', " & ")
    res = res.replace(" )", ")")
    res = res.replace(';', ",") # this should be corrected in EVA2
    return res

def transform_datapoints(s, columns):
    """Transform EVA2 datapoints to Python Pandas datapoints"""
    res = s
    not_found = []
    for item in re.findall(r'{(.*?)}', res):
        res = res.replace("{"+item+"}", "df['"+item+"']")
        if item not in list(columns):
            not_found.append(item)
    return res, not_found

def transform_conditional_expression(g):
    """Transform EVA2 conditional expression to Python Pandas code"""
    item = re.search(r'IF\s*(.*)\s*THEN\s*(.*)\s*', g)
    if item is not None:
        co_str = 'df[('+add_brackets(item[1])+') & ('+add_brackets(item[2])+")]"
        ex_str = 'df[('+add_brackets(item[1])+') & ~('+add_brackets(item[2])+")]"
    else:
        co_str = 'df[('+add_brackets(g)+')]'
        ex_str = 'df[~('+add_brackets(g)+')]'
    return co_str, ex_str

def evaluate_strings(co_str, ex_str):
    """Evaluate Python Pandas string for confirmation and exceptions"""
    try:
        co = len(eval(co_str, {'df': df, 'MAX': np.maximum, 'MIN': np.minimum, 'SUM': np.sum}))
        ex = len(eval(ex_str, {'df': df, 'MAX': np.maximum, 'MIN': np.minimum, 'SUM': np.sum}))
        return "Correctly parsed (#co=" + str(co)+", #ex="+str(ex)+")"
    except:
        return "Parse error: " + co_str
        
def get_all_datapoints(template, dim):
    """Get all rows or columns in the taxonomy given a template with column or row"""
    d = dim.lower()
    if 'r' in d:
        l = list(df_taxo[(df_taxo['template']==template) & (df_taxo['row']==dim)]['column'].values)
        if l != ['']:
            dp = [template + "," + d + "," + column for column in l]
        else:
            dp = [template + "," + d]
        return dp
    elif 'c' in d:
        l = list(df_taxo[(df_taxo['template']==template) & (df_taxo['column']==dim)]['row'].values)
        if l != ['']:
            dp = [template + "," + row + "," + d for row in l]
        else:
            dp = [template + "," + d]
        return dp        

In [None]:
def evaluate_rules(df_data, df_rules, df_taxo):
    for row in df_rules.index:
        print("Rule " + row + ": ", end='')
        original = df_rules.loc[row, 'Formule']
        g = preprocess(original)
        g, not_found = transform_datapoints(g, df_data.columns)
        if not_found == []:
            # Simple expression with complete datapoints
            co_str, ex_str = transform_conditional_expression(g)
            print(evaluate_strings(co_str, ex_str))
        else:
            # Datapoints in expression should be expanded with the content of Rijen and Kolommen
#             to_select_rows = df_rules.loc[row, "Rijen"].replace("(", "").replace(")", "")
#             to_select_columns = df_rules.loc[row, "Kolommen"].replace("(", "").replace(")", "")
#             if (to_select_rows != "") and not('all' in to_select_rows.lower()):
#                 to_select_rows = ["r" + r if len(r)==4 else r for r in to_select_rows.split(";")]
# #               print(to_select_rows)
#             if (to_select_columns != "") and not('all' in to_select_columns.lower()): 
#                 to_select_columns = ["c" + r if len(r)==4 else r for r in to_select_columns.split(";")]
# #               print(to_select_columns)
            expansion = []
            for datapoint in not_found:
                template = datapoint[0:13]
                dim = datapoint[14:]
                l = get_all_datapoints(template, dim)
            print("Complex expression, not yet implemented")
#                 if l is None:
#                     print("Datapoint string: " + str(datapoint))
#                 else:
#                     if len(l) == 0:
#                         print("Datapoint not found " + str(datapoint))
#                     else:
#                         expansion.append(l)
#             if expansion !=[]:
#                 print("Not yet implemented: expand possible ")
#                 for row in to_select_rows:
#                     a = datapoint[0:13] + "," + row + datapoint[13:19]
#                     if a in df_taxo['datapoint'].values:
#                         print(a + ": found")

In [None]:
evaluate_rules(df, df_vr, df_taxo)

In [None]:
df[(df['S.15.01.04.01,c0070']>0)]