In [2]:
import time
import datetime
import pandas as pd
import os
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)

## Read Config File

In [None]:
import configparser 
config = configparser.ConfigParser()
config.read('config.ini')
ip = config['DEFAULT']['IP']
port = config['DEFAULT']['MongoDB-Port']
slack_token = config['DEFAULT']['Slack-Token']
channel = config['DEFAULT']['Channel']
file_path = config['DEFAULT']['File-Path']
log_file_path = config['DEFAULT']['Log-File-Path']
out_folder = config['DEFAULT']['Out-Folder']

## Connect MongoDB

In [None]:
import pymongo
from pymongo import MongoClient
client = MongoClient(ip, int(port))

## Connect Slack

In [None]:
from slacker import Slacker
slack = Slacker(slack_token)

## Supporting Functions

In [None]:
def connect_database(client,slack):
    # connect to database
    db_twitter = client["Twitter"]
    collections_twitter = db_twitter.collection_names()
    return db_twitter, collections_twitter

In [None]:
def current_year_week():
    try:
        # get refrence timestamp list for each year
        year_timestamp_df = pd.read_csv("year_timestamp.csv", header = None, encoding = "UTF-8",dtype = int)
        year_list = year_timestamp_df[0].tolist()
        year_timestamp_list = year_timestamp_df[1].tolist()

        # get current year and current week number
        current_timestamp = int(time.time() * 1000)
        current_year = int(datetime.datetime.now().year)
        year_index = year_list.index(current_year)
        ref_timestamp = year_timestamp_list[year_index]
        if current_timestamp < ref_timestamp:
            ref_timestamp = year_timestamp_list[year_index-1]    
        else:
            if current_timestamp > year_timestamp_list[year_index+1]:
                ref_timestamp = year_timestamp_list[year_index+1]
                current_year = current_year + 1
        print("current year : " + str(current_year))

        current_week = int((current_timestamp - ref_timestamp)/1000/604800)+1

        print("current week : " + str(current_week))
        return current_year, current_week
    except Exception as e:
        slack.chat.post_message(channel,str(e))

In [None]:
# create foler if not exist
def create_folder(folder):
    if not os.path.exists(folder):
        os.makedirs(folder)

In [None]:
def get_lastest_number(db_twitter,collections_twitter,current_year,current_week):
    w = "W"+str(current_week)
    dic_collection = {}
    for i in collections_twitter:
        if i.startswith(str(current_year)) and w in i:
            dic_collection[i] = "{:}".format(db_twitter[i].find({}).count())
    return dic_collection

In [None]:
def read_file(file_path):
    with open(file_path,"a+") as input_file:
        # back to first line
        input_file.seek(0)
        line = input_file.readlines()
    return line

In [None]:
def get_old_number(line):
    old_dic_collection = {}
    for i in line:
        i = i.replace("\n","")
        collection_name = i.split(":")[0]
        document_number = i.split(":")[1]
        old_dic_collection[collection_name] = "{:}".format(document_number)
    return old_dic_collection

In [None]:
def send_msg_slack(dic_collection,old_dic_collection,slack):
    for collection in dic_collection:
        if collection in old_dic_collection:
            if int(dic_collection[collection]) == int(old_dic_collection[collection]):  
                # bold message
                message = "[" + collection + "] is the same last run: " + '\033[1m' + dic_collection[collection] + '\033[0m'
                slack.chat.post_message(channel,message)
            elif int(dic_collection[collection]) < int(old_dic_collection[collection]):  
                # bold message
                message = "[" + collection + "] is reduced last run: " + '\033[1m' + dic_collection[collection] + '\033[0m'
                slack.chat.post_message(channel,message)
                

In [None]:
def write_file(dic_collection, file_path):
    with open(file_path,"w+") as output_file:
        for key in sorted(dic_collection): 
            output_file.write("%s: %s" % (key, dic_collection[key])+"\n") 
    print("The result file is ready.")

In [None]:
def write_log_file(dic_collection, log_file_path):
    with open(log_file_path,"a+") as output_file:
        output_file.write("---------------------------"+"\n")
        output_file.write(str(datetime.datetime.now())+"\n")
        for key in sorted(dic_collection): 
            output_file.write("%s: %s" % (key, dic_collection[key])+"\n") 
    print("The log file is ready.")

## Monitor NIFI

In [None]:
create_folder(out_folder)

try:
    db_twitter, collections_twitter = connect_database(client,slack)
except Exception as e:
    slack.chat.post_message(channel,str(e))
    with open(log_file_path,"a+") as output_file:
        output_file.write("---------------------------"+"\n")
        output_file.write(str(datetime.datetime.now())+"\n")
        output_file.write(str(e) + "\n")
    raise
else:
    current_year, current_week = current_year_week()
    #compare the record number
    dic_collection = get_lastest_number(db_twitter,collections_twitter,current_year,current_week)
    line = read_file(file_path)
    old_dic_collection = get_old_number(line)
    # send message to slack
    send_msg_slack(dic_collection,old_dic_collection,slack)
    # write into output file
    write_file(dic_collection, file_path)  
    write_log_file(dic_collection, log_file_path)