In [7]:
# Import libraries
import numpy as np
import mdptoolbox as mdp
import csv

In [None]:
'''
Datenstruktur zur Repräsentation der States - String mit 8 Zeichen: [Pos0, Pos1, 
Pos2, Pos3, Pos4, Pos5, Transition, Colour]

Pos0-5: 0 = keine Belegung, 1 = White, 2 = Blue, 3 = Red
Transition: 0 = Store, 1 = Restore
Colour: 1 = White, 2 = Blue, 3 = Red

Graphische Belegung im Warehouse:

  Action (numerisch)             Action(als Koordinate)
   _________________              _________________
  |     |     |     |            |     |     |     |
  |  5  |  4  |  3  |            |(2,3)|(2,2)|(2,1)|
  |_____|_____|_____|            |_____|_____|_____|
  |     |     |     |            |     |     |     |
  |  2  |  1  |  0  |            |(1,3)|(1,2)|(1,1)|
  |_____|_____|_____|            |_____|_____|_____| (Start/Stopp)

(1,1) = erstes Feld, das betreten werden kann
'''

In [8]:
# Functions for scanning training data for later calculation of probabilities
def scan_training_data(file_name):
    with open(file_name) as f:
        lineList = f.readlines()

    data = np.empty([len(lineList), 2], dtype=str)
    for idx, line in enumerate(lineList):
        words = line.split()
        data[idx, 0] = words[0]
        data[idx, 1] = words[1]

    countSW = 0
    countSB = 0
    countSR = 0
    countRW = 0
    countRB = 0
    countRR = 0

    successorSW = [0, 0, 0, 0, 0, 0]
    successorSB = [0, 0, 0, 0, 0, 0]
    successorSR = [0, 0, 0, 0, 0, 0]
    successorRW = [0, 0, 0, 0, 0, 0]
    successorRB = [0, 0, 0, 0, 0, 0]
    successorRR = [0, 0, 0, 0, 0, 0]

    for idxData, dataLine in enumerate(data):
        if data[idxData, 0] == 's' and data[idxData, 1] == 'w':
            countSW += 1
            successorSW = get_successor(data, idxData, successorSW)
        elif data[idxData, 0] == 's' and data[idxData, 1] == 'b':
            countSB += 1
            successorSB = get_successor(data, idxData, successorSB)
        elif data[idxData, 0] == 's' and data[idxData, 1] == 'r':
            countSR += 1
            successorSR = get_successor(data, idxData, successorSR)
        elif data[idxData, 0] == 'r' and data[idxData, 1] == 'w':
            countRW += 1
            successorRW = get_successor(data, idxData, successorRW)
        elif data[idxData, 0] == 'r' and data[idxData, 1] == 'b':
            countRB += 1
            successorRB = get_successor(data, idxData, successorRB)
        elif data[idxData, 0] == 'r' and data[idxData, 1] == 'r':
            countRR += 1
            successorRR = get_successor(data, idxData, successorRR)

    probabiltiesSW = calculate_probabilities(countSW, successorSW)
    probabiltiesSB = calculate_probabilities(countSB, successorSB)
    probabiltiesSR = calculate_probabilities(countSR, successorSR)
    probabiltiesRW = calculate_probabilities(countRW, successorRW)
    probabiltiesRB = calculate_probabilities(countRB, successorRB)
    probabiltiesRR = calculate_probabilities(countRR, successorRR)
    probabilities = [probabiltiesSW, probabiltiesSB, probabiltiesSR, 
                     probabiltiesRW, probabiltiesRB, probabiltiesRR]

    occurances = {'(Re)store White': countSW + countRW,
                  '(Re)store Blue': countSB + countRB,
                  '(Re)store Red': countSR + countRR }

    with open("2x3_probabilities.csv", "w", newline="") as g:
        cw = csv.writer(g)
        cw.writerows(probabilities)

    return occurances

def get_successor(inputList, dataIdx, successorList):
    if dataIdx < (len(inputList) - 2):
        nextAct = inputList[dataIdx + 1, 0]
        nextCol = inputList[dataIdx + 1, 1]

        if nextAct == 's' and nextCol == 'w':
            successorList[0] += 1
        elif nextAct == 's' and nextCol == 'b':
            successorList[1] += 1
        elif nextAct == 's' and nextCol == 'r':
            successorList[2] += 1
        elif nextAct == 'r' and nextCol == 'w':
            successorList[3] += 1
        elif nextAct == 'r' and nextCol == 'b':
            successorList[4] += 1
        elif nextAct == 'r' and nextCol == 'r':
            successorList[5] += 1

    return successorList

def calculate_probabilities(countTransition, successors):
    probabilities = [0, 0, 0, 0, 0, 0]
    for idxProb, suc in enumerate(successors):
        probabilities[idxProb] = suc / countTransition

    return probabilities

In [9]:
# Creating the data structure for representing the states within the MDP
def create_states_successor_list():
    transitionPossibilities = {}
    for first in range(4):
        for second in range(4):
            for third in range(4):
                for fourth in range(4):
                    for fifth in range(4):
                        for sixth in range(4):
                            for action in range(2):
                                for colour in range(3):
                                    storagePlace = str(first) + str(second) + str(third) + str(fourth) + str(fifth) + str(sixth) + str(action) + str(colour + 1)
                                    transitionPossibilities[storagePlace] = find_transition_possibilities(storagePlace)

    # Map dictionary to list
    successorList = []
    for key, value in transitionPossibilities.items():
        temp = [key, value]
        successorList.append(temp)
    successorList.sort()

    return successorList


# Find possible successors for the given state
def find_transition_possibilities(currentState):
    # Adding item
    possibleTransitions = []
    store = currentState[6]
    col = currentState[7]

    for idx, char in enumerate(currentState):
        if store == '0':
            if idx < 6:
                if char == '0':
                    nextState = list(currentState)
                    nextState[idx] = str(col)
                    nextStateStr = ''.join(nextState)
                    for actIdx in range(2):
                        for colIdx in range(3):
                                if prob_feasible(nextStateStr, actIdx, colIdx) == True:
                                    nextState[len(nextState) - 2] = str(actIdx)
                                    nextState[len(nextState) - 1] = str(colIdx + 1)
                                    modfied = "".join(nextState)
                                    possibleTransitions.append(modfied)

        else:
            if idx < 6:
                nextState = list(currentState)
                if nextState[idx] == col:
                    nextState[idx] = str('0')
                    nextStateStr = ''.join(nextState)
                    for actIdx in range(2):
                        for colIdx in range(3):
                            if prob_feasible(nextStateStr, actIdx, colIdx) == True:
                                nextState[len(nextState) - 2] = str(actIdx)
                                nextState[len(nextState) - 1] = str(colIdx + 1)
                                modfied = "".join(nextState)
                                possibleTransitions.append(modfied)

    return possibleTransitions


def prob_feasible(currentState, idxAct, idxCol):
    subStringCurrentState = currentState[0:6]
    col = str(idxCol + 1)
    feasible = False

    if idxAct == 0:
        if subStringCurrentState.find('0') != -1:
            feasible = True
    else:
        if subStringCurrentState.find(col) != -1:
            feasible = True

    return feasible


In [10]:
# Create probability Matrix
def create_probability_matrix(states_list, index_list, customProbabilities):
    # Create empty matrix
    matrix = np.zeros([6, len(index_list), len(index_list)], dtype=np.float16)

    # Cycle through all states
    for idx, state in enumerate(states_list):
        tempStorageAt0 = []
        tempStorageAt1 = []
        tempStorageAt2 = []
        tempStorageAt3 = []
        tempStorageAt4 = []
        tempStorageAt5 = []

        currState = list(state[0])
        allCurr = currState[6] + currState[7]

        for i, substate in enumerate(states_list[idx][1]):
            for charIdx, char in enumerate(currState):
                if charIdx < 6:
                    if currState[6] == '0': # Store as next input for warehouse
                        if currState[charIdx] != substate[charIdx]:
                            if substate[charIdx] == currState[7]:  # What Colour?
                                if charIdx == 0:
                                    tempStorageAt0.append(substate)
                                elif charIdx == 1:
                                    tempStorageAt1.append(substate)
                                elif charIdx == 2:
                                    tempStorageAt2.append(substate)
                                elif charIdx == 3:
                                    tempStorageAt3.append(substate)
                                elif charIdx == 4:
                                    tempStorageAt4.append(substate)
                                elif charIdx == 5:
                                    tempStorageAt5.append(substate)
                    else:   # Restore as next input for warehouse
                        if currState[charIdx] == currState[7]:
                            if substate[charIdx] == '0':
                                if charIdx == 0:
                                    tempStorageAt0.append(substate)
                                elif charIdx == 1:
                                    tempStorageAt1.append(substate)
                                elif charIdx == 2:
                                    tempStorageAt2.append(substate)
                                elif charIdx == 3:
                                    tempStorageAt3.append(substate)
                                elif charIdx == 4:
                                    tempStorageAt4.append(substate)
                                elif charIdx == 5:
                                    tempStorageAt5.append(substate)

        storages = [tempStorageAt0, tempStorageAt1, tempStorageAt2, tempStorageAt3, 
                    tempStorageAt4, tempStorageAt5]


        for stIdx, storage in enumerate(storages):
            # Calculate probabilities with equal allocation
            if len(storage) > 0:
                proportionateMatrix = np.zeros([2, len(storage)])

                temp = []
                for succIdx, successor in enumerate(storage):
                    # Get Allocation for later calculation of probability
                    allocation = successor[6] + successor[7]
                    allocationCurrState = currState[6] + currState[7]
                    nextSt = mapping_change(allocation)
                    currSt = mapping_change(allocationCurrState)

                    # Retrieve current Probability from csv-input
                    prob = customProbabilities[currSt]
                    prob = float(prob[nextSt])

                    # TransitionIndex
                    transitionIndex = index_list.index(successor)

                    proportionateMatrix[0, succIdx] = prob
                    proportionateMatrix[1, succIdx] = transitionIndex


                # Calculate proportionate probability
                weight = sum(proportionateMatrix[0])
                for propIdx in range(len(storage)):
                    proportionateProbability = proportionateMatrix[0, propIdx] / weight

                    transitionIndex = int(proportionateMatrix[1, propIdx])
                    temp.append(proportionateProbability)
                    matrix[stIdx, idx, transitionIndex] = proportionateProbability


            else:
                matrix[stIdx, idx, idx] = 1.0

    return matrix

def mapping_change(allocation):
    nextSt = 0

    if allocation == '01':
        nextSt = 0
    elif allocation == '02':
        nextSt = 1
    elif allocation == '03':
        nextSt = 2
    elif allocation == '11':
        nextSt = 3
    elif allocation == '12':
        nextSt = 4
    elif allocation == '13':
        nextSt = 5

    return nextSt


In [11]:
# Create reward matrix 
def create_3d_reward_matrix(index_list, probMatrix):
    R = np.zeros([6, len(index_list), len(index_list)], dtype=np.float16)

    for act in range(6):
        print('current Action: ', act)
        for i in range(len(index_list)):
            for j in range(len(index_list)):
                if probMatrix[act,i,j] != 0:
                    if probMatrix[act, i, j] != 1.0:
                        currState = list(index_list[i])
                        nextState = list(index_list[j])

                        # Calculate Distance
                        distance= 0
                        for cellIdx, cell in enumerate(currState):
                            if cellIdx < 6:
                                if currState[cellIdx] != nextState[cellIdx]:
                                    if cellIdx == 0:
                                        distance = 35
                                    elif cellIdx == 1 or cellIdx == 3:
                                        distance = 25
                                    elif cellIdx == 2 or cellIdx == 4:
                                        distance = 20
                                    else:
                                        distance = 10

                        R[act, i, j] = distance

                    else:
                        R[act, i, j] = -10000
                else:
                    R[act, i, j] = -100
    return R

In [12]:
# Evaluate the given Policy
def evaluatePolicy(states, policy, idxList):
    with open('SAKI Exercise 3 warehouseordernew.txt') as f:
        lineList = f.readlines()

    lineList.pop(0) # Remove first item

    testdata = np.empty([len(lineList), 2], dtype=str)
    for idx, line in enumerate(lineList):
        words = line.split()
        testdata[idx, 0] = words[0]
        testdata[idx, 1] = words[1]

    state = states[0] # Start with store white

    stateTrans = state[0][6]
    stateCol = state[0][7]

    distance = 0        # Calculate distance for later evaluation
    for transIdx, transition in enumerate(testdata):
        trans = transition[0]
        col = transition[1]

        transitionIndex = index_list.index(state[0])
        placement = policy[transitionIndex]         #Saving Action

        # Add distance
        if placement == 0:
            distance += 2 # 1 step to location and 1 step back
        elif placement == 1 or placement == 3:
            distance += 4  # 2 steps to location and 2 steps back
        elif placement == 2 or placement == 4:
            distance += 6  # 3 steps to location and 3 steps back
        else:
            distance += 8  # 4 steps to location and 4 steps back

        nextState = eval_get_succ(stateTrans, stateCol, trans, col, state, state[1], placement)

        print('------')
        mappingIdx = idxList.index(nextState)
        state = states[mappingIdx]
        stateTrans = state[0][6]
        stateCol = state[0][7]

        mapping_matrix = {
            "0": '-',
            "1": 'W',
            "2": 'B',
            "3": 'R'
        }

        print(' _____ _____ _____')
        print('|     |     |     |')
        print('| ',mapping_matrix[nextState[5]], ' | ',mapping_matrix[nextState[4]],' | ',mapping_matrix[nextState[3]],' |')
        print('|_____|_____|_____|')
        print('|     |     |     |')
        print('| ',mapping_matrix[nextState[2]], ' | ',mapping_matrix[nextState[1]],' | ',mapping_matrix[nextState[0]],' |')
        print('|_____|_____|_____|', ' (1,1)')

        print('')
        print(nextState)
        print('')

    print('The robot has covered the following distance: ',distance)
    print('')


def eval_get_succ(stateTrans, stateCol, trans, col, currState, succList, placement):
    if trans == 's':
        trans = 0
    else:
        trans = 1

    if col == 'w':
        col = 1
    elif col == 'b':
        col = 2
    else:
        col = 3

    nextState = currState[0]
    for successor in succList:
        if successor[6] == str(trans) and successor[7] == str(col):
            if stateTrans == '0' and currState[0][placement] == '0' and successor[placement] == stateCol:
                nextState = successor
                break
            elif stateTrans == '1' and currState[0][placement] == stateCol and successor[placement] == '0':
                nextState = successor
                break

    return nextState

In [13]:
# Main Script starting here!

if __name__ == "__main__":

    # Create states List with successors
    states_list = create_states_successor_list()

    # Create index list for later mapping
    index_list = []
    for value in states_list:
        index_list.append(value[0])

    print('List created')

    # Scan Training Data
    occurances_Colours = scan_training_data('Exercise 3 - Reinforcement Learning - warehousetraining.txt')

    probabilities = []
    with open('2x3_probabilities.csv') as f:
        reader = csv.reader(f)
        probabilities = list(reader)


    # Create Matrices
    P = create_probability_matrix(states_list, index_list, probabilities)
    print('P matrix done!')
    R = create_3d_reward_matrix(index_list, P)

    # In the right shape?
    mdp.util.check(P, R)

    print('Preprocessing Done!')

    # Run MDP and create Policy
    sdp = mdp.mdp.ValueIteration(P, R, 0.99, max_iter = 1000)
    sdp.run()

    print('Number of Iterations: ',sdp.iter)
    
    # Write policy into txt-file
    text_file =  open("2x3_policy.txt", "w", newline="")
    for stateIdx, state in enumerate(sdp.policy):
        stringState = str(index_list[stateIdx]) + ' -> ' + 'Action: '+ str(state) + '\n'
        text_file.write(stringState)
    text_file.close()

    # Evaluate Policy
    evaluatePolicy(states_list, sdp.policy, index_list)

    print('------')
    print('Done')


List created
P matrix done!
current Action:  0
current Action:  1
current Action:  2
current Action:  3
current Action:  4
current Action:  5
Preprocessing Done!
Number of Iterations:  1833
------
 _____ _____ _____
|     |     |     |
|  -  |  -  |  -  |
|_____|_____|_____|
|     |     |     |
|  -  |  W  |  -  |
|_____|_____|_____|  (1,1)

01000003

------
 _____ _____ _____
|     |     |     |
|  -  |  -  |  -  |
|_____|_____|_____|
|     |     |     |
|  -  |  W  |  R  |
|_____|_____|_____|  (1,1)

31000013

------
 _____ _____ _____
|     |     |     |
|  -  |  -  |  -  |
|_____|_____|_____|
|     |     |     |
|  -  |  W  |  -  |
|_____|_____|_____|  (1,1)

01000002

------
 _____ _____ _____
|     |     |     |
|  -  |  -  |  B  |
|_____|_____|_____|
|     |     |     |
|  -  |  W  |  -  |
|_____|_____|_____|  (1,1)

01020003

------
 _____ _____ _____
|     |     |     |
|  -  |  -  |  B  |
|_____|_____|_____|
|     |     |     |
|  -  |  W  |  R  |
|_____|_____|_____|  (1,1)

