### This notebook constructs an asynchronous query to do a heavyduty calculation periodically and then store the resulting data (top 1000 tag categories) into a separate key-value DynamoDB data store for faster more efficient and performant retrieval for near real-time processing. 
#### This enables us to efficiently store semi-permanent data into an efficient key-value data store which is optimized for reads, and split the workload of the client facing component of the project. 

In [17]:
import pymongo
from IPython.display import display
from pymongo import MongoClient
import pandas as pd
from dotenv import dotenv_values


config = dotenv_values(".env")

# setup mongo connection
client = MongoClient('mongodb+srv://' + config['MONGO_USER'] + ':' + config['MONGO_PASS'] + '@final-cluster.uucno.mongodb.net')
db = client.final
col = db.StackOverflowPosts

In [3]:
# Aggregation pipeline - count of each tag
pipeline = [
    # Project only the 'Tags' field
    {
        "$project": {
            "Tags": 1
        }
    },
    # Filter out documents where 'Tags' is missing or is not an array
    {
        "$match": {
            "Tags": {"$exists": True, "$type": "array"}
        }
    },
    # Unwind the 'Tags' array to create a document for each tag
    {
        "$unwind": "$Tags"
    },
    # Group by tag and count occurrences
    {
        "$group": {
            "_id": "$Tags",
            "count": {"$sum": 1}
        }
    },
    # Sort by count in descending order
    {
        "$sort": {"count": -1}
    },
    # Limit to the top 1000 tags
    {
        "$limit": 1000
    }
]



In [4]:
# Execute the aggregation query
docs = list(col.aggregate(pipeline))

In [19]:
import boto3
from botocore.exceptions import ClientError

# Initialize a DynamoDB client
dynamodb = boto3.resource('dynamodb', aws_access_key_id = config['ACCESS_ID'], aws_secret_access_key=config['SECRET_KEY'],region_name='us-east-1')

# Reference to your DynamoDB table
table_name = "final"
table = dynamodb.Table(table_name)

### Careful running the AWS code, double check if the table is cleared when uploading the 1000 top tags

In [None]:
# delete and recreate table - mimics clearing the data for new calculated content

# Delete the table
table.delete()
print(f"Table '{table_name}' deleted.")

# Wait for the table to be deleted
table.wait_until_not_exists()

# Recreate the table (example schema)
new_table = dynamodb.create_table(
    TableName=table_name,
    KeySchema=[{'AttributeName': 'tag', 'KeyType': 'HASH'}],
    AttributeDefinitions=[{'AttributeName': 'tag', 'AttributeType': 'S'}],
    ProvisionedThroughput={'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5}
)

# Wait for the table to be created
new_table.wait_until_exists()
print(f"Table '{table_name}' recreated.")

In [20]:
print(len(docs))
print(docs)
# upload precalculated top 1000 records into dynamoDB
for tag in docs:
    tag_id = tag["_id"]
    try:
        table.put_item(Item={"tag": tag_id})
        print(f"Uploaded tag: {tag_id}")
    except ClientError as e:
        print(f"Error uploading tag: {e}")

1000
[{'_id': 'javascript', 'count': 2496567}, {'_id': 'python', 'count': 2138942}, {'_id': 'java', 'count': 1898054}, {'_id': 'c#', 'count': 1592815}, {'_id': 'php', 'count': 1458812}, {'_id': 'android', 'count': 1404681}, {'_id': 'html', 'count': 1174530}, {'_id': 'jquery', 'count': 1033935}, {'_id': 'c++', 'count': 794407}, {'_id': 'css', 'count': 792766}, {'_id': 'ios', 'count': 681166}, {'_id': 'sql', 'count': 661392}, {'_id': 'mysql', 'count': 660061}, {'_id': 'r', 'count': 490741}, {'_id': 'node.js', 'count': 462679}, {'_id': 'reactjs', 'count': 456594}, {'_id': 'arrays', 'count': 413135}, {'_id': 'c', 'count': 396698}, {'_id': 'asp.net', 'count': 373010}, {'_id': 'json', 'count': 356195}, {'_id': 'python-3.x', 'count': 338362}, {'_id': 'ruby-on-rails', 'count': 336846}, {'_id': '.net', 'count': 331660}, {'_id': 'sql-server', 'count': 330256}, {'_id': 'swift', 'count': 327523}, {'_id': 'django', 'count': 306986}, {'_id': 'angular', 'count': 296194}, {'_id': 'objective-c', 'count

In [22]:
# mockup of code to be used in javascript server query

# retrieve list of tags from DynamoDB
response = table.scan(ProjectionExpression='tag')
tags_list = [item['tag'] for item in response['Items']]
print(tags_list)
print(len(tags_list))

['export', 'angularjs-directive', 'ssis', 'windows-runtime', 'function', 'ffmpeg', 'stripe-payments', 'paypal', 'retrofit', 'global-variables', 'opencv', 'centos', 'drag-and-drop', 'sonarqube', 'codeigniter', 'winapi', 'binary', 'in-app-purchase', 'maps', 'android-listview', 'optimization', 'typescript', 'angularjs-ng-repeat', 'filesystems', 'uitextfield', 'webpack', 'sprite-kit', 'jsx', 'excel', 'gradle', 'nuget', 'pyqt5', 'terminal', 'active-directory', 'wordpress-theming', 'salesforce', 'mfc', 'repository', 'continuous-integration', 'location', 'textview', 'elasticsearch', 'vue.js', 'spring-data', 'calendar', 'ruby', 'cross-browser', 'https', 'web-config', 'uibutton', 'gpu', 'mockito', 'flutter-layout', 'android-fragments', 'xsd', 'collections', 'jpa', 'http', 'asp.net-mvc-2', 'thymeleaf', 'powerbi', 'variables', 'ef-code-first', 'heroku', 'ipad', 'timeout', 'kotlin', 'onclick', 'output', 'google-sheets-formula', 'soap', 'excel-formula', 'servlets', 'uiview', 'pagination', 'dependen

In [28]:
# make mongo query, compare every document to see if tags are in the tags_set
# including optimizations to increase performance and manage bandwidth:
# projection of only necessary fields (tags)
# cache dynamodb call?
# index the tags field for efficient reading of tags

# since our data is so large we want to ensure that we are offloading the bulk of the calculations onto the cluster rather than doing the calculations locally or on clientside. to do this we make use of MongoDB aggregations

# Aggregation pipeline to count tags
pipeline = [
    # Project only the 'Tags' field
    {
        "$project": {
            "Tags": 1
        }
    },
    # Filter out documents where 'Tags' is missing or is not an array
    {
        "$match": {
            "Tags": {"$exists": True, "$type": "array"}
        }
    },
    # Unwind the 'Tags' array to create a document for each tag
    {
        "$unwind": "$Tags"
    },
    # Filter to include only documents with tags in the specified list
    {
        "$match": {
            "Tags": {"$in": tags_list}
        }
    },
    # Group by tag and count occurrences
    {
        "$group": {
            "_id": "$Tags",
            "count": {"$sum": 1}
        }
    },
    # Sort by count in descending order
    {
        "$sort": {"count": -1}
    },
    # Limit to the top 1000 tags
    {
        "$limit": 1000
    }
]

top_tags = list(col.aggregate(pipeline))


OperationFailure: PlanExecutor error during aggregation :: caused by :: $strLenCP requires a string argument, found: array, full error: {'ok': 0.0, 'errmsg': 'PlanExecutor error during aggregation :: caused by :: $strLenCP requires a string argument, found: array', 'code': 34471, 'codeName': 'Location34471', '$clusterTime': {'clusterTime': Timestamp(1702432730, 8034), 'signature': {'hash': b'2\xe7\xf6\x1f+\xf3\xcdt\x03M\x84~\r+\xdd\x1dK\xda"}', 'keyId': 7309311056709419012}}, 'operationTime': Timestamp(1702432730, 8034)}

In [27]:
print(list(top_tags))
# pretty display the resulting docs using pandas (_id: tag name, count: number of posts with that tag)
df = pd.DataFrame(top_tags)
display(df)

[]
