In [1]:
'''
Author: Matthew Sun
Date: August 24, 2022
Description: Program pulls messages from stream and displays word count, 
potential trends, and hashtag relations across a moving time window.
'''

'\nAuthor: Matthew Sun\nDate: August 24, 2022\nDescription: Program pulls messages from stream and displays word count, \npotential trends, and hashtag relations across a moving time window.\n'

In [2]:
from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1

from collections import Counter
from collections import deque

import os
import json
import sys
import re
import time
from math import*

import requests

In [3]:
def convert_counter_to_dict(counter):
    '''
    Converts a counter.most_common(n) list of tuples into a dictionary
    Parameters: counter - a counter.most_common(n) list of tuples
    Returns: counterDict - a dictionary with words and their frequency
    '''
    counterDict = {}
    for pair in list(counter):
        counterDict[pair[0]] = pair[1]
    
    return counterDict
            
def rank_hashtags(counter):
    '''
    Converts a counter into a dictionary with addtional information including rank 
    and relative mentions
    Parameters: counter - a counter
    Returns: rankDict - a dictionary with additional information
    '''
    rankDict = {}
    rankAssigned = 1
    hashtagTotal = sum(counter.values())
    
    for word in counter:
        rankDict[word] = {'rank': rankAssigned, 'total_mentions': counter[word], 'relative_mentions': round(float(counter[word] / hashtagTotal), 4)}
        rankAssigned += 1
    
    return rankDict

def lin_slope_per_word(counterQueue, n):
    '''
    Converts a counter.most_common(n) list of tuples into a dictionary
    Parameters: counterQueue - a deque of counters
    n - an integer determining how many hashtags are in the returned dictionary
    Returns: counterDict - a dictionary with words and their frequency
    '''
    
    queueSum = Counter()
    for counter in counterQueue:
        queueSum += counter
    
    slopeDict = {}

    for pair in queueSum.most_common(n):
        # Process for calculating the slope value for linear regression
        xVals = []
        yVals = []
        for i in range(len(counterQueue)):
            xVals.append(i + 1)
            if pair[0] in counterQueue[i]:
                yVals.append(counterQueue[i][pair[0]])
            else:
                yVals.append(0)
        xMean = sum(xVals) / len(xVals)
        yMean = sum(yVals) / len(yVals)

        xySum = 0
        xxSum = 0
        for i in range(len(xVals)):
            xySum += (xVals[i] - xMean) * (yVals[i] - yMean)
            xxSum += (xVals[i] - xMean) ** 2

        slopeDict[pair[0]] = round(xySum / xxSum, 4)
    
    return slopeDict

def avg_lin_slope(counterQueue, n):
    '''
    Calculates the average linear slope value for the most common n words from the
    sum of the counterQueue
    Parameters: counterQueue - a deque of counters
    n - an integer determining how many hashtags are in the returned dictionary
    Returns: round(slopeSum / n, 4) - the average slope value of the n most popular
    words rounded to 4 decimals
    '''
    
    queueSum = Counter()
    for counter in counterQueue:
        queueSum += counter
    
    slopeSum = 0
    for pair in queueSum.most_common(n):
        xVals = []
        yVals = []
        for i in range(len(counterQueue)):
            xVals.append(i + 1)
            if pair[0] in counterQueue[i]:
                yVals.append(counterQueue[i][pair[0]])
            else:
                yVals.append(0)
        xMean = sum(xVals) / len(xVals)
        yMean = sum(yVals) / len(yVals)

        xySum = 0
        xxSum = 0
        for i in range(len(xVals)):
            xySum += (xVals[i] - xMean) * (yVals[i] - yMean)
            xxSum += (xVals[i] - xMean) ** 2

        slopeSum += xySum / xxSum       
    
    return round(slopeSum / n, 4)

def square_rooted(x):
    '''
    Returns a list with its values squared
    Parameters: x - a list of numbers
    Returns: sqrt(sum([a*a for a in x])) - list with squared values
    '''
             
    return sqrt(sum([a*a for a in x]))
 
def cosine_similarity(x,y):
    '''
    Calculates the cosine similarity between two vectors
    Parameters: x - a list of numbers
    y - a list of numbers
    Returns: round(numerator/float(denominator), 4) - cosine similarity rounded to
    4 decimals
    '''
    
    numerator = sum(a*b for a,b in zip(x,y))
    denominator = square_rooted(x)*square_rooted(y)
    return round(numerator/float(denominator), 4)

def related_hashtags_dict(counterQueue, targetWords):
    '''
    Creates a dictionary were each key is a potential trend word and the value is a
    list of the other most popular hashtags and their cosine similarity score to
    that key in descending order
    Parameters: counterQueue - a queue of counters
    targetWords - a list of tuples in the format [('hashtag word x', M), ...] where M is
    the slope value of 'hashtag word x'
    Returns: similairtyDict - dictionary of potential trend words and their cosine
    similarities to the other most popular hashtags
    '''
    
    queueSum = Counter()
    # A sum of all counters is neccessary to determine what words are most popular
    # overall.
    for counter in counterQueue:
        queueSum += counter
    
    timeWindows = []
    # Seperates the queue into 5 seperate sections
    for i in range(0, len(counterQueue), 5):
        window = []
        for j in range(i, i+5):
            window.append(counterQueue[j])
        timeWindows.append(window)
    
    slopeDictList = []
    # slopeDictList will contain 5 dictionaries where each dictionary contains a
    # word as a key and their slope value as the value.
    for q in timeWindows:
        slopeDictList.append(lin_slope_per_word(q, 100))

    mostCommonWords = queueSum.most_common(50)
    commonWordSlopes = {}
    # Making a dictionary with the top 50 hashtags using the slopeDictList.
    # commonWordSlopes will have hashtag words as keys and a list of 5 slope values
    # from each time window as the values.
    # If a word is not found in one of the elements of timeWindows, it will be
    # assigned a 0 in that posistion in tempSlopeList.
    for pair in mostCommonWords:

        tempSlopeList = []
        for d in slopeDictList:
            if pair[0] in d:
                tempSlopeList.append(d[pair[0]])
            else:
                tempSlopeList.append(0)

        commonWordSlopes[pair[0]] = tempSlopeList

    # Similarity test.
    # Each trending word is compared with all other words. 
    similarityDict = {}
    for targetWord in targetWords:
        if targetWord[0] in commonWordSlopes:
            cosSimList = []
            for pair in mostCommonWords:
                if pair[0] != targetWord[0]:
                    cosSim = cosine_similarity(commonWordSlopes[targetWord[0]], commonWordSlopes[pair[0]])
                    cosSimList.append((pair[0], cosSim))
            similarityDict[targetWord[0]] = cosSimList 

    for word in similarityDict:
        similarityDict[word].sort(key=lambda x: x[1], reverse=True)
    
    return similarityDict
            

In [4]:
MSGLIST = []
# project_id = 'stingray-295922'
# subscription_id = 'stringray-intelligence-dev' 
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    '''
    Function for requesting and acknowledging messages from stream
    '''
    
    global MSGLIST

    data = message.data.decode()
    dictionary = json.loads(data)
    MSGLIST.append(dictionary)

    message.ack()
    
def word_count_hashtags(text):
    '''
    Counts all hashtags in a given string
    Parameters: text - a string
    Returns: hashtagCount - a counter with hashtags and their number of mentions
    '''
    
    hashtagCount = Counter()
    for x in text:
        for hashTag in re.findall('#(\w+)', x):
            hashtagCount[hashTag.casefold()] += 1
    
    return hashtagCount

def preprocess(str):
    '''
    Puts a string through the proprocess functions which will remove any symbols
    used in replies, remove urls, and remove forwards
    '''
    str = remove_forward(str)
    str = remove_url(str)
    str = remove_reply(str)
    
    return str

def remove_emojis(str):
    emojis = re.compile(pattern = "["
        u"\U0001F600-\U0001F64F"  # emoticons
        u"\U0001F300-\U0001F5FF"  # symbols & pictographs
        u"\U0001F680-\U0001F6FF"  # transport & map symbols
        u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
                           "]+", flags = re.UNICODE)
    try:
        str = emojis.sub(r'', str)
        return str
    except:
        return str

def remove_forward(str):
    try:
        str = re.sub(r'//@.+?:', '', str)
        return str
    except:
        return str
    
def remove_url(str):
    try:
        str = re.sub(r"http\S+", '', str)
        return str
    except:
        return str

def remove_reply(str):
    try:
        str = re.sub(r'回复@.+?:', '', str)
        return str
    except:
        return str

def translate_zhHans_to_eng(chineseWord):
    '''
    Translate string from simplified Chinese to English
    Parameters: chineseWord - a string of chinese characters
    Returrns: engWord - translated string in English
    '''
    
    url = 'http://35.243.155.154:8080/api/translate'
    myObj = {"q": chineseWord,"source":"zh-Hans","target":"en"}
    r = requests.post(url, json=myObj)
    try:
        engWord = json.loads(r.text)['translatedText']
        return engWord
    except KeyError:
        return chineseWord
    

def pull_count():
    '''
    Pulls 100 messages per iteration from stream for certain amount of time and word
    counts all hashtags
    Parameters: None
    Returns: wordCounter - a counter containing hashtags words and their frequencies
    '''
    
    global MSGLIST
    wordCounter = Counter()
    # Timer set to pull for 60 seconds
    t = int(time.time()) + 60
    while (time.time() <= t): 
        
        textList = []
        subscriber = pubsub_v1.SubscriberClient()
        subscription_path = 'projects/stingray-295922/subscriptions/stingray-intelligence-dev'
        timeout = 1.5
        flow_control = pubsub_v1.types.FlowControl(max_messages=100)
        streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback, flow_control=flow_control)

        with subscriber:
            try:
                streaming_pull_future.result(timeout=timeout)
            except TimeoutError:
                streaming_pull_future.cancel()  # Trigger the shutdown.
                streaming_pull_future.result()  # Block until the shutdown is complete.

        for msg in MSGLIST:
            if 'status' in msg['text'].keys():
                text = msg['text']['status']['text']
                text = preprocess(text)
                textList.append(text)
            elif 'comment' in msg['text'].keys():
                text = msg['text']['comment']['status']['text']
                text = preprocess(text)
                textList.append(text)

        wordCounter += word_count_hashtags(textList)
        
        MSGLIST = []
        textList = []
    
    return wordCounter

In [None]:
def main():
    '''
    Put counters from every minute into queue and every minute the oldest counter will
    be taken out. Display the sum of all counters of the last 5 minutes. bigQueue will 
    contain the information from the last 25 minutes which is needed to compare with 
    the most recent 5 minutes of information in order to find potential trends.
    Parameters: None
    Returns: None
    '''

    bigQueue = deque()
    maxLength = 25

    chnQueue = deque()
    chnQueueSum = Counter()
    
    for i in range(5):
        chnCounter = pull_count()
        chnQueue.append(chnCounter)
        chnQueueSum += chnCounter
        
        if len(bigQueue) < maxLength:
            bigQueue.append(chnCounter)
        
    print('Initial: ' + str(chnQueueSum.most_common(100)) + '\n')


    timesUpdated = 0
    potentialRisers = []
    while True:
        # Updating Chinese queue
        chnQueueSum -= chnQueue[0]
        chnCounter = pull_count()
        chnQueueSum += chnCounter
        chnQueue.popleft()
        chnQueue.append(chnCounter)

        timesUpdated += 1
        print('Update ' + str(timesUpdated) + ': ' + str(chnQueueSum.most_common(100)) + '\n')
        
        if len(bigQueue) < maxLength:
            bigQueue.append(chnCounter)
        else:
            bigQueue.popleft()
            bigQueue.append(chnCounter)
        
        # Once bigQueue reaches wanted length the program will look for potential trends
        if len(bigQueue) == maxLength:
            slopeDict = lin_slope_per_word(chnQueue, 50)
            avgSlope = avg_lin_slope(bigQueue, 100)
            
            slopeSum = 0
            cutOff = 0
            # First cut: only words that have a greater slope value than avgSlope move on.
            for word in slopeDict:
                if slopeDict[word] > avgSlope:
                    slopeSum += slopeDict[word]
                    potentialRisers.append((word, slopeDict[word]))
            
            cutOff = slopeSum / len(potentialRisers)
            trendingWords = []
            # To further reduce noise, only words with above average slope values among
            # potentiRisers will be added to trendingWords,
            for pair in potentialRisers:
                if pair[1] > cutOff and pair[1] > 0:
                    trendingWords.append(pair)
            
            print(avgSlope)
            print(cutOff)
            trendingWords.sort(key=lambda pair: pair[1], reverse=True)
            print('Potential trends:', trendingWords, '\n')
            
            # Finding related hashtags
            if len(trendingWords) > 0:
                try:
                    hashtagRelations = related_hashtags_dict(bigQueue, trendingWords)
                    relationsDict = {}
                    # The 3 most related hashtags for each potential trend are displayed.
                    for word in hashtagRelations:
                        relationsDict[word] = hashtagRelations[word][:3]
                    print('Hashtag relations:', relationsDict, '\n\n')
                # Occasionally the program will run into a zero division error.
                # This is most likely because words do not appear in certain time windows.
                except ZeroDivisionError:
                    print('Unable to generate hashtag relations: zero division error')
                    
            potentialRisers = []

if __name__ == "__main__":
    main()