# Sensor Position Optimization

### BMW Quantum Computing Challenge

#### Valter Uotila, Unified Database Management Systems
#### Sardana Ivanova, Discovery Research Group

#### Department of Computer Science, University of Helsinki

## Remarks about the implementation

We have been learning, testing and developing some basic quantum algorithms using D-waves own API but we are not very experienced with details related to these quantum computers.

Unfortunately, it seems that Amazon Bracket does not yet support D-waves hybrid solvers. We might be wrong but we could not figure out how to access, for example, LeapHybridSampler using Bracket. It seems that BracketDWaveSampler and BracketSampler are using the standard DWaveSampler (https://docs.ocean.dwavesys.com/projects/system/en/stable/reference/samplers.html). Because our solution is based on LeapHybridSampler, it is not possible to run it in Bracket. Maybe in the future Amazon will implement something like BracketDWaveHybridSampler which offers the functionality of LeapHybridSampler. Anyway, we are positively impressed how Bracket collects various quantum computing resources together.

## Initializing parameters

### Importing D-wave packages

Note that in order to run the codes here, you need to be able to successfully access D-wave quantum cloud computing resources. You can see more info at https://cloud.dwavesys.com/. We are used to develop with D-waves developer's plan which includes 1 min of quantum computing time per month.

In [None]:
import dimod

from dwave.system import LeapHybridSampler

import json
import csv

from itertools import combinations
import os
import math

import numpy as np
from matplotlib import pyplot
from mpl_toolkits.mplot3d import Axes3D
from mpl_toolkits import mplot3d
from sympy import *

from ipynb.fs.defs.overlap_3D import overlap

notebook_path = os.path.abspath("main_3D_connecting_Dwave_Leap.ipynb")

### Global parameters

The parameter enviroment_x, enviroment_y, enviroment_z describe the lengths of sides of the space which is considered as environment. Anything outside the environment is supposed not to be accessible by sensors. We took the values for these parameters from the data set criticallity_grid which BMW provided.

In [None]:
car_sample_accuracy = 500

angle_accuracy = 60

variables = dict()

all_variables = list()

criticallity_grid_abs_path = os.path.join(os.path.dirname(notebook_path), "sensor_position_data/criticallity_grid_0_5.csv")

xs, ys, zs = list(), list(), list()

with open(criticallity_grid_abs_path, newline='') as csvfile:
    records = iter(csv.reader(csvfile, delimiter=','))
    next(records)
    for row in records:
        xs.append(int(float(row[0])*100))
        ys.append(int(float(row[1])*100))
        zs.append(int(float(row[2])*100))

environment_x = abs(max(xs)) + abs(min(xs))
print(environment_x)

environment_y = abs(max(ys)) + abs(min(ys))
print(environment_y)

environment_z = abs(max(zs)) + abs(min(zs))
print(environment_z)

### Importing sensors

In [None]:
abs_sensors_file_path = os.path.join(os.path.dirname(notebook_path), "3d_example_data/3D_sensors_set_1.json")

sensor_types = {0: 'lidar', 1: 'radar', 2: 'camera', 3: 'ultrasound'}

f = open(abs_sensors_file_path)

sensor_root = json.load(f)
sensors = sensor_root["sensors"]

def get_sensor_price(sensor_id):
    for sensor in sensors:
        if sensor['id'] == sensor_id:
            return sensor['price']

print(sensors)

### Initializing allowed sensor positions on car

Here we utilize the data BMW provided. Compared to the first-round 2D version, the car model in this second-round version is based on the allowed sensor positions data. In order to create variables, we need to sample some points from the surfaces. To sample points, we need some easier format than vector representation. Thus we calculate the plane equation for the surfaces. The equation allows us to sample points when we are creating the variables.

In [None]:
def euclidean_3d_distance(x, y):
    return math.floor(math.sqrt(pow((x[0] - y[0]), 2) + pow((x[1] - y[1]), 2) + pow((x[2] - y[2]), 2)))

def unit_vector(vector):
    """ Returns the unit vector of the vector.  """
    return vector / np.linalg.norm(vector)

def angle_between(v1, v2):
    """ Returns the angle in radians between vectors 'v1' and 'v2'::

            >>> angle_between((1, 0, 0), (0, 1, 0))
            1.5707963267948966
            >>> angle_between((1, 0, 0), (1, 0, 0))
            0.0
            >>> angle_between((1, 0, 0), (-1, 0, 0))
            3.141592653589793
    """
    v1_u = unit_vector(v1)
    v2_u = unit_vector(v2)
    return np.arccos(np.clip(np.dot(v1_u, v2_u), -1.0, 1.0))

def plane_equation(c1, c2, c3):
    p1 = np.array([c1[0], c1[1], c1[2]])
    p2 = np.array([c2[0], c2[1], c2[2]])
    p3 = np.array([c3[0], c3[1], c3[2]])
    
    # These two vectors are in the plane
    v1 = unit_vector(p3 - p1)
    v2 = unit_vector(p2 - p1)
    v2 -= v1.dot(v2)*v1
    
    #if not np.allclose([1,1,0,0], [v1.dot(v1), v2.dot(v2), v1.dot(v2), v1.dot(v2)]):
    #    print("Error in dot products!")
    #    print(v1.dot(v2))
    #    print(v1.dot(v2))
    
    #v2 = unit_vector((p2 - p1) + (-(v1)*math.sin(math.pi/2 - angle_between(p2 - p1, v1))*np.linalg.norm(p2 - p1)))
    
    # cross product is a vector normal to the plane
    cp = unit_vector(np.cross(v1, v2))
    cp = np.array([round(x, 7) for x in cp])
    a, b, c = cp
    
    #if not np.allclose([1,0,0], [cp.dot(cp), cp.dot(v1), cp.dot(v2)]):
    #    print("Error in normal vector dot products!")
    #    print(cp.dot(v1))
    #    print(cp.dot(v2))
    
    # This evaluates a * x3 + b * y3 + c * z3 which equals d
    d = np.dot(cp, p3)
    
    #print('Equation is {0}x + {1}y + {2}z = {3}'.format(a, b, c, d))
    
    #plot_region(a, b, c, d)
    
    return { 'span1': v1, 'span2': v2, 'normal_vector': cp, 'x': a, 'y': b, 'z': c, 'd': d }

def plot_region(a, b, c, d):
    x = np.linspace(-1,1,10)
    y = np.linspace(-1,1,10)

    X,Y = np.meshgrid(x,y)
    if c != 0:
        Z = (d - a*X - b*Y) / c

        fig = plt.figure()
        ax = fig.gca(projection='3d')

        surf = ax.plot_surface(X, Y, Z)
        
def scatter_plot(points, save = False):
    fig = pyplot.figure()
    #ax = pyplot.axes(projection='3d')
    #ax = fig.add_subplot(111, projection='3d')
    ax = Axes3D(fig)
    
    x_vals = [c[0] for c in points]
    y_vals = [c[1] for c in points]
    z_vals = [c[2] for c in points]
    
    #ax.scatter3D(x_vals, y_vals, z_vals, c=z_vals);
    
    ax.scatter(x_vals, y_vals, z_vals)
    #ax.set_xlabel('X Label')
    #ax.set_ylabel('Y Label')
    #ax.set_zlabel('Z Label')
    pyplot.show()
    
    

In [None]:
abs_position_file_path = os.path.join(os.path.dirname(notebook_path), "sensor_position_data/allowed_sensor_positions.csv")

allowed_sensor_positions = dict()

with open(abs_position_file_path, newline='') as csvfile:
    records = iter(csv.DictReader(csvfile, delimiter=';'))
    next(records)
    for row in records:
        c1 = (int(row["x1"]), int(row["y1"]), int(row["z1"]))
        c2 = (int(row["x2"]), int(row["y2"]), int(row["z2"]))
        c3 = (int(row["x3"]), int(row["y3"]), int(row["z3"]))
        c4 = (int(row["x4"]), int(row["y4"]), int(row["z4"]))
        
        region = row["Region"].lower()
        
        allowed_sensor_positions[region] = dict()
        allowed_sensor_positions[region]["corners"] = [c1, c2, c3, c4]
        allowed_sensors = [x.lower() for x in row["Allowed Sensors"].split(", ")]
        allowed_sensor_positions[region]["allowed_sensors"] = allowed_sensors
        
        equation = plane_equation(c1, c2, c3)
        allowed_sensor_positions[region]["equation"] = equation
        
        intervals = dict()
            
        if equation["x"] == 0:
            if c1[0] < c2[0]:
                intervals["x"] = range(c1[0], c2[0], car_sample_accuracy)
            elif c1[0] > c2[0]:
                intervals["x"] = range(c2[0], c1[0], car_sample_accuracy)
            elif c1[0] < c3[0]:
                intervals["x"] = range(c1[0], c3[0], car_sample_accuracy)
            elif c1[0] > c3[0]:
                intervals["x"] = range(c3[0], c1[0], car_sample_accuracy)
                
        if equation["y"] == 0:
            if c1[1] < c2[1]:
                intervals["y"] = range(c1[1], c2[1], car_sample_accuracy)
            elif c1[1] > c2[1]:
                intervals["y"] = range(c2[1], c1[1], car_sample_accuracy)
            elif c1[1] < c3[1]:
                intervals["y"] = range(c1[1], c3[1], car_sample_accuracy)
            elif c1[1] > c3[1]:
                intervals["y"] = range(c3[1], c1[1], car_sample_accuracy)
                
        if equation["z"] == 0:
            if c1[2] < c2[2]:
                intervals["z"] = range(c1[2], c2[2], car_sample_accuracy)
            elif c1[2] > c2[2]:
                intervals["z"] = range(c2[2], c1[2], car_sample_accuracy)
            elif c1[2] < c3[2]:
                intervals["z"] = range(c1[2], c3[2], car_sample_accuracy)
            elif c1[2] > c3[2]:
                intervals["z"] = range(c3[2], c1[2], car_sample_accuracy)
                
        allowed_sensor_positions[region]["fixed_intervals"] = intervals

for elem in allowed_sensor_positions:
    print(elem, allowed_sensor_positions[elem])
    print()
        

### Initializing variables

Compared to the first-round, variables are updated to be triples (x, y, i) where x and y are points in the 3D-space and i refers to the sensor id. We fix the point x so that it belongs to some of the allowed sensor positions on the car's surface. The point y is selected from the environment so that the distance between x and y is range R_i for sensor i.

We are dealing with different cases depending on how the car surface is positioned in the space.

In [None]:
def sample_from_car_surface(corners, equation, fixed_intervals, car_sample_accuracy):
    sample = list()
    
    # This is the simple case that the plane is parallel with some of the three axis. 
    # Thus the parallel axis stays constant.
    # For example, side mirror, back, trunk and sides are parallel to one of the axis
    
    if len(fixed_intervals) == 2:
        if 'x' in fixed_intervals and 'y' in fixed_intervals:
            
            z = corners[0][2]
            
            for x in fixed_intervals['x']:
                for y in fixed_intervals['y']:
                    sample.append((x, y, z))
                    
        elif 'x' in fixed_intervals and 'z' in fixed_intervals:
            
            y = corners[0][1]
            
            for x in fixed_intervals['x']:
                for z in fixed_intervals['z']:
                    sample.append((x, y, z))
                    
        elif 'y' in fixed_intervals and 'z' in fixed_intervals:
            
            x = corners[0][0]
            
            for y in fixed_intervals['y']:
                for z in fixed_intervals['z']:
                    sample.append((x, y, z))

    elif len(fixed_intervals) == 1:
        
        if 'x' in fixed_intervals:
            y, z = symbols('y z')
            expr = equation['y']*y + equation['z']*z - equation['d']
            y_interval = None
            
            c1 = corners[0][1]
            c2 = corners[1][1]
            c3 = corners[2][1]
            
            if c1 < c2:
                y_interval = range(c1, c2, car_sample_accuracy)
            elif c1 > c2:
                y_interval = range(c2, c1, car_sample_accuracy)
            elif c1 < c3:
                y_interval = range(c1, c3, car_sample_accuracy)
            elif c1 > c3:
                y_interval = range(c3, c1, car_sample_accuracy)
        
            
            for x in fixed_intervals['x']:
                for y_var in y_interval:
                    y_expr = expr.subs(y, y_var)
                    z = math.floor(solve(y_expr)[0])
                    sample.append((x, y_var, z))
                    #print((x, y_var, z))
                x += car_sample_accuracy
                    
        elif 'y' in fixed_intervals:
            x, z = symbols('x z')
            expr = equation['x']*x + equation['z']*z - equation['d']
            x_interval = None
            
            c1 = corners[0][0]
            c2 = corners[1][0]
            c3 = corners[2][0]
            
            if c1 < c2:
                x_interval = range(c1, c2, car_sample_accuracy)
            elif c1 > c2:
                x_interval = range(c2, c1, car_sample_accuracy)
            elif c1 < c3:
                x_interval = range(c1, c3, car_sample_accuracy)
            elif c1 > c3:
                x_interval = range(c3, c1, car_sample_accuracy)
        
            
            for y in fixed_intervals['y']:
                for x_var in x_interval:
                    x_expr = expr.subs(x, x_var)
                    z = math.floor(solve(x_expr)[0])
                    sample.append((x_var, y, z))
                    #print((x_var, y, z))
                y += car_sample_accuracy
            
        elif 'z' in fixed_intervals:
                
            x, y = symbols('x y')
            expr = equation['x']*x + equation['y']*y - equation['d']
            x_interval = None

            c1 = corners[0][0]
            c2 = corners[1][0]
            c3 = corners[2][0]

            if c1 < c2:
                x_interval = range(c1, c2, car_sample_accuracy)
            elif c1 > c2:
                x_interval = range(c2, c1, car_sample_accuracy)
            elif c1 < c3:
                x_interval = range(c1, c3, car_sample_accuracy)
            elif c1 > c3:
                x_interval = range(c3, c1, car_sample_accuracy)


            for z in fixed_intervals['z']:
                for x_var in x_interval:
                    x_expr = expr.subs(x, x_var)
                    y = math.floor(solve(x_expr)[0])
                    #print(y)
                    sample.append((x_var, y, z))
                    #print((x_var, y, z))
                z += car_sample_accuracy
        
    return sample



In [None]:
# Sample points from the cars surface
# In the optimal situation we could sample points with accuracy factor 1. This would produce a huge number of points but the system cannot scale that well.

for sensor in sensors:
    variables[sensor["id"]] = dict()
    for pos_name in allowed_sensor_positions:
        pos = allowed_sensor_positions[pos_name]
        #print(sensor_types)
        if sensor_types[sensor["type"]] in pos["allowed_sensors"]:
            
            variables[sensor["id"]][pos_name] = list()
            srange = sensor["view"]["range"]
            #print("Range", srange)
            corners = pos["corners"]
            equation = pos["equation"]
            fixed_intervals = pos["fixed_intervals"]
            
            normal_vector = equation['normal_vector']
            spanning_vector_on_plane1 = equation['span1']
            spanning_vector_on_plane2 = equation['span2']
            
            car_sample = sample_from_car_surface(corners, equation, fixed_intervals, car_sample_accuracy)
            
            #print(pos_name)
            #print(car_sample)
            #print()
            
            #scatter_plot(car_sample)
        
            for car_point in car_sample:
                for angle_on_plane in range(0, 360, angle_accuracy):
                    for angle_for_normal in range(0, 90, angle_accuracy):
                        
                        rad_angle_on_plane = math.radians(angle_on_plane)
                        rad_angle_for_normal = math.radians(angle_for_normal)
                        
                        #print("Car point ", car_point)
                        
                        #print("Normal vector", normal_vector)
                        #print("Spanning vector", spanning_vector_on_plane1)
                        #print("Spanning vector", spanning_vector_on_plane2)
                        
                        # We start moving from the fixed point on the car's surface
                        point_in_environment = np.array([float(x) for x in car_point])
                        car_point_vector = np.array(car_point)
                        
                        #print(float(srange))
                        #print(rad_angle_on_plane)
                        #print(math.cos(rad_angle_on_plane))
                        
                        #print("#11 ", srange*math.cos(rad_angle_on_plane)*spanning_vector_on_plane1)
                        #print("#12 ", srange*math.sin(rad_angle_on_plane)*spanning_vector_on_plane2)
                        #print("#13 ", srange*math.sin(rad_angle_for_normal)*normal_vector)
                        
                        # Move along the plane to the direction of the first spanning vector
                        point_in_environment += srange*math.cos(rad_angle_on_plane)*math.cos(rad_angle_for_normal)*spanning_vector_on_plane1
                        
                        #print("#1 ", point_in_environment)
                        
                        # Move along the plane to the direction of the second spanning vector
                        point_in_environment += srange*math.sin(rad_angle_on_plane)*math.cos(rad_angle_for_normal)*spanning_vector_on_plane2
                        
                        #print("#2 ", point_in_environment)
                        
                        # Move to the orthonogal direction to the plane i.e. "upwards"
                        point_in_environment += srange*math.sin(rad_angle_for_normal)*normal_vector
                        
                        #print("#3 ", point_in_environment)
                        
                        #env_points.append(point_in_environment)
                        #scatter_plot(env_points)
                        
                        point_in_environment = [math.floor(x) for x in point_in_environment]
                        
                        # For bugging purposes:
                        #print("Angles are ", angle_on_plane, angle_for_normal)
                        #print("Distance defined in the sensor data is " + str(srange) + " and the distance between the sampled points is ", euclidean_3d_distance(car_point, tuple(point_in_environment)))
            
                        b_variable = (car_point, tuple(point_in_environment), sensor["id"])
                        variables[sensor["id"]][pos_name].append(b_variable)
                        all_variables.append(b_variable)
                        #break
                    #break
                #break
            #break
        #break
            

#print(variables.keys())
print(len(all_variables))


## Constructing quadratic unconstrained binary optimization model

In [None]:
vartype = dimod.BINARY
main_bqm = dimod.BinaryQuadraticModel({}, {}, 0.0, vartype)

The following functions enable us to append or initialize coefficients for the variables in the BQM. The distance function implements the ordinary Euclidean distance.

In [None]:
def append_linear_safe(variable, value, linear_dict):
    if variable in linear_dict.keys():
        linear_dict[variable] = linear_dict[variable] + value
    else:
        linear_dict[variable] = value

def append_quadratic_safe(variable, value, quadratic_dict):
    if variable in quadratic_dict.keys():
        quadratic_dict[variable] = quadratic_dict[variable] + value
    else:
        quadratic_dict[variable] = value

def sensor_view_volume(sensor_id):
    for sensor in sensors:
        if sensor['id'] == sensor_id:
            # Following code calculates volume of view of sensor
            side1 = 2*sensor["view"]["range"]*float(math.tan(math.radians(sensor["view"]["angle"]["horizontal"])/2))
            side2 = 2*sensor["view"]["range"]*float(math.tan(math.radians(sensor["view"]["angle"]["vertical"])/2))
            return (1/3)*sensor["view"]["range"]*side1*side2

### Constraint 1: selecting sufficiently sensors to cover the environment

Every binary quadratic function which is part of the model contains four parameters: linear terms, quadratic terms, offset (constant) and variable type. Variable type is always BINARY since we are using QUBO. If we use Ising, we set variable type to be SPIN.

In [None]:
# Encoding constraint H1

E = environment_x*environment_y*environment_z

#print(E)

A1 = 1
linear_h1 = {}
quadratic_h1 = {}
offset_h1 = math.pow(E, 2)
C = 1

for sensor_id in variables:
    volume = sensor_view_volume(sensor_id)
    #print(volume)
    coefficient = math.pow(volume, 2)*math.pow(C, 2) - 2*E*C*volume
    for surface in variables[sensor_id]:
        for var in variables[sensor_id][surface]:
            append_linear_safe(var, coefficient, linear_h1)
            
quadratic_terms = combinations(all_variables, 2)

for pair in quadratic_terms:
    b1 = pair[0]
    b2 = pair[1]
    
    volume1 = sensor_view_volume(b1[2])
    volume2 = sensor_view_volume(b2[2])
    
    coefficient_quadratic = 2*math.pow(volume1, 2)*math.pow(volume2, 2)
    append_quadratic_safe((b1, b2), coefficient_quadratic, quadratic_h1)
            
bqm_h1 = dimod.BinaryQuadraticModel(linear_h1, quadratic_h1, offset_h1, vartype)
bqm_h1.scale(A1)
bqm_h1.normalize()
main_bqm.update(bqm_h1)

i = 0
for elem in main_bqm.linear:
    print(elem, main_bqm.linear[elem])
    i+=1
    if i > 50:
        break
i = 0
for elem in main_bqm.quadratic:
    print(elem, main_bqm.quadratic[elem])
    i+=1
    if i > 50:
        break
            

### Constraint 2: optimizing overlap of sensor views

In [208]:
# Encoding constraint H2

A2 = 1
linear_h2 = {}
quadratic_h2 = {}
offset_h2 = 0

quadratic_terms = combinations(all_variables, 2)

for pair in quadratic_terms:
    b1 = pair[0]
    b2 = pair[1]
    
    coefficient_quadratic = overlap(b1, b2)
    
    append_quadratic_safe((b1, b2), coefficient_quadratic, quadratic_h2)
            
bqm_h2 = dimod.BinaryQuadraticModel(linear_h2, quadratic_h2, offset_h2, vartype)
bqm_h2.scale(A2)
bqm_h2.normalize()
main_bqm.update(bqm_h2)
    

### Constraint 3: minimizing total price

In [None]:
# Encoding constraint H3

A3 = 1
linear_h3 = {}
quadratic_h3 = {}
offset_h3 = 0

for variable in all_variables:
    sensor_id = variable[2]
    price = get_sensor_price(sensor_id)
    append_linear_safe(variable, price, linear_h3)

            
bqm_h3 = dimod.BinaryQuadraticModel(linear_h3, quadratic_h3, offset_h3, vartype)
bqm_h3.scale(A3)
main_bqm.update(bqm_h3)
            

### Solve QUBO

Unfortunately, LeapHybridSampler is not available in Amazon Bracket. That is why this code will not work in Bracket. On the other hand, we tried run the code using BracketDwaveSampler but the problem cannot be mapped to the circuits.

In [None]:
main_bqm.normalize()

sampler = LeapHybridSampler()
sampleset = sampler.sample(main_bqm)
sample = sampleset.first.sample
print(sampleset)
print()
# energy = sampleset.first.energy
print("Possible sensor positions in the space (x-coordinate, y-coordinate, sensor id):")
i = 0
for varname, value in sample.items():
    if value == 1:
        i+=1
        print(varname, value)
print(i)