In [None]:
#import modules for CarbonIface
import socket, pickle, struct, urllib2

#import general modules
import threading
import io, os.path, time, math, threading, sys, random
import ConfigParser, ast

#custom consumer and carbon/graphite-interface modules
import avro.schema, avro.io
from carboniface import CarbonIface
from kafka import KafkaConsumer

#data analysis
import pandas as pd
import numpy as np


In [None]:
#from sc_watchdog.logger import log
class CarbonIface(object):
    
    def __init__(self, host, port, event_url = 'events'):
        '''Initialize Carbon Interface. 
        host: host where the carbon daemon is running
        port: port where carbon daemon is listening for pickle protocol on host
        event_url: web app url suffix where events can be added. It must be provided if add_event(...) is to 
                   be used. Otherwise an exception by urllib2 will raise
        '''
        self.host = host
        self.port = port
        self.url_post_event = "http://%s/%s/" % (host, event_url)
        self.__data = []
        self.__data_lock = threading.Lock()
        
    def add_data(self, metric, value, ts=None):
        if not ts:
            ts = time.time()
        if self.__data_lock.acquire():
            self.__data.append((metric, (ts, value)))
            self.__data_lock.release()
            return True
        return False
        
    def add_data_dict(self, dd):
        '''
        dd must be a dictionary where keys are the metric name, 
        each key contains a dictionary which at least must have 'value' key (optionally 'ts')
        
        dd = {'experiment1.subsystem.block.metric1': {'value': 12.3, 'ts': 1379491605.55},
              'experiment1.subsystem.block.metric2': {'value': 1.35},
             ...}
        '''
        if self.__data_lock.acquire():
            for k,v in dd.items():
                ts = v.get('ts', time.time())
                value = v.get('value')
                self.__data.append((k, (ts, value)))
            self.__data_lock.release()
            return True
        return False
    
    def add_data_list(self, dl):
        '''
        dl must be a list of tuples like:
        dl = [('metricname', (timestamp, value)),
              ('metricname', (timestamp, value)),
              ...]
        '''
        if self.__data_lock.acquire():
            self.__data.extend(dl)
            self.__data_lock.release()
            return True
        return False
        
    def send_data(self, data=None):
        '''If data is empty, current buffer is sent. Otherwise data must be like:
        data = [('metricname', (timestamp, value)),
              ('metricname', (timestamp, value)),
              ...]
        '''
        save_in_error = False
        if not data:
            if self.__data_lock.acquire():
                data = self.__data
                self.__data = []
                save_in_error = True
                self.__data_lock.release()
            else:
                return False
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        payload = pickle.dumps(data)
        header = struct.pack("!L", len(payload))
        message = header + payload
        s.connect((self.host, self.port))
        try:
            s.send(message)
        except:
            #log.exception('Error when sending data to carbon')
            if save_in_error:
                self.__data.extend(data)
            return False
        else:
            #log.debug('Sent data to {host}:{port}: {0} metrics, {1} bytes'.format(len(data), len(message), host = self.host, port=self.port))
            return True
        finally:
            s.close()
        
    def add_event(self, what, data=None, tags=None, when=None):
        if not when: when = time.time()
        postdata = '{{"what":"{0}", "when":{1}'.format(what, when)
        if data: postdata += ', "data":"{0}"'.format(str(data))
        if tags: postdata += ', "tags": "{0}"'.format(str(tags))
        postdata += '}'
        req = urllib2.Request(self.url_post_event)
        req.add_data(postdata)
        
        try:
            urllib2.urlopen(req)
        except Exception, _:
            #log.exception('Error when sending event to carbon')
            pass

In [None]:
#from MR, SVDS
class PythonConsumerThread(object):
    """ Threading example class
    The run() method will be started and it will run in the background
    until the application exits.
    """

    def __init__(self,flag):
        self.cancelled = False
        self.read_config()
        self.carboniface_loader()
        self.flag = flag
        #instantiate variables for spreading out graphite messages
        self.last_send_time=time.time()
        self.data=[]
        #for analyzing in python
        #self.setup_history()
        self.msg_num=0
        #spin up the thread
        thread = threading.Thread(target=self.run, args=())
        thread.daemon = True                            # Daemonize thread
        thread.start()                                  # Start the execution
        print self, 'created'
        
    def run(self):
        """ Method that runs forever """
        while not self.cancelled:
            self.consume_messages()
            #self.consume_messages_synthetic()
            #time.sleep(1/self.fps)
        else: 
            print 'Consumer Thread Terminated'
    
    def cancel(self):
        """End this timer thread"""
        self.cancelled = True
    
    def read_config(self):
        config_directory=(os.path.abspath(os.path.join(os.path.dirname(''), '..', 'conf')) + '/')
        config = ConfigParser.RawConfigParser()
        config_file='rpi_consumer.cfg'
        config.read(config_directory+config_file)
        self.host = str(config.get('MSG','host'))
        self.port = int(config.get('MSG','port'))
        self.event_url = str(config.get('MSG','event_url'))
        self.topic=str(config.get('MSG','topic'))
        self.input_kafka_topic=str(config.get('MSG','input_kafka_topic'))
        self.bootstrap_servers=ast.literal_eval(config.get('MSG','bootstrap_servers'))
        self.group_id=str(config.get('MSG','group_id'))
        self.fps=float(config.get('MSG','fps'))
        #get schema for csv
        self.channel_schema=ast.literal_eval(config.get('MSG','channel_schema'))
        self.msg_per_sec=float(config.get('MSG','msg_per_sec'))
        print 'schema:', self.channel_schema

    def carboniface_loader(self):
        #information for carbon interface class
        self.data = []
        self.carbon = CarbonIface(host=self.host, port=self.port, event_url=self.event_url)
        print self.carbon,'created'
        
    def add_consumer_message_to_carbon_queue(self,datum,topic):
        #find channel name and add it to the general topic
        topic += '.'+str((datum[0])) 
        #assume the time is the last entry in the schema
        timestamp=(datum[-1])
        #print timestamp, (time.time())
        for i in range(1,len(datum)-1):
            #make the topic longer using the known schema information
            topic_specific=topic + '.'+str(self.channel_schema[i]) 
            #append the data in the following format (topic, (time, value)), the time,value need to be a tuple
            self.data.append((topic_specific,(timestamp,datum[i])))
        
    def create_artificial_message(self):
        #AF3 (left frontal), AF4 (right frontal), T7 (left temporal), T8 (right temporal), and Pz (central parietal).sensor_array=['AF3','AF4','T7','T8','Pz']
        sensor=random.choice(sensor_array)
        Theta=1410.735220 * random.uniform(0.5, 1.5)
        Alpha=1821.903537 * random.uniform(0.8, 1.2)
        Low_beta=1125.776789 * random.uniform(0.2, 1.8)
        High_beta=1823.893874 * random.uniform(0.6, 1.4)
        Gamma=346.522470 * random.uniform(0.9, 1.1)
        Time=time.time()
        self.msg=[sensor,Theta,Alpha,Low_beta,High_beta,Gamma,Time]
        #print self.msg
    
    def consume_messages_synthetic(self):
        print 'consuming synthetic messages'
        while self.cancelled!=True:
            self.create_artificial_message()
            self.decode_message(self.msg)
            time.sleep(1.0/self.msg_per_sec)
        
    def consume_messages(self):
        #from:https://gist.github.com/ChristianKniep/9580204
        print 'self.input_kafka_topic',self.input_kafka_topic
        print 'self.group_id',self.group_id
        print 'self.bootstrap_servers',self.bootstrap_servers
        consumer = KafkaConsumer(self.input_kafka_topic,
                                 group_id=self.group_id,
                                 bootstrap_servers=self.bootstrap_servers)
        #print consumer.topics()
        print consumer, 'connected'
        self.consumer=consumer
        #From: http://stackoverflow.com/questions/31047163/with-bottledwater-pg-how-to-read-data-by-a-python-consumer/31085531#31085531
        single_message_sent=False
        print 'waiting'
        time.sleep(1)
        #TODO: CREATE TEST TO MAKE SURE THE TOPIC EXISTS IN THE PARTITION
        print consumer.partitions_for_topic('sink1')
        for msg in consumer:
            self.msg_num=self.msg_num+1
            #print msg
            if self.cancelled!=True:
                if (time.time()%1)<(1.0/2.0):
                    try:
                        #print msg, time.time()
                        self.decode_message(msg)
                    except:
                        print "Unexpected error:", sys.exc_info()[0]
                        print 'Message not able to be sent:',msg
                        single_message_sent=False
                    #send a message the first time this loop is run
                    if single_message_sent!=True:
                        print 'Messages are now being sent to a Carbon-Whisper-Graphite server'
                        single_message_sent=True
            else:
                print 'exiting consumer loop'
                break
            
    def decode_message(self,msg):
        #get the value of the message
        try:
            datum=msg.value #THIS IS FOR THE KAFKA STRUCTURE
            #convert the csv to an array
            datum_array = datum.split(",")
        except:
            print 'unable to get message'
            datum_array=msg #THIS IS FOR THE SYNTHETIC STRUCTURE
        #print datum_array
        #call function to send the datum to carbon/graphite (each time send the old base topic)
        self.add_consumer_message_to_carbon_queue(datum_array,self.topic)
        self.add_to_history(datum_array)
        #check how much time has passed, and if yes, send to grafana
        if (time.time()-1) > self.last_send_time:
            self.send_message_to_carbon()
            #reset the empty data array which is sent to send batch messages to carbon
            self.data=[]
    
    def send_message_to_carbon(self):
        #send the batch data array
        self.carbon.send_data(self.data)
        self.last_send_time=time.time()
        print 'messages sent to carbon',time.time()
        #print self.data
        
        #print 'data sent to consumer at',time.time()
        
        #print self.data
        #self.cancel()
        #kill the thread so it only runs once
        
    def setup_history(self):
        rois=self.channel_schema[1:-2] #avoid columns with sensor and time
        len_history=100
        columns=['time'] #first column
        second_column=['sensor_type']
        sensor_types=['AF3','AF4','T7','T8','Pz']
        for roi in rois:
            columns.append(roi)
        #create a history dataframe that contains the time as well as the motion information from the different ROIs
        self.history = pd.DataFrame.from_records(np.zeros((len_history,2+len(rois)),
                                    index=np.arange(len_history),
                                    columns=columns))
                                        
    def add_to_history(self,datum_array):
        value=[datum_array[-1],datum_array[0]]
        for i in range(1,len(datum_array)-1):
            value.append(datum_array[i])
        print value
        self.history.iloc[self.msg_num % self.history.shape[0]] = value  # set the new value in the ring buffer


In [None]:
flag=False
p_consumer = PythonConsumerThread(flag)

In [None]:
p_consumer.cancel()
#7:55:00 to 755:30 relaxed
#758:03 758:18 blinking

In [None]:
from kafka import TopicPartition, KafkaConsumer
consumer = KafkaConsumer(bootstrap_servers='ec2-54-213-9-89.us-west-2.compute.amazonaws.com:9092')
#print consumer.topics()
#consumer.assign([TopicPartition('sink1', 1)])
#print consumer
#msg = next(consumer)

In [None]:
time.time()%1