# Cache Engine
Purpose is to build an eninge to cache results from Neo4j and MongoDB.<br>
By: Jonathan Lo<br>
Date: 8/12/23

## Overhead

In [1]:
# Imports
import redis
from json import load
from pymongo import MongoClient
from py2neo import Graph
from ast import literal_eval
from time import time

## Engine

In [2]:
class CacheEngine():
    """ Engine to improve query time of queries by using Redis
        as an L1 Cache.
    """
    
    def __init__(self, secrets_path):
        self.redis = None
        self.neo4j = None
        self.mongo_tweets = None
        self.mongo_airports = None
        
        self._setup(secrets_path)
        
    def execute_neo4j(self, query):
        """ Executes neo4j against cache
        query: The cypher query to be executed against
        """
        # If in Redis Cache hit, return
        res = self.redis.hget("neo_cache", key=query)
        if res:
            return res
        
        # Execute cypher query instead
        db_res = self.neo4j.run(query)
        
        # Save query results
        self.redis.hset("neo_cache", key=query, value=str(db_res))
        
        return db_res
    
    def execute_mongo(self, target, query, projection=False):
        """ Executes mongo against cache
        target: Must be tweets or airports, specifies the target db::collection
        query: The mongo query to be executed
        projection[optional]: Projection of the result
        """
        # Determine target
        mongo_db = None
        if target == "tweets":
            mongo_db = self.mongo_tweets
        elif target == "airports":
            mongo_db = self.mongo_airports
        else:
            raise ValueError("Target must be 'tweets' or 'airports'")
        
        # If in Redis Cache hit, return
        res = self.redis.hget("mongo_cache", key=str(query))
        if res:
            return res
        
        # Execute mongo query instead
        if projection:
            db_res = list(mongo_db.find(query, projection))
        else:
            db_res = list(mongo_db.find(query))
        
        # Save query results
        self.redis.hset("mongo_cache", key=str(query), value=str(db_res))
        
        return db_res
    
    def clear_cache(self, target=False):
        """ Clears all Cache unless target is directly specified
        target[optional]: Must be neo4j or mongo
        """
        if not target:
            keys = self.redis.keys('*')
            if len(keys) == 0:
                return True
            self.redis.delete(*keys)
            return True
        
        # Determine target
        if target == "neo4j":
            self.redis.delete("neo_cache")
        elif target == "mongo":
            self.redis.delete("mongo_cache")
        else:
            raise ValueError("Target must be 'neo4j' or 'mongo'")
        
    
    def _setup(self, secrets_path):
        """ Private method to setup the engine
        """
        # Obtain secrets information
        secrets_data = load(open(secrets_path, "r"))
        mongo_user = secrets_data['MongoUser']
        mongo_pass = secrets_data['MongoPass']
        neo4j_password = secrets_data['neo4jKey']
        
        # Neo4j Connection
        neo4j_url = "neo4j+s://65b6d554.databases.neo4j.io"
        neo4j_user = "neo4j"
        self.neo4j = Graph(neo4j_url, auth=(neo4j_user, neo4j_password))

        # MongoDB Connection
        connection_string = f"mongodb+srv://{mongo_user}:{mongo_pass}@dsc104-final-project.6oeuizv.mongodb.net/"
        client = MongoClient(connection_string)

        # Obtain MongoDB Clients
        db_tweets = client.tweets
        db_airports = client.airports
        self.mongo_tweets = db_tweets.tweets
        self.mongo_airports = db_airports.prices

        # Setup Redis
        self.redis = redis.Redis(host='localhost', port=6379, db=0, charset="utf-8", decode_responses=True)

## Demonstration

In [3]:
# How to initialize the engine
engine = CacheEngine("./secrets.json")

# Clear engine
engine.clear_cache()

True

In [4]:
# A Sample Neo4j query
st = time()
sample_query = """
MATCH (l:Hashtag)<-[:`Has_HT`]-(t:Tweet)
WITH l, count(t) AS tweetCount
WHERE tweetCount >= 10
MATCH (l)<-[]-(:Tweet)-[:Has]->(s:Sentiment)
WITH l, CASE s.airline_sentiment WHEN 'positive' THEN 1 WHEN 'neutral' THEN 0 WHEN 'negative' THEN -1 ELSE 0 END AS sentimentScore
WITH l, collect(sentimentScore) AS sentimentScores
RETURN l.hashtag AS location, apoc.coll.avg(sentimentScores) AS averageSentiment
ORDER BY averageSentiment
"""
engine.execute_neo4j(sample_query)
ed = time()
print(f"Runtime: {ed-st}")

Runtime: 0.614105224609375


In [5]:
# Sample of time reduction (same code)
st = time()
sample_query = """
MATCH (l:Hashtag)<-[:`Has_HT`]-(t:Tweet)
WITH l, count(t) AS tweetCount
WHERE tweetCount >= 10
MATCH (l)<-[]-(:Tweet)-[:Has]->(s:Sentiment)
WITH l, CASE s.airline_sentiment WHEN 'positive' THEN 1 WHEN 'neutral' THEN 0 WHEN 'negative' THEN -1 ELSE 0 END AS sentimentScore
WITH l, collect(sentimentScore) AS sentimentScores
RETURN l.hashtag AS location, apoc.coll.avg(sentimentScores) AS averageSentiment
ORDER BY averageSentiment
"""
engine.execute_neo4j(sample_query)
ed = time()
print(f"Runtime: {ed-st}")

Runtime: 0.00099945068359375


In [6]:
# Sample of MongoDB query
st = time()
query = {
    "State Name": "MA",
    "Year": 2018
}
projection = {
    "_id": 0,
    "Airport Code": 1,
    "City Name": 1,
    "Average Fare ($)": 1
}
engine.execute_mongo("airports", query, projection)
ed = time()
print(f"Runtime: {ed-st}")

Runtime: 0.3580482006072998


In [7]:
# Sample of time reduction (same code)
st = time()
query = {
    "State Name": "MA",
    "Year": 2018
}
projection = {
    "_id": 0,
    "Airport Code": 1,
    "City Name": 1,
    "Average Fare ($)": 1
}
engine.execute_mongo("airports", query, projection)
ed = time()
print(f"Runtime: {ed-st}")

Runtime: 0.0010013580322265625
