# MASS: Howto Mini App Streaming Source (MASS)

This notebooks demonstrates the usage data source apps that can be used for the development and characterizing of streaming application.

For Light Source - see TomoPy manual: <https://tomopy.readthedocs.io/en/latest/ipynb/tomopy.html>

In [None]:
%%capture

%pylab inline
%matplotlib inline

import pandas as pd
import matplotlib.pyplot as plt
import os, sys, time
sys.path.append("..")
import tomopy
import pandas as pd
import numpy
import ast
import dxchange
import tempfile
import pykafka
import base64
import io
import binascii
import tempfile
import pykafka
import datetime, time, json
import mass.kafka

## logging
import logging
logging.basicConfig(level=logging.ERROR)
logging.getLogger().setLevel(logging.ERROR)
logging.getLogger("py4j").setLevel(logging.ERROR)
logging.getLogger("radical.utils").setLevel(logging.ERROR)
 
import boto3
boto3.setup_default_session(profile_name='dev')    
    
# Pilot-Streaming
import pilot.streaming
sys.modules['pilot.streaming']

# Resource Setup

## Kafka

In [None]:
kafka_pilot_description1 = {
    "resource":"slurm+ssh://login1.wrangler.tacc.utexas.edu",
    "working_directory": os.path.join('/work/01131/tg804093/wrangler/', "work"),
    "number_cores": 48,
    "project": "TG-MCB090174",
    "queue": "normal",
    "walltime": 159,
    "type":"kafka"
}
kafka_pilot1 = pilot.streaming.PilotComputeService.create_pilot(kafka_pilot_description1)
kafka_pilot1.wait()
kafka_details = kafka_pilot1.get_details()
kafka_details

## Kinesis

In [None]:
number_partitions=4
pilot_compute_description = {
    "resource":"kinesis://awscloud.com",
    "number_cores": number_partitions,
    "type":"kinesis"
}
kinesis_pilot = pilot.streaming.PilotComputeService.create_pilot(pilot_compute_description)
kinesis_pilot.wait()
kinesis_details=kinesis_pilot.get_details()
kinesis_pilot.get_details()

## Dask

In [None]:
dask_pilot_description = {
    "resource":"slurm+ssh://login1.wrangler.tacc.utexas.edu",
    "working_directory": os.path.join('/work/01131/tg804093/wrangler/', "work"),
    "number_cores": 48,
    "project": "TG-MCB090174",
    "queue": "normal",
    "walltime": 159,
    "type":"dask"
}
dask_pilot = pilot.streaming.PilotComputeService.create_pilot(dask_pilot_description)
dask_pilot.wait()
dask_details = dask_pilot.get_details()
dask_details

# Mini App Test

## KMeans

### Kinesis

In [None]:
miniapp=mass.kafka.MiniApp(
                            #dask_scheduler=dask_details['master_url'],
                            dask_scheduler=None,
                            resource_url=kinesis_details["master_url"],
                            broker_service="kinesis",
                            number_parallel_tasks=2,
                            number_clusters=10,
                            number_points_per_cluster=10000,
                            number_points_per_message=1000,
                            number_messages=1,
                            number_dim=3,
                            number_produces=1,
                            number_partitions=number_partitions,
                            topic_name="test",
                            application_type="kmeans"
                           )
miniapp.run()

In [None]:
boto3.setup_default_session(profile_name='dev')
kinesis_client = boto3.client('kinesis', region_name='us-east-1')
stream_name = kinesis_details["master_url"].split("/")[1]
print("Stream Name: %s"%stream_name)
stream = kinesis_client.describe_stream(StreamName=stream_name)['StreamDescription']

messages = []
for shard in stream['Shards']:
    print("### %s - %s"%(stream_name, shard['ShardId']))
    shard_iterator = kinesis_client.get_shard_iterator(
        StreamName=stream_name,
        ShardId=shard['ShardId'],
        ShardIteratorType='AT_TIMESTAMP',  #'TRIM_HORIZON'|'LATEST'
        Timestamp=datetime.datetime.utcnow() - datetime.timedelta(minutes=30)
    )['ShardIterator']

    out = kinesis_client.get_records(ShardIterator=shard_iterator, Limit=1000)
    if out["Records"]:
        for record in out["Records"]:
            #data = json.loads()
            messages.append(record["Data"])
    else:
        print(out)
        time.sleep(1)

In [None]:
len(messages)

In [None]:
count = 0
number_total_points = 0
for message in messages:
    data_np = np.array(ast.literal_eval(message.decode("utf-8")))
    num_points = data_np.shape[0]
    number_dimensions = data_np.shape[1]
    count =  count + 1
    number_total_points = number_total_points + num_points
        
print("Total Messages: %d, Total Points: %d, Number Dimensions: %d"%(count, number_total_points, number_dimensions))

### Kafka Broker

Ensure that the correct amount of data was successfully written to Kafka

In [None]:
%%capture

%pylab inline
%matplotlib inline

import pandas as pd
import matplotlib.pyplot as plt
import os, sys, time
sys.path.append("..")
import tomopy
import pandas as pd
import numpy
import ast
import dxchange
import tempfile
import pykafka
import base64
import io
import binascii
import tempfile
import pykafka
import datetime, time, json
import mass.kafka

## logging
import logging
logging.basicConfig(level=logging.ERROR)
logging.getLogger().setLevel(logging.ERROR)
logging.getLogger("py4j").setLevel(logging.ERROR)
logging.getLogger("radical.utils").setLevel(logging.ERROR)
 
import boto3
boto3.setup_default_session(profile_name='dev')    
    
# Pilot-Streaming
import pilot.streaming
sys.modules['pilot.streaming']

In [None]:
%%time
miniapp=mass.kafka.MiniApp(
                            #dask_scheduler=dask_details['master_url'],
                            #dask_scheduler='tcp://c251-101:8786',
                            dask_scheduler=None,
                            resource_url='c251-126:2181',
                            #resource_url=kafka_details["master_url"],
                            broker_service="confluent",
                            number_parallel_tasks=1,
                            number_clusters=3,
                            number_points_per_cluster=1000,
                            number_points_per_message=1000,
                            number_messages=1,
                            number_dim=3,
                            number_produces=1,
                            number_partitions=1,
                            topic_name="test-kmeans",
                            application_type="kmeans"
                           )
miniapp.run()

In [None]:
kafka_details={}
kafka_details["master_url"] = 'c251-126:2181'
client = pykafka.KafkaClient(zookeeper_hosts=kafka_details["master_url"])
topic = client.topics["test-kmeans"]
consumer = topic.get_simple_consumer(reset_offset_on_start=True)
print("Brokers: %s, Number Brokers: %s"%(str(client.brokers), str(len(client.brokers))))

In [None]:
i
count = 0
number_total_points = 0
number_dimensions = 0
for i in range(100):
    message = consumer.consume(block=False)
    if message is not None:
        data_np = np.array(ast.literal_eval(message.value.decode("utf-8")))
        num_points = data_np.shape[0]
        number_dimensions = data_np.shape[1]
        count =  count + 1
        number_total_points = number_total_points + num_points
    #print "Consumed message: %d, Number Points: %d, Number Dimensions: %d"%\
    #        (count, num_points, number_dimensions)   
        
print("Total Messages: %d, Total Points: %d, Number Dimensions: %d"%(count, number_total_points, number_dimensions))

##  Light Source

This tests the light source reconstruction data production Mini App

Example GridRec Reconstruction

In [None]:
import binascii
import tempfile
import pykafka
import dxchange
import tomopy

def reconstruct(message):
    start = 0
    end = 2
    #msg_bin = base64.urlsafe_b64decode(message.value)
    tf = tempfile.NamedTemporaryFile(delete=True)
    #tf = open("test.h5", "w")
    tf.write(message)
    tf.flush()
    proj, flat, dark, theta = dxchange.read_aps_32id(tf.name, sino=(start, end))
    theta = tomopy.angles(proj.shape[0])
    proj = tomopy.normalize(proj, flat, dark)
    rot_center = tomopy.find_center(proj, theta, init=290, ind=0, tol=0.5)
    proj = tomopy.minus_log(proj)
    recon = tomopy.recon(proj, theta, center=rot_center, algorithm='gridrec')
    recon = tomopy.circ_mask(recon, axis=0, ratio=0.95)
    return recon
    #tf.close()

### Kinesis

In [None]:
%%time

pilot_compute_description = {
    "resource":"kinesis://awscloud.com",
    "number_cores": number_partitions,
    "type":"kinesis"
}
kinesis_light_pilot = pilot.streaming.PilotComputeService.create_pilot(pilot_compute_description)
kinesis_light_pilot.wait()
kinesis_light_details=kinesis_light_pilot.get_details()

miniapp=mass.kafka.MiniApp(
                           #dask_scheduler=dask_details['master_url'],
                           dask_scheduler=None,
                           resource_url=kinesis_light_details["master_url"],
                           broker_service="kinesis",
                           number_parallel_tasks=1,
                           number_messages=1,
                           number_produces=1,
                           number_partitions=number_partitions,
                           topic_name="light_test8",
                           application_type = "light"
                          )
miniapp.run()

In [None]:
boto3.setup_default_session(profile_name='dev')
kinesis_client = boto3.client('kinesis', region_name='us-east-1')
stream_name = kinesis_light_details["master_url"].split("/")[1]
print("Stream Name: %s"%stream_name)
stream = kinesis_client.describe_stream(StreamName=stream_name)['StreamDescription']

messages = []
for shard in stream['Shards']:
    print("### %s - %s"%(stream_name, shard['ShardId']))
    shard_iterator = kinesis_client.get_shard_iterator(
        StreamName=stream_name,
        ShardId=shard['ShardId'],
        ShardIteratorType='AT_TIMESTAMP',  #'TRIM_HORIZON'|'LATEST'
        Timestamp=datetime.datetime.utcnow() - datetime.timedelta(minutes=30)
    )['ShardIterator']

    out = kinesis_client.get_records(ShardIterator=shard_iterator, Limit=10)
    if out["Records"]:
        for record in out["Records"]:
            #data = json.loads()
            messages.append(record["Data"])
    else:
        print(out)
        time.sleep(1)
print("Read %d messages from Kinesis"%len(messages))

In [None]:
recon=reconstruct(messages[0])
plt.imshow(recon[0, :,:], cmap='Greys_r')
plt.show()

### Kafka

In [None]:
%%time
topic_name = "light_test"
miniapp=mass.kafka.MiniApp(
                           #dask_scheduler=dask_details['master_url'],
                           dask_scheduler=None,
                           resource_url=kafka_details["master_url"],
                           broker_service="kafka",
                           number_parallel_tasks=1,
                           number_messages=2,
                           number_produces=2,
                           number_partitions=number_partitions,
                           topic_name=topic_name,
                           application_type = "light"
                          )

In [None]:
print("Read from topic: %s"%topic_name)
client = pykafka.KafkaClient(zookeeper_hosts=kafka_details["master_url"])
topic = client.topics[topic_name]
consumer = topic.get_simple_consumer(reset_offset_on_start=True,fetch_message_max_bytes=10000000)
miniapp.run()
message = consumer.consume(block=False)

In [None]:
type(message)

In [None]:
%%time
recon=reconstruct(message.value)
plt.imshow(recon[0, :,:], cmap='Greys_r')
plt.show()

# Stop Pilots

In [None]:
kafka_pilot1.cancel()
dask_pilot.cancel()
kinesis_pilot.cancel()
kinesis_light_pilot.cancel()

**END**

---

---

***Scratch***

For testing serialization

In [None]:
import pkg_resources, base64
import binascii
import tempfile

data = None
with open("../mass/tooth.h5", "r") as f:
    data = f.read()
    
data_enc=binascii.hexlify(data)
print data_enc[:20]
print type(data_enc)

tf = tempfile.NamedTemporaryFile(delete=True)
#tf = open("test.h5", "w")
tf.write(binascii.unhexlify(data_enc))
tf.flush()
proj, flat, dark, theta = dxchange.read_aps_32id(tf.name, sino=(0, 2))

In [None]:
theta.dtype

In [None]:
count = 0
number_total_points = 0
read_bytes = 0
for i in range(100):
    message = consumer.consume(block=False)
    if message is not None:
        print "Message %d, Bytes: %d"%(count, len(message.value))
        reconstruct(message)
        read_bytes = read_bytes + len(message.value)
        count =  count + 1
   
        
print("Total Messages: %d, Read Bytes: %d"%(count, read_bytes))

General Kafka Test

In [None]:
producer.produce("hello")

In [None]:
message = consumer.consume(block=False)
print message.value

In [None]:
consumer.partitions

In [None]:
from distributed import Client
dask_distributed_client = Client('tcp://c251-136:8786')

#def map_test():
#    return 1


class DaskTest():
    
    def __init__(self):
        self.dask_distributed_client = Client('tcp://c251-136:8786')


    def map_test(self):
        return 1
    
    def run(self):
        tasks = []
        for block_id in range(3):
            tasks.append(self.dask_distributed_client.submit(self.map_test))
           
        return self.dask_distributed_client.gather(tasks)
        

In [None]:
t = DaskTest()
t.run()

In [None]:
tasks = []
for block_id in range(3):
    tasks.append(dask_distributed_client.submit(map_test))
           
dask_distributed_client.gather(tasks)
#f = dask_distributed_client.submit(map_test)