In [None]:
!pip install \
  dwave-ocean-sdk \
  dwave-system \
  dwave-neal \
  #dwave-inspector \
  #mplcyberpunk
#!dwave install inspector -y
import json
import dimod
import math
from collections import defaultdict
from dwave.system import LeapHybridSampler, DWaveSampler, EmbeddingComposite
# import dwavebinarycsp
import dwave.inspector
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics.cluster import rand_score, adjusted_rand_score
import pandas as pd
import random
import matplotlib.colors as mcolors
import matplotlib.cm as cm
import os
#import mplcyberpunk
from matplotlib import colors
import colorsys

In [None]:
def get_max_coeff(mydict):
	return max([abs(v) for v in mydict.values()])

def angle_diff(a, b):
	return 2*abs((a - b + 0.5) % 1.0 - 0.5)

In [None]:

def g_from_AJpaper_QUBO(m, Dij):
	return 1 + math.log(Dij*m)

def create_qubo(df, m, nv):
	Z = df['z']
	T = df['theta']
	P = df['momentum']
	nT = len(Z)

	qubo = defaultdict(float)
	Dij_max = 0

	for k in range(nv):
		for i in range(nT):
			for j in range(i+1, nT):
				Dij = ((Z[i] - Z[j])**2 + angle_diff(T[i], T[j])**2) ** 0.5
				Dij_max = max(Dij_max, Dij)
				qubo[(i+nT*k, j+nT*k)] = g_from_AJpaper_QUBO(m, Dij) + min(P[i], P[j])

	lam = 1.0 * get_max_coeff(qubo)

	for i in range(nT):
		for k in range(nv):
			qubo[(i+nT*k, i+nT*k)] -= lam
			for l in range(k+1, nv):
				qubo[(i+nT*k, i+nT*l)] += 2 * lam

	return qubo

def run_qa(df, nv):
	nv = nv
	qubo = create_qubo(df, nv-1, nv)
	strength = math.ceil(get_max_coeff(qubo))
	sampler = EmbeddingComposite(DWaveSampler(token='DEV-(redacted)'))
	response = sampler.sample_qubo(qubo, num_reads=100, chain_strength=strength, annealing_time = 50) # Don't mess with this until you understand it
	best = response.first.sample
	set_solution_from_annealer_response(df, best)

def set_solution_from_annealer_response(df, response):
	nT = len(df)
	track_to_vertex = [None] * nT

	if len(response.items()) == 0:
		print("Invalid solution! Annealer returned an error.")
		df['qagroup'] = None
		return False

	for num, bool in response.items():
		if bool == 1:
			i = num % nT # track number
			k = num // nT # vertex number

			if track_to_vertex[i] != None:
				print("Invalid solution! Track assigned to multiple vertices.")
				df['qagroup'] = None
				return False
			else:
				track_to_vertex[i] = k

	if None in track_to_vertex:
		print("Invalid solution! Track assigned to no vertex.")
		track_to_vertex = None
		return False

	else:
		pass
	df['qagroup'] = track_to_vertex
	return True


In [None]:

def get_reasonable_sizes_for_plotting_momentum(P):
	return [500 * p**0.6 for p in P]

def rand_cmap_for_plotting(nlabels, type='bright'):

	if type == 'bright':
		randHSVcolors = [((np.random.uniform(low=0.0, high=1)),
						  np.random.uniform(low=0.7, high=1),
						  np.random.uniform(low=0.9, high=1)) for _ in range(nlabels)]

		randRGBcolors = []
		for HSVcolor in randHSVcolors:
			randRGBcolors.append(colorsys.hsv_to_rgb(HSVcolor[0], HSVcolor[1], HSVcolor[2]))

		return randRGBcolors
	if type == 'soft':
		low = 0.6
		high = 0.95
		randRGBcolors = [(np.random.uniform(low=low, high=high),
						  np.random.uniform(low=low, high=high),
						  np.random.uniform(low=low, high=high)) for _ in range(nlabels)]
		return randRGBcolors


palette = rand_cmap_for_plotting(30, type='bright')


def plot_clusters(df, grouping, title, saveto=None):
	all_zs = df['z']
	all_thetas = df['theta']
	all_ps = df['momentum']

	colors = [palette[i] for i in grouping]
	scaled = get_reasonable_sizes_for_plotting_momentum(all_ps)
	plt.figure()
	plt.title(title)
	plt.xlabel("Z (normalized)")
	plt.ylabel("θ (normalized)")
	plt.grid()
	for (i, z) in enumerate(all_zs):
		plt.scatter(z, all_thetas[i], color=colors[i], s=scaled[i], alpha=0.5)
		# mplcyberpunk.make_scatter_glow()
	if saveto is not None:
		plt.savefig(saveto)
		plt.close() # avoid displaying the plot

In [None]:
def score_clusters(truth, solution):
	if len(solution) == 0:
		return 0.0
	if solution.isnull().any():
		return 0.0
	return 100.0 * adjusted_rand_score(truth, solution)

def get_rand_from_number_of_vertices(v):
	elem = v[np.random.randint(len(v))]
	v.remove(elem)
	return elem

def generate_clusters(nt=16, nv=2, std=0.02):

	z_range = 1 # 0 to 1
	theta_range = 1 # 0 to 1 (we're acting like we squished the ranges)
	num_tracks = nt

	p = np.random.rand(num_tracks)
	p = 1/(p**2 + 0.001) # 3 orders of magnitude diff between min and max. corresponds to: 30 MeV, 30 GeV scaled
	p /= p.max()

	p = np.sort(p)[::-1].tolist()

	hard_ps = p[:nv]
	soft_ps = p[nv:]

	vertex_zs = np.random.rand(nv) * z_range
	vertex_thetas = np.random.rand(nv) * theta_range

	points_per_cluster = num_tracks // nv
	remainder = num_tracks % nv

	all_zs = []
	all_thetas = []
	all_ps = []
	truth = [] # which cluster each point belongs to
	for i in range(nv):
		z = vertex_zs[i]
		theta = vertex_thetas[i]

		# -1 because we're going to add the vertex itself too
		toadd = points_per_cluster - 1 + (1 if i < remainder else 0)
		for j in range(toadd):
			all_zs.append(z + np.random.normal(scale=std))
			all_thetas.append((theta + np.random.normal(scale=std)) % 1.0) # modulo 1.0 to keep it in the range
			all_ps.append(get_rand_from_number_of_vertices(soft_ps))
			truth.append(i)
		all_zs.append(z)
		all_thetas.append(theta)
		all_ps.append(get_rand_from_number_of_vertices(hard_ps))
		truth.append(i)
	# all_zs = np.array(all_zs)
	# all_thetas = np.array(all_thetas)
	DF = pd.DataFrame({'z': all_zs, 'theta': all_thetas, 'momentum': all_ps, 'truegroup': truth})
	DF.to_csv('TEST_DATA_WEEK_6.csv')
	return DF

	# return (n, all_zs, all_thetas, all_ps, truth)


In [None]:
def run_test(nv, nt, chain_length, chain_strength, annealing_time, sweeps):
    # Set standard deviation for cluster generation
    std = 0.06

    # Generate synthetic clusters
    df = generate_clusters(nt=nt, nv=nv, std=std)

    # Run Quantum Annealing
    qubo = create_qubo(df, chain_length, nv)
    sampler = EmbeddingComposite(DWaveSampler(token='DEV-0c064bac1884ffbe99c32c0c572a9390eb918320'))

    # Perform sampling with provided parameters
    response = sampler.sample_qubo(
        qubo,
        num_reads=sweeps,
        chain_strength=chain_strength,
        annealing_time=annealing_time
    )

    # Extract and validate solution
    best_sample = response.first.sample
    valid_solution = set_solution_from_annealer_response(df, best_sample)

    if not valid_solution:
        return None, None

    # Check and score the solution
    qascore = score_clusters(df['truegroup'], df['qagroup'])
    print("Score:", qascore)

    # Save results to CSV for this test
    df['qascore'] = qascore
    df.to_csv(f"results_nv_{nv}_nt_{nt}.csv", index=False)

    # Return DataFrame and score for comparison and plotting
    return qascore, df

def test_parameter_combinations(test_parameters):
    best_overall_score = 0
    best_overall_params = {}
    best_df = None

    # Set fixed values for nv and nt
    nv = 4
    nt = 10

    for params in test_parameters:
        print("Test parameters:", params)
        # Run test for current parameter set
        qascore, df = run_test(
            nv=nv,
            nt=nt,
            chain_length=params["chain_length"],
            chain_strength=params["chain_strength"],
            annealing_time=params["annealing_time"],
            sweeps=params["sweeps"]
        )

        # Track best configuration and score
        if qascore is not None and qascore > best_overall_score:
            best_overall_score = qascore
            best_overall_params = {**params, "qascore": qascore}
            best_df = df

    # Plot the clusters of the best configuration if found
    if best_df is not None:
        title = f"Best Annealer Solution with ARI={best_overall_score:.1f}%"
        plot_clusters(best_df, best_df['qagroup'], title)

    # Print and return best configuration
    print(f"Best configuration with highest Adjusted Rand Index:\n{best_overall_params}")
    return best_overall_params

In [None]:
import matplotlib.pyplot as plt

# New function to graph the results of the parameter tests
def graph_results(parameter_test_results):
    chain_lengths = [result["chain_length"] for result in parameter_test_results]
    scores = [result["qascore"] for result in parameter_test_results]

    plt.figure(figsize=(10, 6))
    plt.plot(chain_lengths, scores, marker='o', color='b', label="ARI Score")
    plt.xlabel("Chain Length")
    plt.ylabel("Adjusted Rand Index (ARI) Score (%)")
    plt.title("ARI Scores for Different Chain Lengths")
    plt.legend()
    plt.grid(True)
    plt.show()

# Modified run_dynamic_chain_length_test function to store results for graphing
def run_dynamic_chain_length_test_with_graph(nv, nt, chain_strength, annealing_time, sweeps, chain_length_range):
    best_overall_score = 0
    best_overall_params = {}
    parameter_test_results = []

    for chain_length in chain_length_range:
        print(f"Testing chain length: {chain_length:.1f}")
        trials = 30
        avg_score = 0
        for i in range(trials):
            qascore, df = run_test(
                nv=nv,
                nt=nt,
                chain_length=chain_length,
                chain_strength=chain_strength,
                annealing_time=annealing_time,
                sweeps=sweeps
            )
            if qascore is None:
                qascore = 0
            avg_score += qascore

        avg_score /= trials
        parameter_test_results.append({
            "chain_length": chain_length,
            "qascore": avg_score
        })

        print(f"Average score for chain length {chain_length:.1f}: {avg_score}")

        if avg_score > best_overall_score:
            best_overall_score = avg_score
            best_overall_params = {
                "chain_length": chain_length,
                "chain_strength": chain_strength,
                "annealing_time": annealing_time,
                "sweeps": sweeps,
                "qascore": avg_score
            }

    # Graph the results after all tests are complete
    graph_results(parameter_test_results)

    print(f"Best configuration with highest ARI for chain lengths {chain_length_range}:\n{best_overall_params}")
    return best_overall_params

# Run the modified test with nv, nt, and a dynamic chain length range
nv = 4  # Number of vertices
nt = 10  # Number of tracks
chain_length_range = np.arange(1.5, 3.1, 0.1)  # Chain lengths from 1.5 to 3 in 0.1 increments
base_parameters = {
    "chain_strength": 1.5,
    "annealing_time": 80,
    "sweeps": 50
}

# Execute the function with additional plotting
best_params_found = run_dynamic_chain_length_test_with_graph(
    nv=nv,
    nt=nt,
    chain_strength=base_parameters["chain_strength"],
    annealing_time=base_parameters["annealing_time"],
    sweeps=base_parameters["sweeps"],
    chain_length_range=chain_length_range
)
