<a href="https://colab.research.google.com/github/s21sm/Python/blob/master/CNN_Positioning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
# This program is used for CNN based RF fingerprinting positioning calculation

!pip install -U -q PyDrive
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics.pairwise import euclidean_distances
from scipy.spatial import distance
import time

import keras
import tensorflow as tf
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation, Flatten
from keras.layers.convolutional import Convolution2D, MaxPooling2D

# Authenticate and create the PyDrive client.
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)


link1 = 'https://drive.google.com/open?id=1fMPzxJ1rjzoAHCrAEZwMgJFnuelq1f6l' # training rss
link2 = 'https://drive.google.com/open?id=1_3oR_DmfO4MUVkjzmiPp4ze8ZVd9lQYH' # test rss
link3 = 'https://drive.google.com/open?id=1oqe9BmyMy994z0TgUsIzujIcRhbjd7N4' # train coordinte
link4 = 'https://drive.google.com/open?id=1w7ym-3OtIDNLSvSS3HI3VKJCcxR4zeZc' # test coordinate

fluff, id = link1.split('=')
downloaded = drive.CreateFile({'id':id}) 
downloaded.GetContentFile('Training_rss_21Aug17.csv')  
df1 = pd.read_csv('Training_rss_21Aug17.csv')
train_rss = np.array(df1)
train_rss = np.delete(train_rss, 620, 0) # errornious training FP


fluff, id = link2.split('=')
downloaded = drive.CreateFile({'id':id}) 
downloaded.GetContentFile('Test_rss_21Aug17.csv')  
df2 = pd.read_csv('Test_rss_21Aug17.csv')
test_rss = np.array(df2)
err_test = [1267, 2066, 2192, 3084, 3487, 3542] # errornious test FP
test_rss = np.delete(test_rss,err_test,0)



fluff, id = link3.split('=')
downloaded = drive.CreateFile({'id':id}) 
downloaded.GetContentFile('Training_coordinates_21Aug17.csv')  
df3 = pd.read_csv('Training_coordinates_21Aug17.csv')
train_coordinate = np.array(df3)
train_coordinate = np.delete(train_coordinate, 620, 0)



fluff, id = link4.split('=')
downloaded = drive.CreateFile({'id':id}) 
downloaded.GetContentFile('Test_coordinates_21Aug17.csv')  
df4 = pd.read_csv('Test_coordinates_21Aug17.csv')
test_coordinate = np.array(df4)
err_test = [1267, 2066, 2192, 3084, 3487, 3542]
test_coordinate = np.delete(test_coordinate,err_test,0)

In [0]:
# floor mapping 0 = 0, 3.7 = 1, 7.4 = 2 ........
def floor_mapping(coordinate):
  floors = coordinate[:,2]
  mapp =[]
  for i in range(floors.shape[0]):
    if floors[i] == 0.0:
      mapp.append(0)
    elif floors[i] == 3.7:
      mapp.append(1)
    elif floors[i] == 7.4:
      mapp.append(2)
    elif floors[i] == 11.1:
      mapp.append(3)
    elif floors[i] == 14.8:
      mapp.append(4)
  floors = np.asarray(mapp)
  return floors

In [0]:
# Floor 
train_floors = floor_mapping(train_coordinate)
test_floors = floor_mapping(test_coordinate)
# categorical floor
train_floors = np.array(keras.utils.to_categorical(train_floors,num_classes=5))
test_floors = np.array(keras.utils.to_categorical(test_floors,num_classes=5))

In [0]:

train_rss=np.hstack([train_rss, np.full((695, 32), 100)])
# For training data : changing the initial RSS value 100 to -103
for i in range(train_rss.shape[0]): 
 for j in range(train_rss.shape[1]):
  if train_rss[i,j] == 100:
    train_rss[i,j] = -103
  elif train_rss[i,j] > -30:
    train_rss[i,j] = -30

# normalization
train_rss_normal=np.zeros((695, 1024))
for i in range(train_rss.shape[0]): 
 for j in range(train_rss.shape[1]):
   train_rss_normal[i,j] = ((train_rss[i,j] + 103)/ 73 ) 


test_rss=np.hstack([test_rss, np.full((3944, 32), 100)])
# For test data : changing the initial RSS value 100 to -103
for i in range(test_rss.shape[0]): 
 for j in range(test_rss.shape[1]):
  if test_rss[i,j] == 100:
    test_rss[i,j] = -103
  elif test_rss[i,j] > -30:
    test_rss[i,j] = -30

# normalization
test_rss_normal=np.zeros((3944, 1024))
for i in range(test_rss.shape[0]): 
 for j in range(test_rss.shape[1]):
   test_rss_normal[i,j] = ((test_rss[i,j] + 103)/ 73) 

In [0]:
# Creating fingerprint image
train_rss_pixel = np.reshape(train_rss_normal, (-1, 32, 32))
test_rss_pixel = np.reshape(test_rss_normal, (-1, 32, 32))

x_train = np.reshape (train_rss_pixel, list (np.shape (train_rss_pixel)) + [1])
x_test = np.reshape (test_rss_pixel, list (np.shape (test_rss_pixel)) + [1])
y_train = train_floors
y_test = test_floors


In [0]:
# Floor detection model
floor_result_box =[]
for p in range(20):
  floor_model = Sequential()
  floor_model.add(Convolution2D(32,3,data_format='channels_last',activation='relu',input_shape=(32,32,1)))
  floor_model.add(MaxPooling2D(pool_size=(2,2)))
  floor_model.add(Flatten())
  floor_model.add(Dense(100))
  floor_model.add(Dropout(0.7))
  floor_model.add(Dense(5))
  floor_model.add(Activation('softmax'))
  floor_model.compile(loss='categorical_crossentropy', optimizer = 'adadelta', metrics = ['accuracy'])
  floor_model.fit(x_train,y_train,epochs=30)
  a = floor_model.evaluate(x_test,y_test)
  floor_result_box.append(a[1])

In [0]:
# lebeling for positioning model, each training fingerprint is a lebel
training_coordinate_label = np.arange(695)
y_train_all = keras.utils.to_categorical(training_coordinate_label,num_classes=695)

In [0]:
# for 3D positioning model
result_box =[]
for k in range(20):
  positioning_model = Sequential()
  positioning_model.add(Convolution2D(32,3,data_format='channels_last',activation='relu',input_shape=(32,32,1)))
  positioning_model.add(MaxPooling2D(pool_size=(2,2)))
  positioning_model.add(Flatten())
  positioning_model.add(Dense(1024))
  #positioning_model.add(Dropout(0.7))
  positioning_model.add(Dense(695))
  positioning_model.add(Activation('softmax'))
  positioning_model.compile(loss='categorical_crossentropy', optimizer = 'adadelta', metrics = ['accuracy'])
  positioning_model.fit(x_train,y_train_all,epochs=30)

  posi_error = 0
  floor_error = 0
  needed_time = 0
  for i in range(3944): 
     start_time = time.time()
     prediction =  positioning_model.predict(x_test[i].reshape(1, 32, 32, 1)) # for specific images
     pre = np.argmax(prediction)
     a = train_coordinate[pre]
     end_time=time.time()
     time_diff = (end_time-start_time)
     needed_time += time_diff
     b = test_coordinate[i]
     x=distance.euclidean(a,b)
     posi_error+= x
     if a[2] == b[2]:
       floor_error+=1
  x1 = posi_error/3945
  x2 = floor_error/3945
  x3 = needed_time/3945
  line=[x1,x2,x3]
  print(line)
  result_box.append(line)