In [16]:
import time
import pymongo
import psycopg2
import pandas as pd
import json
from datetime import datetime
import os
import sys
from cache import SearchCache

In [25]:
class SearchEngine_postgre:
    def __init__(self, cache_size=100, cache_ttl=3600):
        """
        Initializes a SearchEngine_postgre object with a specified database type and cache settings.

        Args:
        - cache_size (int): Maximum number of items to store in cache
        - cache_ttl (int): Time-to-live (in seconds) for cached items
        """
        # initialize a cache object for the search engine using the SearchCache class
        self.cache = SearchCache(cache_size, cache_ttl)
        self.db_conn = psycopg2.connect(database="postgres", user="postgres", password="priyanka", host="localhost")
        self.users_cursor = self.db_conn.cursor()
        self.user_table = 'twitter_users_partitioned'
    
    ## top 10 Most popular users
    def most_popular_users(self, n=10):
            """
            Returns the n most popular Twitter users along with their tweets.

            Args:
            - n (int): Number of users to return.

            Returns:
            - list: List of the top n Twitter users, each represented as a dictionary with a 'username' key and a 'tweets' key.
            """
            start_time = time.time()
            
            if 'most_popular_users' in self.cache:
                print("Retrieving 'most popular users' from cache!")
                end_time = time.time()
                print(f"Query took {end_time - start_time:.4f} seconds\n")
                return self.cache['most_popular_users']
            else:
                print(f"New entry, retrieving 'most popular users' from database!")

            query = f"""
                SELECT user_id, name, twitter_join_date, location, 
                verified, followers_count, friends_count, favourites_count
                FROM (
                    SELECT user_id, name, twitter_join_date, location, 
                    verified, followers_count, friends_count, favourites_count,
                    DENSE_RANK() OVER (PARTITION BY user_id ORDER BY followers_count DESC) AS rnk
                FROM {self.user_table}
                        ) AS B
                WHERE rnk = 1
                ORDER BY followers_count DESC
                LIMIT {n}
                """
            
            self.users_cursor.execute(query)
            results = self.users_cursor.fetchall()
            
            users = []
            for row in results:
                user = {
                    'user_id': row[0],
                    'name': row[1],
                    'twitter_join_date': row[2],
                    'location': row[3],
                    'verified': row[4],
                    'followers_count': row[5],
                    'friends_count': row[6],
                    'favourites_count': row[7],
                }
                users.append(user)

            users = pd.DataFrame(users)
            self.cache['most_popular_users'] = users.to_json(orient='records')
            self.cache.save_checkpoint()
            end_time = time.time()
            print(f"Query took {end_time - start_time:.4f} seconds\n")
            
            return users  
        
    ## search by user_name

    def search_user(self, string_user):
            """
            Returns the tweets with string or hash_tag provided.

            Args:
            - string_user: # or string to match.

            Returns:
            - list: List of the tweets that match the string
            """
            start_time = time.time()
            if 'users_name'+string_user.replace(" ","") in self.cache:
                print("Retrieving 'users_name' from cache!")
                end_time = time.time()
                print(f"Query took {end_time - start_time:.4f} seconds\n")
                return self.cache['users_name'+string_user.replace(" ","")]
            else:
                print(f"New entry, retrieving 'users_name' from database!")


            query = f"""

                WITH ranked_users AS (
                SELECT *, ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY followers_count DESC) AS rn
                    FROM {self.user_table}
                    WHERE name LIKE %s
                )
                SELECT name,screen_name FROM ranked_users
                WHERE rn = 1
                ORDER BY verified DESC, followers_count DESC;
                """
            
            # Add wildcard characters to the desired_username for searching similar usernames
            username_pattern = '%' + string_user + '%'

            self.users_cursor.execute(query, (username_pattern,))

            results = self.users_cursor.fetchall()
            
            users = []
            for row in results:
                user = {
                    'name': row[0],
                    'screen_name': row[1]
                    }
                users.append(user)

            users = pd.DataFrame(users)
            if users.shape[0] == 0:
                users = pd.DataFrame(["No User found"], index= [string_user])
                print("No User found")
            self.cache['users_name'+string_user.replace(" ","")] = users.to_json(orient='records')
            self.cache.save_checkpoint()
            end_time = time.time()
            print(f"Query took {end_time - start_time:.4f} seconds\n")
            
            return users  

In [26]:
# create a SearchEngine object with cache size of 50 and cache TTL of 3600 seconds
search_engine_postgre = SearchEngine_postgre(cache_size=50, cache_ttl=3600)

Cache file is corrupted.
Creating new cache.
Checkpoint saved!


In [4]:
# Retrieving the Most Popular Users and updating the Checkpoint
import time
starttime = time.perf_counter()
mpu = search_engine_postgre.most_popular_users()
print(time.perf_counter() - starttime)

New entry, retrieving 'most popular users' from database!
Checkpoint saved!
Query took 2.4680 seconds

2.4713431000127457


In [5]:
# Printing Most Popular Users
mpu

Unnamed: 0,user_id,name,twitter_join_date,location,verified,followers_count,friends_count,favourites_count
0,813286,Barack Obama,2007-03-05,"Washington, DC",True,115603427,607612,11
1,18839785,Narendra Modi,2009-01-10,India,True,55786179,2364,0
2,807095,The New York Times,2007-03-02,New York City,True,46361159,904,18483
3,145125358,Amitabh Bachchan,2010-05-18,"Mumbai, India",True,41596464,1833,75
4,101311381,Shah Rukh Khan,2010-01-02,,True,40028019,77,32
5,471741741,PMO India,2012-01-23,India,True,34461808,486,0
6,113419517,Hrithik Roshan,2010-02-11,,True,28170371,90,172
7,92724677,Virender Sehwag,2009-11-26,India,True,20571543,143,4627
8,405427035,Arvind Kejriwal,2011-11-05,India,True,18339248,221,618
9,14293310,TIME,2008-04-03,,True,17057740,494,536


In [None]:
# Retreving the tweets from cache and checking the time taken to retrev data from cache
import time
starttime = time.perf_counter()
mpu_cache = search_engine_postgre.most_popular_users()
print(time.perf_counter() - starttime)

In [None]:
# Printing Most Popular Users from cache
mpu_cache

In [27]:
import time
starttime = time.perf_counter()
user_search = search_engine_postgre.search_user("Nikhitha")
print(time.perf_counter() - starttime)

New entry, retrieving 'users_name' from database!


TypeError: Index(...) must be called with a collection of some kind, 'Nikhitha' was passed

In [21]:
user_search

In [None]:
import time
starttime = time.perf_counter()
user_search_cache = search_engine_postgre.search_user("Narendra Modi")
print(time.perf_counter() - starttime)

In [None]:
user_search_cache

In [6]:
db_client = pymongo.MongoClient('mongodb+srv://priyankanagasuri:littlegirl369@cluster1.dfkwly1.mongodb.net/')
db = db_client.get_database("twitter_db")

In [7]:
class SearchEngine_mongodb:
    def __init__(self, cache_size=100, cache_ttl=3600):
        """
        Initializes a SearchEngine object with a specified database type and cache settings.

        Args:
        - cache_size (int): Maximum number of items to store in cache
        - cache_ttl (int): Time-to-live (in seconds) for cached items
        """
        # initialize a cache object for the search engine using the SearchCache class
        self.cache = SearchCache(cache_size, cache_ttl)
        self.db_client = pymongo.MongoClient('mongodb+srv://priyankanagasuri:littlegirl369@cluster1.dfkwly1.mongodb.net/')
        self.tweets_collection = self.db_client['twitter_db']['tweets_final']
        
   ## Search by string
    def search_by_string(self, string_to_match):
            """
            Returns the tweets with string provided.

            Args:
            - string_to_match: string to match.

            Returns:
            - list: List of the tweets that match the string
            """
            start_time = time.time()            
            if 'string_match_cache' + string_to_match in self.cache:
                print("Retrieving tweets with " + string_to_match +  " from cache!")
                end_time = time.time()
                print(f"Query took {end_time - start_time:.4f} seconds\n")
                return self.cache['string_match_cache' + string_to_match]
            else:
                print(f"New entry, retrieving tweets " + string_to_match + " from database!")

            results = list(self.tweets_collection.aggregate([
                {
                "$search": {
                "index": "search_tweets",
                "text": {
                "query": string_to_match,
                "path": "tweet"
                }
                }
                }   
                ]))

            users = []
            for row in results:
                user = {
                    'Account_name': row["Account_Name"],
                    'text': row["tweet"],
                    'date': row["Time_stamp"],
                    'Likes': row["Likes"]
                    }
                users.append(user)

            users = pd.DataFrame(users)
            cache_name = "string_match_cache" + string_to_match
            self.cache[cache_name] = users.to_json(orient='records')
            self.cache.save_checkpoint()
            end_time = time.time()
            print(f"Query took {end_time - start_time:.4f} seconds\n")
            
            return users  

   ## Search by Hastag
    def search_by_hashtag(self, hashtag_to_match):
            
            """
            Returns the tweets with hashtag provided.

            Args:
            - hashtag_to_match: string to match.

            Returns:
            - list: List of the tweets that match the hashtag
            """
            
            start_time = time.time()
            if 'hashtag_match_cache' + hashtag_to_match in self.cache:
                print("Retrieving tweets with  hashtags " + hashtag_to_match +" from cache!")
                end_time = time.time()
                print(f"Query took {end_time - start_time:.4f} seconds\n")
                return self.cache['hashtag_match_cache' + hashtag_to_match]
            else:
                print(f"New entry, retrieving tweets " + hashtag_to_match + " from database!")

            results = list(self.tweets_collection.aggregate([
                {
                "$search": {
                "index": "search_tweets",
                "text": {
                "query": hashtag_to_match,
                "path": "hashtags"
                }
                }
                }
                ]))

            users = []
            for row in results:
                user = {
                    'Account_name': row["Account_Name"],
                    'text': row["tweet"],
                    'date': row["Time_stamp"],
                    'Likes': row["Likes"]
                    }
                users.append(user)

            users = pd.DataFrame(users)
            cache_name = "hashtag_match_cache" + hashtag_to_match
            self.cache[cache_name] = users.to_json(orient='records')
            self.cache.save_checkpoint()
            end_time = time.time()
            print(f"Query took {end_time - start_time:.4f} seconds\n")

            return users

   ## Top 10 Hastags
    def get_top_hashtags(self,n):

        """
        Returns the tweets with hashtag provided.

        Args:
        - n: No of hashtags to return

        Returns:
        - list: List of the top hashtags
        """

        start_time = time.time()
        if 'top_hashtags_' + str(n) in self.cache:
            print("Retrieving top " +str(n)+ "  hashtags from cache!")
            end_time = time.time()
            print(f"Query took {end_time - start_time:.4f} seconds\n")
            return self.cache['top_hashtags_' + str(n)]
        else:
            print(f"New entry, retrieving top " + str(n) +" hashtags from database!")


            # Pipeline to aggregate and retrieve top 10 hashtags based on likes_count
            pipeline = [
                # Unwind the hashtags array
                {"$unwind": "$hashtags"},
                # Group by hashtag and count occurrences
                {"$group": {"_id": "$hashtags", "count": {"$sum": 1}}},
                # Sort by count in descending order
                {"$sort": {"count": -1}},
                # Limit to top 10 hashtags
                {"$limit": n}
            ]

            # Execute the aggregation pipeline
            results = self.tweets_collection.aggregate(pipeline)
            
            users = pd.DataFrame(results._CommandCursor__data)
            users.columns = ["hashtag","count"]    
            cache_name = "top_hashtags_" + str(n)
            self.cache[cache_name] = users.to_json(orient='records')
            self.cache.save_checkpoint()
            end_time = time.time()
            print(f"Query took {end_time - start_time:.4f} seconds\n")

        return users

   ## Top 20 Tweets
    def top_tweets(self,n):

        start_time = time.time()
        if 'top_tweets_' + str(n) in self.cache:
            print("Retrieving top " +str(n)+ "  tweets from cache!")
            end_time = time.time()
            print(f"Query took {end_time - start_time:.4f} seconds\n")
            return self.cache['top_tweets_' + str(n)]
        else:
            print(f"New entry, retrieving top " + str(n) +" tweets from database!")

        results =self.tweets_collection.find().sort("retweets_count",-1).limit(n)
        

        users = pd.DataFrame(results)
        users = users[["Time_stamp","Account_Name","tweet","retweets_count"]]    
        cache_name = "top_tweets_" + str(n)
        self.cache[cache_name] = users.to_json(orient='records')
        self.cache.save_checkpoint()
        end_time = time.time()
        print(f"Query took {end_time - start_time:.4f} seconds\n")

        return users


In [8]:
# create a SearchEngine object with cache size of 50 and cache TTL of 3600 seconds
search_engine2 = SearchEngine_mongodb(cache_size=50, cache_ttl=3600)

Cache file is corrupted.
Creating new cache.
Checkpoint saved!


In [9]:
# Retrieving the string_search tweets and updating the Checkpoint
starttime = time.perf_counter()
string_search = search_engine2.search_by_string(  "ram"  )
print(time.perf_counter() - starttime)

New entry, retrieving tweets ram from database!
Checkpoint saved!
Query took 0.3270 seconds

0.32755220000399277


In [10]:
# printing the String_search tweets
string_search

Unnamed: 0,Account_name,text,date,Likes
0,Jitendra Nikam,@ArvindKejriwal @Olacabs Thank you for opening...,Sat Apr 25 13:09:26 +0000 2020,0
1,Dr. Latha,"Hindus, who voted Modi/BJP, celebrated Ugadi a...",Sat Apr 25 04:10:37 +0000 2020,174
2,Mayank Pandey,Logon se vinti hai kripa kar ke sabji walon se...,Sat Apr 25 13:54:36 +0000 2020,0
3,And yet it moves,@mkatju Please put your question like this: I...,Sun Apr 12 18:43:29 +0000 2020,0
4,MOLLY5 # 100% Follow back to Sanghis,Corona ki dawai nahin \nMacchar ke liye coil n...,Sat Apr 25 14:43:17 +0000 2020,0
5,Vishva Hindu Parishad -VHP,Press release of shri Milind Parande on Ram Na...,Sat Apr 11 10:22:55 +0000 2020,90
6,RAM X pro,Still in Quarrantine to fight covid 19 Corona ...,Sat Apr 25 12:22:02 +0000 2020,0
7,Cool mind üáÆüá≥,@narendramodi There should be limit of appease...,Sat Apr 25 11:58:01 +0000 2020,1
8,Prof Hari Om,"Classical Example Of Secularism\nHoli: No, No....",Sat Apr 25 08:37:49 +0000 2020,41
9,Rashika Dixit,@MSBhatiaIPS Let's fight against CORONA\n\nMA...,Wed Apr 15 08:02:05 +0000 2020,18


In [None]:
# Retreving the tweets from cache and checking the time taken to retrev data from cache
import time
starttime = time.perf_counter()
cache_string_search = search_engine2.search_by_string("ram")
print(time.perf_counter() - starttime)

In [None]:
# Printing the string_search tweets from cache
cache_string_search

In [11]:
# Retrieving the hashtag_search tweets and updating the Checkpoint
starttime = time.perf_counter()
hashtag_search = search_engine2.search_by_hashtag("COVID19InTurkeyPrisons")
print(time.perf_counter() - starttime)

New entry, retrieving tweets COVID19InTurkeyPrisons from database!
Checkpoint saved!
Query took 0.0436 seconds

0.0444226999534294


In [12]:
# printing the Hashtag_search tweets
hashtag_search

Unnamed: 0,Account_name,text,date,Likes
0,Lalekosan,#COVID19InTurkeyPrisons,Sun Apr 12 18:28:07 +0000 2020,0
1,Asya ƒ∞pek√ßi,A lot of children and patient are face to face...,Sun Apr 12 18:41:13 +0000 2020,0
2,Asya ƒ∞pek√ßi,A lot of children and patient are face to face...,Sun Apr 12 18:43:12 +0000 2020,0


In [None]:
# Retreving the tweets from cache and checking the time taken to retrev data from cache
starttime = time.perf_counter()
hashtag_search_cache = search_engine2.search_by_hashtag("COVID19InTurkeyPrisons")
print(time.perf_counter() - starttime)

In [None]:
# printing the Hastage_search tweets from cache
hashtag_search_cache

In [13]:
# Retrieving the top_10_hastags tweets and updating the Checkpoint
starttime = time.perf_counter()
top_10_hashtags = search_engine2.get_top_hashtags(10)
print(time.perf_counter() - starttime)

New entry, retrieving top 10 hashtags from database!
Checkpoint saved!
Query took 0.2573 seconds

0.25722710002446547


In [14]:
# printing the top_10_hastages 
top_10_hashtags

Unnamed: 0,hashtag,count
0,Corona,4825
1,corona,2092
2,coronavirus,1521
3,Covid_19,1355
4,COVID19,1050
5,ŸÉŸàÿ±ŸàŸÜÿß,752
6,StayHome,574
7,level4lockdown,501
8,lockdown,412
9,Covid19,408


In [None]:
# Retreving the tweets from cache and checking the time taken to retrev data from cache
starttime = time.perf_counter()
top_10_hashtags_cache = search_engine2.get_top_hashtags(10)
print(time.perf_counter() - starttime)

In [None]:
# printing the Hastage_search tweets from cache
top_10_hashtags_cache

In [15]:
# printing the top_tweets tweets 
search_engine2.top_tweets(20)

New entry, retrieving top 20 tweets from database!
Checkpoint saved!
Query took 0.1033 seconds



Unnamed: 0,Time_stamp,Account_Name,tweet,retweets_count
0,Wed Mar 04 17:31:21 +0000 2020,Pre K ‚ùÑÔ∏è,ALERT‚ÄºÔ∏è‚ÄºÔ∏è‚ÄºÔ∏è\nThe corona virus can be spread th...,298538
1,Fri Apr 24 18:25:00 +0000 2020,Joe Biden,"I can‚Äôt believe I have to say this, but please...",255239
2,Fri Mar 13 00:43:40 +0000 2020,gilbertüöÄ,*corona virus enters my body*\n\nThe 4 Flintst...,237307
3,Sun Mar 01 22:33:16 +0000 2020,MARLEY üí°,this is the BEST set of info I‚Äôve found on cor...,205415
4,Fri Apr 10 21:54:29 +0000 2020,ŸáŸÄ,Dreaming of this moment https://t.co/mxKHQKxZNm,201433
5,Fri Jul 28 13:07:48 +0000 2017,Î∞©ÌÉÑÏÜåÎÖÑÎã®,700üíï https://t.co/MirDwcJSra,191242
6,Mon Jan 27 18:03:19 +0000 2020,night owl,someone said ‚Äúthis is what happened when rats ...,190463
7,Wed Mar 18 17:51:18 +0000 2020,D i l l ü¶¶,When this Corona shit passes we have to promis...,181584
8,Tue Mar 10 17:52:14 +0000 2020,Carlitos ü¶â‚ôõ,THIS MAN IS A GENIUS he figured out the Corona...,179479
9,Fri Mar 13 13:10:10 +0000 2020,WUD?,Corona virus got a 97% survival rate nd the wh...,160253


In [None]:
# Retreving the tweets from cache and checking the time taken to retrev data from cache
starttime = time.perf_counter()
top_tweets_cache = search_engine2.top_tweets(20)
print(time.perf_counter() - starttime)

In [None]:
# printing the top_tweets tweets from cache
top_tweets_cache