In [None]:
from matplotlib.axes import subplot_class_factory
import pandas as pd
import matplotlib.pyplot as plt
import networkx as nx
import numpy as np
from numpy import random
import matplotlib as mpl
import matplotlib.cm as cm
from scipy.spatial.distance import hamming

### read data into pandas
df = pd.read_csv('https://raw.githubusercontent.com/weigangq/nov-search/main/data/bd1-subset2.tsv', sep='\t')
print(df.head)
#plt.figure()
#df['Fitness'].plot.hist(bins=10)

### store data into dictionary
recs = df.to_dict("records")

### add edges & assign layers
def seq_dist(wt, var):
    d = 0;
    for i in range(len(wt)):
        if wt[i] != var[i]:
            d += 1
    return(d)

wt = recs[0]['Variants']

dict1 = {}
for rec in recs:
    dict1[rec['Variants']] = {'id': rec['Variants'], 'fit': rec['Fitness'], 'subset': seq_dist(wt, rec['Variants'])}
for level in range(5):
    ndPre = [var for var in  dict1 if dict1[var]['subset'] == level]
    print(level, len(ndPre))

### create graph, add nodes and edges 
DG = nx.DiGraph() 
for var in dict1: 
    DG.add_node(var)         
    for k in dict1[var]: # attach features
        DG.nodes[var][k] = dict1[var][k]
    DG.nodes[var]['alpha'] = dict1[var]['fit'] # assign fitness as alpha (transparency)

for level in range(5): # add edge
    ndPre = [var for var in  dict1 if dict1[var]['subset'] == level]
    ndThis = [var for var in  dict1 if dict1[var]['subset'] == level + 1]
    for i in ndPre:
        for j in ndThis:
            if seq_dist(i, j) == 1:
                if dict1[i]['fit'] < dict1[j]['fit']:   
                    DG.add_edge(i, j)
                else:
                    DG.add_edge(j, i)
#print(len(DG.nodes))
#print(len(DG.edges))

# node coloring by fitness 
#node_colors = range(10)
node_alphas = [ DG.nodes[x]['fit'] for x in DG.nodes]
print(node_alphas)

peaks = [x[0] for x in DG.out_degree if x[1] == 0]
vals = [x[0] for x in DG.in_degree if x[1] == 0]
print(peaks)
print(vals)

### basins of attraction 
def fit_diff(G, nd, suc):
    return G.nodes[suc]['fit'] - G.nodes[nd]['fit'] # must be positive

# recursive function
def greedy_climb(G, nd, pt):
    if nd in peaks: # a peak, no successors; but inclusive as a basin hap
        pt.append(nd)
        return
    best = sorted( [suc for suc in G.successors(nd)], key = lambda x: fit_diff(G, nd, x), reverse = True)
    pt.append(best[0])
    greedy_climb(G, best[0], pt)

# Basin of attraction (should partition the haplotype space)
basins = {} # nuumber of paths to a peak (through greedy hill climbing)
for p in peaks: # initalize dict
    basins[p] = []

for nd_i in DG.nodes: # 625 starting nodes
    #start = np.random.default_rng().choice(DG.nodes)
    path = []
    greedy_climb(DG, nd_i, path) # recursive until a peak
    peak = path[-1] # last item is the peak
    basins[peak].append(path)
    print(path)

for p in basins:
  print(p, "\t", len(basins[p]), "\t", round(dict1[p]['fit'],6))


### escape edge
D = 2 # mini distance between two solutions

# mutating i at a distance d<=D to reach j (for each solution of j): 
def distance_basin(basin, var, d): # mutate var to reach basin at a distance of d or fewer bits
    solutions = [x[0] for x in basin] # starting points in a basin
    dists = 0
    for i in solutions:
        d = hamming(list(i), list(var)) * len(var)
        if d <= D:
            dists += 1
    return(dists)

# mutation distance of i to reach all peaks, self inclusive
def distance_all(nodes, var, d): # within the i, distance to j
    dists = 0
    for nd in nodes:
        basin = basins[nd]
        dists += distance_basin(basin, var, d)
    return(dists)

escape = {}
for i in range(len(peaks)): # to each peak i
    var_i = peaks[i]
    basin_i = basins[var_i]
    total = distance_all(peaks, var_i, D) # distances to peak i from all nodes
    for j in range(len(peaks)): # from aanother peak j
        var_j = peaks[j]
        basin_j = basins[var_j]
        dists = distance_basin(basin_j, var_i, D) # distance of j basins to LOi 
        norm = dists/total
        if dists > 0:
            escape[ (var_i, var_j) ] = {'wt': norm, 'numb': dists, 'total': total}

for e in escape:
  print(dict1[e[0]]['id'], "\t", dict1[e[1]]['id'],  "\t", round(escape[e]['wt'],4), "\t", escape[e]['numb'], "\t", escape[e]['total'])

# plot LON
LON = nx.DiGraph()
for peak in peaks: # var as key
    LON.add_node(peak)
    for k in dict1[peak]: # k includes 'var' and 'fit'
        LON.nodes[peak][k] = dict1[peak][k]
#print([dict1[n]['fit'] for n in LON.nodes])
        
for e in escape:
    LON.add_edge(e[0], e[1])
    LON.edges[e]['wt'] = escape[e]['wt']

# map fitness to color map
norm = mpl.colors.Normalize(vmin=0, vmax=1)
mapper = cm.ScalarMappable(norm=norm, cmap=mpl.cm.cool)
ndColors = [ mapper.to_rgba(LON.nodes[v]['fit']) for v in LON.nodes ] # colored by fitness
ndSizes = [ len(basins[p])*1.5 for p in LON.nodes ] # sized by basin size
egWidth = [ LON.edges[e]['wt']*2 for e in LON.edges ]  # edge width by weight

pos = nx.circular_layout(LON)
plt.figure(figsize = (6,6), dpi=300)
ec = nx.draw_networkx_edges(LON, pos, width=egWidth, connectionstyle="arc3,rad=0.3")
nc = nx.draw_networkx_nodes(LON, pos, 
                            node_color=ndColors, 
                            node_size=ndSizes, 
                            edgecolors=ndColors
                            )
nl = nx.draw_networkx_labels(LON, pos, font_size =10)
plt.show()
#plt.savefig("lon-nk.png")


