In [7]:
import pandas as pd
from typing import Any

class ColumnEditor:
    def edit(self, value: Any) -> Any:
        raise NotImplementedError("Subclasses must implement the 'edit' method.")

class Column1Editor(ColumnEditor):
    def edit(self, value: int | float) -> int | float:
        if pd.isna(value) or not isinstance(value, int):
            return value
        return 10 if value == 1 else value

class Column2Editor(ColumnEditor):
    def edit(self, value: int | float) -> int | float:
        if pd.isna(value) or not isinstance(value, int):
            return value
        return -1 if value == 0 else value

class Column3Editor(ColumnEditor):
    def edit(self, value: int | float) -> int | float:
        if pd.isna(value) or not isinstance(value, (int, float)):
            return value
        return value * 2

class Column4Editor(ColumnEditor):
    def edit(self, value: int | float) -> int | float:
        if pd.isna(value) or not isinstance(value, int):
            return value
        return 100 if value == 0 else value

class Column5Editor(ColumnEditor):
    def edit(self, value: int | float) -> int | float:
        if pd.isna(value) or not isinstance(value, int):
            return value
        return 0 if value >= 2 else value

class Column6Editor(ColumnEditor):
    def edit(self, value: int | float) -> int | float:
        if pd.isna(value) or not isinstance(value, int):
            return value
        return 10 if value >= 3 else value

class Column7Editor(ColumnEditor):
    def edit(self, value: Any) -> str:
        if pd.isna(value):
            return value
        return str(value) + "_"

In [8]:
# ディシジョンテーブルに書く条件判定関数
# いずれも戻り値はboolである
# add 2024/05/07
def is_4digits(value):
    if pd.isna(value):
        return False
    elif isinstance(value, str):
        return len(value) == 4
    else:
        return False

def is_5digits(value):
    if pd.isna(value):
        return False
    elif isinstance(value, str):
        return len(value) == 5
    else:
        return False

def is_empty(value):
    if pd.isna(value):
        return True
    elif isinstance(value, str):
        return len(value) == 0
    else:
        return False

def is_not_empty(value):
    if pd.isna(value):
        return False
    elif isinstance(value, str):
        return len(value) > 0
    else:
        return False


In [9]:
# Facade定義

#from column_editors import Column1Editor, Column2Editor, Column3Editor, Column4Editor, Column5Editor, Column6Editor, Column7Editor

class DataFrameEditor:
    def __init__(self):
        self.column_editors: dict[str, ColumnEditor] = {}

    def edit_dataframe(self, df: pd.DataFrame) -> pd.DataFrame:
        edited_df = df.copy()
        for column, editor in self.column_editors.items():
            if column in df.columns:
                edited_df[column] = edited_df[column].apply(editor.edit)
        return edited_df

# どのcolumnに何の編集処理を適用するか定義している、Facadeそのもの
class DataFrameEditor1(DataFrameEditor):
    def __init__(self):
        super().__init__()
        self.column_editors = {
            'column1': Column1Editor(),
            'column2': Column2Editor(),
            'column3': Column3Editor(),
        }

class DataFrameEditor2(DataFrameEditor):
    def __init__(self):
        super().__init__()
        self.column_editors = {
            'column4': Column4Editor(),
            'column5': Column5Editor(),
            'column6': Column6Editor(),
        }

class DataFrameEditor3(DataFrameEditor):
    def __init__(self):
        super().__init__()
        self.column_editors = {
            'column7': Column7Editor(),
        }

class DataFrameEditor4(DataFrameEditor):
    def __init__(self):
        super().__init__()
        self.column_editors = {
            'column4': Column4Editor(),
            'column5': Column5Editor(),
            'column6': Column6Editor(),
            'column7': Column7Editor(),
        }

class DataFrameEditor5(DataFrameEditor):
    def __init__(self):
        super().__init__()
        self.column_editors = {
            'column5': Column5Editor(),
            'column6': Column6Editor(),
            'column7': Column7Editor(),
        }

class DataFrameEditor6(DataFrameEditor):
    def __init__(self):
        super().__init__()
        self.column_editors = {
            'column7': Column7Editor(),
        }

class DataFrameEditor7(DataFrameEditor):
    def __init__(self):
        super().__init__()
        self.column_editors = {
            'column7': Column7Editor(),
        }

class DataFrameEditor8(DataFrameEditor):
    def __init__(self):
        super().__init__()
        self.column_editors = {
            'column7': Column7Editor(),
        }

class DataFrameEditor9(DataFrameEditor):
    def __init__(self):
        super().__init__()
        self.column_editors = {
            'column7': Column7Editor(),
        }

class DataFrameEditor10(DataFrameEditor):
    def __init__(self):
        super().__init__()
        self.column_editors = {
            'column7': Column7Editor(),
        }

class DataFrameEditorDefault(DataFrameEditor):
    def __init__(self):
        super().__init__()
        self.column_editors = {
        }

In [10]:
#from dataframe_editors import DataFrameEditor1, DataFrameEditor2, DataFrameEditor3

class EditorFactory:
    # ディシジョンテーブル戻り値と
    # Facade名を一致させる必要があります
    # ディシジョンテーブルでの戻り値定義はFacadeのみを指定, ()はつけてはなりません

    # globals()関数は、現在のグローバルシンボルテーブルを表す辞書を返します。
    # グローバルシンボルテーブルとは、現在のモジュールで定義されている
    # グローバル変数、関数、クラスなどの名前と値のマッピングです。
    # globals()関数を使用すると、文字列として表現された名前からオブジェクトを取得することができます。
    # 例えば、globals()['MyClass']は、MyClassという名前のクラスオブジェクトを返します。
    def __init__(self, decision_table: pd.DataFrame):
        self.decision_table = decision_table

    def evaluate_conditions(self, row):
        for _, decision_row in self.decision_table.iterrows():
            if all(check_condition(row[col], decision_row[col]) for col in row.index):
                print(f'判定結果: {decision_row["判定結果"]}')
                return decision_row['判定結果']
        print(f'判定結果:Not Match -> Default')
        return 'DataFrameEditorDefault'

    def create_editor(self, row):
        # ディシジョンテーブルにより決定されたFacade名(戻り値)から
        # glovals()を使用して、その名前のObjectを取得→Facade実体取得
        editor_class_name = self.evaluate_conditions(row)
        editor_class = globals()[editor_class_name]
        return editor_class()


# v0.1での実装
#    def evaluate_conditions(self, row: pd.Series) -> str:
#        for _, decision_row in self.decision_table.iterrows():
#            # decision table と明細(行)の一致確認 series同士の比較,column単位
#            if all(check_condition(row[col], decision_row[col]) for col in row.index):
#                return decision_row['判定結果']
#        return 'No Match'
#
#    def create_editor(self, row):
#        # Facade判定情報取得
#        result = self.evaluate_conditions(row)
#
#        # Facade生成
#        if result == "Facade1":
#            return DataFrameEditor1()
#        elif result == "Facade2":
#            return DataFrameEditor2()
#        elif result == "Facade3":
#            return DataFrameEditor3()
#        elif result == "Facade4":
#            return DataFrameEditor4()
#        elif result == "Facade5":
#            return DataFrameEditor5()
#        elif result == "Facade6":
#            return DataFrameEditor6()
#        elif result == "Facade7":
#            return DataFrameEditor7()
#        elif result == "Facade8":
#            return DataFrameEditor8()
#        elif result == "Facade9":
#            return DataFrameEditor9()
#        elif result == "Facade10":
#            return DataFrameEditor10()
#        else:
#            return DataFrameEditorDefault()

In [11]:
# utils
import pandas as pd

def check_condition(value: str | int, condition: str | int) -> bool:
    # anyに対するアーリーリターン
    if pd.isna(condition) or condition == 'any':
        return True

    # or条件対応
    # 1,2,3のようなor記述→str属性でExcelから読み取り
    if isinstance(condition, str):
        if ',' in condition:
            return str(value) in [str(c.strip()) for c in condition.split(',')]
        #return str(value) == str(condition)
        # ディシジョンテーブルに記載のある条件判定関数を呼び出す
        #print('-'*80)
        #print(f'value: {value}, condition: {condition}')
        #print(globals()[condition](str(value)))
        return globals()[condition](str(value))

    # str属性以外はそのままセット
    return value == condition

def process_row(row: pd.Series, factory: EditorFactory) -> pd.Series:
    # Factory->Facadeを結びつける
    editor = factory.create_editor(row)
    if editor is None:
        return row

    # columnsをindex化してpd.Seriesを返す
    return editor.edit_dataframe(pd.DataFrame([row])).squeeze()

In [13]:
import pandas as pd
#from factory import EditorFactory
#from utils import process_row

# ディシジョンテーブルとデータサンプルの読み取り
decision_table = pd.read_excel('decision_table.xlsx')
data_sample = pd.read_excel('data_sample.xlsx')

# 結果を表示する
print('-'*80)
print(data_sample)
print(decision_table)

# Factoryを使用してFacadeクラスのインスタンスを取得し、データフレームの編集処理を行う
factory = EditorFactory(decision_table)

# pd.DatarFrameにapplyを適用することで
# row: Seriesを渡している
# DataFrameに対するiterループは書かないで対処する
data_sample = data_sample.apply(lambda row: process_row(row, factory), axis=1)

# 結果を表示する
print('-'*80)
print(data_sample)

--------------------------------------------------------------------------------
    column1  column2  column3  column4  column5  column6  column7
0         1        1        1        1        1        1        1
1         1        0     1111        1        1        1        1
2         1        1        0    10000        1        1        1
3         1        1        1        0        3        1        1
4         1        1        1        1        2        2        1
5         0        1        1        1        1        0        1
6         0        0        1        1        1        1        1
7         0        1        0        1        1        1        1
8         1        1        1        1        0        1        1
9         1        1        1        1        0        2        1
10        1        1        1        1        0        3        1
11        1        1        1        1        0        4        1
                判定結果  column1       column2     column3     c