# A First Spark Notebook

Here we set up a small 1-d simulation in Spark and let it do the decomposition and updates on a cell-by-cell basis

In this first set of imports we get the environment ready. If this is the first notebook you're running in this VM, matplotlib may emit warnings about generating font caches for the first time - that's normal.

In [None]:
import numpy as np
import matplotlib.pylab as plt
%matplotlib inline
import findspark

Now we use findspark to figure out where `SPARK_HOME` is and put that in the python path, and create a spark context with 4 local processes

In [None]:
findspark.init()
from pyspark import SparkContext

That done, we write some code:

In [None]:
def thermal_simulation_1d(sc, ncells, nsteps, nprocs,
                           leftX=-10., rightX=+10.,
                           sigma=1.5, ao=1., coeff=.45):
    
    cells_per_proc = (ncells+nprocs-1) // nprocs
    # helper functions: 
    # - find out if a point is a boundary point or interior
    # - calculate the 1d diffusion stencil
    # - partition data in contiguous chuns
    # - collect the data and plot
    def is_interior(cell):                        # boilerplate
        """Is this point an interior point (eg, not guardcell?)"""
        index, temperature = cell
        return (index > 0) and (index < ncells-1)

    def stencil(cell):
        """Return the contributions of this cell to all neighbouring cells"""
        index, temperature = cell
        updates = [(index,  -2*coeff*temperature), 
                   (index-1, coeff*temperature), 
                   (index+1, coeff*temperature)]
        return [cell] + [delta for delta in updates if is_interior(delta)]
    
    def plot_data(data, usecolor=None):
        """Gets the data by partition for plot:
           for the given data, return a list of (index, value, partition#) triples."""
        def f(partition, iterator):
            yield [(i[0], i[1], partition) for i in iterator]
        alltriples = [triple for item in data.mapPartitionsWithIndex(f).collect()
                             for triple in item]
        x, y, color = list(zip(*alltriples))
        x = leftX + (rightX-leftX)/ncells * np.array(x)
        if usecolor is not None:
            color=usecolor
        plt.scatter(x, y, c=color, edgecolor='')

    # The main calculation
    # Step 1: Calculate initial conditions
    x = np.linspace(leftX, rightX, ncells)
    temperatures = ao * np.exp(-x*x/(2.*sigma*sigma))
    temps_and_idxs = [(i, temperatures[i]) for i in range(len(temperatures))]

    # Step 2: Distribute data across tasks
    # Plot initial conditions in red
    data = sc.parallelize(temps_and_idxs, nprocs)
    plot_data(data, usecolor='red')
    
    # Main loop: For each iteration,
    #  - calculate terms in the next step
    #  - and sum
    for step in range(nsteps):
        data = data.flatMap(stencil) \
                    .reduceByKey(lambda x, y:x+y)
            
    # Plot final results in black
    plot_data(data, usecolor='black')

In [None]:
with SparkContext("local[4]") as sc:
    thermal_simulation_1d(sc, ncells=50, nsteps=20, nprocs=4)