In [None]:
import pandas as pd
import numpy as np
import gzip


class SOMToolBox_Parse:
    
    def __init__(self, filename):
        self.filename = filename
    
    def read_weight_file(self,):
        df = pd.DataFrame()
        if self.filename[-3:len(self.filename)] == '.gz':
            with gzip.open(self.filename, 'rb') as file:
                df, vec_dim, xdim, ydim = self._read_vector_file_to_df(df, file)
        else:
            with open(self.filename, 'rb') as file:
                df, vec_dim, xdim, ydim = self._read_vector_file_to_df(df, file)

        file.close()            
        return df.astype('float64'), vec_dim, xdim, ydim


    def _read_vector_file_to_df(self, df, file):
        xdim, ydim, vec_dim, position = 0, 0, 0, 0
        for byte in file:
            line = byte.decode('UTF-8')
            if line.startswith('$'):
                xdim, ydim, vec_dim = self._parse_vector_file_metadata(line, xdim, ydim, vec_dim)
                if xdim > 0 and ydim > 0 and len(df.columns) == 0:
                    df = pd.DataFrame(index=range(0, ydim * xdim), columns=range(0, vec_dim))
            else:
                if len(df.columns) == 0 or vec_dim == 0:
                    raise ValueError('Weight file has no correct Dimensional information.')
                position = self._parse_weight_file_data(line, position, vec_dim, df)
        return df, vec_dim, xdim, ydim


    def _parse_weight_file_data(self, line, position, vec_dim, df):
        splitted=line.split(' ')
        try:
            df.values[position] = list(np.array(splitted[0:vec_dim]).astype(float))
            position += 1
        except: raise ValueError('The input-vector file does not match its unit-dimension.') 
        return  position


    def _parse_vector_file_metadata(self, line, xdim, ydim, vec_dim):
        splitted = line.split(' ')
        if splitted[0] == '$XDIM':      xdim = int(splitted[1])
        elif splitted[0] == '$YDIM':    ydim = int(splitted[1])
        elif splitted[0] == '$VEC_DIM': vec_dim = int(splitted[1])
        return xdim, ydim, vec_dim 
        

In [None]:
import itertools
import networkx as nx

class MetroSolver:
    """ Metro Map Solver
    
    """
    def __init__(self, lines, input_grid, metro_grid=None):
        self.lines = lines
        if metro_grid is None:
            metro_grid = input_grid
        self.input_grid = input_grid
        self.metro_grid = metro_grid


    def lines_to_graph(self, lines):
        """Convert list of lines to graph
        
        This function converts a list of lines, each composed of an array of points, to a
        networkx graph. The positions are encoded in each node's 'pos' attribute.
        """
        G = nx.Graph()

        offset = 0
        for nr, line in enumerate(lines):
            for i in range(len(line) - 1):
                G.add_edges_from([(offset + i, offset + i + 1, {'line': nr, 'sector': get_sector(line[i], line[i+1])})])
                G.nodes(data=True)[offset + i]['pos'] = line[i]
            G.nodes(data=True)[offset + len(line) - 1]['pos'] = line[-1] 
            offset += len(line)

        return G


    def neighborhood_generator(self):
        """Sequentially point generator
        
        Here we generate 2D points at discrete positions starting from the origin.
        The generated points are spiraling outwards towards infinity.
        """
        r = 1
        pos = np.array([0,0])
        directions = [np.array([1,0]), np.array([0,-1]), np.array([-1,0]), np.array([0,1])]
        d_idx = 0
        yield pos
        pos = np.array([0,r])
        yield pos
        while True:
            if np.abs(pos[0]) == np.abs(pos[1]): # corner
                if d_idx == 3: # completed 4th direction
                    r += 1
                    d_idx = 0
                    pos[1] = r
                    yield pos
                    continue
                d_idx += 1
            pos = pos + directions[d_idx]
            yield pos


    def gen_neighbors(self, pt, n_neighbors = 5):
        neighbors = []
        nearest = np.round(pt)

        for offset in self.neighborhood_generator():
            new_pt = nearest + offset

            dist = np.linalg.norm(new_pt-pt)
            if dist >= 1:
                neighbors.append((new_pt, dist))
            if len(neighbors) >= n_neighbors:
                break

        neighbors.sort(key=lambda a: a[1])
        return neighbors


    def get_sector(self, v1, v2):
        # 3  2  1
        # 4 -1  0
        # 5  6  7

        if v1[0] < v2[0] and v1[1] == v2[1]:   # right
            return 0
        elif v1[0] < v2[0] and v1[1] < v2[1]:  # right up
            return 1
        elif v1[0] == v2[0] and v1[1] < v2[1]: # up
            return 2
        elif v1[0] > v2[0] and v1[1] < v2[1]:  # left up
            return 3
        elif v1[0] > v2[0] and v1[1] == v2[1]: # left
            return 4
        elif v1[0] > v2[0] and v1[1] > v2[1]:  # left down
            return 5
        elif v1[0] == v2[0] and v1[1] > v2[1]: # down
            return 6
        elif v1[0] < v2[0] and v1[1] > v2[1]:  # right down
            return 7
        if np.any(v1 != v2):
            print(v1, v2)

        # points equal
        return -1 


    def calc_penalty(self, corner_penalty, sector_diff):
        if sector_diff == 4:
            return corner_penalty * 8
        sector_diff = np.mod(sector_diff, 4)
        if sector_diff == 1:
            return 0
        if sector_diff == 2:
            return corner_penalty
        if sector_diff == 3:
            return corner_penalty * 6
        if sector_diff == 0:
            return 0


    def gen_feasible_neighbors(self, snapped, pt, n_neighbors = 4, corner_penalty = 0.7):
        # search more neighbors than return, increases quality of neighbors with a 
        # relatively small additional computational effort
        MULTIPLIER = 2

        neighbors = []
        nearest = np.round(pt)
        prev = snapped[-1]

        prev_direction = None
        prev_sector = None
        if len(snapped) > 1:
            prev_sector = self.get_sector(snapped[-2], snapped[-1])

        for offset in self.neighborhood_generator():
            new_pt = nearest + offset

            diff = np.abs(new_pt - prev)
            if new_pt[0] == prev[0] or new_pt[1] == prev[1] or diff[0] == diff[1]:
                penalty = 0
                if prev_sector is not None:
                    new_sector = self.get_sector(prev, new_pt)
                    abs_sec_diff = np.abs(new_sector - prev_sector)
                    penalty = self.calc_penalty(corner_penalty, abs_sec_diff)

                dist = np.linalg.norm(new_pt - pt)
                if dist >= 1:
                    neighbors.append((new_pt, dist + penalty))

            if len(neighbors) >= n_neighbors * MULTIPLIER:
                break

        neighbors.sort(key=lambda a: a[1])
        return neighbors[:n_neighbors]


    def snap_line(self, line, snapped, dist, lb):
        if len(line) == 0:
            return dist, snapped
        pt = line[0]
        best_line = None
        best = None
        #print(f"snapped: {snapped} line: {line}")
        #print(snapped)
        neighbors = self.gen_feasible_neighbors(snapped, pt)
        for n_pt, n_dist in neighbors:
            if (dist + n_dist) > lb:
                continue
            n_snapped = snapped[:]
            n_snapped.append(n_pt)
            total_dist, new_snapped = self.snap_line(line[1:], n_snapped, dist + n_dist, lb)
            if total_dist is not None and total_dist < lb:
                lb = total_dist
                best = total_dist
                best_line = new_snapped
        return best, best_line


    def solve(self):
        # Calculate a scaling factor for transforming the lines from the SOM grid
        # into the metro grid coordinates
        scale = np.array([md / sd for sd, md in zip(self.input_grid, self.metro_grid)])
        dist_threshold = 3
        snapped_lines = []
        for idx, line in enumerate(self.lines):
            # Apply the transformation from SOM grid coordinates into metro grid coordinates
            line = line * scale
            print(f"snapping line {idx+1}/{len(self.lines)}")
            start = line[0]
            neighbors = self.gen_neighbors(start)
            best_dist = 999999
            snapped = None
            for pt, dist in neighbors:
                n_dist, n_snapped = self.snap_line(line[1:], [pt], dist, best_dist)
                if n_dist is None:
                    continue
                if n_dist < best_dist:
                    #print(n_snapped)
                    best_dist = n_dist
                    snapped = n_snapped
            # Transform the line back from metro grid coordinates into SOM grid coordinates
            snapped /= scale
            snapped_lines.append(snapped)
        return snapped_lines

In [None]:
import numpy as np
from scipy.spatial import distance_matrix, distance
from ipywidgets import Layout, HBox, Box, widgets, interact
import plotly.graph_objects as go
import matplotlib.pyplot as plt
import plotly.express as px
import plotly.colors


class SomViz:
    
    def __init__(self, weights=[], m=None, n=None):
        self.weights = weights
        self.m = m
        self.n = n

    def umatrix(self, som_map=None, color="Viridis", interp = "best", title=""):
        um =np.zeros((self.m *self.n, 1))
        neuron_locs = list()
        for i in range(self.m):
            for j in range(self.n):
                neuron_locs.append(np.array([i, j]))
        neuron_distmat = distance_matrix(neuron_locs,neuron_locs)

        for i in range(self.m * self.n):
            neighbor_idxs = neuron_distmat[i] <= 1
            neighbor_weights = self.weights[neighbor_idxs]
            um[i] = distance_matrix(np.expand_dims(self.weights[i], 0), neighbor_weights).mean()

        if som_map is None:
            return self.plot(um.reshape(self.m,self.n), color=color, interp=interp, title=title)    
        else:
            som_map.data[0].z = um.reshape(self.m,self.n)

    def hithist(self, som_map=None, idata = [], color='RdBu', interp = "best", title=""):
        hist = [0] *self.n *self.m
        for v in idata: 
            position =np.argmin(np.sqrt(np.sum(np.power(self.weights - v, 2), axis=1)))
            hist[position] += 1    
        
        if som_map is None:
            return self.plot(np.array(hist).reshape(self.m,self.n), color=color, interp=interp, title=title)        
        else:
            som_map.data[0].z = np.array(hist).reshape(self.m,self.n)


    def component_plane(self, som_map=None, component=0, color="Viridis", interp = "best", title=""):
        if som_map is None:
            return self.plot(self.weights[:,component].reshape(-1,self.n), color=color, interp=interp, title=title)   
        else:
            som_map.data[0].z = self.weights[:,component].reshape(-1,n)


    def __gen_sequential_colors(self, levels, colors=px.colors.sequential.Jet):
        """Generate a color sequence
        
        Generates a color sequence with the specified number of levels based on the
        provided colormap.
        """
        color_sequence = []
        n_colors = len(colors)
        n_levels = levels

        color_sequence.append(colors[0])

        if n_colors > 1:
            color_step = 1 / (n_colors - 1)
        else:
            return color_sequence * n_levels
    
        for i in range(1, n_levels-1):
            level_pos = i / (n_levels - 1)
            color_index = int(level_pos/color_step)

            intermediate = (level_pos - color_index * color_step)/color_step
            color_sequence.append(plotly.colors.find_intermediate_color(colors[color_index], colors[color_index+1], intermediate, colortype='rgb'))

        color_sequence.append(colors[-1])
        return color_sequence


    def metro(self, som_map=None, bins=6, metro_grid=(10,10)):
        water = [
            (0.0, 'rgb(255,255,255)'),
            (0.33, 'rgb(255,255,255)'),
            (0.33, 'rgb(198,219,239)'),
            (1.0, 'rgb(198,219,239)')
        ] #(0.66, 'rgb(107,174,214)'), (1.0, 'rgb(33,113,181)')]
        if som_map is None:
            som_map = self.umatrix(color=water, interp='best', title='U-matrix SOMToolBox') 

        lines = []
        n_lines = self.weights.shape[1]
        for component in range(n_lines):
            raw = self.weights[:,component].reshape(self.m, self.n)
            ranges = np.linspace(raw.min(), raw.max(), bins)
            binned = np.digitize(raw, ranges)
            stations = []
            for i in range(1, bins+1):
                match = np.argwhere(binned == i)
                if match.shape[0] == 0:
                    print("layer empty")
                    continue
                stations.append(np.sum(match, axis=0)/match.shape[0])
            lines.append(stations)
        l1 = 1
        l2 = 3
        #print("#########BEFORE")
        #print(lines)
        solver = MetroSolver(lines=lines, input_grid=(self.m, self.n), metro_grid=metro_grid)
        lines2 = solver.solve()
        #lines2 = snap(lines, som_dimensions=(self.m, self.n), metro_grid=metro_grid)
        self.lines2 = lines2
        #print("#########AFTER")
        #print(lines2)
        
        colors = self.__gen_sequential_colors(len(lines), colors=px.colors.diverging.Portland)
                                                         
        for i, (line, col) in enumerate(zip(lines, colors)):
            #fig = self.plot(binned, interp=None, showscale=False)
            y, x = list(zip(*line))
            som_map.add_trace(go.Scatter(x=x, y=y, mode='lines+markers', name=f'Component {i}', line_shape='linear', line=dict(dash='dot', color=col)))
        
        for i, (line, col) in enumerate(zip(lines2, colors)):
            #fig = self.plot(binned, interp=None, showscale=False)
            y, x = list(zip(*line))
            som_map.add_trace(go.Scatter(x=x, y=y, mode='lines+markers', name=f'Component {i}', line_shape='linear', line=dict(width=6, color=col), marker=dict(size=10, line=dict(width=2,color='black'))))

        return som_map


    def plot(self, matrix, color="Viridis",interp = "none", title="", showscale=False):
        return go.FigureWidget(go.Heatmap(z=matrix, zsmooth=interp, showscale=showscale, colorscale=color), layout=go.Layout(width=700, height=700,title=title, title_x=0.5, plot_bgcolor='rgb(255,255,255)'))


In [None]:
import minisom as som
from sklearn import datasets, preprocessing

#smap = SOMToolBox_Parse('iris.wgt.gz')
#smap, sdim, smap_x, smap_y = smap.read_weight_file()

# Visualizaton
#viz = SomViz(smap.values.reshape(-1,sdim), smap_y, smap_x)
m = 20
n = 20

iris = datasets.load_iris().data
min_max_scaler = preprocessing.MinMaxScaler()
iris = min_max_scaler.fit_transform(iris)

s = som.MiniSom(m, n, iris.shape[1], sigma=0.8, learning_rate=0.7)
s.train_random(iris, 10000, verbose=False)

In [None]:
# Visualizaton
viz = SomViz(s._weights.reshape(-1,4), m, n)
display(viz.metro(bins=10, metro_grid=(20, 20)))

In [None]:
def unravel_network(G):
    # Edges
    edge_x = []
    edge_y = []
    for edge in G.edges():
        x0, y0 = G.nodes[edge[0]]['pos']
        x1, y1 = G.nodes[edge[1]]['pos']
        edge_x.append(x0)
        edge_x.append(x1)
        edge_x.append(None)
        edge_y.append(y0)
        edge_y.append(y1)
        edge_y.append(None)
        
    # Nodes
    node_x = []
    node_y = []
    for node in G.nodes():
        x, y = G.nodes[node]['pos']
        node_x.append(x)
        node_y.append(y)
        
    return edge_x, edge_y, node_x, node_y


def create_traces(edge_x, edge_y, node_x, node_y):
    # Plot Edges
    edge_trace = go.Scatter(
        x=edge_x, y=edge_y,
        line=dict(width=0.5, color='#888'),
        hoverinfo='none',
        mode='lines')
    
    # Plot Nodes
    node_trace = go.Scatter(
        x=node_x, y=node_y,
        mode='markers',
        hoverinfo='text',
        marker=dict(
            size=10,
            line_width=2))
    return edge_trace, node_trace


def create_figure(edge_trace, node_trace):
    # Create the final plot
    fig = go.Figure(
        data=[edge_trace, node_trace], 
        layout=go.Layout(
            showlegend=False,
            xaxis=dict(zeroline=False, showgrid=False),
            yaxis=dict(zeroline=False, scaleanchor="x", scaleratio=1, showgrid=False)
        )
    )
    return fig


traces = unravel_network(lines_to_graph(viz.lines2))
edge_trace, node_trace = create_traces(*traces)
fig = create_figure(edge_trace, node_trace)
fig

In [None]:
import pandas as pd
import minisom as som
from sklearn import datasets, preprocessing
#interp: False, 'best', 'fast', 
#color = 'viridis': https://plotly.com/python/builtin-colorscales/



#############################
######## miniSOM ############1/0
#############################
m=10
n=10

# Pre-processing 
#iris = datasets.load_iris().data
#min_max_scaler = preprocessing.MinMaxScaler()
#iris = min_max_scaler.fit_transform(iris)

# Train
#s = som.MiniSom(m, n, iris.shape[1], sigma=0.8, learning_rate=0.7)
#s.train_random(iris, 10000, verbose=False)

# Visualizaton
#viz_miniSOM = SomViz(s._weights.reshape(-1,4), m, n)
#um1 = viz_miniSOM.umatrix(color='magma', interp='best', title='U-matrix miniSOM')

#display(um1)

##########################################
######## read from SOMToolBox ############
##########################################
trainedmap = SOMToolBox_Parse('iris.vec')
idata, idim, idata_x, idata_y = trainedmap.read_weight_file()

#display(idata)
#print(f'idim: {idim}')
#print(f'idata_x: {idata_x}')
#print(f'idata_y: {idata_y}')

smap = SOMToolBox_Parse('iris.wgt.gz')
smap, sdim, smap_x, smap_y = smap.read_weight_file()

#display(smap)
#print(f'sdim: {sdim}')
#print(f'smap_x: {smap_x}')
#print(f'smap_y: {smap_y}')

# Visualizaton
viz_SOMToolBox = SomViz(smap.values.reshape(-1,sdim), smap_y, smap_x)
#um2 = viz_SOMToolBox.umatrix(color='viridis', interp=None, title='U-matrix SOMToolBox') 
#um3 = viz_SOMToolBox.hithist(som_map=None, idata=idata, color='RdBu', interp="best", title="Hithist")

display(viz_SOMToolBox.metro(bins=7, metro_grid=(20,20)))
#display(um2)
#display(um3)
#display(HBox([um2, um3]))