# Ingest Twitter Feed Data and Sentiments into iguazio Stream & Time-Series DB 

## Initialization 
install packages and set environment variables.<br>
need to fill the following environment variables with real credentials.

```
    V3IO_PASSWORD = <V3IO-Password>
    V3IO_USER = <V3IO-Username>
    V3IO_ADDRESS = <address of V3IO API end point>
    FRAMESD_URL = <V3IO Framesd URL (Columnar & TSDB API)>
    
    # Twitter credentials
    app_key = <..>
    app_secret = <..>
    oauth_token = <..>
    oauth_token_secret = <..>

```
> note: in nuclio environment variables and build dependencies are specified in the configuration tab, for notebooks they can be initialized from a file as implemented below.

In [None]:
# nuclio: ignore
!pip install textblob
!pip install twython

### initialize nuclio emulation (this section will be ignored by nuclio nbconvert)

In [1]:
# nuclio: ignore
from nuclio import Context, Event
from v3io import env_fromfile
env_fromfile('tweet_env.txt')
context = Context()

### Twitter stream handling class

In [2]:
from twython import TwythonStreamer
import json
import re
import v3io
import os
from textblob import TextBlob
import v3io_frames as v3f
import pandas as pd

oauth = {
    'app_key' : os.getenv('app_key'),
    'app_secret' : os.getenv('app_secret'),  
    'oauth_token' : os.getenv('oauth_token'), 
    'oauth_token_secret' : os.getenv('oauth_token_secret'),
}
lastText = ''

# initialize iguazio v3io APIs
v3 = v3io.v3io(os.getenv('V3IO_ADDRESS'),os.getenv('V3IO_USER'),os.getenv('V3IO_PASSWORD'), 'bigdata')
client = v3f.Client()

# Twitter stream handler  
class MyStreamer(TwythonStreamer):
    def __init__(self, context, name, **kw):
        self.name = name
        self.context = context
        TwythonStreamer.__init__(self, **kw)
        
    def start(self, cb, limit=10, **kw):
        self.cb = cb
        self.limit = limit
        self.statuses.filter(**kw)
        
    def on_success(self, data):
        if 'text' in data:
            record = {'text': data['text'], 
                      'user': '@'+data['user']['screen_name'],
                      'id': data['id'],
                      'created_at':data['created_at'],
                     }
            self.context.last_message = record
            if self.cb:
                self.cb(self.context, self.name, record)
                
        self.limit -= 1 
        if self.limit <= 0 :
            self.disconnect()

    def on_error(self, status_code, data):
        self.context.logger.error_with('Error in stream', status_code=status_code)

def process_event(context, name, record):
    clean = ' '.join(re.sub("(@[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)", " ", record['text']).split())
        
    # enrich the record with natural language metadata
    blob = TextBlob(clean)
    record['polarity'] = blob.sentiment.polarity
    record['subjectivity'] = blob.sentiment.subjectivity

    # Write the record into a iguazio straem
    context.logger.debug_with('writing data to Stream', record=record)
    resp = v3.putrecords('stocks_stream', [json.dumps(record)])
        
    # Write data to iguazio Time-Series DB
    df = pd.DataFrame(index=[[pd.to_datetime(record['created_at'])],['GOOG']], columns=['sentiment'])
    df['sentiment'] = [float(blob.sentiment.polarity)]
    df.index.names=['time','symbol']
    client.write(backend='tsdb', table='stock_metrics',dfs=df)

## Nuclio service initialization (init_context) and event handler implementation

In [3]:
import threading

def start_listener(context):
    stream = MyStreamer(context, 'GOOG', **oauth)
    stream.start(process_event, 200, track='@Google', lang='en')

def handler(context, event):
    return json.dumps(context.last_message)

def init_context(context):
    context.last_message = {}
    t = threading.Thread(target=start_listener, args=(context,))
    t.start()

## Function invocation
the following section simulates nuclio function initialization and invocation

In [4]:
# nuclio: ignore
init_context(context)
event = Event(body='')

2018-10-25 23:21:12,006 nuclio (D) writing data to Stream {"record": {"text": "RT @NextBillionFH: News via @MarketWatch: @Google touts 30 million monthly users for its mobile-payment platform in India: https://t.co/bkr\u2026", "user": "@PickDraftDFS", "id": 1055554888657190917, "created_at": "Thu Oct 25 20:21:11 +0000 2018", "polarity": 0.0, "subjectivity": 0.0}}
2018-10-25 23:21:14,804 nuclio (D) writing data to Stream {"record": {"text": "Hey @Google, 'tha' should be 'than' near the bottom of this page on the note. Just a heads up! https://t.co/8CQ1b7N9wd", "user": "@RoryCollinsSEO", "id": 1055554898438352896, "created_at": "Thu Oct 25 20:21:13 +0000 2018", "polarity": 0.1, "subjectivity": 0.4}}
