In [44]:
# IMPORT REQUIRED DEPENDENCIES
from pymongo import MongoClient
import pandas as pd
import json

In [68]:
# MONGODB HOST e.g. //localhost
host = '//localhost' 

# MONGODB POPRT e.g. //27017
port = '27017'

# CONNECT TO MONGODB
client = MongoClient('mongodb:'+host+':'+port+'/?readPreference=primary&appname=MongoDB%20Compass&ssl=false')

# THE OUTPUT PATH FOR THE PRODUCED JSON FILE 
json_output_path = "../data/output/netflix_titles.json" 

# TASK OUTPUT PATHS
task_1_output = 'output/task_1/'
task_2_output = 'output/task_2/'
task_3_output = 'output/task_3/'
task_4_output = 'output/task_4/'
task_5_output = 'output/task_5/'

In [69]:
# LOAD JSON DATA
with open(json_output_path) as f:
    file_data = json.load(f)

# DATABASE NAME
dbname = 'netflix'
    
# COLLECTION NAME
collection = 'movies'
    
# GET DB NAMES
dblist = client.list_database_names()

# IMPORT DATA IN COLLECTION. 
# IF COLLECTION AND DB ALREADY EXIST, DELETE EVERYTHING AND IMPORT
if dbname in dblist:
    collnames = client[dbname].list_collection_names()
    if collection in collnames:
        client[dbname][collection].delete_many({})    
    client[dbname][collection].insert_many(file_data)
    db = client[dbname][collection]
else:
    client[dbname][collection].insert_many(file_data)
    db = client[dbname][collection]


In [70]:
task_1 = db.aggregate([
    {
        '$match': {
            'date_added': {
                '$regex': '2019$'
            }
        }
    }, {
        '$project': {
            'show_id': 1, 
            'type': 1, 
            'title': 1
        }
    }
])

# APPEND QUERY RESULTS TO DATAFRAME
df =  pd.DataFrame(list(task_1)) 

# SELECT THE COLUMNS REQUIRED FOR THE TASK
columns = ['show_id', 'title', 'type'] 
df = df[columns]

# CREATE THE CSV AND JSON OUTPUT FILES
df.to_csv(task_1_output+"task_1.csv", index = False)
df.to_json(task_1_output+"task_1.json", orient = "records", date_format = "epoch", force_ascii = True, default_handler = None)

In [71]:
task_2 = db.aggregate([
        {
            '$addFields': {
                'countries': {
                    '$split': [
                        '$country', ', '
                    ]
                }
            }
        }, {
            '$match': {
                'type': 'TV Show'
            }
        }, {
            '$unwind': {
                'path': '$countries'
            }
        }, {
            '$group': {
                '_id': '$countries', 
                'count': {
                    '$sum': 1
                }
            }
        }, {
            '$sort': {
                'count': -1
            }
        }
    ])

# APPEND QUERY RESULTS TO DATAFRAME
df =  pd.DataFrame(list(task_2)) 

# SELECT THE COLUMNS REQUIRED FOR THE TASK
columns = ['_id', 'count'] 
df = df[columns]

# CREATE THE CSV AND JSON OUTPUT FILES
df.to_csv(task_2_output+"task_2.csv", index=False)
df.to_json(task_2_output+"task_2.json", orient = "records", date_format = "epoch", force_ascii = True, default_handler = None)

In [73]:
task_3 = db.aggregate([
    {
        '$addFields': {
            'genres': {
                '$split': [
                    '$listed_in', ', '
                ]
            }
        }
    }, {
        '$unwind': {
            'path': '$genres'
        }
    }, {
        '$group': {
            '_id': '$genres', 
            'count': {
                '$sum': 1
            }
        }
    }, {
        '$sort': {
            'count': -1
        }
    }
])
# APPEND QUERY RESULTS TO DATAFRAME
df =  pd.DataFrame(list(task_3)) 

# SELECT THE COLUMNS REQUIRED FOR THE TASK
columns = ['_id', 'count'] 
df = df[columns]

# CREATE THE CSV AND JSON OUTPUT FILES
df.to_csv(task_3_output+"task_3.csv", index=False)
df.to_json(task_3_output+"task_3.json", orient = "records", date_format = "epoch", force_ascii = True, default_handler = None)

In [74]:
task_4 = db.aggregate([
    {
        '$addFields': {
            'actors': {
                '$split': [
                    '$cast', ', '
                ]
            }
        }
    }, {
        '$unwind': {
            'path': '$actors'
        }
    }, {
        '$group': {
            '_id': '$actors', 
            'count': {
                '$sum': 1
            }
        }
    }, {
        '$sort': {
            'count': -1
        }
    }
])
# APPEND QUERY RESULTS TO DATAFRAME
df =  pd.DataFrame(list(task_4)) 

# SELECT THE COLUMNS REQUIRED FOR THE TASK
columns = ['_id', 'count'] 
df = df[columns]

# CREATE THE CSV AND JSON OUTPUT FILES
df.to_csv(task_4_output+"task_4.csv", index=False)
df.to_json(task_4_output+"task_4.json", orient = "records", date_format = "epoch", force_ascii = True, default_handler = None)

In [75]:
task_5 = db.aggregate([
    {
        '$addFields': {
            'actors': {
                '$split': [
                    '$cast', ', '
                ]
            }
        }
    }, {
        '$unwind': {
            'path': '$actors'
        }
    }, {
        '$addFields': {
            'genre': {
                '$split': [
                    '$listed_in', ', '
                ]
            }
        }
    }, {
        '$unwind': {
            'path': '$genre'
        }
    }, {
        '$group': {
            '_id': {
                'id': '$actors', 
                'genre': '$genre'
            }, 
            'count': {
                '$sum': 1
            }
        }
    }, {
        '$group': {
            '_id': '$_id.id', 
            'term_count': {
                '$push': {
                    'genre': '$_id.genre', 
                    'count': '$count'
                }
            }
        }
    }, {
        '$sort': {
            'term_count.count': -1
        }
    }, {
        '$unwind': {
            'path': '$term_count'
        }
    }, {
        '$sort': {
            'term_count.count': -1
        }
    }, {
        '$group': {
            '_id': '$_id', 
            'term_count_ordered': {
                '$push': '$term_count'
            }
        }
    }, {
        '$sort': {
            'term_count_ordered.count': -1
        }
    }, {
        '$project': {
            '_id': 1, 
            'type': {
                '$arrayElemAt': [
                    '$term_count_ordered.genre', 0
                ]
            }, 
            'count': {
                '$arrayElemAt': [
                    '$term_count_ordered.count', 0
                ]
            }
        }
    }, {
        '$sort': {
            '_id': 1
        }
    }
])
# APPEND QUERY RESULTS TO DATAFRAME
df =  pd.DataFrame(list(task_5)) 

# SELECT THE COLUMNS REQUIRED FOR THE TASK
columns = ['_id', 'count', 'type'] 
df = df[columns]

# CREATE THE CSV AND JSON OUTPUT FILES
df.to_csv(task_5_output+"task_5.csv", index=False)
df.to_json(task_5_output+"task_5.json", orient = "records", date_format = "epoch", force_ascii = True, default_handler = None)