In [None]:
import pymongo
from bson.objectid import ObjectId
import unittest

mongo = pymongo.MongoClient("mongodb://root:hhn@mongo/admin")
print(mongo.server_info()['version'])
db = mongo.examples

In [None]:
db.users.delete_many({})
db.projects.delete_many({})

# Initialize some data

Let's create some functions to save some users with each one following some others:

In [None]:
def find_user(id):
    doc = db.users.find_one({'_id': ObjectId(id)})
    return doc or False

def find_users():
    return [user for user in db.users.find()]

Let's create three users and save them:

In [None]:
martin = {'name': 'Martin Marsal', 'tweets': [], 'followers': [], 'timeline': []}
christian = {'name': 'Christian Diegmann', 'tweets': [], 'followers': [], 'timeline': []}
robin = {'name': 'Robin Schüle', 'tweets': [], 'followers': [], 'timeline': []}

In [None]:
def save_user(user):
    db.users.insert_one(user)
    return user

In [None]:
save_user(martin)

In [None]:
save_user(christian)

In [None]:
save_user(robin)

Let's add the other users to each followers list.

In [None]:
def add_to_followers(user_id, follower_id):
    db.users.update_one(
        {'_id': ObjectId(user_id)},
        {'$push': {'followers': {'_id': ObjectId(follower_id)}}}
    )
    return db.users.find_one({'_id': user_id})

In [None]:
add_to_followers(martin['_id'], christian['_id'])

In [None]:
add_to_followers(christian['_id'], robin['_id'])

In [None]:
add_to_followers(robin['_id'], martin['_id'])

# 1st access pattern: Post a tweet

In [None]:
tweet = {'_id': ObjectId(), 'text': 'Moin, moin.', 'likes': 0, 'replies': []}
tweet_2 = {'_id': ObjectId(), 'text': 'MongoDB ist super!', 'likes': 0, 'replies': []}

In [None]:
newMartin = find_user(martin['_id'])

In [None]:
def post_tweet(user, tweet):
    db.users.update_one(
        {'_id': ObjectId(user['_id'])},
        {'$push': {'tweets': tweet}}
    )
    db.users.update_many(
        {'_id': {'$in': [follower['_id'] for follower in user['followers']]}},
        {'$push': {'timeline': tweet}}
    )
    return [user for user in db.users.find()]

In [None]:
post_tweet(newMartin, tweet)
post_tweet(newMartin, tweet_2)

In [None]:
def read_timeline(user):
    return db.users.find_one(
        {'_id': ObjectId(user['_id'])},
        {'_id': 0, 'timeline': 1}
    )

class TestPostTweet(unittest.TestCase):
    def test_post_tweet_enriched_timeline(self):
        timeline = read_timeline(christian)
        timeline_length = len(timeline['timeline'])
        self.assertEqual(timeline_length, 2)

    def test_post_tweet_empty_timeline(self):
        timeline = read_timeline(robin)
        timeline_length = len(timeline['timeline'])
        self.assertEqual(timeline_length, 0)

In [None]:
if __name__ == '__main__':
    suite = unittest.TestLoader().loadTestsFromTestCase(TestPostTweet)
    unittest.TextTestRunner(verbosity=2).run(suite)

# 2nd access pattern: Post a reply

In [None]:
reply = {'_id': ObjectId(), 'text': 'Hallo zurück.', 'likes': 0}

In [None]:
newMartin = find_user(martin['_id'])

In [None]:
def post_reply(tweet, reply):
    db.users.update_one(
        {'tweets._id': ObjectId(tweet['_id'])},
        {'$push': {'tweets.$.replies': reply}}
    )
    db.users.update_many(
        {'timeline._id': ObjectId(tweet['_id'])},
        {'$push': {'timeline.$.replies': reply}}
    )
    return [user for user in db.users.find()]

In [None]:
post_reply(newMartin['tweets'][0], reply)

In [None]:
class TestPostReply(unittest.TestCase):
    def test_post_reply_enriched_replies(self):
        timeline = read_timeline(christian)
        replies_length = len(timeline['timeline'][0]['replies'])
        self.assertEqual(replies_length, 1)

    def test_post_reply_empty_replies(self):
        timeline = read_timeline(christian)
        replies_length = len(timeline['timeline'][1]['replies'])
        self.assertEqual(replies_length, 0)

In [None]:
if __name__ == '__main__':
    suite = unittest.TestLoader().loadTestsFromTestCase(TestPostReply)
    unittest.TextTestRunner(verbosity=2).run(suite)

# 3rd access pattern: Edit a tweet

In [None]:
newMartin = find_user(martin['_id'])

In [None]:
def edit_tweet(tweet):
    new_text = 'Moin, moin. Wie geht es euch?'
    db.users.update_one(
        {'tweets._id': ObjectId(tweet['_id'])},
        {'$set': {'tweets.$.text': new_text}}
    )
    db.users.update_many(
        {'timeline._id': ObjectId(tweet['_id'])},
        {'$set': {'timeline.$.text': new_text}}
    )
    return [user for user in db.users.find()]

In [None]:
edit_tweet(newMartin['tweets'][0])

In [None]:
class TestEditTweet(unittest.TestCase):
    user = find_user(newMartin['_id'])
    
    def test_edit_tweet_edited_tweet(self):
        text = self.user['tweets'][0]['text']
        self.assertEqual(text, 'Moin, moin. Wie geht es euch?')

    def test_edit_tweet_unedited_tweet(self):
        text = self.user['tweets'][1]['text']
        self.assertEqual(text, 'MongoDB ist super!')

In [None]:
if __name__ == '__main__':
    suite = unittest.TestLoader().loadTestsFromTestCase(TestEditTweet)
    unittest.TextTestRunner(verbosity=2).run(suite)

# 4th access pattern: Read a timeline

In [None]:
newChristian = find_user(christian['_id'])

In [None]:
def read_timeline(user):
    return db.users.find_one(
        {'_id': ObjectId(user['_id'])},
        {'_id': 0, 'timeline': 1}
    )

In [None]:
read_timeline(newChristian)

In [None]:
class TestReadTimeline(unittest.TestCase):
    def test_read_timeline(self):
        timeline = read_timeline(christian)
        timeline_length = len(timeline['timeline'])
        self.assertEqual(timeline_length, 2)

In [None]:
if __name__ == '__main__':
    suite = unittest.TestLoader().loadTestsFromTestCase(TestReadTimeline)
    unittest.TextTestRunner(verbosity=2).run(suite)

# 5th access pattern: Delete a Tweet

In [None]:
newMartin = find_user(martin['_id'])

In [None]:
def delete_tweet(user, tweet):
    db.users.update_one(
        {'_id': ObjectId(user['_id'])},
        {'$pull': {'tweets': {'_id': ObjectId(tweet['_id'])}}}
    )
    
    db.users.update_many(
        {'_id': {'$in': [follower['_id'] for follower in user['followers']]}},
        {'$pull': {'timeline': {'_id': ObjectId(tweet['_id'])}}}
    )
    
    return [user for user in db.users.find()]

In [None]:
delete_tweet(newMartin, tweet)

In [None]:
class TestDeleteTweet(unittest.TestCase):
    def test_delete_tweet(self):
        timeline = read_timeline(christian)
        timeline_length = len(timeline['timeline'])
        self.assertEqual(timeline_length, 1)

In [None]:
if __name__ == '__main__':
    suite = unittest.TestLoader().loadTestsFromTestCase(TestDeleteTweet)
    unittest.TextTestRunner(verbosity=2).run(suite)