In [None]:
# %% [markdown]
# # Demo MongoDB Replication Lag
# Ch·ª©ng minh hi·ªán t∆∞·ª£ng replication lag gi·ªØa Primary v√† Secondary

# %%
import pymongo
import time
import datetime
from pprint import pprint
import threading

# %%
# K·∫øt n·ªëi t·ªõi c√°c node
primary_client = pymongo.MongoClient("mongodb://localhost:27027/?replicaSet=rs0")
secondary_client = pymongo.MongoClient("mongodb://localhost:27028/?replicaSet=rs0")

# %%
# Ki·ªÉm tra k·∫øt n·ªëi
try:
    primary_db = primary_client.admin
    primary_status = primary_db.command('ismaster')
    print("Primary Node Status:")
    pprint(primary_status)
except Exception as e:
    print(f"L·ªói k·∫øt n·ªëi Primary: {e}")

# %%
try:
    # ƒê·ªçc t·ª´ secondary c·∫ßn set slaveOk
    secondary_db = secondary_client.admin
    secondary_db.command('ismaster')
    print("\nSecondary Node Status:")
    # Th·ª≠ ƒë·ªçc t·ª´ secondary
    secondary_db.command('isMaster')  # Ki·ªÉm tra tr·∫°ng th√°i
except Exception as e:
    print(f"L·ªói k·∫øt n·ªëi Secondary: {e}")

# %%
# T·∫°o database v√† collection test
db_name = "test_replica"
collection_name = "lag_demo"

primary_db = primary_client[db_name]
secondary_db = secondary_client[db_name]

# %%
# H√†m ghi d·ªØ li·ªáu li√™n t·ª•c v√†o primary
def write_to_primary():
    collection = primary_db[collection_name]
    counter = 0
    
    print("B·∫Øt ƒë·∫ßu ghi d·ªØ li·ªáu v√†o Primary...")
    while counter < 50:
        doc = {
            "message": f"Document s·ªë {counter}",
            "timestamp": datetime.datetime.now(),
            "counter": counter,
            "written_at": time.time()
        }
        
        result = collection.insert_one(doc)
        print(f"‚úÖ ƒê√£ ghi document {counter} - ID: {result.inserted_id}")
        counter += 1
        time.sleep(1)  # Ghi m·ªói gi√¢y 1 document

# %%
# H√†m ƒë·ªçc d·ªØ li·ªáu t·ª´ secondary v√† ƒëo ƒë·ªô tr·ªÖ
def read_from_secondary():
    collection = secondary_db[collection_name]
    last_seen_counter = -1
    max_lag = 0
    
    print("B·∫Øt ƒë·∫ßu theo d√µi replication lag t·ª´ Secondary...")
    
    while True:
        try:
            # Cho ph√©p ƒë·ªçc t·ª´ secondary
            secondary_db.command('setSlaveOk')
            
            # ƒê·∫øm s·ªë document tr√™n secondary
            secondary_count = collection.count_documents({})
            latest_doc = collection.find().sort([("counter", -1)]).limit(1)
            latest_counter = latest_doc[0]["counter"] if latest_doc else -1
            
            # ƒê·∫øm s·ªë document tr√™n primary ƒë·ªÉ so s√°nh
            primary_count = primary_db[collection_name].count_documents({})
            primary_latest = primary_db[collection_name].find().sort([("counter", -1)]).limit(1)
            primary_counter = primary_latest[0]["counter"] if primary_latest else -1
            
            lag = primary_count - secondary_count
            time_lag = None
            
            if latest_counter >= 0:
                latest_doc_obj = collection.find_one({"counter": latest_counter})
                if latest_doc_obj and "written_at" in latest_doc_obj:
                    time_lag = time.time() - latest_doc_obj["written_at"]
                    max_lag = max(max_lag, time_lag)
            
            print(f"‚è±Ô∏è  Lag: {lag} documents | "
                  f"Time lag: {time_lag:.2f}s | "
                  f"Primary: {primary_count} | "
                  f"Secondary: {secondary_count}")
            
            if lag > 10:  # N·∫øu lag qu√° l·ªõn, c·∫£nh b√°o
                print("üö® C·∫¢NH B√ÅO: Replication lag l·ªõn!")
            
            if primary_count >= 50 and secondary_count >= 50:
                print(f"üìä K·∫øt th√∫c - Max lag: {max_lag:.2f} seconds")
                break
                
            time.sleep(2)
            
        except Exception as e:
            print(f"‚ùå L·ªói khi ƒë·ªçc t·ª´ Secondary: {e}")
            time.sleep(2)

# %%
# H√†m hi·ªÉn th·ªã real-time status
def monitor_status():
    while True:
        try:
            primary_status = primary_db.command('isMaster')
            print(f"\nüü¢ Primary: {primary_status['ismaster']} - Host: {primary_status['hosts']}")
            
            # L·∫•y tr·∫°ng th√°i replica set
            rs_status = primary_db.command('replSetGetStatus')
            members = rs_status['members']
            
            for member in members:
                state_str = member['stateStr']
                lag = member.get('optimeDate', datetime.datetime.now()) - member.get('lastHeartbeatRecv', datetime.datetime.now())
                print(f"  {member['name']}: {state_str} - Lag: {abs(lag.total_seconds()) if lag else 0:.1f}s")
            
        except Exception as e:
            print(f"‚ùå L·ªói monitoring: {e}")
        
        time.sleep(5)

# %%
# Ch·∫°y demo
print("=== DEMO REPLICATION LAG ===")
print("C·∫•u h√¨nh:")
print("- Primary: localhost:27027")
print("- Secondary: localhost:27028 (c√≥ delay 3s)")
print("- S·∫Ω ghi 50 documents v√†o primary")
print("- Theo d√µi replication lag tr√™n secondary")

# %%
# T·∫°o threads ƒë·ªÉ ch·∫°y ƒë·ªìng th·ªùi
write_thread = threading.Thread(target=write_to_primary)
read_thread = threading.Thread(target=read_from_secondary)
monitor_thread = threading.Thread(target=monitor_status)

# %%
# Ch·∫°y monitoring tr∆∞·ªõc
monitor_thread.daemon = True
monitor_thread.start()

time.sleep(3)

# %%
# Ch·∫°y ghi v√† ƒë·ªçc
write_thread.start()
time.sleep(5)  # ƒê·ª£i m·ªôt ch√∫t tr∆∞·ªõc khi b·∫Øt ƒë·∫ßu ƒë·ªçc
read_thread.start()

# %%
# ƒê·ª£i c√°c thread ho√†n th√†nh
write_thread.join()
read_thread.join()

print("üéâ Demo ho√†n th√†nh!")

# %%
# Ph√¢n t√≠ch k·∫øt qu·∫£
print("\n=== PH√ÇN T√çCH K·∫æT QU·∫¢ ===")
primary_collection = primary_db[collection_name]
secondary_collection = secondary_db[collection_name]

final_primary_count = primary_collection.count_documents({})
final_secondary_count = secondary_collection.count_documents({})

print(f"T·ªïng documents tr√™n Primary: {final_primary_count}")
print(f"T·ªïng documents tr√™n Secondary: {final_secondary_count}")
print(f"Replication lag cu·ªëi c√πng: {final_primary_count - final_secondary_count} documents")

# %%
# Hi·ªÉn th·ªã 5 documents cu·ªëi c√πng t·ª´ c·∫£ hai node
print("\n5 documents cu·ªëi c√πng t·ª´ PRIMARY:")
primary_docs = list(primary_collection.find().sort([("counter", -1)]).limit(5))
for doc in primary_docs:
    print(f"Counter: {doc['counter']} - Time: {doc['timestamp']}")

print("\n5 documents cu·ªëi c√πng t·ª´ SECONDARY:")
secondary_docs = list(secondary_collection.find().sort([("counter", -1)]).limit(5))
for doc in secondary_docs:
    print(f"Counter: {doc['counter']} - Time: {doc['timestamp']}")

# %%
# Ki·ªÉm tra user creation (ch·ª©ng minh vi·ªác ph√¢n quy·ªÅn ch·ªâ c√≥ tr√™n primary)
print("\n=== KI·ªÇM TRA PH√ÇN QUY·ªÄN ===")
try:
    # Th·ª≠ k·∫øt n·ªëi v·ªõi user admin tr√™n primary
    auth_primary = pymongo.MongoClient("mongodb://admin:password123@localhost:27027/admin")
    auth_primary.admin.command('ismaster')
    print("‚úÖ User admin ho·∫°t ƒë·ªông tr√™n Primary")
except Exception as e:
    print(f"‚ùå User admin kh√¥ng ho·∫°t ƒë·ªông tr√™n Primary: {e}")

try:
    # Th·ª≠ k·∫øt n·ªëi v·ªõi user admin tr√™n secondary (s·∫Ω th·∫•t b·∫°i)
    auth_secondary = pymongo.MongoClient("mongodb://admin:password123@localhost:27028/admin")
    auth_secondary.admin.command('ismaster')
    print("‚ùå User admin ho·∫°t ƒë·ªông tr√™n Secondary (KH√îNG ƒê√öNG)")
except Exception as e:
    print(f"‚úÖ User admin KH√îNG ho·∫°t ƒë·ªông tr√™n Secondary (ƒê√öNG): {e}")

print("üëâ Ch·ª©ng t·ªè ph√¢n quy·ªÅn ch·ªâ ƒë∆∞·ª£c replicate sau khi d·ªØ li·ªáu")
