<div class="clearfix" style="padding: 10px; padding-left: 0px">
<img src="resources/img/jupyter-logo.png" class="pull-left"  style="display: block; height: 20px; margin-top: 10px;">
<img src="resources/img/softbutterfly-logo.png" class="pull-right" style="display: block; height: 40px; margin: 0;">
</div>

<h1>Jupyter Notebok: Un entorno de programación interactivo<br><small>Parte II: Monitoreo de tareas de Spark en Jupyter Notebook</small></h1>


<b>Martín Josemaría Vuelta Rojas</b><br><br>
<i>Universidad Nacional Mayor de San Marcos</i><br>
<span>Facultad de Ciencias Físicas</span><br><br>
<i>SoftButterfly</i><br>
<span>Líder del Área de Desarrollo</span>

# Ejemplos

<div style="padding: 16px; border: 1px solid #dedede; border-left: 5px solid #1b809e; border-radius: 4px;margin:0;">
<h3 style="color:#1b809e; margin: 0; font-weight: normal;">Suma de números</h3>

<p>
Obtener la suma de una lista un millon de números aleatrios.
</p>
</div>

In [1]:
from IPython.display import HTML
from IPython.display import Javascript
from IPython.display import display_javascript
from IPython.display import display_html

from urllib import request
from time import sleep
from uuid import uuid4

import threading
import json


class SparkNumberSum(object):
    widget = open("spark_monitor_widget.html").read()

    def __init__(self, ):
        self.uuid = uuid4().hex
        self.is_widget_displayed = False
        self.sum = 0
        self.rdd = None
        self.monitoring = threading.Thread(target=self.monitor)
        self.jobID = None
        self.appID = None
        
    def _display_html(self):
        if not self.is_widget_displayed:
            self.widget = self.widget % {
                'uuid': self.uuid,
                'appid': self.rdd.ctx.applicationId
            }
            
            display_html(
                HTML(self.widget)
            )
            
            self.is_widget_displayed = True
    
    def monitor(self):
        url_base = "http://127.0.0.1:4040/api/v1/applications/"
        url_pattern = url_base + '%(appid)s/jobs'
        
        if self.appID is None:
            response = request.urlopen(url_base)
            response = json.loads(response.read().decode("utf8"))
            response = response[0]
            self.appID = response["id"]
            
            response = json.dumps(response)
            
            jso = Javascript("getApplicationData_%(uuid)s(%(data)s)" % {'uuid': self.uuid, 'data': response})
            display_javascript(jso)
        
        numCompletedTasks = 0
        numTasks = 0

        sleep(0.05)
        url = url_pattern % {'appid': self.appID}
        
        while True:
            response = request.urlopen(url)
            response = json.loads(response.read().decode("utf8"))

            if self.jobID is None:
                response = response[0]
                
                if (response["status"] == "SUCCEEDED"):
                    sleep(0.05)
                    continue
                    
                self.jobID = response["jobId"]
            else:
                response = response[- self.jobID - 1]
            
            numCompletedTasks = response["numCompletedTasks"]
            numTasks = response["numTasks"]
            
            response = json.dumps(response)
            
            jso = Javascript("refreshJobProgress_%(uuid)s(%(data)s)" % {'uuid': self.uuid, 'data': response})
            display_javascript(jso)
            
            if numCompletedTasks >= numTasks:
                break
            
            sleep(0.005)
      
    
    def apply(self, data, numSlices=20):
        self.rdd = sc.parallelize(data, numSlices=numSlices)
        
        self._display_html()
        self.monitoring.start()
        
        self.sum = self.rdd.flatMap(lambda t: t[1]).sum()

        return self.sum

In [2]:
from numpy import random

spark_job = SparkNumberSum()
data = [(i, random.random(1000)) for i in range(50000)]

spark_job.apply(data, numSlices=50)

25003530.105893306

In [3]:
from numpy import random

spark_job_2 = SparkNumberSum()
data = [(i, random.random(1000)) for i in range(5000)]

spark_job_2.apply(data, numSlices=50)

2500100.0192016135

In [4]:
from numpy import random

spark_job_3 = SparkNumberSum()
data = [(i, random.random(1000)) for i in range(100)]

spark_job_3.apply(data, numSlices=50)

49937.157435067966

<div style="padding: 16px; border: 1px solid #dedede; border-left: 5px solid #1b809e; border-radius: 4px;margin:0;">
<h3 style="color:#1b809e; margin: 0; font-weight: normal;">Histograma de letras</h3>

<p>
Obtener el histograma de letras de un texto.
</p>
</div>

In [105]:
from IPython.display import HTML
from IPython.display import Javascript
from IPython.display import display_javascript
from IPython.display import display_html

from urllib import request
from time import sleep
from uuid import uuid4

import threading
import json


alphabet = [letter for letter in "abcdefghijklmnopqrstuvwxyz"]


class SparkLetterHistogram(object):
    widget = open("spark_monitor_widget.html").read()
    plot_widget = open("spark_monitor_plot_widget.html").read()

    def __init__(self, ):
        self.uuid = uuid4().hex
        self.is_widget_displayed = False
        self.is_plot_widget_displayed = False
        self.rdd = None
        self.monitoring = threading.Thread(target=self.monitor)
        self.jobID = None
        self.appID = None
        self.histogram = dict().fromkeys(alphabet, 0)
        
    def _display_html(self):
        if not self.is_widget_displayed:
            self.widget = self.widget % {
                'uuid': self.uuid,
                'appid': self.rdd.ctx.applicationId
            }
            
            display_html(
                HTML(self.widget)
            )
            
            self.is_widget_displayed = True
    
    
    def monitor(self):
        url_base = "http://127.0.0.1:4040/api/v1/applications/"
        url_pattern = url_base + '%(appid)s/jobs'
        
        if self.appID is None:
            response = request.urlopen(url_base)
            response = json.loads(response.read().decode("utf8"))
            response = response[0]
            self.appID = response["id"]
            
            response = json.dumps(response)
            
            jso = Javascript("getApplicationData_%(uuid)s(%(data)s)" % {'uuid': self.uuid, 'data': response})
            display_javascript(jso)
        
        numCompletedTasks = 0
        numTasks = 0

        sleep(0.05)
        url = url_pattern % {'appid': self.appID}
        
        while True:
            response = request.urlopen(url)
            response = json.loads(response.read().decode("utf8"))

            if self.jobID is None:
                response = response[0]
                
                if (response["status"] == "SUCCEEDED"):
                    sleep(0.05)
                    continue
                    
                self.jobID = response["jobId"]
            else:
                response = response[- self.jobID - 1]
            
            numCompletedTasks = response["numCompletedTasks"]
            numTasks = response["numTasks"]
            
            response = json.dumps(response)
            
            jso = Javascript("refreshJobProgress_%(uuid)s(%(data)s)" % {'uuid': self.uuid, 'data': response})
            display_javascript(jso)
            
            if numCompletedTasks >= numTasks:
                break
            
            sleep(0.005)
    
    def _display_html_plot(self):
        if not self.is_plot_widget_displayed:
            self.plot_widget = self.plot_widget % {
                'uuid': self.uuid,
            }
            
            display_html(
                HTML(self.plot_widget)
            )
            
            self.is_plot_widget_displayed = True
    
    def plot(self, height=None, width=None):
        self._display_html_plot()
        data = json.dumps(dict(zip(["categories", "frequency"], zip(*spark_job_4.histogram))))
        
        jso = Javascript("plot_%(uuid)s(%(data)s, %(height)s, %(width)s)" % {'uuid': self.uuid, 'data': data, 'height': json.dumps(height), 'width': json.dumps(width)})
        display_javascript(jso)
        
        
    def apply(self, data, numSlices=20):
        data = [(letter.lower(), 1) for letter in text if letter.lower() in alphabet]
        
        self.rdd = sc.parallelize(data, numSlices=numSlices)
        
        self._display_html()
        self.monitoring.start()

        self.histogram.update(dict(self.rdd.groupByKey().mapValues(sum).collect()))
        self.histogram = list(self.histogram.items())
        self.histogram.sort()

        return self.histogram

In [107]:
spark_job_4 = SparkLetterHistogram()

with open('./Lorem ipsum.txt', 'r') as text:
    text = text.read()
    spark_job_4.apply(text)

    print("{0:>6s}  {1:>8s}".format("Letter", "Freq"))
    for l, f in spark_job_4.histogram:
        print("{0:>6s}  {1:>8d}".format(l,f))

Letter      Freq
     a      4249
     b       656
     c      2181
     d      1509
     e      5889
     f       532
     g       612
     h       291
     i      5261
     j        65
     k         0
     l      3090
     m      2439
     n      3193
     o      2184
     p      1233
     q       712
     r      2910
     s      4364
     t      4338
     u      4697
     v       896
     w         0
     x       101
     y         0
     z         0


In [108]:
spark_job_4.plot()

In [114]:
a = json.dumps(None)
a

'null'