### Básico para todos os algoritmos

In [1]:
!pip install pyswarms



In [2]:
import math
import random
import json
import numpy as np
import pandas as pd
import matplotlib.patheffects as PathEffects
from matplotlib         import pyplot as plt
from pyswarm            import pso
from matplotlib.patches import Circle

%matplotlib inline

In [3]:
# Map Informations
x_under_lim = 1
x_upper_lim = 44
y_under_lim = 1
y_upper_lim = 15

aps_positions = {'WAP1' : [4.3 , 5.0 ],
                 'WAP2' : [8.0 , 6.0 ],
                 'WAP3' : [16.5, 3.0 ],
                 'WAP4' : [23.0, 3.5 ],
                 'WAP5' : [33.0, 5.0 ],
                 'WAP6' : [38.0, 4.3 ],
                 'WAP7' : [39.0, 11.2],
                 'WAP8' : [33.5, 12.0],
                 'WAP9' : [28.5, 12.0],
                 'WAP10': [20.0, 12.3],
                 'WAP11': [1.5 , 11.4],
                 'WAP12': [4.0 , 17.5],
                 'WAP13': [13.5, 10.0],
                 'WAP14': [17.8, 9.0 ],
                 'WAP15': [32.3, 9.0 ]}

list_aps = list(aps_positions.keys())

#Constantes Log-Distance

N   = 4
d0  = 1
PL0 = 55
STD_DEV = 1
TXPOWER = 0
WALLL_LOSS = 3

#PSO Infomations
population = 20
dimension = 2
generation = 20
fitness_criterion = 10e-8

### Calculo de Paredes

In [4]:
# Abre o json com as informações das salas (dimensão)
with open('rooms2.json', 'r') as json_file:
    rooms = json.load(json_file)

# Calcula a quantidade de paredes por intersecção de retas.
class Wall:
    def __init__(self, x1, y1, x2, y2):
        self.x1 = x1
        self.y1 = y1
        self.x2 = x2
        self.y2 = y2

    def interceptPoint(self, x1, y1, x2, y2):
        p0_x = self.x1
        p0_y = self.y1
        p1_x = self.x2
        p1_y = self.y2

        p2_x = x1
        p2_y = y1
        p3_x = x2
        p3_y = y2
        
        s1_x = p1_x - p0_x
        s1_y = p1_y - p0_y
        s2_x = p3_x - p2_x
        s2_y = p3_y - p2_y

        s = (-s1_y * (p0_x - p2_x) + s1_x * (p0_y - p2_y)) / (-s2_x * s1_y + s1_x * s2_y)
        t = ( s2_x * (p0_y - p2_y) - s2_y * (p0_x - p2_x)) / (-s2_x * s1_y + s1_x * s2_y)

        if (s >= 0) and (s <= 1) and (t >= 0) and (t <= 1):
            intX = p0_x + (t * s1_x)
            intY = p0_y + (t * s1_y)
            
            return [intX, intY];

        return -1

# Calcula o tamanho de todas as paredes do cenário (top, bottom, right, left) 
walls = []

for r in rooms['rooms']:
    room = rooms['rooms'][r]
    
    if room['type'] == 'room':
        xWidth = room['x']+room['width']
        yHeight = room['y']+room['height']

        walls.append(Wall(room['x'], room['y'], xWidth   , room['y'] )); #top
        walls.append(Wall(xWidth   , room['y'], xWidth   , yHeight   )); #right
        walls.append(Wall(xWidth   , yHeight  , room['x'], yHeight   )); #bottom
        walls.append(Wall(room['x'], yHeight  , room['x'], room['y'] )); #left
        
# Realiza o cálculo de paredes entre dois pontos usando as funções acima. O primeiro ponto é a posição do beacon, o segundo é  a posição do ponto.
def intercept(x1, y1, x2, y2):

    if (x1 == x2) and (y1 == y2):
        return []

    allInterceptions = []
    
    for wall in walls:
        intercept = wall.interceptPoint(x1, y1, x2, y2)

        if intercept != -1:
            allInterceptions.append(intercept);
    
    result = []

    for testPoint in allInterceptions:
        foundClose = 'false'

        for resultPoint in result:
            dist = math.sqrt(math.pow(testPoint[0] - resultPoint[0], 2) + math.pow(testPoint[1]-resultPoint[1], 2))

            if dist < 5:
                foundClose = 'true'
                break

        if foundClose == 'false':
            result.append(testPoint)

    return result

#Exemplo
p1 = [397,416]
p2 = [2645,607]

res = len(intercept(p1[0],p1[1],p2[0],p2[1]))
print('Paredes:', res)

Paredes: 0


In [5]:
def euclidian_distance(P1, P2):
    X1, Y1 = P1[0], P1[1]
    X2, Y2 = P2[0], P2[1]
    return round(math.sqrt((X2-X1)**2 + (Y2-Y1)**2),1)

def toPixels(x, y):
    return (x*100, y*100)

def generateMap(particles, costs, realPosition, currentRound):
    low_cost = 100000
    low_position = [0,0]
    
    perfPixels = toPixels(realPosition[0], realPosition[1])
    
    plt.clf()
    #plt.close()
    plt.gca().set_axis_off()
    plt.subplots_adjust(top = 1, bottom = 0, right = 1, left = 0, hspace = 0, wspace = 0)
    plt.margins(0,0)                    
    plt.gca().xaxis.set_major_locator(plt.NullLocator())
    plt.gca().yaxis.set_major_locator(plt.NullLocator())
    
    img = plt.imread("map.png")
    plt.imshow(img)
    
    txt = plt.text(perfPixels[0], perfPixels[1], "✖", weight='bold', fontsize=8, color="white", verticalalignment='center', horizontalalignment='center', zorder=5)
    txt.set_path_effects([PathEffects.withStroke(linewidth=2, foreground='black')])

    v_positions = []
    groups      = {}
    
    for i in range(len(particles)):
        pos = particles[i]
        posPixels = toPixels(pos[0], pos[1])
        label     = str(pos[0]) + str(pos[1])
        
        cost = costs[i]
        
        if cost < low_cost:
            low_cost = cost
            low_position = pos
        
        if label not in groups:
            groups[label] = [1, posPixels[0], posPixels[1], cost]
        else:
            groups[label][0] += 1
            
        circ = Circle((posPixels[0] + np.random.normal(0, 5), posPixels[1] + np.random.normal(0, 5)), radius=10, alpha=0.5, color="red", zorder=10)
        plt.gca().add_patch(circ)
    
    for group in groups.values():
        plt.annotate("Cost: " + str(round(group[3], 1)), xy=(group[1]+5, group[2]-5), size=2.7, va="center", ha="left", xytext=(group[1] + 50, group[2] - 80), zorder=8,
              bbox=dict(boxstyle="square", facecolor="#ffffffcc", edgecolor="#aaaaaa88", linewidth=0.4),
              arrowprops=dict(arrowstyle="-", antialiased=True, color="#444444", connectionstyle="arc3,rad=-0.2", linewidth=0.15))
        
    #Plot Low Position
    lowPosPixels = toPixels(low_position[0], low_position[1])
    circ = Circle((lowPosPixels[0], lowPosPixels[1]), radius=10, color="#2ca05aff", zorder=12)
    plt.gca().add_patch(circ)
    
    imgFilename = "sample-" + str(currentRound) + ".png"
    plt.savefig(imgFilename, dpi=300, bbox_inches="tight", pad_inches=0)

## Algoritmo 1

In [6]:
def update_velocity(particle, velocity, pbest, gbest, w_min=0.5, max=0.5, c=0.5):
    # Initialise new velocity array
    num_particle = len(particle)
    new_velocity = np.array([0.0 for i in range(num_particle)])
    
    # Randomly generate r1, r2 and inertia weight from normal distribution
    r1 = random.uniform(0,max)
    r2 = random.uniform(0,max)
    w = random.uniform(w_min,max)
    c1 = c
    c2 = c
    
    # Calculate new velocity
    for i in range(num_particle):
        new_velocity[i] = w*velocity[i] + c1*r1*(pbest[i]-particle[i])+c2*r2*(gbest[i]-particle[i])
        
    return new_velocity

def update_position(particle, velocity):
    # Move particles by adding velocity
    new_particle = particle + velocity
    
    return new_particle

def pso_2d(population, dimension, generation, fitness_criterion, sample, real_position):
    # Population
    particles = [[round(random.uniform(x_under_lim, x_upper_lim),1), round(random.uniform(y_under_lim, y_upper_lim),1)] for i in range(population)]
    # Particle's best position
    pbest_position = particles
    
    # Fitness
    pbest_fitness = [fitness_function([p[0],p[1]], sample) for p in particles]
    
    # Index of the best particle
    gbest_index = np.argmin(pbest_fitness)
    
    # Global best particle position
    gbest_position = pbest_position[gbest_index]
    
    # Velocity (starting from 0 speed)
    velocity = [[0.0 for j in range(dimension)] for i in range(population)]

    # Loop for the number of generation
    for t in range(generation):
    # Stop if the average fitness value reached a predefined success criterion
        if np.average(pbest_fitness) <= fitness_criterion:
            break
        else:
            for n in range(population):
                # Update the velocity of each particle
                velocity[n] = update_velocity(particles[n], velocity[n], pbest_position[n], gbest_position)
                
                # Move the particles to new position
                particles[n] = update_position(particles[n], velocity[n])
        
        # Calculate the fitness value
        pbest_fitness = [fitness_function([p[0],p[1]], sample) for p in particles]
        
        # Find the index of the best particle
        gbest_index = np.argmin(pbest_fitness)
        
        # Update the position of the best particle
        gbest_position = pbest_position[gbest_index]
        
        ##print('Round', t+1, ', Best Position:', gbest_position, ', Cost:', pbest_fitness[gbest_index])
        ##generateMap(particles, pbest_fitness, real_position, t)

    # Print the results
    #print('Global Best Position: ', gbest_position)
    #print('Best Fitness Value: ', min(pbest_fitness))
    #print('Average Particle Best Fitness Value: ', np.average(pbest_fitness))
    #print('Number of Generation: ', t)
    
    return gbest_position

## Algoritmo 2

cont = 1
primeira=0
rodada = 1

real_position = [550, 1250]
particulas_rodada = []
costs_rodada = []

def cost(x):
    global primeira
    global cont
    global particulas_rodada
    global costs_rodada
    global rodada
    
    atualPos = [round(x[0], 1), round(x[1], 1)]
    cost = euclidian_distance(atualPos, real_position)
    
    if (primeira == 0 and cont <= (population*2)) or (primeira == 1 and cont <=population):
        cont += 1       
        particulas_rodada.append(atualPos)
        costs_rodada.append(cost)
    else:
        generateMap(particulas_rodada, costs_rodada, real_position, rodada)
        cont = 1
        particulas_rodada = []
        primeira = 1
        rodada += 1
    return cost

lb = [x_under_lim, y_under_lim]
ub = [x_upper_lim, y_upper_lim]

#xopt, fopt = pso(cost, lb, ub, swarmsize=population, omega=0.5, phip=0.5, phig=0.5, maxiter=generation, minstep=1e-8, minfunc=1e-8, debug=True)
#print('Best position: [' + str(round(xopt[0],1)) + ',' + str(round(xopt[1],1)) + ']')

### TESTES

In [7]:
df = pd.read_csv('db_yuri_training5_semsala42_1-todos-aps-maxValue.csv')
df = df[(df["DEVICE"] == '2055') | (df["DEVICE"] == '121B') | (df["DEVICE"] == '20B5')].reset_index(drop=True)

In [8]:
def attenuation():
    u = 0
    v = 0
    
    u = random.uniform(0, 1)
    v = random.uniform(0, 1)
    
    normal = math.sqrt(-2.0 * math.log(u)) * math.cos(2.0 * math.pi * v)
    return normal * STD_DEV

# a linha de um dataframe (sample) vem na forma de um object e aqui transformamos em um dicionário para obter 
# apenas o número do wap e o respectivo RSSI. Isso será utilizado no cálculo da função de custo para comparar os RSSI's.
def toDict(sample):
    dict_sample = {}

    for i in range(len(list_aps)):
        dict_sample[list_aps[i]] = sample[list_aps[i]]
    
    return dict_sample

# Cada particula tem uma posição [x,y] e pegamos essa posição para obter as distâncias pros respectivos waps
# e através da distância obter o RSSI entre eles. No final será retornado um dicionário com o RSSI para todos os WAPS.
def particle_RSSI(particle_position):
    particle_sample = {}
    
    for i in range(len(list_aps)):
        wap_position = aps_positions[list_aps[i]]
        dist = euclidian_distance(wap_position, particle_position)
        
        if particle_position[0] == wap_position[0]:
            particle_position[0] += 0.1
        if particle_position[1] == wap_position[1]:
            particle_position[1] += 0.1
            
        walls_qtd = len(intercept(wap_position[0],wap_position[1],particle_position[0],particle_position[1]))
        #print("particula:", particle_position, "wap:", wap_position, 'Paredes:', walls_qtd)
        
        if dist < 0.1:
            RSSI = -PL0
        else:
            distLoss = PL0 + 10 * N * math.log10(dist/d0)
            wallLoss = walls_qtd * WALLL_LOSS
            pathLoss = distLoss + wallLoss
            
            RSSI = round((TXPOWER - ( pathLoss)) + attenuation() ,0)
    
        if RSSI < -95: RSSI = -105
        particle_sample[list_aps[i]] = RSSI
    
    return particle_sample

#RMSD entre os RSSI da particula e do sample
def fitness_function(particle_position, sample):
    particula = particle_RSSI(particle_position)
    error = 0
    
    for i in range(len(list_aps)):
        error += math.pow((particula[list_aps[i]] - sample[list_aps[i]]) , 2)
    
    error = error/len(list_aps)
    return error

In [None]:
erro = []
erro_global = []

list_rooms = df['ROOM_ID'].unique()

for room in list_rooms:
    room_df     = df[df['ROOM_ID'] == room]
    room_points = room_df['LABEL'].unique()

    for point in room_points:
        point_df = room_df[room_df['LABEL'] == point]
        
        for index, row in point_df.iterrows():
            sample = toDict(row)
            real_position = [row.X, row.Y]

            estimated_position = pso_2d(population, dimension, generation, fitness_criterion, sample, real_position)

            estimated_error = euclidian_distance(real_position, estimated_position)
            #print('O erro foi de:', estimated_error, 'm')
            
            erro.append(estimated_error)
            erro_global.append(estimated_error)

    print('Sala:', room, 'Erro:', round(np.mean(erro),2))
    erro = []
print('Erro médio global:', round(np.mean(erro_global),2))

Sala: 11 Erro: 3.33
Sala: 5 Erro: 2.75
Sala: 3 Erro: 2.8
Sala: 1 Erro: 3.05
Sala: 9 Erro: 3.46
Sala: 8 Erro: 3.28
Sala: 10 Erro: 4.19
Sala: 6 Erro: 4.43
