# Unstructured mesh done properly with Scala + GraphX

Here we'll do the unstructured mesh with graphx in scala, the native language of Spark.

First we import the necessary modules:

In [1]:
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

Now we're going to define the mesh geometry; we'll define some constants about the mesh and distribute them to all worker processes:

In [2]:
val n = 11
val npts = n*n
val dx_val = 1./(n-1.)
val dy_val = 1./(n-1)

val mesh = sc.broadcast((n, npts, dx_val, dy_val))

val npts = mesh.value._2
val nx = mesh.value._1
val ny = mesh.value._1
val dx = mesh.value._3
val dy = mesh.value._4

Functions to create the grid geometry follow; we'll use a triangular mesh which in this case is regular but
we're not asusming that anywhere in the calculations

In [3]:
type nodetype = (Double, (Double, Double))  // (dens, (x, y))
type edgetype = (Double, Double)            // (deltax, deltay)

val idx_to_grid : Long => (Long, Long) = idx => (idx % nx, idx/nx)
val grid_to_idx = (i:Long, j:Long) => j*nx+i
val valid_point = (i:Long, j:Long) => i >= 0 && i < nx && j >= 0 && j < ny
val idx_to_x = (idx:Long) => idx_to_grid(idx)._1*dx
val idx_to_y = (idx:Long) => idx_to_grid(idx)._2*dy
val deltax = (start:Long, end:Long) => idx_to_x(end) - idx_to_x(start)
val deltay = (start:Long, end:Long) => idx_to_y(end) - idx_to_y(start)
   
def edges_from_idx( idx:Long ): List[Edge[edgetype]] = {
    val (i, j) = idx_to_grid( idx )
    val deltas : List[(Long, Long)] = List((+1,0), (0,+1), (+1,+1))
    return deltas.map(d => (i+d._1, j+d._2)).
                  filter(ij => valid_point(ij._1, ij._2)).
                  map(ij => grid_to_idx(ij._1, ij._2)).
                  map(end => Edge(idx, end, (deltax(idx, end), deltay(idx, end))))
}

def node_from_idx( idx:Long ) : (VertexId, nodetype) = {
    val initial_posx = 0.3
    val initial_posy = 0.3
    val sigma = 0.15
    
    val x = idx_to_x(idx)
    val y = idx_to_y(idx)
    val distx = x - initial_posx
    val disty = y - initial_posy
    val density = scala.math.exp(-(distx*distx + disty*disty)/(sigma*sigma))
    
    return (idx, (density, (x, y)))
}

Now we construct the graph from the list of edges and nodes.  The nodes contain their density value and also their
position, simply for easy of plotting

In [4]:
val nodelist = (0L to npts-1).toList.map(node_from_idx)
val vertices: RDD[(VertexId, (Double, (Double, Double)))] = sc.parallelize(nodelist) 

val edgelist = (0L to npts-1).toList.map(edges_from_idx).flatten
val edges: RDD[Edge[(Double, Double)]] = sc.parallelize(edgelist)

val graph = Graph(vertices, edges)

To plot the graph we'll save it as a text file, read it in in python which has nice plotting capabilities, and plot:

In [5]:
def save_graph(name:String, graph:Graph[nodetype, edgetype]) : Unit = {
    graph.edges.map(e => e.srcId + "," + e.dstId).saveAsTextFile(name+"-edges.txt")
    graph.vertices.map(v => v._1 + "," + v._2._1 + "," + v._2._2._1 + "," + v._2._2._2).saveAsTextFile(name+"-vtxs.txt")
}

In [6]:
save_graph("init", graph)

In [7]:
%%PySpark
import matplotlib
matplotlib.use('Agg')
import matplotlib.pylab as plt

def load_graph(name):
    edgeRDD = sc.textFile(name+'-edges.txt').collect()
    edges = [tuple(int(item) for item in tuple(edge.split(','))) for edge in edgeRDD]
    
    vertexRDD = sc.textFile(name+'-vtxs.txt').collect()
    vertexdata = [tuple(float(item) for item in tuple(line.split(','))) 
                  for line in vertexRDD]
                  
    xydict = {int(id):(x,y) for id, dens, x, y in vertexdata}
    denss = [dens for id, dens, x, y in vertexdata]
    xs = [x for id, dens, x, y in vertexdata]
    ys = [y for id, dens, x, y in vertexdata]
    
    lines = [(xydict[s], xydict[e]) for s, e in edges]
    return xs, ys, denss, lines

def show_results(name):
    x, y, dens, lines = load_graph(name)
    plt.tricontourf(x, y, dens, cmap=plt.cm.Blues)
    plt.plot(x, y, 'ko', markersize=1)
    
    xlines = [x for line in lines for x in [line[0][0], line[1][0], None]]
    ylines = [y for line in lines for y in [line[0][1], line[1][1], None]]

    plt.plot(xlines, ylines, 'k-', linewidth=.5, alpha=0.25)
    plt.savefig(name+'.png')
    plt.close()

show_results('init')

  return np.alltrue(x[1:] - x[0:-1] >= 0)


![Initial](init.png)

Ok, so now we define the advection.  We'll set up a velocity and a timestep, and then create the advection 
routine: we collect all the terms necessary at each vertex and do the update.

In [8]:
val u = sc.broadcast(Array(1., 1.))
val dt = sc.broadcast(0.025)

In [9]:
type msgtype = (Double, (Double, Double), Double, Double, Double, Double, Double)

def src_msg(dxy:(Double, Double), srcAttr: nodetype, dstAttr: nodetype) : msgtype = {
   val (dx, dy) = dxy 
   val (srcDens, (xs, ys)) = srcAttr
   val (dstDens, (xd, yd)) = dstAttr
   return (srcDens, (xs, ys), dx*dx, dy*dy, dx*dy, (dstDens-srcDens)*dx, (dstDens-srcDens)*dy)
}

def dest_msg(dxy:(Double, Double), srcAttr: nodetype, destAttr: nodetype) : msgtype = {
   val (dx, dy) = (-dxy._1, -dxy._2)
   return src_msg((dx, dy), destAttr, srcAttr)
}

def apply_update(id:VertexId, attr:msgtype) : nodetype = {
    val (dens_0, point, dx2, dy2, dxdy, drhodx, drhody) = attr
    val weighted_dy_dx = dxdy/dx2
    val grad_y = (drhody - weighted_dy_dx * drhodx) / ( dy2 - weighted_dy_dx * dxdy + 0.001)
    val grad_x = (drhodx - dxdy*grad_y)/(dx2 + 0.001)
    return ((dens_0 - dt.value*(grad_x*u.value(0) + grad_y*u.value(1))), point)
}

In [10]:
def step(g:Graph[nodetype, edgetype]) : Graph[nodetype, edgetype] = {
    val terms = g.aggregateMessages[msgtype](
        // Map
        triplet => { 
            triplet.sendToSrc(src_msg(triplet.attr, triplet.srcAttr, triplet.dstAttr))
            triplet.sendToDst(dest_msg(triplet.attr, triplet.srcAttr, triplet.dstAttr))
          },
        // Reduce
        (a, b) => (a._1, a._2, a._3 + b._3, a._4 + b._4, a._5 + b._5, a._6 + b._6, a._7 + b._7)
    )
    
    val new_nodes = terms.mapValues((id, attr) => apply_update(id, attr))
    
    return Graph(new_nodes, graph.edges)
}

All done! We now have a fault-tolerant unstructured mesh advection program.  We can run 200 steps and see the results:

In [11]:
var i = 0
var newgraph = graph
for (i <- 1 to 200) {
    newgraph = step(graph)
}
save_graph("final", newgraph)

In [12]:
%%PySpark
show_results("final")

![one step](final.png)