## Capture Tweets from Twitter and save it to Cassandra database.

Objective is to capture the tweets from Twitter for a specific keyword to analyze the trend. we can convert this to a simple python script and run it in the background.

This information can be used to find different kinds of patterns like number of tweets, retweets, followers and text mining on actual tweet.

## Prerequisites
Please see the blog post https://ramanakothi.com/1554/

#### setup twitter account and get API Key
To start with, you will need to have a Twitter developer account and obtain credentials (i.e. API key, API secret, Access token and Access token secret) on the to access the Twitter API, following these steps:

Create a Twitter developer account if you do not already have one from : https://developer.twitter.com/
Go to https://developer.twitter.com/en/apps and log in with your Twitter user account.
Click “Create an app”
Fill out the form, and click “Create”
A pop up window will appear for reviewing Developer Terms. Click the “Create” button again.
In the next page, click on “Keys and Access Tokens” tab, and copy your “API key” and “API secret” from the Consumer API keys section.
Scroll down to Access token & access token secret section and click “Create”. Then copy your “Access token” and “Access token secret”

### Install required python modules

In [None]:
!pip install tweepy, cassandra-driver, sqlalchemy

In [5]:
import psycopg2
import tweepy 
import json
from cassandra import ConsistencyLevel
from cassandra.cluster import Cluster, BatchStatement
from cassandra.query import SimpleStatement
from cassandra.auth import PlainTextAuthProvider
from cassandra import InvalidRequest as CE

#### Create keys.py file, needs to enter values in below block for Twitter API and execute it.

In [1]:
%%writefile keys.py
## create a file keys.py in the working directory and add below attrributes
#### Variables that contains the user credentials from Twitter API (setup twitter account and get API Key).
access_token = "ENTER ACCESS TOKEN"
access_token_secret = "ENTER ACCESS TOKEN SECRET"
consumer_key = "ENTER CONSUMER KEY"
consumer_secret = "ENTER CONSUMER SECRET"

## Cassandra credentials
cassandrauser = 'cassandra'
casspassword = 'cassandra'
host_list = 'localhost'
cassandra_port =9042

Overwriting keys.py


In [2]:
## importing data from the keys files which contains twitter and db parameters.
from keys import *

 #### This function gets the consumer key, consumer secret key, access token and access token secret given by the app created in your Twitter account and authenticate them with Tweepy.

In [8]:
def autorize_twitter_api():
    auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)
    return auth

#### This is a helper function to create connection to Cassandra.

In [9]:
def connect_cassandra():
    auth_provider = PlainTextAuthProvider(username=cassandrauser,password=casspassword)
    cluster = Cluster([host_list],protocol_version=4,
                      auth_provider = auth_provider,port=cassandra_port)
    session = cluster.connect()
    session.set_keyspace('twitter')
    return  session

cursor_twitter = connect_cassandra()

#### This function open a connection with an already created database and creates a new table to store tweets related to a subject specified by the user


In [10]:
def create_tweets_table_cassandra(term_to_search):
    """
    This function open a connection with an already created database and creates a new table to
    store tweets related to a subject specified by the user
    """
    query_create = "CREATE TABLE IF NOT EXISTS %s (id UUID PRIMARY KEY, created_at text, tweet text, user_id text,\
                    retweetcount int, location text, place text, fullmessage text);" %("tweets_"+term_to_search)
    cursor_twitter.execute(query_create)

    return

#### This function open a connection with an already created database and inserts into corresponding table tweets related to the selected topic

In [15]:
def store_tweets_in_table_cassandra(term_to_search, user_id, created_at, tweet, user_name, retweetcount,location, place, data):
    """
    This function open a connection with an already created database and inserts into corresponding table 
    tweets related to the selected topic
    """
    cursor_twitter.execute("INSERT INTO %s (id,created_at, tweet, user_id, retweetcount, location, place,fullmessage) VALUES (now(),%%s, %%s, %%s, %%s,%%s,%%s, %%s);" %('tweets_'+term_to_search), (created_at, tweet, user_id, retweetcount, location, place, data))

    return

#### This is the class that streams data and writes to table. It can be updated to handle exceptions.

In [18]:
class MyStreamListener(tweepy.StreamListener):
    
    '''
    def on_status(self, status):
        print(status.text)
    '''    
    def on_data(self, raw_data):
        try:
            global term_to_search
            data = json.loads(raw_data)            
            user_id = data['user']['id_str']
            created_at = data['created_at']
            tweet = data['text']
            user_name = data['user']['screen_name']
            retweetcount = data['retweet_count']
            if data['place'] is not None:
                place = data['place']['country']
            else:
                place = None
            location = data['user']['location']
            
            #Store them in the Cassandra table
            store_tweets_in_table_cassandra(term_to_search, user_id, created_at, tweet, user_name, 
                                            retweetcount, location, place, raw_data)            
        except Exception as e:
            pass
    
    def on_error(self, status_code):
        if status_code == 420:
            #returning False in on_error disconnects the stream
            return False

#### You can use any search term, I have used Coronavirus to know tweet patterns. or you can use list keywords

In [None]:
if __name__ == "__main__": 
    #Creates the table for storing the tweets
    term_to_search = "coronavirus"
    create_tweets_table_cassandra(term_to_search)
    
    #Connect to the streaming twitter API
    api = tweepy.API(wait_on_rate_limit_notify=True)
    
    #Stream the tweets
    streamer = tweepy.Stream(auth=autorize_twitter_api(), listener=MyStreamListener(api=api))
    streamer.filter(languages=["en"], track=[term_to_search])
    #streamer.filter(languages=["en"],track=['$BABA', '$TCEHY', '$BIDU', '$AAPL', '$TSLA','Trump','Barnie','Warren'])

#### you can copy this code into a py script and run it in the background.

In [20]:
%%writefile Twit_cassandra.py
#!/usr/bin/env python

import psycopg2
import tweepy 
import json
from cassandra import ConsistencyLevel
from cassandra.cluster import Cluster, BatchStatement
from cassandra.query import SimpleStatement
from cassandra.auth import PlainTextAuthProvider
from cassandra import InvalidRequest as CE
from keys import *



def autorize_twitter_api():
    auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)
    return auth

def connect_cassandra():
    auth_provider = PlainTextAuthProvider(username=cassandrauser,password=casspassword)
    cluster = Cluster([host_list],protocol_version=4,
                      auth_provider = auth_provider,port=cassandra_port)
    session = cluster.connect()
    session.set_keyspace('twitter')
    return  session

cursor_twitter = connect_cassandra()

def create_tweets_table_cassandra(term_to_search):
    """
    This function open a connection with an already created database and creates a new table to
    store tweets related to a subject specified by the user
    """
    query_create = "CREATE TABLE IF NOT EXISTS %s (id UUID PRIMARY KEY, created_at text, tweet text, user_id text,\
                    retweetcount int, location text, place text, fullmessage text);" %("tweets_"+term_to_search)
    cursor_twitter.execute(query_create)

    return

def store_tweets_in_table_cassandra(term_to_search, user_id, created_at, tweet, user_name, retweetcount,location, place, data):
    """
    This function open a connection with an already created database and inserts into corresponding table 
    tweets related to the selected topic
    """
    cursor_twitter.execute("INSERT INTO %s (id,created_at, tweet, user_id, retweetcount, location, place,fullmessage) VALUES (now(),%%s, %%s, %%s, %%s,%%s,%%s, %%s);" %('tweets_'+term_to_search), (created_at, tweet, user_id, retweetcount, location, place, data))

    return

class MyStreamListener(tweepy.StreamListener):
    def on_data(self, raw_data):
        try:
            global term_to_search
            data = json.loads(raw_data)            
            user_id = data['user']['id_str']
            created_at = data['created_at']
            tweet = data['text']
            user_name = data['user']['screen_name']
            retweetcount = data['retweet_count']
            if data['place'] is not None:
                place = data['place']['country']
            else:
                place = None
            location = data['user']['location']
            
            #Store them in the Cassandra table
            store_tweets_in_table_cassandra(term_to_search, user_id, created_at, tweet, user_name, 
                                            retweetcount, location, place, raw_data)            
        except Exception as e:
            pass
    
    def on_error(self, status_code):
        if status_code == 420:
            #returning False in on_error disconnects the stream
            return False


if __name__ == "__main__": 
    #Creates the table for storing the tweets
    term_to_search = "coronavirus"
    create_tweets_table_cassandra(term_to_search)
    
    #Connect to the streaming twitter API
    api = tweepy.API(wait_on_rate_limit_notify=True)
    
    #Stream the tweets
    streamer = tweepy.Stream(auth=autorize_twitter_api(), listener=MyStreamListener(api=api))
    streamer.filter(languages=["en"], track=[term_to_search])
    #streamer.filter(languages=["en"],track=['$BABA', '$TCEHY', '$BIDU', '$AAPL', '$TSLA','Trump','Barnie','Warren'])

Writing Twit_cassandra.py


In [21]:
!pwd

/Users/ramanakothi/Downloads
