In [26]:
pip install pymongo




In [27]:
import json
import pprint
from datetime import datetime
from pymongo import MongoClient
from lrucache import LRUCache

In [28]:
MONGODB_URL="mongodb://localhost:27017"
client=MongoClient(MONGODB_URL)
tweets_collection= client.dbms_project.tweets

In [29]:
# Clear the entire collection
# tweets_collection.delete_many({})
# print("Collection cleared. Ready to start anew.")

In [30]:
#so that duplicate documents dont get inserted(raises an error)
tweets_collection.create_index("tweet_id", unique=True)

'tweet_id_1'

In [31]:
#we are reading the contents of the original tweet if the tweet is retweeted 
#for access to extended tweeet if available
def get_tweet_text(data):
    # Check if the tweet is a retweet based on the text content
    if data['text'].startswith('RT'):
        # Retrieve the original tweet's data from the retweeted_status, if available
        retweet = data.get('retweeted_status', {})
        if 'extended_tweet' in retweet:
            # Use full_text from extended_tweet if available
            return retweet['extended_tweet']['full_text']
        else:
            # Use text from retweeted_status if extended_tweet is not available
            return retweet.get('text', data['text'])
    else:
        # For a non-retweet, check if it's an extended tweet
        if 'extended_tweet' in data:
            # Use full_text from extended_tweet if available
            return data['extended_tweet']['full_text']
        else:
            # Use standard text field if it's not an extended tweet
            return data['text']


In [32]:
def get_hashtags(data):
    # Check if the tweet is a retweet and extract hashtags accordingly
    if data['text'].startswith('RT') and 'retweeted_status' in data:
        retweet = data['retweeted_status']
        hashtags_list = retweet.get('extended_tweet', {}).get('entities', {}).get('hashtags', retweet.get('entities', {}).get('hashtags', []))
    else:
        hashtags_list = data.get('extended_tweet', {}).get('entities', {}).get('hashtags', data.get('entities', {}).get('hashtags', []))
    return [hashtag['text'] for hashtag in hashtags_list]

In [33]:
def read_and_insert(file_name):
    with open(file_name, 'r') as file:
        for line in file:
            try:
                data = json.loads(line)
                if tweets_collection.count_documents({"tweet_id": data["id"]}) == 0:
                    tweet_text = get_tweet_text(data)
                    hashtags = get_hashtags(data)

                    tweet_document = {
                        "tweet_id": data["id"],
                        "text": tweet_text,
                        "hashtags": hashtags,
                        "user": {
                            "user_id": data['user']['id'],
                            "name": data['user']['name'],
                            "screen_name": data['user']['screen_name']
                        },
                        "created_at": parse_date(data['created_at'])
                    }

                    tweets_collection.insert_one(tweet_document)
            except (json.JSONDecodeError, KeyError):
                continue  # Skip invalid or incomplete lines


In [34]:
def parse_date(date_str):
    return datetime.strptime(date_str, '%a %b %d %H:%M:%S %z %Y')

# File paths
file_1 = "C:/Users/lpnhu/Downloads/694-2024-team-13/data/corona-out-2"
file_2 = "C:/Users/lpnhu/Downloads/694-2024-team-13/data/corona-out-3"

# Process each file
read_and_insert(file_1)
read_and_insert(file_2)

print("Documents inserted")

FileNotFoundError: [Errno 2] No such file or directory: 'C:/Users/lpnhu/Downloads/694-2024-team-13/data/corona-out-2'

# Integrate cache with MongoDB 

In [35]:
tweets_collection = client.dbms_project.tweets

def fetch_tweet_from_mongodb(tweet_id):
    tweet = tweets_collection.find_one({"tweet_id": tweet_id})
    return tweet

cache = LRUCache(capacity=100, ttl=3600, persistence_path='cache.json')
cache.restore()

# Modify the get method in LRUCache to fetch data from MongoDB if not in cache
# Define the fallback function outside the LRUCache class
def get_with_mongo_fallback(self, key):
    data = LRUCache.get(self, key)  # Call the original get method
    if data is None:
        # If not in cache, fetch from MongoDB
        data = fetch_tweet_from_mongodb(key)
        if data is not None:
            # Update the cache with the fetched data
            self.put(key, data)
    return data

# Bind the new method to the cache instance, bypassing the overridden get method
import types
cache.get = types.MethodType(get_with_mongo_fallback, cache)

Cache file not found, starting with an empty cache.


In [36]:
# Usage
tweet_id = '1254022772558368768'
tweet_data = cache.get(tweet_id)

# Set up timing and logging mechanism 

In [37]:
import time
import logging

# Set up basic logging to a file
logging.basicConfig(filename='cache_performance.log', level=logging.INFO)

def log_performance(start_time, end_time, operation, key, hit_or_miss):
    duration = (end_time - start_time) * 1000 #measure in milliseconds
    logging.info(f"{operation} took {duration:.2f} ms")

# Modify cache methods to include timing

In [42]:
def get_with_mongo_fallback(self, key):
        start_time = time.perf_counter()  
        data = super().get(key)  
        if data is not None:
            # Cache hit, Log the performance
            end_time = time.perf_counter()  
            log_performance(start_time, end_time, "Cache hit", key, "hit")
        else:
            # Cache miss, fetch from MongoDB and then put it in the cache
            data = fetch_tweet_from_mongodb(key) 
            if data is not None:
                self.put(key, data)
            end_time = time.perf_counter()  # Change to use perf_counter
            log_performance(start_time, end_time, "MongoDB fetch", key, "miss")
        return data


In [43]:
# Replace the get method in the LRUCache instance
import types
cache = LRUCache(capacity=100, ttl=3600, persistence_path='cache.json')
cache.restore()


Cache file not found, starting with an empty cache.


In [44]:
# Function to test the cache
def test_cache_performance(cache, test_keys):
    for key in test_keys:
        # First access will always be a miss since we're not assuming pre-loading
        start_time = time.perf_counter()
        data = cache.get(key)
        end_time = time.perf_counter()
        log_performance(start_time, end_time, "Access", key, "miss")

        # Subsequent accesses should be hits if the key is still in the cache
        for _ in range(3):  # Access the same key three times to test cache hits
            start_time = time.perf_counter()
            data = cache.get(key)
            end_time = time.perf_counter()
            log_performance(start_time, end_time, "Access", key, "hit")


In [45]:
# Example test_keys taken from the MongoDB 
test_keys = [
    '1249403767108668930', 
    '1249403768023678982', 
    '1249403769193779202'
]

test_cache_performance(cache, test_keys)

# Testing for cache miss 

In [None]:
import time
import logging

logging.basicConfig(filename='cache_performance.log', level=logging.INFO)

In [47]:
def log_performance(start_time, end_time, operation, key, hit_or_miss):
    duration = end_time - start_time
    logging.info(f"{operation} for key {key} ({hit_or_miss}) took {duration:.6f} seconds")

In [48]:
def test_cache_miss_timing(cache, test_keys, fetch_from_db):
    for key in test_keys:
        # Clear the key from the cache to ensure a cache miss
        cache.cache.pop(key, None)
        
        # Now access the key, which should trigger a cache miss and a database fetch
        start_time = time.perf_counter()
        data = fetch_from_db(key)
        
        # Assume the fetch_from_db function updates the cache after a miss
        cache.put(key, data)
        
        end_time = time.perf_counter()
        log_performance(start_time, end_time, "Database fetch", key, "miss")

In [49]:
def fetch_from_db(key):
    # Simulate a database fetch with a sleep
    time.sleep(0.01)  # Simulate database latency
    # Fetch the data from the database here (this is just a placeholder)
    return "data_from_db"

In [50]:
# Initialize the cache
cache = LRUCache(capacity=100, ttl=3600, persistence_path='cache.json')

In [51]:
# Run the test for cache misses
test_keys = [
    '1249403767108668930', 
    '1249403768023678982', 
    '1249403769193779202'
]

test_cache_miss_timing(cache, test_keys, fetch_from_db)

# Testing for cache hit

In [46]:
import time
import logging

logging.basicConfig(filename='cache_performance.log', level=logging.INFO)

def log_performance(start_time, end_time, operation, key, hit_or_miss):
    # Calculate duration in milliseconds
    duration = (end_time - start_time) * 1000  # Convert from seconds to milliseconds
    logging.info(f"{operation} for key {key} ({hit_or_miss}) took {duration:.3f} ms")

def test_cache_hit_timing(cache, test_keys, fetch_from_db):
    for key in test_keys:
        # Ensure the key is in the cache by fetching it from the db if it's not already there
        if key not in cache.cache:
            data = fetch_from_db(key)
            cache.put(key, data)

        # Now access the key, which should be a cache hit
        start_time = time.perf_counter()
        data = cache.get(key)  # Should be a cache hit as the data is already in the cache
        end_time = time.perf_counter()

        log_performance(start_time, end_time, "Cache access", key, "hit")

def fetch_from_db(key):
    # Simulate a database fetch with a sleep
    time.sleep(0.01)  # Simulate database latency for the initial fetch
    return "data_from_db"

# Initialize the cache
cache = LRUCache(capacity=100, ttl=3600, persistence_path='cache.json')
cache.restore()

# Example test keys for the test
test_keys = [
    '1249403767108668930', 
    '1249403768023678982', 
    '1249403769193779202'
]

# Run the test for cache hits
test_cache_hit_timing(cache, test_keys, fetch_from_db)


Cache file not found, starting with an empty cache.
