In [10]:
from experta import *
from schema import *
from experta.utils import *
from functools import reduce, singledispatch

In [11]:
''' helper constant '''


class B_COLORS:
    HEADER = '\033[95m'
    OKBLUE = '\033[94m'
    OKGREEN = '\033[92m'
    WARNING = '\033[93m'
    FAIL = '\033[91m'
    ENDC = '\033[0m'
    BOLD = '\033[1m'
    UNDERLINE = '\033[4m'


CF_STRINGS = {
    'Yes': 1.0,
    'No': 0.0
}


class DiseaseStates:
    INITIAL = 'INITIAL'
    SY_CF_INPUTTED = 'SY_CF_INPUTTED'
    DIAGNOSED = 'DIAGNOSED'

    values = [
        'INITIAL',
        'SY_CF_INPUTTED',
        'DIAGNOSED'
    ]

In [12]:
''' data table '''
# P1:
#   D1:
#       s1: 0.3
#       s2: 0.2
#   D2:
#       s1: 0.5
#       s3: 0.5
#
# P2
#   D1:
#       s1: 0.2
#       s2: 0.3
#       s4: 0.6
#   D3:
#       s3: 0.9
#       s4: 0.1


''' static data '''
Disease_Data = {
    'p1': [
        {
            'plant': 'p1',
            'name': 'D1',
            'symptoms': [
                {'name': 's1', 'CF': 0.3},
                {'name': 's2', 'CF': 0.2}
            ]
        },
        {
            'plant': 'p1',
            'name': 'D2',
            'symptoms': [
                {'name': 's1', 'CF': 0.3},
                {'name': 's3', 'CF': 0.5}
            ]
        }
    ],
    'p2': [
        {
            'plant': 'p2',
            'name': 'D1',
            'symptoms': [
                {'name': 's1', 'CF': 0.3},
                {'name': 's2', 'CF': 0.2},
                {'name': 's4', 'CF': 0.6}
            ]
        },
        {
            'plant': 'p2',
            'name': 'D3',
            'symptoms': [
                {'name': 's3', 'CF': 0.9},
                {'name': 's4', 'CF': 0.1}
            ]
        }
    ]
}

plants_Array = ['p1', 'p2']


In [3]:
''' asking helper functions '''


def Ask_Choices_Question(question: str, *choices) -> str:
    '''
        function that ask question with list of choices to allow the user to choose from
    '''
    while True:
        try:
            answer = input('{}\nchoices:\n{}\nanswer:'.format(
                question,
                '\n'.join(
                    map(lambda choice: f"{choice[0] + 1}) {choice[1]}", enumerate(choices)))
            ))
            if answer in choices:
                return answer
            raise IOError(f'wrong choice Try Again')
        except IOError as error:
            print(f'{B_COLORS.FAIL}ERROR: {error}{B_COLORS.ENDC}')


def Ask_Plant_Name() -> str:
    return Ask_Choices_Question(f'{B_COLORS.OKBLUE}what is the plant to be diagnosed ?{B_COLORS.ENDC}', *plants_Array)


def Ask_About_Symptom(question: str) -> float:
    return CF_STRINGS[Ask_Choices_Question(question, *CF_STRINGS.keys())]


In [None]:
''' Custom Defined Facts '''


class Disease(Fact):
    # Description: Defines the types of data describing the details of a Disease
    #
    # Properties:
    # -- name - Name of the disease
    # -- plant - Name of the plant that the disease belongs to
    # -- symptoms - The disease symptoms. It's dict object that map Name of the Symptom with symptoms data
    #             -- Symptom object have CF property that is the symptom contributes to the overall disease
    # -- CF - The total weight that the symptoms of the disease contribute. It's an addition of only the present symptoms.
    # -- state - The current state/stage of the disease in the system

    name = Field(str,mandatory=True)

    plant = Field(str, mandatory=True)
    symptoms = Field({str: dict}, mandatory=True)

    CF: Field(float)
    state: Field(DiseaseStates.values, DiseaseStates.INITIAL)


class UserInput(Fact):
    plant = Field(str, mandatory=True)
    symptoms = Field(
        Or({str: float}, object),
    )



In [582]:
''' helper function in defining disease stage'''


def Accumulate_Symptoms_CF(resultSymptoms: dict, symptom: dict):
    return {
        **resultSymptoms,
        symptom['name']: {
            **symptom
        }
    }


def Build_Disease_Fact(**disease: Disease) -> Disease:
    return Disease(
        **{
            **disease,
            'symptoms': reduce(
                Accumulate_Symptoms_CF,
                disease['symptoms'],
                {}
            ),
            'state': disease['state'] if 'state' in disease else DiseaseStates.INITIAL,
        }
    )



In [583]:
''' helper function in reading symptoms CF stage'''


def Accumulate_UserInput_Symptoms_CF(resultSymptomsCF: dict, symptom: dict) -> dict:
    # print('\nX\n', resultSymptomsCF, '\nW\n', symptom)
    symptom_Name = symptom['name']
    return {
        **resultSymptomsCF,
        symptom_Name: Ask_About_Symptom(f"what about {symptom_Name}?")
        if not symptom_Name in resultSymptomsCF
        else resultSymptomsCF[symptom_Name],
    }



In [584]:
''' engines '''


class Input_Static_Data_Engine(KnowledgeEngine):

    @Rule(NOT(UserInput(plant=W())))
    def read_Plant_Name(self):
        # print('read plant')
        self.declare(UserInput(
            plant=Ask_Plant_Name(),
            symptoms={}
        ))


class Define_Plant_Diseases_Engine(KnowledgeEngine):

    @Rule(UserInput(plant=MATCH.plant), NOT(Fact(Diseases_Defined=True)))
    def define_Disease(self, plant):
        # print('define disease')
        for disease in Disease_Data[plant]:
            disease = {
                'plant': plant,
                **disease
            }
            self.declare(Build_Disease_Fact(**disease))
        self.declare(Fact(Diseases_Defined=True))


class Input_Symptoms_CF_Engine(KnowledgeEngine):

    @ Rule(
        AS.userInput << UserInput(
            plant=MATCH.userInputPlant,
            symptoms=MATCH.userInputSymptoms,
        ),
        AS.disease << Disease(
            plant=MATCH.diseasePlant,
            symptoms=MATCH.diseaseSymptoms,
            state=DiseaseStates.INITIAL,
        ),
        TEST(lambda diseasePlant, userInputPlant: diseasePlant == userInputPlant)
    )
    def readDiseaseSymptomsCF(
            self,
            userInput: UserInput,
            diseaseSymptoms: dict,
            disease: Disease,
            userInputSymptoms: dict,
    ):
        # initialize
        # print('read', userInput, repr(userInput), disease, repr(disease))
        Disease_Symptoms = diseaseSymptoms.values()
        UserInput_Symptoms_CF = unfreeze(userInputSymptoms) or {}

        # processing
        result_UserInput_Symptoms_CF = reduce(
            Accumulate_UserInput_Symptoms_CF,
            Disease_Symptoms,
            UserInput_Symptoms_CF
        )

        # print('result', result_UserInput_Symptoms_CF)

        # updating
        self.modify(userInput, symptoms=result_UserInput_Symptoms_CF)
        self.modify(disease, state=DiseaseStates.SY_CF_INPUTTED)


In [609]:
def Calc_Symptom_CF_With_UserInput(symptom: dict, userInputSymptomsCF: dict):
    symptomName = symptom['name']
    symptomCF = symptom['CF']
    return symptomCF * userInputSymptomsCF[symptomName]


def calcCF(cf1, cf2):
    print(cf1 * (1 - cf2))
    return cf1 * (1 - cf2)
    # TODO: check if it worked


class Diagnose_Disease_Engine(KnowledgeEngine):

    @Rule(
        AS.userInput << UserInput(
            plant=MATCH.userInputPlant,
            symptoms=MATCH.userInputSymptoms,
        ),
        AS.disease << Disease(
            plant=MATCH.diseasePlant,
            symptoms=MATCH.diseaseSymptoms,
            state=DiseaseStates.SY_CF_INPUTTED,
        ),
        TEST(lambda diseasePlant, userInputPlant: diseasePlant == userInputPlant),
        # may Test the cf Here or inside the RHS
    )
    def DiagnoseDisease(
            self,
            diseaseSymptoms: dict,
            disease: Disease,
            userInputSymptoms: dict,
    ):
        # pass
        # # initialize
        # print('read', disease, repr(disease))
        Disease_Symptoms = diseaseSymptoms.values()
        UserInput_Symptoms_CF = unfreeze(userInputSymptoms) or {}

        # # processing
        total_CF = 1 - reduce(
            lambda resultCF, symptom: calcCF(
                resultCF,
                Calc_Symptom_CF_With_UserInput(symptom, UserInput_Symptoms_CF)
            ),
            Disease_Symptoms,
            1
        )
        # print('result', result_UserInput_Symptoms_CF)

        # updating
        self.modify(disease, CF=total_CF, state=DiseaseStates.DIAGNOSED)

In [610]:
class MY_KE(
    Input_Static_Data_Engine,
    Define_Plant_Diseases_Engine,
    Input_Symptoms_CF_Engine,
    Diagnose_Disease_Engine,
    KnowledgeEngine,
):
    pass


engine = MY_KE()

engine.reset()
engine.run()
print(engine.facts)

X D1
checking X D1
<f-1>
checking X D1
<f-2>
checking X D1
<f-3>
checking X D1
<f-4>
checking X D1
<f-5>
checking X D1
<f-6>
0.0
X D1
checking X D1
<f-1>
checking X D1
<f-2>
checking X D1
<f-3>
checking X D1
<f-4>
checking X D1
<f-5>
checking X D1
<f-6>
0.0
X D1
checking X D1
<f-1>
checking X D1
<f-2>
checking X D1
<f-3>
checking X D1
<f-4>
checking X D1
<f-5>
checking X D1
<f-6>
0.0
X D1
checking X D1
<f-2>
checking X D1
<f-3>
checking X D1
<f-4>
checking X D1
<f-5>
checking X D1
<f-6>
checking X D1
<f-8>
0.30000000000000004
X D1
checking X D1
<f-2>
checking X D1
<f-4>
checking X D1
<f-5>
checking X D1
<f-6>
checking X D1
<f-8>
checking X D1
<f-10>
0.65
D1 may be present with CF: 0.65
