# Description:
In this notebook we explore the MongoDB instance where we keep our news documents. We see some examples of documents stored, the time at which each batch of documents was inserted, provide a function to remove batch of documents given their insertion time, analyze the distribution of time at which each document is published, analyze the distribution of category of documents as well as their sources, analyze the relationship between category and source, provide a function to remove documents with no content or description and a function to check/ remove duplicate documents based on description and content combined. This function was used previoulsy the creation of the unique index of content and description. With this index there shouldn't be any duplicate documents.

# TODO:
- UPDATE THIS TO GET DATA FROM ELASTICSEARCH OVER MONGODB   

In [1]:
import os
from datetime import datetime, timedelta
from pprint import pprint
from elasticsearch import Elasticsearch
import pandas as pd
import matplotlib.pyplot as plt

In [2]:
# Connecting to Elasticsearch
es = Elasticsearch(
    hosts=['odfe-node1', '0.0.0.0'],
    http_auth=('admin', 'admin'),
    scheme="https",
    verify_certs=False
)

## Indices
The document store is composed of indices. These indices in turn hold documents.

In [3]:
# List indices
es.indices.get_alias("*")

{'label': {'aliases': {}},
 '.opendistro_security': {'aliases': {}},
 'security-auditlog-2021.08.06': {'aliases': {}},
 'document': {'aliases': {}}}

In [14]:
# How many documents does the 'document' collection hold?
es.indices.refresh('document')
count = es.cat.count('document', params={"format": "json"})[0]['count']
print(f"There are {count} documents in the index 'document'")

There are 335654 documents in the index 'document'


## Get random document

In [18]:
# Print random document
result = es.search(
    {
        "size": 1,
        "query": {
            "function_score": {
                "functions": [
                    {
                        "random_score": {
                            "seed": "1477072619038"
                        }
                    }
                ]
            }
        }
    },
    index="document"
)['hits']['hits'][0]
print(f"Document ID: {result['_id']}", "\n", result['_source'])

Document ID: 3bddeb1b-b947-46e7-8a11-acf273e48784 
 {'text': 'The US wants startups to get a piece of the $16 billion spent on space tech – TechCrunch The U.S. government is one of the biggest spenders in the nascent space industry and the man who handles the money for the Air Force’s $16 billion checkbook wants startups to know that his door is open for them. In all, Will Roper, the Assistant Secretary of  The U.S. government is one of the biggest spenders in the nascent space industry and the man who handles the money for the Air Force’s $16 billion checkbook wants startups to know that his door is op', 'embedding': [-0.0016897115856409073, 0.07757193595170975, -0.015606055036187172, 0.5347102880477905, -0.0044190650805830956, -0.3730097711086273, -1.0053037405014038, -0.21396642923355103, -0.2809211313724518, -0.16508913040161133, -0.1657077521085739, 0.07875555008649826, 0.23647645115852356, 0.19197793304920197, -0.5011623501777649, -0.5944852828979492, 0.4417313039302826, -0.39503

## Published date

In [25]:
# Get documents between two dates
result = es.search(
    {
        "query": {
            "range": {
                "publishedat": {
                    "gte": "2021-08-01",
                    "lte": "2021-09-01"
                }
            }
        }
    },
    index="document"
)['hits']['hits']

print(f"There are {len(result)} documents in the search result. An example of these documents:\n")
print(f"Document ID: {result[0]['_id']}", "\n", result[0]['_source'])

There are 10 documents in the search result. An example of these documents:

Document ID: 198fe490-df8b-4ae5-a08a-c8065b6f381e 
 {'text': 'Tampa Bay doctor weighs in on delta plus variant and if you should be concerned - WFLA There’s growing concern surrounding the coronavirus delta variant, and now, there’s reports about a delta plus variant. TAMPA, Fla. (WFLA) — There’s growing concern surrounding the coronavirus delta variant, and now, there are reports about a delta plus variant. The delta plus variant is similar to the existing delt', 'embedding': [0.5252478718757629, -0.7922804355621338, 0.5082009434700012, 0.19313007593154907, 0.3913789987564087, -0.17472167313098907, -0.7063270211219788, -0.03448726236820221, -0.6324267387390137, -0.3566722571849823, -0.1480695903301239, 0.26366034150123596, -0.49925512075424194, -0.28616660833358765, 0.661585807800293, 0.20501725375652313, 0.31656157970428467, 0.4915027320384979, 0.13757525384426117, 0.09903568029403687, -0.03624914586544037, 

In [27]:
# Get most recent documents
result = es.search(
    {
        "size": 10,
        "sort": {
            "publishedat": "desc"
        },
        "query": {
            "match_all": {}
        }
    },
    index="document"
)['hits']['hits']
print("The most recent document is: \n")
print(f"Document ID: {result[0]['_id']}", "\n", result[0]['_source'])

Document ID: f49241cc-21f8-4d56-8019-4bebdf0a5abf 
 {'text': "News24.com | Big win for Amazon in battle with Indian conglomerate US e-commerce giant Amazon won a major legal victory in India on Friday as the country's top court blocked a $3.4-billion deal struck by domestic rival Reliance. Amazon won a major legal victory in India on Friday as the country's top court blocked a $3.4-billion deal struck by a domestic e-commerce rival.Last year, the rival, Reliance, struc", 'embedding': [-0.477854460477829, -1.0442570447921753, -0.06881744414567947, 0.22596564888954163, 0.0019466019002720714, 0.5015498995780945, -0.321696400642395, 0.3806595504283905, -0.26726099848747253, 0.3787592053413391, 1.067002534866333, 0.3469347357749939, 0.6093743443489075, 0.2808688282966614, 0.11111714690923691, 0.006571550387889147, -0.2259233146905899, 0.10582209378480911, -0.13152094185352325, 0.11749044060707092, 0.344808965921402, -0.378091037273407, 0.4888850450515747, 0.6534454226493835, 0.2962729930877

## Exploratory Data Analysis

### publishedAt

In [None]:
pipeline = [
    {  # project publishedAtDay and publishedAtHour
        '$project': {
            'publishedAtDay': {
                '$dateToString': {
                    'format': '%d-%m-%YT%H', 
                    'date': {'$toDate': '$publishedAt'}
                }
            },
            'publishedAtHour': {
                '$hour': {
                    'date': {'$toDate': '$publishedAt'}
                }
            }
        }
    },
    {  # groups on publishedAtDay and gets number of documents per day and hour (document_count) and publishedAtHour
        '$group': {
            '_id': '$publishedAtDay',
            'document_count': {'$sum': 1},
            'publishedAtHour': {'$first': '$publishedAtHour'}
        }
    },
    {  # groups on publishedAtHour and gets average of documents per hour over days
        '$group': {
            '_id': '$publishedAtHour',
            'avg_document_count': {'$avg': '$document_count'}
        }
    },
    {  # sort results in descending order by _id
        '$sort': {'_id': -1}
    }
]

fig, axes = plt.subplots(2, 1, figsize=(13, 9))
for ax, col in zip(axes.flatten(), collection_list):
    x, y = [], []
    for i in list(db[col].aggregate(pipeline)):
        x.append(i['_id'])
        y.append(i['avg_document_count'])
    ax.plot(x, y, linestyle="-")
    ax.set_xticks(x)
    ax.set_title(f"Average number of documents per publishedAt Hour - {col} collection")
    
plt.show()

## category

In [None]:
pipeline = [
    {  # project category
        '$project': {
            '_id': 0,
            'category': 1
        }
    },
    {  # groups on category and gets number of documents for each category
        '$group': {
            '_id': '$category',
            'document_count': {'$sum': 1},
        }
    },
    {  # sort results in descending order by _id
        '$sort': {'_id': 1}
    }
]

fig, axes = plt.subplots(2, 1, figsize=(13, 9))
for ax, col in zip(axes.flatten(), collection_list):
    x, y = [], []
    for i in list(db[col].aggregate(pipeline)):
        x.append(i['_id'])
        y.append(i['document_count'])
    ax.bar(x, y)
    ax.set_xticks(x)
    ax.set_title(f"Number of documents per category - {col} collection")
    
plt.show()

## source

In [None]:
pipeline = [
    {  # project source
        '$project': {
            '_id': 0,
            'source': 1
        }
    },
    {  # groups on source and gets number of documents for each source
        '$group': {
            '_id': '$source',
            'document_count': {'$sum': 1},
        }
    },
    {  # sort results in descending order by _id
        '$sort': {'_id': 1}
    }
]

fig, axes = plt.subplots(2, 1, figsize=(19, 9))
for ax, col in zip(axes.flatten(), collection_list):
    x, y = [], []
    for i in list(db[col].aggregate(pipeline)):
        if i['_id'] is None:
            x.append("Null")
        else:
            x.append(i['_id'])
        y.append(i['document_count'])
    ax.bar(x, y)
    ax.set_xticks(x)
    ax.set_xticklabels(x, rotation=30, ha='right')
    ax.set_title(f"Number of documents per source - {col} collection")

plt.subplots_adjust(hspace=0.4)
plt.show()

## relationship between categories and sources

In [None]:
# use stacked bar chart
pipeline = [
    {  # project source and category
        '$project': {
            '_id': 0,
            'source': 1,
            'category': 1
        }
    },
    {  # groups on source and gets number of documents for each source
        '$group': {
            '_id': {
                'category': '$category',
                'source': '$source'
            },
            'document_count': {'$sum': 1},
        }
    },
    {
        '$project':{
            '_id': 0,
            'document_count': 1,
            'category': '$_id.category',
            'source': '$_id.source'            
        }
    },
    {  # sort results in descending order by _id
        '$sort': {'category': 1}
    }
]

fig, axes = plt.subplots(2, 1, figsize=(19, 11))
for ax, col in zip(axes.flatten(), collection_list):
    plt_data = pd.DataFrame(list(db[col].aggregate(pipeline))).pivot(index="source", columns="category", values="document_count")
    plt_data["sum"] = plt_data.sum(axis=1)
    plt_data.sort_values("sum", ascending=False).drop("sum", axis=1).plot(kind='bar', stacked=True, rot=90, ax=ax)
    ax.set_title(f"Source frequencies by category - {col} collection")

plt.subplots_adjust(hspace=0.9)
plt.show()

## missing values

In [None]:
def remove_missing_values(db, collection=None): 
    """
    Function to remove documents with missing values on both description and content from db's specified collection
    or all of them (default).
    """
    pipeline_remove = [
        {
            '$project': {
                '_id': 1,
                'description': 1,
                'content': 1
            }
        },
        {
            "$match": {
                '$or': [
                    {
                        "description" : {"$eq" : None},
                        "content" : {"$eq": None}
                    },
                    {
                        "description" : {"$eq" : ''},
                        "content" : {"$eq": ''}
                    },
                    {
                        "description" : {"$eq" : None},
                        "content" : {"$eq": ''}
                    },
                    {
                        "description" : {"$eq" : ''},
                        "content" : {"$eq": None}
                    }                    
                ]
            } 
        }, 

        {
            "$project": {
                "id" : 1
            }
        }
    ]
    
    if collection is None:
        collection_list = db.list_collection_names()
        for col in collection_list:
            idsList = list(map(lambda x: x['_id'], db[col].aggregate(pipeline_remove)))
            db[col].delete_many({'_id': {'$in': idsList}})
            print(f"{len(idsList)} documents with missing values were removed from {col}\n")
    else:
        idsList = list(map(lambda x: x['_id'], db[collection].aggregate(pipeline_remove)))
        db[collection].delete_many({'_id': {'$in': idsList}})
        print(f"{len(idsList)} documents with missing values were removed from {collection}\n")

remove_missing_values(db)

## duplicates

In [None]:
def remove_duplicates(db, collection=None): 
    """
    Function to remove documents with missing values on both description and content from db's specified collection
    or all of them (default).
    """
    pipeline_remove = [
        {
            "$group": {
                "_id": {'description': '$description', 'content': '$content'},
                "_idsNeedsToBeDeleted": {"$push": "$$ROOT._id"} # push all `_id`'s to an array
            }
        },
        # Remove first element - which is removing a doc
        {
            "$project": {
                "_id": 0,
                "_idsNeedsToBeDeleted": {  
                    "$slice": [
                        "$_idsNeedsToBeDeleted", 1, {"$size": "$_idsNeedsToBeDeleted"}
                    ]
                }
            }
        },
        {
            "$unwind": "$_idsNeedsToBeDeleted" # Unwind `_idsNeedsToBeDeleted`
        },
        # Group without a condition & push all `_idsNeedsToBeDeleted` fields to an array
        {
            "$group": { "_id": "", "_idsNeedsToBeDeleted": { "$push": "$_idsNeedsToBeDeleted" } }
        },
        { 
            "$project" : { "_id" : 0 }  # Optional stage
        }
        # At the end you'll have an [{ _idsNeedsToBeDeleted: [_ids] }] or []
    ]
    
    if collection is None:
        collection_list = db.list_collection_names()
        for col in collection_list:
            try:
                idsList = list(db[col].aggregate(pipeline_remove))[0]["_idsNeedsToBeDeleted"]
                db[col].delete_many({'_id': {'$in': idsList}})
                print(f"{len(idsList)} documents with duplicated documents were removed from {col}\n")
            except IndexError:
                print(f"0 documents with duplicated documents in {col}\n")
    else:
        try:
            idsList = list(db[collection].aggregate(pipeline_remove))[0]["_idsNeedsToBeDeleted"]
            db[collection].delete_many({'_id': {'$in': idsList}})
            print(f"{len(idsList)} documents with duplicated documents were removed from {collection}\n")
        except IndexError:
                print(f"0 documents with duplicated documents in {collection}\n")

remove_duplicates(db)

### duplicates across collections

In [None]:
pipeline = [
    {  # project fields
        '$project': {
            '_id': 0,
            'text': {
                '$concat': [
                    {'$ifNull': ['$title', '']},
                    ' - ',
                    {'$ifNull': ['$description', '']},
                    ' - ',
                    {'$ifNull': ['$content', '']},
                ]
            }
        }
    }
]

r_everything = list(map(lambda x: x['text'], db.everything.aggregate(pipeline)))
r_top_headlines = list(map(lambda x: x['text'], db.top_headlines.aggregate(pipeline)))

In [None]:
# union allows to join the elements of two sets while removing the duplicates
len(set(r_top_headlines).union(set(r_everything)))