## Import Dependencies

In [6]:
#import libraries

import import_ipynb
import requests
import time 
from datetime import timedelta, datetime
import threading

#import ipynb files

from common.Extract_Twitter_Trends import Twitter_Trends
import config.Read_Configs as Read_Configs
from common.App import PubNub_Config
import common.My_Subscribe_Callback as My_Subscribe_Callback
import common.Cassandra_DB_Setup as Cassandra_DB_Setup

#ignore warnings
## Initialize Class for Subscibing Callback for Retreiving Start Time
import warnings
warnings.filterwarnings('ignore')

## Initialize Global Variables

In [2]:
#initialize global variables

g_end_time = None
g_trends = None
g_country = None
g_minutes = None

## Function to Setup PubNub Consumer

In [3]:
def connect_to_PubNub():
    
    # Read Configs from configuration.yaml file
    
    try:
    
        pubnub_configs = Read_Configs.read_pubnub_configurations()

        # Setup Configurations for PubNub Consumer

        stream = PubNub_Config(pubnub_configs['subscribe_key'], pubnub_configs['publish_key'], pubnub_configs['user_id'] )

        
    except Exception as e:        
        
        print("Error connecting to PubNub", str(e))
        
    return stream

## Function to Extract Top Trends Based on API Configurations 

In [4]:
def update_top_trends(trends, country):
    
    # Connect to PubNub and Start Streaming to get Start DateTime
    
    # Update Global variables of Hour and Date 
    
    My_Subscribe_Callback.get_hour = None
    My_Subscribe_Callback.get_date = None 
    
    try:
        stream = connect_to_PubNub()
        FirstSubscribeCallback = My_Subscribe_Callback.FirstSubscribeCallback()
        stream.run(FirstSubscribeCallback)

        time.sleep(10)

        # Update variables of Hour and Date extracted from PubNub stream

        hour = My_Subscribe_Callback.get_hour
        date = My_Subscribe_Callback.get_date

        # Extract Trends from getdaytrends API based on API configurations

        trends = Twitter_Trends(country, date, hour, trends)
        top_trends = trends.get_top_trends()

        # Update Top Trends Global variable based on Extracted Trends

        My_Subscribe_Callback.set_top_trends = top_trends
        print(My_Subscribe_Callback.set_top_trends)
    
        
    except Exception as e:

        print("Error extracting top trends from API", str(e))

## Function to Extract Tweets Count for Top Trends Based on API Configurations 

In [5]:
def update_tweets_for_trends():
    
    # Connect to PubNub and Start Streaming to get Tweets Count for Top Trends
    
    try:
    
        stream = connect_to_PubNub()
        MainSubscribeCallback = My_Subscribe_Callback.MainSubscribeCallback()
        stream.run(MainSubscribeCallback)       
    
    except Exception as e:
        
        print("Error extracting tweets count for top trends", str(e))

## Function to Extract Number of Trends and Location from API

In [6]:
def get_configs_from_trends_API():
    
    # URL for retreiving Twitter Trends configurations from API 
    
    URL = "https://9188-2a02-a210-2ec2-1480-58ec-cacd-5b36-a6cf.ngrok-free.app/trend_read_configurations/"
    
    try:
        # Get request to Trends API

        r = requests.get(url = URL)

        # Load Configurations for Number of Trends and Location

        trends = r.json()[0]
        country = r.json()[1]

        return trends, country
    
    except Exception as e:
        
        print("Error extracting number of trends and location from API", str(e))

## Function to Extract Time Interval in Minutes for Tracking Tweet Count from API

In [7]:
def get_configs_from_time_API():
    
    # URL for retreiving Time configurations from API 
    
    URL = "https://9188-2a02-a210-2ec2-1480-58ec-cacd-5b36-a6cf.ngrok-free.app/get_timer/"
    
    try:
        
        # Get Request to Time API

        r = requests.get(url = URL)

        # Load Configurations for Minutes

        minutes = r.json()

        return minutes
    
    except Exception as e:
        
        print("Error extracting minutes from API", str(e))

## Function to Get All Data from Cassandra Table

In [8]:
def get_trends_tweets_from_Cassandra():
    
    try:
    
        # Connect to Cassandra DB

        session = Cassandra_DB_Setup.cassandra_connection(zip_path, cassandra_configs)

        # Select * from table

        Cassandra_DB_Setup.select_from_table(session, "twitter_trends", "tweets_per_trend")
        
    
    except Exception as e:
        
        print("Error selecting table data from Cassandra", str(e))

## Function to Initiate Thread for Updating Configurations and fetching Top Trends as soon as they get updated on the API

In [9]:
def thread1():
    
    # Declare Global Variables
    
    global g_end_time, g_trends, g_country, g_minutes
    
    # Infinite Loop
    
    while True:
        
        try:
        
            # Retreive Number of Top Trends and Country from API

            trends, country = get_configs_from_trends_API()

            time.sleep(60)

            # Retrive Minutes from API

            minutes = get_configs_from_time_API()

            #Check if any of the configurations is updated

            if g_end_time is None or g_end_time < datetime.now() or trends!= g_trends or country != g_country or minutes !=g_minutes:

                    # Update Global Variables

                    g_trends = trends
                    g_country = country
                    g_minutes = minutes

                    # Update Top Trends based on new configurations

                    update_top_trends(trends, country)
                    print(minutes, My_Subscribe_Callback.set_minutes)
                    if minutes not in My_Subscribe_Callback.set_minutes:
                        My_Subscribe_Callback.set_minutes.append(minutes)
                        My_Subscribe_Callback.start_times.append(None)

                    # Modify Time Interval to Automatically Update Trends for the next hour

                    g_end_time = datetime.now() + timedelta(hours=1)
                    print(g_end_time, datetime.now())
                    
        except Exception as e:
            
            print("Error in Thread 1", str(e))

## Function to Initiate Thread to Retreive Tweet Counts based on API configurations 

In [10]:
def thread2():
    
    try:

        # Read Cassandra DB configrations from configuration.yaml

        zip_path, cassandra_configs = Read_Configs.read_cassandra_configurations()

        # Update global variables for Cassadra connection

        My_Subscribe_Callback.set_zip_path = zip_path
        My_Subscribe_Callback.set_cassandra_configs = cassandra_configs

        # Start Retreiving Tweet Counts for Top Trends

        update_tweets_for_trends()
        
    
    except Exception as e:
        
        print("Error in Thread 2", str(e))

## MAIN function

In [11]:
def main():
    
    # Initiate two threads for parallel update of configurations from API and data retreival from PunNub
    
    try:
        
        # Thread 1 for parallel update of configurations from API
        
        t1=threading.Thread(target=thread1)
        t1.daemon = True  # set thread to daemon ('ok' won't be printed in this case)
        t1.start()
        
        
        time.sleep(120)
        
        
        # Thread 2 for data retreival from PunNub
        
        t2=threading.Thread(target=thread2)
        t2.daemon = True  # set thread to daemon ('ok' won't be printed in this case)
        t2.start()
        
    except Exception as e:
        
        print ("Error: unable to start threads", str(e))
        
    while True:
        
        pass


## Call MAIN to Start the Process

In [None]:
if __name__ == "__main__":
    main()

C:\Users\tarvi\Twitter Trends Analysis\Phase 4\src\data\configuration.yaml
13 2023-02-09
https://getdaytrends.com/netherlands/2023-02-09/13/
{1: 'feynec', 2: 'emine', 3: 'unilever'}
6 [1, 5, 10]
2023-05-19 10:59:34.450817 2023-05-19 09:59:34.450817
C:\Users\tarvi\Twitter Trends Analysis\Phase 4\src\data\configuration.yaml
C:\Users\tarvi\Twitter Trends Analysis\Phase 4\src\data\configuration.yaml
Process Started at:  2023-02-09 13:53:24+00:00
<cassandra.cluster.Session object at 0x000002157F0B0F90>
1  minutes have passed  2023-02-09 13:54:24+00:00
Here's twitter tweets {'1675947204_1': {'feynec': 0, 'emine': 0, 'unilever': 0}}
INSERT INTO twitter_trends.tweets_per_trend (StreamTimeKey, TweetCountForTrends) VALUES (%s, %s) 1675947204_1 {'feynec': 0, 'emine': 0, 'unilever': 0}
1  minutes have passed  2023-02-09 13:55:24+00:00
Here's twitter tweets {'1675947264_1': {'feynec': 0, 'emine': 0, 'unilever': 0}}
INSERT INTO twitter_trends.tweets_per_trend (StreamTimeKey, TweetCountForTrends) VAL

1  minutes have passed  2023-02-09 14:14:24+00:00
Here's twitter tweets {'1675948404_1': {'feynec': 0, 'emine': 0, 'unilever': 1}}
INSERT INTO twitter_trends.tweets_per_trend (StreamTimeKey, TweetCountForTrends) VALUES (%s, %s) 1675948404_1 {'feynec': 0, 'emine': 0, 'unilever': 1}
1  minutes have passed  2023-02-09 14:15:24+00:00
Here's twitter tweets {'1675948464_1': {'feynec': 0, 'emine': 0, 'unilever': 1}}
INSERT INTO twitter_trends.tweets_per_trend (StreamTimeKey, TweetCountForTrends) VALUES (%s, %s) 1675948464_1 {'feynec': 0, 'emine': 0, 'unilever': 1}
1  minutes have passed  2023-02-09 14:16:24+00:00
Here's twitter tweets {'1675948524_1': {'feynec': 0, 'emine': 0, 'unilever': 1}}
INSERT INTO twitter_trends.tweets_per_trend (StreamTimeKey, TweetCountForTrends) VALUES (%s, %s) 1675948524_1 {'feynec': 0, 'emine': 0, 'unilever': 1}
1  minutes have passed  2023-02-09 14:17:24+00:00
Here's twitter tweets {'1675948584_1': {'feynec': 0, 'emine': 0, 'unilever': 1}}
INSERT INTO twitter_tre

1  minutes have passed  2023-02-09 14:34:24+00:00
Here's twitter tweets {'1675949604_1': {'feynec': 0, 'emine': 0, 'unilever': 1}}
INSERT INTO twitter_trends.tweets_per_trend (StreamTimeKey, TweetCountForTrends) VALUES (%s, %s) 1675949604_1 {'feynec': 0, 'emine': 0, 'unilever': 1}
1  minutes have passed  2023-02-09 14:35:24+00:00
Here's twitter tweets {'1675949664_1': {'feynec': 0, 'emine': 0, 'unilever': 1}}
INSERT INTO twitter_trends.tweets_per_trend (StreamTimeKey, TweetCountForTrends) VALUES (%s, %s) 1675949664_1 {'feynec': 0, 'emine': 0, 'unilever': 1}
6  minutes have passed  2023-02-09 14:35:24+00:00
Here's twitter tweets {'1675949364_6': {'feynec': 0, 'emine': 0, 'unilever': 1}}
INSERT INTO twitter_trends.tweets_per_trend (StreamTimeKey, TweetCountForTrends) VALUES (%s, %s) 1675949364_6 {'feynec': 0, 'emine': 0, 'unilever': 1}
1  minutes have passed  2023-02-09 14:36:24+00:00
Here's twitter tweets {'1675949724_1': {'feynec': 0, 'emine': 0, 'unilever': 1}}
INSERT INTO twitter_tre

## Create Table in Cassandra

In [None]:
## Uncomment and set column name, data types and primary keys to create table in Cassandra DB 

## Set varibles

# columns = ['StreamTimeKey', 'TweetCountForTrends' ]
# data_types = ['text', 'map<text, int>']
# primary_key = ['StreamTimeKey']

## Create table

# Cassandra_DB_Setup.create_table(session, "twitter_trends", "tweets_per_trend", columns, data_types, primary_key)