# Atlas Online Archive

[Atlas Online Archive](https://docs.atlas.mongodb.com/online-archive/manage-online-archive/) moves infrequently accessed immutable data from your Atlas cluster to MongoDB-managed read-only blob storage without user action. Once Atlas archives the data, you have a unified view of your Atlas and Online Archive data.

<img src="./images/online_archive_architecture.png">

In this demo we will generate 1000 IoT events for the current year. Here's an example event:

```JSON
{
  '_id': ObjectId('5ef4ff46cf35f6a16e7f88a9'),
  'username': 'rogerrhodes',
  'remote_ipv4': '82.180.218.173',
  'httpMethod': 'PATCH',
  'hostName': 'desktop-51.freeman.net',
  'portNum': 52048,
  'location': {
    'type': 'Point',
    'coordinates': [
      Decimal128('-158.511919'),
      Decimal128('24.326279')
    ]
  },
  'dateAccessed': datetime.datetime(2020,  6,  15,  0,  0)
}
```

The events will be written to ```test.iot``` and Online Archive has been configured to achive documents whose ```dateAccessed``` field is older than 30 days:

<img src="./images/online_archive.png">


## Python Setup

In [None]:
# Install dependencies
!sudo -E pip install --upgrade pip
!sudo -E pip install python-dotenv
!sudo -E pip install faker
!sudo -E pip install --upgrade pymongo
!sudo -E pip install dnspython

In [None]:
# Imports
import time
import datetime
from timeit import default_timer as timer
import settings
from pymongo import MongoClient
from faker import Faker
from bson.decimal128 import Decimal128
import requests
from requests.auth import HTTPDigestAuth
import json
import pprint


# Constants loaded from .env file
MDB_CONNECTION = settings.MDB_CONNECTION
MDB_CONNECTION_ARCHIVE = settings.MDB_CONNECTION_ARCHIVE
PEM_FILE = settings.PEM_FILE
MDB_DATABASE = settings.MDB_DATABASE
MDB_COLLECTION = settings.MDB_COLLECTION
NUM_DOCS = settings.NUM_DOCS


## Some Helper Functions

In [None]:
def get_cluster_collection():
    mongo_client = MongoClient(MDB_CONNECTION, ssl_certfile=PEM_FILE)
    db = mongo_client[MDB_DATABASE]
    return db[MDB_COLLECTION]

def get_cluster_archive_collection():
    mongo_client_archive = MongoClient(MDB_CONNECTION_ARCHIVE)
    archive_db = mongo_client_archive[MDB_DATABASE]
    return archive_db[MDB_COLLECTION]

# Verity records still remain in the cluster.
def refresh_needed():
    
    # Ensure there are still unarchived documents for the demo
    my_collection = get_cluster_collection()
    records = my_collection.count_documents({})
    if ( records == 0):
        print ('There are no records in the cluster')
        return True
    else:
        print ('There are ' + str(records) + ' documents in the cluster')
        return False
            
def generate_events():
    fake = Faker()

    # Start script
    startTs = time.gmtime()
    start = timer()
    print("================================")
    print("   Generating Sample IoT Data   ")
    print("================================")
    print("\nStarting " + time.strftime("%Y-%m-%d %H:%M:%S", startTs) + "\n")

    print('NUM DOCS TO GENERATE: ' + str(NUM_DOCS))

    my_collection = get_cluster_collection()

    # Remove the existing documents (don't drop the collection from underneath the archive)
    my_collection.delete_many({})

    for index in range(int(NUM_DOCS)):
        # create timestamp
        fake_timestamp = fake.date_this_year()

        # Define IoT Document
        my_iot_document = {
            "username": fake.user_name(),
            "remote_ipv4": fake.ipv4(),
            "httpMethod": fake.http_method(),
            "hostName": fake.hostname(),
            "portNum": fake.port_number(),
            "location": {
                    "type": "Point",
                    "coordinates": [
                        Decimal128(fake.longitude()),
                        Decimal128(fake.latitude())
                    ]
            },
            "dateAccessed": datetime.datetime(fake_timestamp.year, fake_timestamp.month, fake_timestamp.day)
        }
        # print(my_iot_document)
        print(".", end="")
        my_collection.insert_one(my_iot_document)

    # Indicate end of script
    end = timer()
    endTs = time.gmtime()
    print("\nEnding " + time.strftime("%Y-%m-%d %H:%M:%S", endTs))
    print('===============================')
    print('Total Time Elapsed (in seconds): ' + str(end - start))
    print('===============================')    
              

# Use after a new archive is created to verify the data has been archived.
def wait_for_data_to_archive():
    my_collection = get_cluster_collection()

    archive_date = get_archive_date()
    query = {'dateAccessed':{'$lt': archive_date}}

    docs_waiting_for_archive = my_collection.count_documents(query)

    if docs_waiting_for_archive > 0:
        print (str(docs_waiting_for_archive) + " documents remaining to be archived")   

        while docs_waiting_for_archive > 0:
            current_docs_waiting_for_archive = my_collection.count_documents(query)
            if current_docs_waiting_for_archive < docs_waiting_for_archive:
                print (str(current_docs_waiting_for_archive) + " documents remaining to be archived")
                docs_waiting_for_archive = current_docs_waiting_for_archive
            else:
                print(".", end="")    
            time.sleep(1)
    else:
        print("Archive complete. " + str(my_collection.count_documents({})) + " documents remain in the Atlas Cluster")    
        
def get_archive_date():
    return datetime.datetime.now() - datetime.timedelta(30)

def print_row(count, source):
    formatted_count = str(count).rjust(5)
    print(" %-10s %45s" % (formatted_count, source))
    

## Archive Test and Setup

If the cluster still contains unarchived events, the demo's good to go. Otherwise, the archive should be rebuilt. To keep the demo consistent, we'll keep 1000 records in the collection, across the cluster and the archive. 

To rebuild the demo, delete and recreate the Online Archive (the archive itself is immutable).

<img src="./images/config_archive.png">

Then open a new cell and run the ```generate_events()``` funcction. *Note, I played with using a flag that could be set to call this function, but I kept forgetting to unset the flag and generating more events. Forcing the creatiion of cell for this uncommon event should prevent this.*

Note, if attempting to demo this live, it can take several minutes for documents to archive after the archive is created. You can run the ```wait_for_data_to_archive()``` function to verify the demo is ready.

In [None]:
if refresh_needed(): 
    
    print ('\nRecreate the Online Archive:')
    print ('   Step 1: Delete and Online Archive.')
    print ('   Step 2: Return here, open a new cell and run the generate_events() function.')
    print ('   Step 3: Create the Online Archive.')
    
else:
    print ("Online Archive is ready to demo!")


## Cluster Query

With our archive configuration, the cluster only contains 30 days of events. Let's validate that's true by counting the number of documents older than 30 days...

In [None]:
my_collection = get_cluster_collection()

archive_date = get_archive_date()
query = {'dateAccessed':{'$lt': archive_date}}

print ('All documents older than ' + str(archive_date.date()) + ' have been archived.\n')

print ('A query for documents older than ' + str(archive_date.date()) + ' should return zero.\n')

query = {'dateAccessed':{'$lt': archive_date}}
doc_count = my_collection.count_documents(query)
print (str(doc_count) + ' documents returned from the query\n')

print ("Here's an example document:\n")
pp = pprint.PrettyPrinter(indent=4)
pp.pprint(my_collection.find_one())

## Cluster and Online Archive

Atlas now provides two connection options: **Connect to Cluster** and **Connect to Cluster and Online Archive**:

<img src="./images/connection_options.png">

Connecting to the Cluster and Online Archive will give you a unified view of the documents. Let's see...

In [None]:
# Establish a connection to the Cluster and Online Archive
my_collection_archive = get_cluster_archive_collection()

archive_date = get_archive_date()
query = {'dateAccessed':{'$lt': archive_date}}

print ('All documents older than ' + str(archive_date.date()) + ' have been archived.\n')

print ('A query for documents older than ' + str(archive_date.date()) + ' should return documents.\n')

doc_count = my_collection_archive.count_documents(query)
print (str(doc_count) + ' documents returned from the query\n')

print ("Here's an example document from the archive:\n")
pp = pprint.PrettyPrinter(indent=4)
pp.pprint(my_collection_archive.find_one(query))

## Some Math Across the Cluster an Online Archive
While the Atlas Cluster has some subset of the documents, there are still 1000 documents across the cluster and archive.

In [None]:
# Establish a connection to the Cluster and Online Archive
my_collection_archive = get_cluster_archive_collection()

archive_date = get_archive_date()
query = {'dateAccessed':{'$lt': archive_date}}

cluster_count = my_collection.count_documents({'dateAccessed':{'$lt': archive_date}})
cluster_archive_count = my_collection_archive.count_documents({'dateAccessed':{'$lt': archive_date}})

print("Archive date (30 days ago): " + str(archive_date.date()))
print('')
print_row(my_collection.count_documents({}), "Total number of documents in the Atlas Cluster")
print_row(cluster_count, "Total number of documents in the Atlas Cluster older than 30 days")
print_row(cluster_archive_count, "Total number of documents across the Atlas Cluster and the Online Archive older than 30 days")
print('------')
print_row(my_collection_archive.count_documents({}), "Total number of documents across the Atlas Cluster and Online Archive")