# Aufgabe Beschreibung

- Mindestens eine NoSQL DB verwenden (Docker, Docker-compose)
- Lessons Learned wichtiger als optimale Lösung (Was hätte ich anders gemacht?)

## Abzugeben:
- Dockerfile / Dockercompose pro DB
- PDF Datenmodell -> Aufbau von System
- Skript / Programm zum Laden von Daten in die DB
- Abfragen zum Szenarien
- PDF Lessons Learned (`lessons-learned.pdf`)

## DB:
- Sollte auf mehrere Container / Knoten laufen
  - Wenn es nicht geht, erklären wieso das nicht ging / was dafür gebraucht ist

## App (Optional)
- Laden von Init-Daten
- Abfragen feuern
- Inhalt anzeigen

## Systemanforderungen
- Es gibt:
  - Follower-Beziehungen
  - Posts von Prominenten
- Aufgaben:
  - Posts von Prominenten auf die 100 IDs verteilen, die am meisten Follower haben (Influencer)
  - Posts können geliked werden (von welchem User wurde ein Post eines anderen Users geliked)
    - Zufällig generiert
- Anfragen:
  - Auflistung von zu einem Account zugeordneten Posts
  - Auflistung der 100 Accounts mit den meisten Followern (Influencer)
  - Auflistung der 100 Accounts, die den meisten der Influencer folgen
  - Startseite für ein beliebiges Account (Influencer sind hier gut):
    - Anzahl Followers
    - Anzahl gefolgte Accounts
    - 25 Posts:
      - Neueste
      - Meisten Likes von gefolgte Accounts
    - Caching der Posts für die Startseite:
      - Fan-Out in den Cache jedes Followers beim Schreiben eines neuen Posts (Fragen nicht von zentraler Tabelle, sondern jedes Account hat eigenen Tweets-Feed)
    - Auflistung der 25 Posts, die ein Wort beinhalten:
      - (Optional: Und-verknüpfte Wörter)


# Setup

## Install needed libraries and import components

In [2]:
%pip install tqdm
%pip install motor

from pymongo import MongoClient, InsertOne, UpdateOne
from pymongo.errors import ConnectionFailure
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor

from tqdm import tqdm
import pandas as pd

import random

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


## Define the MongoDB server details

In [5]:
# Define the MongoDB server details
host = 'localhost'
port = 27017
username = 'devroot'  # Replace with your MongoDB username
password = 'devroot'  # Replace with your MongoDB password

# Create the connection string
connection_string = f'mongodb://{username}:{password}@{host}:{port}'

## Connect to DB and test connection

In [6]:
# Connect to the MongoDB server
client = MongoClient(connection_string)

In [7]:
try:
    # Verify connection
    client.admin.command('ping')
    print("Connected successfully to MongoDB")
    
    # List all databases
    databases = client.list_database_names()
    print("Databases:", databases)
        
except ConnectionFailure as e:
    print(f"Could not connect to MongoDB: {e}")

Connected successfully to MongoDB
Databases: ['admin', 'config', 'local', 'social_network']


# Global Definitions

## Collection definition

In [8]:
# Select the database and collections
db = client['social_network']
users_collection = db['users']
followers_collection = db['followers']
posts_collection = db['posts']
likes_collection = db['likes']
feeds_collection = db['feeds']

## Helper functions

In [51]:
def get_most_followed_users(n):
    return users_collection.find().sort("followers_count", -1).limit(n)

def get_posts_created_by_user(user_id):
    return posts_collection.find({"user_id": user_id})

def find_top_users_following_influencers(top_influencers, n):    
    # Aggregation pipeline
    pipeline = [
        # Match followers who follow one of the top influencers
        {"$match": {"followed_id": {"$in": top_influencers}}},
        
        # Group by follower_id and count followers
        {"$group": {
            "_id": "$follower_id",
            "count": {"$sum": 1}
        }},
        
        # Sort by count in descending order
        {"$sort": {"count": -1}},
        
        # Limit to top n users
        {"$limit": n}
    ]
    
    # Execute aggregation pipeline and fetch results
    cursor = followers_collection.aggregate(pipeline)
    
    # Extract and return the results as a list of dictionaries
    top_users = list(cursor)
    
    # Return the result
    return top_users

def get_user_profile(user_id):
    user = users_collection.find_one({"_id": user_id})
    followers_count = user["followers_count"]
    following_count = user["following_count"]

    user_posts = posts_collection.find({"user_id": user_id})
    
    user_feed_document = feeds_collection.find_one({"user_id": user_id})
    user_feed_posts = user_feed_document.get('posts', [])
    
    # Sort posts by date
    posts_sorted_by_date = sorted(user_feed_posts, key=lambda x: x.get('timestamp', ''), reverse=True)

    # Sort posts by likes count
    posts_sorted_by_likes = sorted(user_feed_posts, key=lambda x: x['likes'], reverse=True)
    

    profile = {
        "user_id": user_id,
        "followers_count": followers_count,
        "following_count": following_count,
        "user_posts": user_posts,
        "feed": list(user_feed_posts),
        "recent_posts": list(posts_sorted_by_date),
        "popular_posts": list(posts_sorted_by_likes)
    }
    return profile

def print_user_profile(user_profile, n):
    print("User profile with id:", user_profile["user_id"])
    print("Followers count:", user_profile["followers_count"])
    print("Following count:", user_profile["following_count"])
    print("______________________")
    
    print("User posts:")
    for i, post in enumerate(user_profile["user_posts"][:n]):
        print(post["content"], "date:", post["timestamp"])
    print("______________________")
    
    print("Feed (recent):")
    for i, post in enumerate(user_profile["recent_posts"][:n]):
        print(post["content"], "date:", post["timestamp"])
    print("______________________")
    
    print("Feed (popular):")
    for i, post in enumerate(user_profile["popular_posts"][:n]):
        print(post["content"], "likes", post["likes"])
    print("______________________")
    
def get_posts_with_word(word, n):
    return posts_collection.find({"content": {"$regex": word, "$options": "i"}}).limit(n)

# Function to get a random user ID from the users collection
def get_random_user_id():
    count = users_collection.count_documents({})
    if count > 0:
        random_index = random.randint(0, count - 1)
        random_user = users_collection.find().skip(random_index).limit(1)
        for user in random_user:
            return user['_id']
    else:
        return None

# Insert Data into DB

## Insert users and following relationships to db

In [7]:
file_path = './InputData/twitter_combined.txt'

# Read file content
with open(file_path, 'r') as file:
    lines = file.readlines()

# Prepare data
user_pairs = [tuple(map(int, line.strip().split())) for line in lines]

# Get a unique set of all users involved
all_users = {user for pair in user_pairs for user in pair}

# Check which users already exist in the database
existing_users = set(users_collection.distinct("_id", {"_id": {"$in": list(all_users)}}))

# Identify new users
new_users = all_users - existing_users

# Prepare bulk operations for new users
user_bulk_operations = [
    InsertOne({"_id": user_id, "following_count": 0, "followers_count": 0})
    for user_id in new_users
]

# Execute bulk insert for new users
if user_bulk_operations:
    users_collection.bulk_write(user_bulk_operations)

# Prepare bulk operations for relationships and updating counts
relationship_bulk_operations = []
user_update_operations = []

for user1, user2 in user_pairs:
    relationship_bulk_operations.append(InsertOne({"follower_id": user1, "followed_id": user2}))
    user_update_operations.append(UpdateOne({"_id": user1}, {"$inc": {"following_count": 1}}))
    user_update_operations.append(UpdateOne({"_id": user2}, {"$inc": {"followers_count": 1}}))

# Execute bulk insert for relationships
if relationship_bulk_operations:
    followers_collection.bulk_write(relationship_bulk_operations)

# Execute bulk update for user counts
if user_update_operations:
    users_collection.bulk_write(user_update_operations)

## Find out the top 100 most followed users (Influencers)

In [8]:
top_influencers = get_most_followed_users(100)
top_influencers_list = list(top_influencers)
print("Top influencers:")
for influencer in top_influencers_list:
    print("user id:", influencer["_id"], "Follower count", influencer["followers_count"])

Top influencers:
user id: 40981798 Follower count 8660
user id: 43003845 Follower count 7700
user id: 22462180 Follower count 7623
user id: 34428380 Follower count 7558
user id: 115485051 Follower count 4798
user id: 15913 Follower count 4337
user id: 3359851 Follower count 3986
user id: 11348282 Follower count 3850
user id: 7861312 Follower count 3712
user id: 27633075 Follower count 3655
user id: 31331740 Follower count 3623
user id: 18996905 Follower count 3255
user id: 7860742 Follower count 3197
user id: 813286 Follower count 3172
user id: 22784458 Follower count 2974
user id: 17868918 Follower count 2904
user id: 10671602 Follower count 2874
user id: 117674417 Follower count 2858
user id: 48485771 Follower count 2725
user id: 34068984 Follower count 2693
user id: 18927441 Follower count 2680
user id: 83943787 Follower count 2678
user id: 15853668 Follower count 2634
user id: 1183041 Follower count 2593
user id: 238260874 Follower count 2560
user id: 8088112 Follower count 2539
us

## Assign the input tweets to the influencers

In [9]:
def add_post_without_likes(user_id, content, date):
    post = {
        "user_id": user_id,
        "content": content,
        "timestamp": date,
        "likes": 0
    }
    post_id = posts_collection.insert_one(post).inserted_id
    return post_id

# Path to the CSV file
file_path = './InputData/tweets.csv'

# Read the CSV file into a DataFrame
df = pd.read_csv(file_path)

# Generate random influencer selections once to avoid repeated random.choice calls
influencer_ids = [random.choice(top_influencers_list)['_id'] for _ in range(len(df))]

# Function to process each tweet
def process_tweet(idx):
    tweet_data = df.iloc[idx]
    influencer_id = influencer_ids[idx]
    date = datetime.strptime(tweet_data['date_time'], '%d/%m/%Y %H:%M')
    content = tweet_data['content']
    post_id = add_post_without_likes(influencer_id, content, date)
    return post_id, influencer_id, content, date

# Step 1: Insert all posts first
with ThreadPoolExecutor() as executor:
    post_data = list(tqdm(executor.map(process_tweet, range(len(df))), total=len(df), desc="Inserting posts"))

Inserting posts: 100%|██████████| 52542/52542 [00:10<00:00, 4808.51it/s]


In [10]:
import concurrent.futures
from tqdm import tqdm
import random
from pymongo import InsertOne, UpdateOne

def get_random_users(pool_size, exclude_user_id):
    pipeline = [
        {"$match": {"_id": {"$ne": exclude_user_id}}},
        {"$sample": {"size": pool_size}}
    ]
    return list(users_collection.aggregate(pipeline))

def add_likes_chunk(chunk, random_user_pool):
    like_operations = []
    like_updates = []

    for post_id, user_id, content, date in chunk:
        # Generate a random number of likes between 0 and 100
        number_of_likes = random.randint(0, 100)
        
        # Get a random sample of users from the pool
        random_users = random.sample(random_user_pool, number_of_likes)
        
        # Prepare bulk operations for likes
        like_operations.extend([
            InsertOne({"userid": user["_id"], "postid": post_id})
            for user in random_users
        ])

        like_updates.append((post_id, number_of_likes))

    return like_operations, like_updates

def add_likes_bulk(post_data, random_user_pool, max_workers=4):
    chunk_size = len(post_data) // max_workers
    chunks = [post_data[i:i + chunk_size] for i in range(0, len(post_data), chunk_size)]
    
    like_operations = []
    like_updates = []

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(add_likes_chunk, chunk, random_user_pool) for chunk in chunks]
        for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures), desc="Processing likes"):
            chunk_like_operations, chunk_like_updates = future.result()
            like_operations.extend(chunk_like_operations)
            like_updates.extend(chunk_like_updates)

    if like_operations:
        likes_collection.bulk_write(like_operations)

    return like_updates

# Step 1: Fetch a pool of random users once
user_pool_size = 10000  # Adjust the pool size as needed
random_user_pool = get_random_users(user_pool_size, exclude_user_id=None)

# Step 2: Insert likes for posts in bulk
like_updates = add_likes_bulk(post_data, random_user_pool, max_workers=4)

# Step 3: Update post like counts in bulk
bulk_updates = [
    UpdateOne({"_id": post_id}, {"$set": {"likes": likes}})
    for post_id, likes in like_updates
]
if bulk_updates:
    posts_collection.bulk_write(bulk_updates)


Processing likes: 100%|██████████| 5/5 [00:01<00:00,  3.46it/s]


In [19]:
# Add indices to critical parameters on collections
followers_collection.create_index([("followed_id", 1), ("follower_id", 1)])

posts_collection.create_index([("user_id", 1)])

'user_id_1'

In [31]:
temp_feeds_collection = db['temp_feeds']

batch_size = 1000  # Define a suitable batch size
post_count = posts_collection.count_documents({})
num_batches = (post_count // batch_size) + 1

# Step 1: Aggregate data into a temporary collection
for batch in tqdm(range(num_batches), desc="Processing Batches"):
    pipeline = [
        {"$skip": batch * batch_size},
        {"$limit": batch_size},
        {"$lookup": {
            "from": "followers",
            "localField": "user_id",
            "foreignField": "followed_id",
            "as": "followers"
        }},
        {"$unwind": "$followers"},
        {"$project": {
            "_id": 0,
            "follower_id": "$followers.follower_id",
            "post": {
                "post_id": "$_id",
                "user_id": "$user_id",
                "content": "$content",  # Include only the necessary fields
                "timestamp": "$timestamp",
                "likes": "$likes"  # Adding likes field
            }
        }},
        {"$group": {
            "_id": "$follower_id",
            "posts": {"$push": "$post"}
        }},
        {"$out": "temp_feeds"}
    ]
    posts_collection.aggregate(pipeline)

# Step 2: Process the temporary collection to split large documents
temp_docs = list(temp_feeds_collection.find())
for doc in tqdm(temp_docs, desc="Processing Temp Feeds"):
    follower_id = doc['_id']
    posts = doc['posts']
    
    chunk_size = 100
    chunked_posts = [posts[i:i + chunk_size] for i in range(0, len(posts), chunk_size)]
    
    for chunk in chunked_posts:
        feeds_collection.update_one(
            {"user_id": follower_id},
            {"$push": {"posts": {"$each": chunk}}},
            upsert=True
        )

# Step 3: Ensure posts arrays are unique and in order
# Use aggregation pipeline to deduplicate and sort posts
pipeline = [
    {
        "$addFields": {
            "unique_posts": {"$setUnion": "$posts"}
        }
    },
    {
        "$project": {
            "user_id": 1,
            "posts": {"$slice": [{"$sortArray": {"input": "$unique_posts", "sortBy": {"timestamp": -1}}}, 16793600]}
        }
    },
    {
        "$merge": {
            "into": "feeds",
            "whenMatched": "replace",
            "whenNotMatched": "insert"
        }
    }
]

feeds_collection.aggregate(pipeline)

# Clean up temporary collection
temp_feeds_collection.drop()

Processing Batches: 100%|██████████| 53/53 [07:03<00:00,  7.98s/it]
Processing Temp Feeds: 100%|██████████| 28459/28459 [02:49<00:00, 168.36it/s]


NameError: name 'E' is not defined

# Request data from DB

## Find out top 100 most followed

In [53]:
# Get and print top influencers
top_influencers = get_most_followed_users(100)
print("Top influencers:")
for influencer in top_influencers:
    print("user id:", influencer["_id"], "Follower count", influencer["followers_count"])

Top influencers:
user id: 40981798 Follower count 8660
user id: 43003845 Follower count 7700
user id: 22462180 Follower count 7623
user id: 34428380 Follower count 7558
user id: 115485051 Follower count 4798
user id: 15913 Follower count 4337
user id: 3359851 Follower count 3986
user id: 11348282 Follower count 3850
user id: 7861312 Follower count 3712
user id: 27633075 Follower count 3655
user id: 31331740 Follower count 3623
user id: 18996905 Follower count 3255
user id: 7860742 Follower count 3197
user id: 813286 Follower count 3172
user id: 22784458 Follower count 2974
user id: 17868918 Follower count 2904
user id: 10671602 Follower count 2874
user id: 117674417 Follower count 2858
user id: 48485771 Follower count 2725
user id: 34068984 Follower count 2693
user id: 18927441 Follower count 2680
user id: 83943787 Follower count 2678
user id: 15853668 Follower count 2634
user id: 1183041 Follower count 2593
user id: 238260874 Follower count 2560
user id: 8088112 Follower count 2539
us

## Find out top 100 influencer followers

In [54]:
# Call the function
top_users = find_top_users_following_influencers(top_influencers, 100)

# Print the top users
for i, doc in enumerate(top_users):
    print(f"#{i+1}: User ID {doc['_id']} follows {doc['count']} influencers")

InvalidDocument: cannot encode object: <pymongo.cursor.Cursor object at 0x16812ae20>, of type: <class 'pymongo.cursor.Cursor'>

## Show user profile

In [49]:
# Get and print user profile
user_id = 22462180
tweets_to_show = 25
user_profile = get_user_profile(user_id)
print_user_profile(user_profile, tweets_to_show)

User profile with id: 22462180
Followers count: 7623
Following count: 701
______________________
User posts:
It's time to turn words into action❗️There are so many steps to take, but my first vow is to… https://t.co/1DwETGXbed date: 2016-11-12 01:19:00
WHAT IT MEANS TO SMILE WITH YOUR MOUTH #YOUGOTTHIS https://t.co/QjyReSoivF date: 2016-10-10 01:49:00
@pacifyher @basedkatyperry 👀 date: 2016-07-23 07:18:00
@katyskettle ok I'll leave 👋🏼 date: 2016-07-23 06:31:00
@dclyde18 ur right, I'm flawed date: 2016-07-23 06:03:00
#AppStoreChat https://t.co/wDdbmx6Mm8 date: 2015-12-18 22:05:00
conciousness = creativity date: 2015-09-02 22:12:00
I'm your butterfly. Sugar. Baby. https://t.co/MrVDRflz9e date: 2015-05-08 13:00:00
Just had 1st FaceTime Thanksgiving! What I would give 2b scraping marshmallow off of sweet potato, wearing fuzzy socks &amp; holding my niece! 😩 date: 2014-11-28 02:51:00
He soy famo with no name-o tho 😳 http://t.co/e8OnnPYG0x date: 2014-11-22 15:25:00
rt if u used to wear love 

## Posts containing word

In [None]:
# Get and print posts with a given word
posts_with_word = get_posts_with_word("test", 2)
print("Posts containing 'beautiful':")
for post in posts_with_word:
    print(post["content"])

# Cleanup

In [None]:
db.users.drop()
db.followers.drop()
db.posts.drop()
db.likes.drop()
db.feeds.drop()
print("Database cleared.")

In [None]:
# Close the connection
client.close()