## Twitter Sentiment Analysis  +  iCampus-Seminar  +  WS1718
***

The following program catches Tweets from Twitter and assigns an emotion based on the text using a Neural Network. <br>
For that, the program covers catching the Tweets, preprocessing the Tweets, training a Neural Network, scoring new Tweets based on the trained model and plotting the result.

This notebook is structured as following:
1. [Imports](#imports) and [Global Variables](#globalvar)
2. Main functions to control the execution flow:
	2. [Function](#maincatch) to catch training tweets based on a given query.
	2. [Function](#maintrain) to train the Neural Network model.
	2. [Function](#mainscore) to score emotions of tweets and plot spider graphs.
3. Classes  which contains the logic:
	3. [Class](#database) to create the Database-API and manage the Database.
	3. [Class](#twitter) to create the Twitter-API and to load Tweets based on a given search query.
	3. [Class](#preprocessing) to preprocess a given Tweet. 
	3. [Class](#train) to train the Neural Network.
	3. [Class](#score) to score a given text based on the trained Neural Network.
	3. [Class](#plot) to plot the emotions as a spider graph.
4. Helper classes
	4. Helper [Class](#helptweets) to get informations about Tweets in database.
	4. Helper [Class](#helpdelete) to delete a given Database.

<br>
© Tobias Brähler | tobias.braehler@mni.thm.de

<a id="imports"></a>
### Imports

In [None]:
# SYS
from time import strftime
from datetime import datetime
from typing import Dict, Union, List, Tuple
from sys import stdout
from re import sub
import pprint
import json

# MATH
from math import pi

# MATPLOT
import matplotlib.pyplot as plt

# PANDAS + NUMPY
import pandas as pd
import numpy as np

# PYMONGO
from pymongo import MongoClient, DESCENDING
from pymongo.errors import ConnectionFailure, BulkWriteError
from pymongo.cursor import Cursor

# TWEEPY
from tweepy import AppAuthHandler, API, Cursor as Tweepy_Cursor, TweepError

# GENSIM + WORD2VEC
import warnings
warnings.filterwarnings(action='ignore', category=UserWarning, module='gensim')
from gensim.models.keyedvectors import KeyedVectors
from gensim.models.doc2vec import FAST_VERSION

# NLT
from nltk.tokenize import word_tokenize

# SCIKIT-LEARN
from sklearn.model_selection import train_test_split
from sklearn import preprocessing

# KERAS
from keras.layers import Convolution1D, MaxPooling1D, Flatten, Dense
from keras.models import Sequential, load_model
from keras import metrics

# TQDM
from tqdm import tqdm
tqdm.pandas(desc="progress-bar")


<a id="globalvar"></a>
### Global variables

In [None]:
_db_collection_training = "training_tweets"
_db_collection_production = "production_tweets"

# search term; filter out retweets
_twitter_search_query = "#realDonaldTrump lang:en -filter:retweets"

_trained_model_savepath = 'C:\\Users\\admin\\Desktop\\twitter_sentiment_model_%s.h5' \
                          % strftime('%d%m%Y-%H%M')
_classlabels_savepath = 'C:\\Users\\admin\\Desktop\\classlabels.json'

_db_client_train = Database(db_collection=_db_collection_training)
_db_client_production = Database(db_collection=_db_collection_production)
_twitter_client = Twitter()
_preprocessing = Preprocessing()
_training = Training()
_score = Score(model=_trained_model_savepath, classlabels=_classlabels_savepath)

_word2vec = KeyedVectors.load_word2vec_format('C:/GoogleNews-vectors-negative300.bin', binary=True)

<a id="maincatch"></a>
### MAIN: Load Training-Tweets based on a given search query.
- Catch Tweets from Twitter using the [Twitter-Class](#twitter)
- Preprocess each Tweet using the [Preprocess-Class](#preprocessing)
- Load Tweets into Database using the [Database-Class](#database)

In [None]:
def catch_training_tweets_from_twitter():
        print('\n', '[', strftime("%d.%m.%Y %H:%M:%S"), ']', ' Start mining Tweets ...', sep='')
        print('Exit with CTRL+C', '\n')

        # get since_id; 'None' if no Tweet is persisted yet; needed to stop catching Tweets if all 
        # Tweets are catched back to the newest Tweet persisted in database (Tweepy catches Tweets 
        # from most recent to oldest)
        __since_id = _db_client_train.get_newest_tweet_id(tweet_id_column='id')
        __max_id = None

        try:
            while True:
                # get tweets
                tweets = _twitter_client.load_tweets(search_query=_twitter_search_query,
                                                     since_id=__since_id,
                                                     max_id=__max_id)
                if not tweets:
                    break

                # set max_id, i.e. lowest tweet id completely processed so far in this run.
                # note: tweepy is going from recent to oldest, therefore, the last tweet processed 
                # is the oldest so far.
                __max_id = tweets[-1]['id'] - 1

                # preprocess tweets for training; add to list if its not 'None'
                processed_tweets = []
                for tweet in tweets:
                    tweet = _preprocessing.preprocess_tweet_for_training(tweet)
                    if tweet:
                        processed_tweets.append(tweet)

                # persist tweets in database
                # Note: If the items tweet["user"]["id"] or tweet["created_at_week_year"] change 
                # (e.g. information are stored elsewhere, the function in class "Database" has to 
                # be updated.
                _db_client_train.persist_tweets(tweets=processed_tweets)

        except KeyboardInterrupt:
            pass
        
        
catch_training_tweets_from_twitter()

<a id="maintrain"></a>
### MAIN: Train the Neural Network Model.
- Get training Tweets using the [Database-Class](#database)
- Train the model using the [Train-Class](#train)

In [None]:
def train(limit: int = 100000):
        # get training data
        __data = _db_client_train.get_trainingdata(limit=limit)
        # split training data into training and test
        __text_train, __text_test, __label_train, __label_test = \
            _training._split_into_test_and_training(__data)
        # convert data to embedded vectors resp. binary labels
        __text_train_embedvec = _training._texts_to_embedded_vec_matrices(__text_train)
        __text_test_embedvec = _training._texts_to_embedded_vec_matrices(__text_test)
        __label_train_binary = _training._labels_to_binary(__label_train)
        __label_test_binary = _training._labels_to_binary(__label_test)
        # train CNN model
        _training._train_model(__text_train_embedvec, __label_train_binary, 
                               __text_test_embedvec, __label_test_binary)
        
        
train()

<a id="mainscore"></a>
### MAIN: Score emotions of tweets and plot spider graphs.
- Get production tweets using the [Database-Class](#database)
- Score the tweets using the [Score-Class](#score)
- Plot spider graphs showing the emotions using the [Plot-Class](#plot)

In [None]:
def score_and_plot():
    # score emotions for tweets which are not yet scored
    unscored_tweets = _db_client_production.get_unscored_tweets()
    for tweet in unscored_tweets:
        tweet['emotion'] = _score.score(tweet['full_text_processed'])
    # persist the calculated emotions
    _db_client_production.persist_tweets(unscored_tweets)
    
    # get emotions and its percentage respectively from database
    percentage_emotions = _db_client_production.get_percentage_of_emotions()
    # add current date
    percentage_emotions.loc[len(percentage_emotions)] = [{'Datum': [datetime.today().
                                                                    strftime('%b. \'%y')]}]
    
    # plot
    _plot = Plot(dataframe=percentage_emotions, graphtitle='AddTitle', scalegraph=40)
    _plot.plot_graphs(percentage_emotions)
    
    
score_and_plot()

<a id="database"></a>
### CLASS: Create the Database-API and manage the Database.

In [None]:
class Database:
    def __init__(self,
                 db_collection: str,
                 db_name: str = "textmining",
                 db_host='localhost',
                 db_port: int = 27017
                 ):
        self._db_name = db_name
        self._db_collection = db_collection
        self._db_host = db_host
        self._db_port = db_port
        try:
            # create database client for given parameters
            self._client = self.__create_db_client()
        except ConnectionFailure as e:
            print('MongoDB error while establishing a connection : ', str(e))
            raise

    def __create_db_client(self):
        try:
            # create client connected to MongoDB
            client = MongoClient(self._db_host, self._db_port)
            # test if client is valid; else: exception is raised
            client.server_info()
            # return client for MongoDB database named 'db_name'
            return client[self._db_name]
        except ConnectionFailure:
            raise

    def persist_tweets(self,
                       tweets: List[Dict,]
                       ) -> None:
        """
        Persists a list of dictionaries in the database where one dictionary represents one 
        Tweet in JSON format. If a specific user ID already has a persisted Tweet for a 
        specific week, the persisted Tweet will be overwritten. On the one hand this should 
        stem Twitter bots which do not give any valuable input for the system (i.e. bots
        which hijack trending hashtags to post lots of advertising or the like). On the 
        other hand this should prevent an opinion bias since angry people tend to speak 
        out more frequently (i.e. one person = one opinion per week).
        :param tweets: a list of dictionaries (represents Tweets in JSON format)
        """
        # check if list of tweets is empty
        if not tweets:
            return

        # Note: Unordered bulk write operations are batched and sent to the server in arbitrary 
        # order. Any errors (e.g. DuplicateKeyError) that occur are reported after ALL operations 
        # are attempted.
        bulk = self._client[self._db_collection].initialize_unordered_bulk_op()

        for tweet in tweets:
            # Compare "user id" and "created at" from database with the Tweet to detect if the 
            # specific person already has stored a Tweet for the specific week. Update the stored 
            # Tweet with the new one if so. Note: Update if "user id" or "created at" is stored 
            # somewhere else.
            bulk.find({"$and": [{"user.id": tweet["user"]["id"]},
                                {"created_at_week_year": tweet["created_at_week_year"]}]}) \
                .upsert() \
                .update_one({"$set": tweet})
        try:
            # execute bulk operation
            db_return = bulk.execute()

            print('[', strftime("%d.%m.%Y %H:%M:%S"), '] ', 'Database: ', 'Upserted(', 
                  db_return['nUpserted'], '), Modified(', db_return['nModified'],
                  '), WriteErrors(', db_return['writeErrors'], ')', sep='')

        except BulkWriteError as e:
            print('MongoDB error while persisting Tweets : ', str(e.details))
            raise

    def get_unscored_tweets(self) -> Cursor:
        # get cursor with all tweets from database where field 'emotion' does not exists yet
        cursor = self._client[self._db_collection] \
            .find({'emotion': {"$exists": False}})
        return cursor

    def get_trainingdata(self, limit: int) -> pd.DataFrame:
        # get data from database
        cursor = self._client[self._db_collection] \
            .find({},
                  {'full_text_processed': 1,
                   'emotions': 1,
                   "_id": 0}) \
            .limit(limit)

        # load data into pandas DataFrame and expand the emotions into multiple rows:
        # from:  | Lorem ipsum dolor  | [emoji_joy, emoji_love] |
        # to:    | Lorem ipsum dolor  | emojy_joy  |
        #        | Lorem ipsum dolor  | emojy_love |
        df = pd.DataFrame(data=list(cursor))
        df_expanded = pd.DataFrame([({'emotion': e, 'text': t.full_text_processed})
                                    for t in df.itertuples()
                                    for e in t.emotions])
        # drop duplicate rows if neural network should consider only one label per text
        # df_expanded.drop_duplicates(subset=['text'], inplace=True)
        return df_expanded

    def get_newest_tweet_id(self, tweet_id_column: str = 'id') -> Union[int, type(None)]:
        """
        Returns the highest Tweet ID persisted in database. This ID represents the newest Tweet 
        processed so far.
        :param tweet_id_column: the name of the database column containing the Tweet IDs
        :returns: returns the highest Tweet ID persisted in database as Integer
        :raises StopIteration: if no Tweet is yet persisted a StopIteration-Exception will be 
        raised
        """
        try:
            # Sort all entries for 'tweet_id' in descending order, limit the result to one, 
            # dereference the list of 'dict' with 'next()' to get the requested 'dict' and 
            # return only the value of 'tweet_id'.
            tweet_id = self._client[self._db_collection].find() \
                .sort(tweet_id_column, DESCENDING) \
                .limit(1) \
                .next() \
                .get(tweet_id_column)
            return tweet_id
        except StopIteration:
            # No tweet returned, i.e. database is empty.
            return None

    def get_percentage_of_emotions(self) -> pd.DataFrame:
        """
        Returns a dataframe with emotions and its percentage respectively. For that it counts 
        all entries and each emotion separately in te database and calculates the percentage.
        :return: dataframe with emotions and its percentage
        """
        # create dataframe
        df = pd.DataFrame()
        
        try:
            # get count of all entries where the field 'emotion' does exists
            count_all = self._client[self._db_collection] \
                .find({'emotion': {"$exists": True}}).count()
            
            # query counts emotions and returns:
            # { "count" : 370, "emotion" : "joy" }
            # { "count" : 530, "emotion" : "fear" }
            # etc.
            for e in self._client[self._db_collection].aggregate(
                        {"$unwind": "$emotion"},
                        {"$group": {"_id": "$emotion", "count": {"$sum": 1}}},
                        {"$project": {"emotion": "$_id", "count": 1}}
                    ):
                # calc the percentage of this emotion
                percentage = (e['count']/count_all)*100
                # write emotion with its percentage in the dataframe
                df.loc[len(df)] = [e['emotion'], percentage]         
        except StopIteration:
            return pd.DataFrame()
        
        return df

<a id="twitter"></a>
### CLASS: Create the Twitter-API and to load Tweets based on a given search query.

In [None]:
class Twitter:
    def __init__(self,
                 consumer_key: str = "XXXX",
                 consumer_secret: str = "XXXX",
                 tweets_per_query: int = 1000,
                 ):
        self._consumer_key = consumer_key
        self._consumer_secret = consumer_secret
        self._tweets_per_query = tweets_per_query

        try:
            # create Twitter API for given parameters
            self._twitter_api = self.__create_twitter_api()
        except TweepError as e:
            print('Tweepy error while creating the API : ', str(e))
            raise

    def __create_twitter_api(self):
        try:
            # Create an AppAuthHandler instance and passing consumer token and secret.
            # AppAuthHandler gives you higher limits than OAuthHandler.
            auth = AppAuthHandler(self._consumer_key, self._consumer_secret)
            # Create Tweepy API.
            # 'wait_on_rate_limits' makes the Tweepy API call auto wait (sleep) when it hits 
            # the rate limit.
            return API(auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True,
                       retry_count=3, retry_delay=5, retry_errors={401, 404, 500, 503})
        except TweepError:
            raise

    def load_tweets(self,
                    search_query: str,
                    since_id=None,
                    max_id=None
                    ) -> List[Dict,]:
        """
        Loads matching Tweets for the given parameters by the given Twitter API.
        :param search_query: the search query, consists of a search term and possible options
                (e.g. "#HelloWorld -filter:retweets" which filters out all retweets)
        :param since_id: the lowest tweet id considered, if 'none' go as far back as API allows
        :param max_id: the highest tweet id considered, if 'none' start at most recent tweet
        :return: a list of dictionaries where each dictionary represents one Tweet in JSON format
        """
        tweets = []

        try:
            # Parameters for API.search cannot be provided directly into the method but has to be 
            # passed into the Cursor constructor method.
            # Parameter: "tweet_mode: 'extended'" -> get full tweet text
            for i, tweet in enumerate(Tweepy_Cursor(self._twitter_api.search, q=search_query,
                                                    since_id=since_id, max_id=max_id,
                                                    tweet_mode='extended')
                                      .items(self._tweets_per_query)):
                # Get Tweepy status object as json and append to list.
                tweets.append(tweet._json)
                stdout.write('\r' + str(i + 1) + '    ')

        except TweepError as e:
            print('Tweepy error while getting Tweets : ', str(e))
        except StopIteration:
            raise

        print('\n', '[', strftime("%d.%m.%Y %H:%M:%S"), '] ', 'Twitter: ', str(len(tweets)),
              ' Tweets loaded', sep='')
        return tweets


<a id="preprocessing"></a>
### CLASS: Preprocess a given Tweet.

In [None]:
class Preprocessing:
    def __init__(self):
        # create emoji list once
        self.__create_emoji_emotion_lists()

    def preprocess_tweet(self, tweet: Dict) -> Dict:
        """
        Processes the given Tweet by:
            - adding an item with 'created at' date formatted as 'week-year' 
              ('created_at_week_year')
            - adding an item with summarized place information ('place_summarized')
                - the x-y-coordinate
                - the country
                - the name and type of the exact location (e.g. Honolulu - City)
            - adding an item with preprocessed text ('full_text_processed'):
                - convert each character to lowercase
                - remove tab, linefeed, carriage return
                - remove the '#' from all hashtags
                - remove all handles and URLs
                - remove separating characters, e.g. this/that -> this that
                - reduce repeating characters to max three characters, 
                  e.g. yaaaayyyyyy -> yaaayyy
                - reduce repeating punctuation to one character, e.g. !!! -> !
        :param tweet: a Tweet with related meta information given as json file (dictionary)
        :return: the same Tweet with added items as json file (dictionary)
        """
        tweet = self.__format_date_to_week_year(tweet)
        tweet = self.__summarize_place_information(tweet)
        tweet = self.__process_text(tweet)
        return tweet

    def preprocess_tweet_for_training(self, tweet: Dict) -> Dict:
        """
        Processes the given Tweet if it should be used as training data:
            - execute the function 'preprocess_tweet(tweet: Dict)'
            - assign the emojis in the tweet text to one of six emotions: anger, fear, sadness,
              joy, love, surprise. The found emotions will be stored in the new item 'emotions' 
              and all emojis will be removed from text.
            - if the Tweet text contains too less words (<7) the whole Tweet is set to 'None'.
            - if the Tweet does not contain any emotions (i.e. no emoji can be assigned to an 
              emotion) the whole Tweet is set to 'None'.
        :param tweet: a Tweet with related meta information given as json file (dictionary)
        :return: the same Tweet with added items as json file (dictionary) or None
        """
        tweet = self.preprocess_tweet(tweet)
        tweet = self.__process_text_for_training(tweet)
        tweet = self.__remove_unusable_tweet_for_training(tweet)
        return tweet

    @staticmethod
    def __format_date_to_week_year(tweet: Dict) -> Dict:
        """
        Takes the 'created_at' date from the Tweet, formats it to 'week-year' and adds it
        as a new item 'created_at_week_year' to the Tweet.
        """
        # convert to datetime object, i.e. Fri Dec 29 18:16:29 +0000 2017
        datetime_object = datetime.strptime(tweet['created_at'], "%a %b %d %H:%M:%S %z %Y")
        tweet['created_at_week_year'] = datetime_object.strftime("%V-%Y")
        return tweet

    @staticmethod
    def __summarize_place_information(tweet: Dict) -> Dict:
        """
        Constructs a dictionary containing summarized place information:
            - the x-y-coordinate
            - the country
            - the name and type of the exact location (e.g. Honolulu - City)
        and adds it as a new item 'place_summarized' to the Tweet.
        """
        place = tweet['place']
        if place:
            # averaging (list of lists of) coordinates -> 4 coordinates gets averaged 
            # to 1 coordinate
            avg_coordinates = [float(sum(col)) / len(col) for col in 
                               zip(*place['bounding_box']['coordinates'][0])]
            x_coordinates = avg_coordinates[0]
            y_coordinates = avg_coordinates[1]

            tweet['place_summarized'] = {'country': place['country'], 'name': place['name'],
                                         'place_type': place['place_type'],
                                         'coordinates': [x_coordinates, y_coordinates]}
        return tweet

    @staticmethod
    def __process_text(tweet: Dict) -> Dict:
        """
        Preprocesses the Tweet text:
            - convert each character to lowercase
            - remove tab, linefeed, carriage return
            - remove the '#' from all hashtags
            - remove all handles and URLs
            - remove separating characters, e.g. this/that -> this that
            - reduce repeating characters to max three characters, e.g. yaaaayyyyyy -> yaaayyy
            - reduce repeating whitespaces to one whitespace
            - reduce repeating punctuation to one character, e.g. !!! -> !
        and adds it as new item 'full_text_processed' to the Tweet.
        """
        # differentiate between retweeted and original Tweet;
        # according to which is true the text has to be accessed from different items in 
        # the dictionary
        if 'retweeted_status' in dir(tweet):
            text = tweet['retweeted_status']['full_text']
        else:
            text = tweet['full_text']

        # convert each character to lowercase
        text = text.lower()
        # remove tab, linefeed, carriage return
        text = sub(r"[\t\n\r]", r" ", text)
        # remove the '#' from all hashtags longer than three characters
        text = sub(r"\#(\w{4,})", r" \1 ", text)
        # remove all handles (i.e. any linkage to an username)
        text = sub(r"\@(\w+)", " ", text)
        # remove all URLs
        text = sub(r"(http|https|ftp)://[a-zA-Z0-9\\./]+", " ", text)
        # remove separating characters, e.g. this/that -> this that
        text = sub(r"[,\"\:;\(\)\[\]\{\}&\-_\|/\\\=~\*]+", " ", text)
        # reduce repeating characters to max three characters, e.g. yaaaayyyyyy -> yaaayyy
        text = sub(r"(.)\1{3,}", r"\1\1\1", text)
        # reduce repeating whitespaces to one whitespace
        text = sub(r"\s{2,}", " ", text)
        # reduce repeating punctuation which represents the end of a sentence to one character:
        # '!!!' -> '!', '!' -> '!', '???' -> '?', '?' -> '?', '.' -> '.', etc
        # but '...' -> '...' because '..' or '...' etc does not mark the end of a sentence
        text = sub(r"((?<!\.)\.(?!\.))\1|([\!\?])\2+", r" \1\2 ", text)

        tweet['full_text_processed'] = text
        return tweet

    def __process_text_for_training(self, tweet: Dict) -> Dict:
        """
        Processes the Tweet text so it can be used for training the neural network. For that 
        the emojis in the text will be assigned to one of six emotions: anger, fear, sadness, 
        joy, love, surprise. The found emotions will be stored in the new item 'emotions' 
        and all emojis will be removed from text.
        """
        # make sure everything needed was created beforehand, i.e. correct order of function 
        # execution
        assert 'full_text_processed' not in dir(tweet), "Tweet does not have an item " \
                                                        "'full_text_processed' yet."
        assert self._emoji_emotion_lists, "Emoji list was not created yet."

        # get the text
        text = tweet['full_text_processed']
        # initialise a set to store the emotions contained in the text (set ensures uniqueness)
        emotions = set()

        # iterate through the lists
        for emotion, emoji_list in self._emoji_emotion_lists.items():
            # for every emoji in the list ...
            for emoji in emoji_list:
                # ... check if its present in the text
                if emoji in text:
                    # remove emoji from text
                    text = sub(emoji, " ", text)
                    # add this emotion
                    emotions.add(emotion)

        # remove skin tones which can be found in the text if an emoji with non standard skin 
        #  is removed
        text = sub(r"[🏻🏼🏽🏾🏿]", " ", text)

        tweet['full_text_processed'] = text
        tweet['emotions'] = list(emotions)
        return tweet

    @staticmethod
    def __remove_unusable_tweet_for_training(tweet: Dict) -> Union[Dict, type(None)]:
        """
        Assigns Tweet to 'None' if the Tweet text
        - contains too less words (<7); Tweets with too less words do not give any valuable 
          information for training
        - does not contain any emotions (i.e. no emoji can be assigned to an emotion)
        """
        # make sure everything needed was created beforehand, i.e. correct order of function 
        # execution
        assert tweet['full_text_processed'], "Tweet does not have an item " \
                                             "'full_text_processed' yet."

        # check if text has less than 7 words
        if len(tweet['full_text_processed'].split()) < 7:
            return None

        # check if text does not contain any emotions (i.e. list with emotions is empty)
        if not tweet['emotions']:
            return None

        return tweet

    def __create_emoji_emotion_lists(self):
        # Basic Emotions by Parrott, W.: Emotions in Social Psychology. Psychology Press (2001)
        # https://unicode.org/emoji/charts/full-emoji-list.html
        emojis_anger = [
            '>:/', '>:\\', '>:[', '>:(', ':@',
            '😤', '😡', '😠', '😣',
            '🤢', '🤥', '💩', '👎', '🙅', '🤮', '🤮',
            '🙄'
            # '😬' = unfavorable situation but often used as grinning; better not use
        ]
        emojis_fear = [
            '😧', '😨', '😩', '😰', '😱', '😳', '😵',
            '💀', '☠', '👺',
        ]
        emojis_sadness = [
            # sad
            ':-(', ')-:', ':(', '):', ':-[', ':[', ':-<', ':<', '=(', ':-/', ':/', '=/', ':L',
            '=L', '=/',
            ':S', ':\\', ':-c', ':c', ':{',
            '☹', '🙁', '😖', '😞', '😟', '😦', '😥', '😫', '😒', '😓', '😕', '😐',
            '💔',
            # crying
            ':,(', ":'-(", ":'(", ':"(', ':((',
            '😢', '😭',
        ]
        emojis_joy = [
            # smiling
            ':-)', ':-))', ':)', ':))', '(:', '(-:', '=)', '=]', ':o)', ':]', ':c)', ':>', '8)',
            ':}', ':^)',
            '😊', '😎', '☺', '🙂', '🤗', '🤤', '😇',
            # laughing/grinning
            ':-D', ':D', 'D:', 'X-D', 'x-D', 'XD', 'xD', '=D', '8-D', '8D',
            '😀', '😁', '😂', '🤣', '😃', '😄', '😅', '😆', '😛', '😜', '😝', '🙃',
            # winking
            ';-D', ';D', ':-P', ':P', ":')", ':-p', ':p', '=p', ':-b', ':b', '😋', '😏',
            # ';-)', '(-;', ';)', '(;', '😉', joy or sarcasm, better not use these
            # other
            '👌', '👍', '🎉', '👏',
        ]
        emojis_love = [
            # heart/kiss
            '<3', ':3', ':*', ':^*',
            '❤', '♥', '💕', '💖', '💜', '💙', '💛', '💚', '💗', '💘', '💞', '💋', '💓', '😍', 
            '😘', '😗', '😙', '😚',
            '💏',
        ]
        emojis_surprise = [
            '🤔', '🤨', '😮', '😯', '😲'
        ]

        self._emoji_emotion_lists = {"emojis_anger": emojis_anger, "emojis_fear": emojis_fear,
                                     "emojis_sadness": emojis_sadness, "emojis_joy": emojis_joy,
                                     "emojis_love": emojis_love, "emojis_surprise": emojis_surprise}


<a id="train"></a>
### CLASS: Train the Neural Network

In [None]:
class Training:
    def __init__(self,
                 n_gram: int = 2,
                 vecsize: int = 300,  # word2vec has vector size of 300
                 nb_filters: int = 1200,
                 maxlen: int = 20
                 ):
        self.n_gram = n_gram
        self.vecsize = vecsize
        self.nb_filters = nb_filters
        self.maxlen = maxlen

    @staticmethod
    def _split_into_test_and_training(data) -> Tuple[np.ndarray, np.ndarray,
                                                     np.ndarray, np.ndarray]:
        return train_test_split(np.array(data.text), np.array(data.emotion),
                                test_size=0.2)

    def _texts_to_embedded_vec_matrices(self, text: np.ndarray) -> np.ndarray:
        """
        Converts the training texts to matrices of embedded vectors. Accordingly, each text
        is a matrix of embedded vectors, whereby, each embedded vector represents one word 
        of the text. To convert a word to an embedded vector a pretrained word2vec model is 
        used. Therefore, each vector represents the position of the specific
        word in the pretrained model (the used pretrained model consists of 300 dimensions).
        :param text: the training texts as array
        :return: the converted training data as matrix of matrices of embedded vectors
        """
        text_embedvec = np.zeros(shape=(len(text), self.maxlen, self.vecsize))
        for i in range(len(text)):
            text[i] = word_tokenize(text[i].lower())  # tokenize text
            for j in range(min(self.maxlen, len(text[i]))):  # maximal 'maxlen' words per text
                text_embedvec[i, j] = self.__word_to_embedded_vec(text[i][j])
        return text_embedvec

    def __word_to_embedded_vec(self, word: str):
        assert FAST_VERSION > -1, "This will be slow otherwise"
        return _word2vec[word] if word in _word2vec else np.zeros(self.vecsize)

    # convert labels to binary because nominal values cannot be used in training.
    def _labels_to_binary(self, labels: np.ndarray) -> np.ndarray:
        lb = preprocessing.LabelBinarizer()
        lb.fit(labels)
        # create lookup for used labels (prediction output has the same order as this list)
        self.__classlabels = lb.classes_
        return lb.transform(labels)

    def _train_model(self, x_train, y_train, x_test, y_test):
        # get number of unique labels (= number of different emotions)
        nr_classes = len(y_train[0])
        # build the deep neural network model
        model = Sequential()
        model.add(Convolution1D(nb_filter=self.nb_filters,
                                filter_length=self.n_gram,
                                border_mode='valid',
                                activation='relu',
                                input_shape=(self.maxlen, self.vecsize)))
        model.add(MaxPooling1D(pool_length=self.maxlen - self.n_gram + 1))
        model.add(Flatten())
        model.add(Dense(nr_classes, activation='softmax'))
        model.compile(loss='categorical_crossentropy',
                      optimizer='rmsprop',
                      metrics=['accuracy'])
        # train and evaluate the model
        model.fit(x_train, y_train, validation_data=(x_test, y_test))
        # save the trained model on disc
        model.save(_trained_model_savepath)
        self.__model = model
        # save class labels as json to assign labels back to the model if the model gets used.
        with open(_classlabels_savepath, 'w') as fp:
            json.dump(self.__classlabels, fp)


<a id="score"></a>
### CLASS: Score a given text based on the trained Neural Network

In [None]:
class Score:
    def __init__(self,
                 model: Sequential,  # trained Neural Network
                 classlabels: dict,  # class labels to assign labels back to the model
                 ):
        self.model = model
        self.classlabels = classlabels

    def score(self, text: str):
        # retrieve vector; note: 'ndimn=1' enforces an 1-D array because a 0-D array does 
        # not support len()-function
        matrix = np.array([Training._texts_to_embedded_vec_matrices
                           (np.array(text, ndmin=1, dtype=object))])
        # remove extra dimension induced above
        matrix_reduced = matrix[0, :, :, :]
        # classification using the neural network
        predictions = self.model.predict(matrix_reduced)
        # assign predictions to their class labels
        score = {}
        for idx, classlabel in zip(range(len(self.classlabels)), self.classlabels):
            score[classlabel] = predictions[0][idx]
        return score


<a id="plot"></a>
### CLASS: Plot spider graph

In [None]:
class Plot:
    def __init__(self,
                 # dataframe contains the values to plot. each column gives one individual plot.
                 # default value gives an example on how the dataframe has to look.
                 dataframe: pd.DataFrame = pd.DataFrame({
                                                'Datum': ['Nov. \'17', 'Jan. \'18'],
                                                'Wut': [34, 28],
                                                'Angst': [22, 16],
                                                'Traurigkeit': [10, 14],
                                                'Freude': [18, 26],
                                                'Liebe': [10, 15],
                                                'Überraschung': [6, 7],
                                            }),
                 graphtitle: str = "ExampleTitle",  # title of graph
                 scalegraph: int = 40,  # scale graph in %
                 ):
        self.df = dataframe
        self.graphtitle = graphtitle
        self.scalegraph = scalegraph
     
    @staticmethod
    def plot_graphs(self):
        # apply to all individuals. initialize the figure.
        my_dpi = 96
        plt.figure(figsize=(1000 / my_dpi, 1000 / my_dpi), dpi=my_dpi)
        
        # create a color palette. the different graphs will get created using a color of
        # the specified color palette.
        my_palette = plt.cm.get_cmap("Spectral", len(self.df.index))
        
        # loop to plot
        for row in range(0, len(self.df.index)):
            self.make_one_graph(row=row, title=self.df['Datum'][row], color=my_palette(row))
        
        # show graph
        plt.show(block=True) 
      
    # function to plot one column of the dataset
    def make_one_graph(self, row, title, color):
        # number of variable
        categories = list(self.df)
        categories.remove('Datum')
        N = len(categories)
    
        # what will be the angle of each axis in the plot? (divide the plot / number of variable)
        angles = [n / float(N) * 2 * pi for n in range(N)]
        angles += angles[:1]
    
        # initialise the spider plot
        ax = plt.subplot(2, 2, row + 1, polar=True, )
    
        # first axis to be on top
        ax.set_theta_offset(pi / 2)
        ax.set_theta_direction(-1)
    
        # draw one axe per variable + add labels labels yet
        plt.xticks(angles[:-1], categories, color='#4c4c4c', size=8, fontweight='bold')
    
        # draw ylabels
        ax.set_rlabel_position(0)
        plt.yticks([10, 20, 30, 40, 50, 60, 70, 80, 90],
                   ["10%", "20%", "30%", "40%", "50%", "60%", "70%", "80%", "90%"],
                   color="#4c4c4c", size=7)
        plt.ylim(0, self.scalegraph)
    
        # ind1
        values = self.df.loc[row].drop('Datum').values.flatten().tolist()
        values += values[:1]
        ax.plot(angles, values, color=color, linewidth=2, linestyle='solid')
        ax.fill(angles, values, color=color, alpha=0.4)
    
        plt.suptitle(self.graphtitle, fontsize=14, color='#4c4c4c')
    
        # add a graph title
        plt.title(title, size=11, color=color, y=1.1)


<a id="helptweets"></a>
### HELPER-CLASS: Get informations about Tweets in database.

In [None]:
# MongoDB
db_host = "localhost"
db_port = 27017
db_name = "textmining"
db_collection = "training_tweets"

# create client connected to MongoDB
client = MongoClient(db_host, db_port)
# return client for MongoDB collection named 'db_name'
db = client[db_name]

# count tweets in database
pprint.pprint("Tweets in Database: " + str(db[db_collection].count()))
# print informations about tweets in database
for tweet in db[db_collection].find():
    pprint.pprint("Tweet-ID: " + str(tweet["id"]))
    #pprint.pprint("Tweet-Text: " + str(tweet["full_text"]))
    #pprint.pprint("Tweet-Text-Processed: " + str(tweet["full_text_processed"]))
    pprint.pprint("Tweet-Emotions: " + str(tweet["emotions"]))

<a id="helpdelete"></a>
### HELPER-CLASS: Delete a given Database.

In [None]:
# MongoDB
db_host = "localhost"
db_port = 27017
db_name = "textmining"
db_collection = "training_tweets"


# create client connected to MongoDB
client = MongoClient(db_host, db_port)
# return client for MongoDB collection named 'db_name'
db = client[db_name]

print(db[db_collection].delete_many({}).deleted_count)