# TensorFlow with Apache Spark
This notebook is example how to use Apache Spark with TensorFlow. To run this example, there have to be installed both Apache Spark and TensorFlow. In command line simply enter:
<nbs><code>PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook --port=7777" pyspark</code> (you can add more configure parameters).
<nbs>
Results are ploted with <a href='www.plotly.ly'>Plotly</a>, and that are results from local machine. We assume that results for Apache Spark will be much better if configured and runned across cluster.

In [1]:
import tensorflow as tf
import numpy as np
import time

from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
import plotly.graph_objs as go

init_notebook_mode(connected=True)


%matplotlib inline 

## Plot Function
Plot function will draw 4 line function. Results got with running:
- Spark + GPU
- Spark + CPU
- Sequential on GPU
- Sequential on CPU

In [15]:
def plot(x_ax, data_p, data_s, title_value, x_name = 'Square matrix size', y_name = 'Time (seconds)'):
    gpu_spark = [x['EXECUTION_TIME'] for x in data_p if x['DEVICE']=='/gpu:0']
    gpu_seq = [x['EXECUTION_TIME'] for x in data_s if x['DEVICE']=='/gpu:0']
    cpu_spark = [x['EXECUTION_TIME'] for x in data_p if x['DEVICE']=='/cpu:0']
    cpu_seq = [x['EXECUTION_TIME'] for x in data_s if x['DEVICE']=='/cpu:0']
    
    gpu_spark_fig = go.Scatter(
        x = x_ax,
        y = gpu_spark,
        mode = 'lines+markers',
        name = 'gpu_spark',
        line = dict(
            width = 4,
            dash = 'dashdot'
        )
    )
    
    cpu_spark_fig = go.Scatter(
        x = x_ax,
        y = cpu_spark,
        mode = 'lines+markers',
        name = 'cpu_spark',
        line = dict(
            width = 4,
            dash = 'dashdot'
        )
    )
    
    gpu_seq_fig = go.Scatter(
        x = x_ax,
        y = gpu_seq,
        mode = 'lines+markers',
        name = 'gpu_seq',
    )
    
    cpu_seq_fig = go.Scatter(
        x = x_ax,
        y = cpu_seq,
        mode = 'lines+markers',
        name = 'cpu_seq',
    )

    data = [gpu_seq_fig, gpu_spark_fig, cpu_seq_fig, cpu_spark_fig]

    layout = dict(title = title_value,
              xaxis = dict(title = x_name),
              yaxis = dict(title = y_name),
              )
    fig = dict(data=data, layout=layout)

    iplot(fig)

## Execute
These two functions <code>execute</code> and <code>make_expression</code> are function that make and run tensorflow code. Function <code>make_expression</code> has to be passed in Spark <code>RDD.map</code> function to be executed in parallel.

In [3]:
def execute(expression):
    config = tf.ConfigProto()
    config.gpu_options.allow_growth=True
    with tf.Session(config=config) as sess:
        start_time = time.time()
        result = sess.run(expression)
        time_taken = time.time() - start_time
    
    return time_taken

In [4]:
def make_expression(size, device):
    shape = (size, size)
    data_type = tf.float16
    with tf.device(device):
        a = tf.random_normal(shape=shape, dtype=data_type)
        b = tf.random_normal(shape=shape, dtype=data_type)
        c = tf.matmul(a,b)
    
    return execute(c)

## Test
For given sizes of square matrix and number of operation for processing this function first measure execution time with Spark (CPU and GPU), and then, measures execution time of same operation, but sequential in for loop.

In [5]:
def test(sizes, number):
    devices = ['/gpu:0','/cpu:0']
    parallel_results = []
    sequential_results = []
    
    #WITH SPARK
    print 'SPARK'
    for device in devices:
        for size in sizes:
            start_time = time.time()
            sc.parallelize([(size,device)]*number).map(lambda x:make_expression(x[0],x[1])).collect()            
            time_taken = time.time() - start_time
            #print 'DEVICE',device,'MATRIX_SHAPE',size,'EXECUTION_TIME',time_taken,'EXPR_NUM',number
            #print '*******************************************************************************'
            parallel_results.append(
                {'DEVICE':device,'MATRIX_SHAPE':size,'EXECUTION_TIME':time_taken,'EXPR_NUM':number})
            tf.Session()
    
    #WITHOUT SPARK
    print 'SEQUENTIAL'
    for device in devices:
        for size in sizes:
            start_time = time.time()
            
            for i in range(number):
                make_expression(size, device)
            
            time_taken = time.time() - start_time
            #print 'DEVICE',device,'MATRIX_SHAPE',size,'EXECUTION_TIME',time_taken,'EXPR_NUM',number
            #print '*******************************************************************************'
            sequential_results.append({'DEVICE':device,'MATRIX_SHAPE':size,'EXECUTION_TIME':time_taken,'EXPR_NUM':number})
    
    return parallel_results,sequential_results
        

In [16]:
sizes = [100, 500, 1000, 2000]
n = [10, 30, 100, 200]
title_shape = 'Process {} operations with and without Spark using CPU and GPU'

## Results

In [27]:
r1 = test(sizes, n[0])
plot(sizes, r1[0], r1[1], title_shape.format(n[0]))

In [28]:
r2 = test(sizes, n[1])
plot(sizes, r2[0], r2[1], n[1], title_shape.format(n[1]))

In [29]:
r3 = test(sizes, n[2])
plot(sizes, r3[0], r3[1], n[2], title_shape.format(n[2]))

In [17]:
#r4 = test(sizes, n[3])
plot(sizes, r4[0], r4[1], title_shape.format(n[3]))

In [19]:
plot(n, r4[0], r4[1],
     'Time needed for N matrix multiplication of size {}'.format(sizes[-1]),'# of matrix multiplication N')