In [33]:
%%writefile slack.avsc
{
"namespace": "slack_kafka.avro",
"type": "record",
"name": "slack_message_schema",
"fields" : [
    {
        "name": "user_id",
        "type": ["string", "null"],
        "doc": "Code given by slack api for username"
    },
    {
        "name": "text",
        "type": ["string", "null"],
        "doc": "The text of the message"
    },
    {
        "name": "channel",
        "type": ["string", "null"],
        "doc": "The code given by slack api for channel"
    },
    {
        "name": "timestamp",
        "type": ["string", "null"],
        "doc": "Unix timestamp of record assigned by slack api"}
 ],
"doc": "A Schema for storing Slack messages."
}

Overwriting slack.avsc


In [49]:
from slackclient import SlackClient
from kafka import KafkaClient, KafkaConsumer, KafkaProducer
import threading, logging, time
import avro.schema
import avro.io
import io
import boto3
import sys
import time

def getSlackToken(auth_file):
    '''Function that returns the slack token contained
    in the supplied file.
    
    args: {
        auth_file: A single line text file containing
            a valid slack authorization token.
    }
    returns: (str) Slack authorization token
    '''
    token_file = open(auth_file, 'r')
    token = [line.strip() for line in token_file.readlines()][0]
    token_file.close()
    return token


def getSlackTeamName(token):
    '''Function that returns the team name (id)
    from the slack api using the supplied authorization
    token.
    
    args: {
        token: String containing
            a valid slack authorization token.
    }
    
    returns: (str) The team name as  aliased by the slack API.
    '''
    
    sc = SlackClient(token)

    # Get the team ID
    team_id = sc.api_call('team.info')['team']['id']
    
    return team_id

def initializeApp(team_name, domain_name='awaybot'):
    '''Function that starts a new slack-pack application in
    a stateless manner. First, queries the simpleDB domain to 
    see if an item for the input team name already exists.
    If it does, queries the simple db to get the last timestamp
    that was recorded in simple db.
    
    args: {
        team_name: A string reprsenting the team name as 
            aliased by the slack API.
        domain_name: The simpleDB domain where the latest 
            timestamp values are stored for each time. 
            default: 'awaybot'
    }
    
    returns: {
        1. (bool) False if the team is not in the domain items.
        2. (str) Unix timestamp of the last message sent to kafka
            for the input team name
    }
    '''
    try:
        client = boto3.client('sdb')
    except Exception as e:
        print 'Failed to connect to aws'
        sys.exit(1)
        
    # First we check if a domain exists and
    # exit if it doesn't
    # Then we check if an item exists for the team name
    # If it does, we return the latest timestmp
    # Otherwise we return false
    domain_request = client.list_domains()
    if 'DomainNames' in domain_request:
        if domain_name in domain_request['DomainNames']:
            try:
                ts_request = client.get_attributes(
                        DomainName=domain_name,
                        ItemName=team_name,
                        AttributeNames=[
                            'ts'
                            ],
                        ConsistentRead=True)
                if 'Attributes' in ts_request:
                    return ts_request['Attributes'][0]['Value']
                else:
                    return '0'
            except Exception as e:
                print 'Failure to fetch timestamp'
                print 'Exception: {}'.format(e)
                sys.exit(1)
        else:
            print 'Domain does not exist'
            return False
#             sys.exit(1)
    else:
        print 'Domain does not exist'
        return False
#         sys.exit(1)


def fetchSlackHistory(token, team_name, timestamp='0'):
    '''Function that retrieves the history, channel
    by channel, of the input slack team.
    
    args: {
        token: (str) Valid slack api authoritzation token
        team_name: (str) The name of the slack team as 
            aliased by the slack api.
        timestamp: (str) Unix timestamp representing
            the last time a message from the channel was
            produced to kafka.
            Default value: None
    }
    returns: None
    
    '''
    
    sc = SlackClient(token)
    channels = [
        channel_dict['id'] for channel_dict in 
        sc.api_call("channels.list")['channels']]
    for channel in channels:
        # TODO: Make this work for > 1000 messages
        # using the unread argument in 'channels.history'
        channel_history = sc.api_call(
            "channels.history", channel=channel,
            oldest = timestamp, count="1000")
        for message_dict in channel_history['messages']:
            if 'user' in message_dict:
                message_dict['channel'] = channel
                yield message_dict


def produceMessage(
    message_dict, team_name,
    avro_schema, avro_writer, avro_byte_writer,
    avro_encoder, kafka_producer):
    '''Function that sends messages to the
    Kafka server. 
    args: {
        message_dict: Dictionary representing one
            slack message from the slack api. 
        avro_schema: Avro schema object
        avro_writer: Avro datum writer associated with the
        schema object
        avro_byte_writer: BytesIO object used to write
            to memory
        avro_encoder: Avro binary encoder used for encoding
            messages as binary
        kafka_producer: A kafka producer object
    }
    
    returns: None'''
    
    
    message = {
        'user_id': message_dict['user'],
        'text': message_dict['text'],
        'channel': message_dict['channel'],
        'timestamp': message_dict['ts']  
        }
    avro_writer.write(message, avro_encoder)
    raw_bytes = avro_byte_writer.getvalue()
    kafka_producer.send(team_name, raw_bytes)
    return


def updateTimestamp(
    timestamp, slack_channel, 
    team_name, domain_name='awaybot'):
    '''Function that updates the timestamp of the 
    last successfully produced Kafka message for
    the input slack_team.'''
    try:
        client = boto3.client('sdb')
    except Exception as e:
        print 'Failed to connect to aws'
    item_attrs = [
        {'Name': 'Team', 'Value': team_name, 'Replace': True},
        {'Name': 'Channel', 'Value': slack_channel, 'Replace': True},
        {'Name': 'ts', 'Value': timestamp, 'Replace': True}
        ]
    response = client.put_attributes(
        DomainName=domain_name,
        ItemName=team_name,
        Attributes=item_attrs)
    return

def rtmConnect(token, team_name):
    sc = SlackClient(token)
    if sc.rtm_connect():
        while True:
            try:
                time.sleep(5)
                message = sc.rtm_read()
                if message:
                    if 'text' in message[0]:
                        yield message
            except Exception as e:
                print 'Failure to fetch timestamp'
                print 'Exception: {}'.format(e)
                sys.exit(1)
            
                    
        

#Setup avro
schema = avro.schema.parse(open("slack.avsc", "rb").read())
writer = avro.io.DatumWriter(schema)
bytes_writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(bytes_writer)

#Setup Kafka Producer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
c = 0
slackToken = getSlackToken('token.txt')
teamName = getSlackTeamName(slackToken)
print teamName
latestValue = initializeApp(teamName)
if latestValue:
    print latestValue
    slackHistory = fetchSlackHistory(slackToken, teamName)
    for msg in slackHistory:
        c += 1
        produceMessage(
            msg, teamName, schema, writer, 
            bytes_writer, encoder, producer)
        updateTimestamp(
            msg['ts'], msg['channel'],
            teamName)
rtm = rtmConnect(slackToken, teamName)
for msg_list in rtm:
    msg = msg_list[0]
    print msg
    produceMessage(
        msg, teamName, schema, writer, 
        bytes_writer, encoder, producer)
    updateTimestamp(
        msg['ts'], msg['channel'],
        teamName)

T2BT8MVE3
0
{u'text': u"haha won't do", u'ts': u'1476409541.000010', u'user': u'U2C9M9GP5', u'reply_to': 9279, u'type': u'message', u'channel': u'C2CUMH2BA'}
{u'text': u'sending messages', u'ts': u'1476411626.000002', u'user': u'U2C9M9GP5', u'team': u'T2BT8MVE3', u'type': u'message', u'channel': u'C2CNEMD0S'}
{u'text': u'for the rtm api', u'ts': u'1476411632.000003', u'user': u'U2C9M9GP5', u'team': u'T2BT8MVE3', u'type': u'message', u'channel': u'C2CNEMD0S'}
{u'text': u'is a good time', u'ts': u'1476411635.000004', u'user': u'U2C9M9GP5', u'team': u'T2BT8MVE3', u'type': u'message', u'channel': u'C2CNEMD0S'}


KeyboardInterrupt: 

In [46]:
import boto3
# Connect to simpleDB
client = boto3.client('sdb')
# Create a test domain (table in RDMS terms)
client.create_domain(
    DomainName='awaybot')

{'ResponseMetadata': {'BoxUsage': '0.0055590278',
  'HTTPHeaders': {'connection': 'keep-alive',
   'content-type': 'text/xml',
   'date': 'Fri, 14 Oct 2016 02:12:37 GMT',
   'server': 'Amazon SimpleDB',
   'transfer-encoding': 'chunked'},
  'HTTPStatusCode': 200,
  'RequestId': '4e44b458-732a-2977-e9e0-3a1903ea50ab',
  'RetryAttempts': 0}}

In [45]:
response = client.delete_domain(
    DomainName='awaybot'
)