In [1]:
import json
from pymongo import MongoClient, errors
from datetime import datetime

In [2]:
import time
from datetime import datetime
from functools import wraps

def log_execution_time(func):
    """A decorator to log the execution time of a function."""
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = datetime.now()
        result = func(*args, **kwargs)
        end_time = datetime.now()
        print(f"Execution time: {end_time - start_time}")
        return result
    return wrapper

def handle_exceptions(func):
    """A decorator to handle exceptions."""
    @wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except FileNotFoundError as e:
            print(f"Error: File not found - {e}")
        except Exception as e:
            print(f"Error occurred in '{func.__name__}': {e}")
    return wrapper

In [3]:
class MongoDBHandler:
    """Handles MongoDB operations."""

    def __init__(self, host: str, port: int):
        """Initialize the MongoDB connection."""
        try:
            self.client = MongoClient(host, port)
            print(f"Connected to MongoDB at {host}:{port}")
            
            self.user_collection = self.client["userDatabase"]["User"]
            self.article_collection = self.client["articleDatabase"]["Article"]
            self.read_collection = self.client["readDatabase"]["Read"]
            self.be_read_collection = self.client["beReadDatabase"]["BeRead"]
            self.popular_rank_collection = self.client["popularRankDatabase"]["PopularRank"]
            
        except errors.ConnectionFailure as e:
            print(f"Error connecting to MongoDB: {e}")
            raise

    def get_database(self, db_name: str):
        """Retrieve a database."""
        return self.client[db_name]
    
    


In [4]:
class TableHandler:
    """Base class for handling common database operations."""    
    def __init__(self, collection):
        """Initialize with a MongoDB collection."""
        self.collection = collection
    
    @handle_exceptions
    @log_execution_time
    def bulk_insert(self, json_file: str, batch_size: int = 5000):
        """
        Bulk inserts data from a JSON file into the MongoDB collection.
        Uses the duplication_logic method for optional record transformation.
        """
        with open(json_file, 'r', encoding='utf-8') as file:
            buffer = []
            count = 0
            for line in file:
                record = json.loads(line)
                # Apply duplication logic (can be overridden in subclasses)
                records = self._process_record(record)
                buffer.extend(records)
                count += 1
                # Bulk write when buffer reaches batch_size
                if len(buffer) >= batch_size:
                    self._write_to_db(buffer)
                    buffer = []
            # Final flush of any remaining records
            if buffer:
                self._write_to_db(buffer)
        print(f"Finished processing {count} records.")

    def _process_record(self, record):
        """
        Default process record logic: No special handling.
        Override this method in subclasses for custom logic.
        """
        return [record]
    
    def _write_to_db(self, data):
        """Write data to the specified collection."""
        try:
            result = self.collection.insert_many(data, ordered=False)
            print(f"Inserted {len(result.inserted_ids)} records into {self.collection.name}")
        except errors.BulkWriteError as e:
            print(f"Error during bulk insert: {e.details}")

In [5]:
cache = {}

class UserTableHandler(TableHandler):
    def __init__(self, db_handler: MongoDBHandler):
        super().__init__(db_handler.user_collection)
        
    def get_region_by_uid(self, uid):
        if uid in cache:
            return cache[uid]
        else:
            user = self.collection.find_one({"uid": uid})
            cache[uid] = user.get("region") if user else None
        return cache[uid]
    
    def fetch_users(self, conditions={}, count=100, offset=0):
        if count == None:
            users = self.collection.find(conditions, { "_id": 0, "timestamp": 0, "id": 0 })
        else:
            users = self.collection.find(conditions, { "_id": 0, "timestamp": 0, "id": 0 }).skip(offset).limit(count)
        return list(users)
    
    def fetch_users_by_region(self, region: str, count=100, offset=0):
        return self.fetch_users({"region": region}, count, offset)
        
class ArticleTableHandler(TableHandler):
    def __init__(self, db_handler: MongoDBHandler):
        super().__init__(db_handler.article_collection)

    def _process_record(self, record):
        if record.get("category") == "science":
            r1 = record.copy()
            r2 = record.copy()
            r1["shardCopy"] = 1
            r2["shardCopy"] = 2
            return [r1, r2]
        return [record]
    
    def fetch_articles(self, conditions={}, count=100, offset=0):
        pipeline = [
            {
                "$match": conditions
            },
            {
                "$group": {
                    "_id": "$aid",  # Group by the unique identifier
                    "deduplicatedDoc": { "$first": "$$ROOT" }
                }
            },
            {
                "$replaceRoot": { "newRoot": "$deduplicatedDoc" }
            }
        ]
        if count != None:
            pipeline.append({"$skip": offset})
            pipeline.append({"$limit": count})

        articles = self.collection.aggregate(pipeline)
        return list(articles)
    
    def fetch_articles_by_category(self, category: str, count=100, offset=0):
        return self.fetch_articles({"category": category}, count, offset)
    
class ReadTableHandler(TableHandler):
    def __init__(self, db_handler: MongoDBHandler):
        super().__init__(db_handler.read_collection)
        self.userTableHandler = UserTableHandler(db_handler)
        
    def _process_record(self, record):
        region = self.userTableHandler.get_region_by_uid(record['uid'])
        record['region'] = region
        return [record]

    def fetch_reads(self, conditions={}, count=100, offset=0):
        ### time check, when feaching beijing user's reads. 
        ### implementation: read with added region field, which is the shard key
        fields = {
                    "_id": 0,
                    "timestamp": 0,
                    "uid": 0,
                    "region": 0
                }
        if count == None:
            reads = self.collection.find(conditions, fields)
        else:
            reads = self.collection.find(conditions, fields).skip(offset).limit(count)
        return list(reads)
    
    def fetch_reads_by_user(self, uid: int):
        return self.fetch_reads({"uid": uid}, None, None)
    
    
    def fetch_aggregated_reads_by_category(self, category: str):
        limit = 10
        """
        Aggregate reads for a specific article category.
        """
        pipeline = [
            {
                "$lookup": {
                    "from": "articleDatabase.Article",
                    "localField": "aid",
                    "foreignField": "aid",
                    "as": "article_details",
                }
            },
            {"$unwind": "$article_details"},  # Flatten the article details array
            {"$match": {"article_details.category": category}},  # Filter condition
            {"$group": {"_id": "$uid", "total_reads": {"$sum": 1}}},  # Group by user
            {"$limit": limit},
        ]
        return list(self.collection.aggregate(pipeline))
    
    def fetch_reads_with_details(self):
        limit = 10
        """
        Fetch reads with user and article details.
        Simulates a join between Reads, User, and Article collections.
        """
        pipeline = [
            {
                "$lookup": {
                    "from": "articleDatabase.Article",
                    "localField": "aid",
                    "foreignField": "aid",
                    "as": "article_details",
                }
            },
            {
                "$lookup": {
                    "from": "userDatabase.User",
                    "localField": "uid",
                    "foreignField": "uid",
                    "as": "user_details",
                }
            },
            {"$limit": limit},
        ]
        return list(self.collection.aggregate(pipeline))
    
    
        
class BeReadTableHandler(TableHandler):
    def __init__(self, db_handler: MongoDBHandler):
        super().__init__(db_handler.be_read_collection)
        self.userTableHandler = UserTableHandler(db_handler)
        
        
class PopularRankTableHandler(TableHandler):
    def __init__(self, db_handler: MongoDBHandler):
        super().__init__(db_handler.popular_rank_collection)
        
        
class QueryHandeler():
    def __init__(self, db_handler: MongoDBHandler):
        self.userTableHandler = UserTableHandler(db_handler)
        self.articleTableHandler = ArticleTableHandler(db_handler)
        self.readTableHandler = ReadTableHandler(db_handler)
        self.beReadTableHandler = BeReadTableHandler(db_handler)
        self.popularRankTableHandler = PopularRankTableHandler(db_handler)
        
    def fetch_users(self, conditions={}, count=100, offset=0):
        return self.userTableHandler.fetch_users(conditions, count, offset)
    
    def fetch_users_by_region(self, region: str, count=100, offset=0):
        return self.userTableHandler.fetch_users_by_region(region, count, offset)
    
    def fetch_articles(self, conditions={}, count=100, offset=0):
        return self.articleTableHandler.fetch_articles(conditions, count, offset)
    
    def fetch_articles_by_category(self, category: str, count=100, offset=0):
        return self.articleTableHandler.fetch_articles_by_category(category, count, offset)
    
    def fetch_reads(self, conditions={}, count=100, offset=0):
        return self.readTableHandler.fetch_reads(conditions, count, offset)
    
    def fetch_user_read(self, uid: str):
        user = self.userTableHandler.fetch_users({"uid": uid})
        print(user)
        reads = self.readTableHandler.fetch_reads_by_user(uid)   
        return {
            "user": user,
            "reads": reads
        }
        
        
    # 3) Populate the empty Be-Read table by inserting newly computed records into the
    # Be-Read table.
    # 4) Query the top-5 daily/weekly/monthly popular articles with articles details (text,
    # image, and video if existing) (involving the join of Be-Read table and Article
    # table)

In [6]:
# Configuration
host = "localhost"
port = 60000
db_handler = MongoDBHandler(host, port)

Connected to MongoDB at localhost:60000


In [12]:
q = QueryHandeler(db_handler)
q.fetch_user_read("1")

[]


{'user': [],
 'reads': [{'id': 'r953',
   'aid': '6322',
   'readTimeLength': '35',
   'agreeOrNot': '0',
   'commentOrNot': '0',
   'shareOrNot': '0',
   'commentDetail': 'comments to this article: (1,6322)'},
  {'id': 'r10313',
   'aid': '8411',
   'readTimeLength': '43',
   'agreeOrNot': '0',
   'commentOrNot': '1',
   'shareOrNot': '0',
   'commentDetail': 'comments to this article: (1,8411)'},
  {'id': 'r17779',
   'aid': '2974',
   'readTimeLength': '38',
   'agreeOrNot': '0',
   'commentOrNot': '0',
   'shareOrNot': '0',
   'commentDetail': 'comments to this article: (1,2974)'},
  {'id': 'r24719',
   'aid': '8804',
   'readTimeLength': '70',
   'agreeOrNot': '1',
   'commentOrNot': '1',
   'shareOrNot': '0',
   'commentDetail': 'comments to this article: (1,8804)'},
  {'id': 'r29358',
   'aid': '3151',
   'readTimeLength': '44',
   'agreeOrNot': '0',
   'commentOrNot': '0',
   'shareOrNot': '0',
   'commentDetail': 'comments to this article: (1,3151)'},
  {'id': 'r29880',
   'ai

In [15]:
user_handler = UserTableHandler(db_handler)
json_file = "../db-generation/user.dat"
user_handler.bulk_insert(json_file)

Inserted 5000 records into User
Inserted 5000 records into User
Finished processing 10000 records.
Execution time: 0:00:00.231713


In [16]:
len(user_handler.fetch_users({"region": "Beijing"}))

100

In [9]:
article_handler = ArticleTableHandler(db_handler)
json_file = "../db-generation/article.dat"
article_handler.bulk_insert(json_file)

Inserted 5000 records into Article
Inserted 5000 records into Article
Inserted 4547 records into Article
Finished processing 10000 records.
Execution time: 0:00:00.254352


In [17]:
article_handler.fetch_articles({"aid": "1840"})

[{'_id': ObjectId('67582a685b433a7d9408c53d'),
  'id': 'a1840',
  'timestamp': '1506000001840',
  'aid': '1840',
  'title': 'title1840',
  'category': 'technology',
  'abstract': 'abstract of article 1840',
  'articleTags': 'tags25',
  'authors': 'author1651',
  'language': 'zh',
  'text': 'text_a1840.txt',
  'image': 'image_a1840_0.jpg,image_a1840_1.jpg,image_a1840_2.jpg,',
  'video': ''}]

In [11]:
read_handler = ReadTableHandler(db_handler)
read_handler.bulk_insert("../db-generation/read.dat")

# be_read_handler = BeReadTableHandler(db_handler)
# be_read_handler.bulk_insert("../db-generation/beRead.dat")

Inserted 5000 records into Read
Inserted 5000 records into Read
Inserted 5000 records into Read
Inserted 5000 records into Read
Inserted 5000 records into Read
Inserted 5000 records into Read
Inserted 5000 records into Read
Inserted 5000 records into Read
Inserted 5000 records into Read
Inserted 5000 records into Read
Inserted 5000 records into Read
Inserted 5000 records into Read
Inserted 5000 records into Read
Inserted 5000 records into Read
Inserted 5000 records into Read
Inserted 5000 records into Read
Inserted 5000 records into Read
Inserted 5000 records into Read
Inserted 5000 records into Read
Inserted 5000 records into Read
Inserted 5000 records into Read
Inserted 5000 records into Read
Inserted 5000 records into Read
Inserted 5000 records into Read
Inserted 5000 records into Read
Inserted 5000 records into Read
Inserted 5000 records into Read
Inserted 5000 records into Read
Inserted 5000 records into Read
Inserted 5000 records into Read
Inserted 5000 records into Read
Inserted

In [18]:
read_handler.fetch_user_reads("1")

AttributeError: 'ReadTableHandler' object has no attribute 'fetch_user_reads'

In [19]:
read_handler.fetch_reads_with_details()

[{'_id': ObjectId('67582a855b433a7d9408f371'),
  'timestamp': '1506332297000',
  'id': 'r0',
  'uid': '8267',
  'aid': '194',
  'readTimeLength': '5',
  'agreeOrNot': '0',
  'commentOrNot': '1',
  'shareOrNot': '0',
  'commentDetail': 'comments to this article: (8267,194)',
  'region': None,
  'article_details': [],
  'user_details': []},
 {'_id': ObjectId('67582a855b433a7d9408f372'),
  'timestamp': '1506332307000',
  'id': 'r1',
  'uid': '8166',
  'aid': '2533',
  'readTimeLength': '44',
  'agreeOrNot': '0',
  'commentOrNot': '0',
  'shareOrNot': '0',
  'commentDetail': 'comments to this article: (8166,2533)',
  'region': None,
  'article_details': [],
  'user_details': []},
 {'_id': ObjectId('67582a855b433a7d9408f373'),
  'timestamp': '1506332317000',
  'id': 'r2',
  'uid': '8517',
  'aid': '7739',
  'readTimeLength': '36',
  'agreeOrNot': '1',
  'commentOrNot': '0',
  'shareOrNot': '0',
  'commentDetail': 'comments to this article: (8517,7739)',
  'region': None,
  'article_details'