# Assumptions:
1. Collection is already opened. A function does not need to open a collection.

2. Job_id index will start from 0.

3. Data Model will be:
   
   MongoDB - 
   Job_id will be unique for job (and as a result uniqe for comapny by definition)
   
   Redis - 
   To improve the performance, 4 different calcualtions will be out sourced into redis. <br>
   a. storing the next Job_id index.<br>
   b. storing the job status.<br>
   c. storing the mapping between job_id and the applied candidates' emails (redis set). <br>
   d. storing the mapping between candidate's email and the companies it has been applied for (sorted set based on the date).<br>
   
4. company_name and location have 1-1 relationship (e.g: TAU is only located at 'Tel-Aviv')

# Imports:

In [1]:
import pymongo
import time
import datetime
import redis 
import pandas as pd
import numpy as np
from pymongo import MongoClient

# Functions:

<b> add_company function: </b><br>
receives a dictionary (a.k.a document) of 2 keys, decides whether the company name is unique, <br>
Then adds 2 more attributes and inserts it to the collection

In [2]:
def add_company(company):
    #print collection.find_one({"company_name":company['company_name']})
    # validation: only insert if the company does not exist
    if collection.find_one({"company_name":company['company_name']}) == None:
        # insertion
        # add jobs_list attribute
        company['jobs_list'] = []
        company['num_of_jobs'] = 0
        collection.insert_one(company)
        print company['company_name'],"was successfully added"
    else:
        print company['company_name'],"is already exists in the system"


<b> add_job function: </b><br>
Gets a dictionary (of the job), and a string of the company's name. validates that the company exists in the DB, <br>
Then adds 2 attributes to the document. <br>
We found it better to envovled Redis in the calculations and operations of the insert section.


In [3]:
def add_job(job, company_name):
    # validation: only insert if there is an associated company
    company_doc = collection.find_one({"company_name":company_name})
    if company_doc == None:
        print "Company does not exist in the system"
    else:
    # insertion
        # add candidates_list attribute
        job['candidates_list'] = []
        # assign auto increment job id
        jobID = r.get('stud16:{}'.format('job_id'))
        job["job_id"] = jobID
                
        # add job_id to company mapping on reddis;
        r.set('stud16:{}'.format(jobID), job['status'])
                
        # update the jobs list for the company
        collection.update_many({"company_name":company_name},{"$addToSet":{"jobs_list":job}})
        #collection.update_many({"company_name":company_name},{"$inc":{"num_of_jobs":1}})

<b> application function: </b><br>
Takes the candidate's details, application time and *existing* job_id.
Checks if the candidate hasn't applied yet and if the job status is currently open.
If both conditions are met, the system adds the application.

In [4]:
def application(candidate, job_id, company_name): #application_time
    # find the relevant job based on job_id
    job_status = r.get('stud16:{}'.format(job_id))

    # validation: check that this email was not applied for this job or if the job is closed.
    # Return a boolean indicating if value is a member of set name
    if (r.sismember('stud16:job{}_e'.format(job_id), candidate['email'])) | (job_status != 'open'):
        print "Candidate already applied for this position"
        
    else:
    #insert
        #add the application time to the candidate
        candidate["application_time"] = pd.to_datetime(candidate['application_date'],dayfirst=True)
        collection.update_one({"company_name":company_name, "jobs_list.job_id":job_id},{"$push":{"jobs_list.$.candidates_list":candidate}})
        # Adding to redis the candidate's emaill to the job object
        r.sadd('stud16:job{}_e'.format(job_id), candidate['email'])
        
        # Adding job application to the redis object
        # Note <time.mktime(pd.to_datetime(application_time).timetuple())> will add the time as a score
        r.zadd('stud16:{}'.format(candidate['email']),int(time.mktime(pd.to_datetime(candidate['application_date'],dayfirst=True).timetuple())),company_name)

<b> update_job_status function: </b><br>
Changes the status of a job according to the job id and company. <br>
We found it necessary to update the changes also in Redis for a quick retrival

In [5]:
def update_job_status(company_name, job_id, new_status):
    # Updating the status in MongoDB:
    query = {'$and': [{"company_name": company_name}, {"jobs_list.job_id" : job_id}]}
    collection.update_one(query, {"$set": {"jobs_list.$.status":new_status}, 
                          "$currentDate": {"jobs_list.$.lastModified": True}})
    # Updating the status in Redis:
    r.set('stud16:{}'.format(job_id), new_status)

<b> show_number_of_jobs function: </b><br>
Showing the number of open jobs per requested location and job title.
We only show the result to the user, So there's no need to save the result in redis.

In [6]:
def show_number_of_jobs(location, job_title):
    aggregation_function = collection.aggregate([
      { "$unwind": "$jobs_list" },
      { "$match": { "$and": [{"jobs_list.status": "open", "jobs_list.location": "Tel Aviv",
                              "jobs_list.job_title": 'product'}] } }     
    ]
    )
    number_of_open_jobs_at_location = pd.DataFrame(list(aggregation_function)).shape[0]
    print "\nNumber of open {} positions at {} is: {}".format(job_title, location, number_of_open_jobs_at_location)

<b> show_candidates function: </b><br>
Showing the candidates' emails for a specific job id, sorted by the number of matches between skills and requirements. <br> 
similary to the previous function, we only show the result to the user, 
hence there's no need to save it in redis. <br>
<b> Assumption: we assumed that the wanted output should be sorted such that the emails with a lower matches between candidate's skills and required skills will appear at the top, and emails with higher matches will appear at the bottom.

In [7]:
def show_candidates(company_name, job_id):
    # Getting the list of skills as required for the job
    skills = collection.aggregate([
            { "$unwind": "$jobs_list" },
            { "$match": {"jobs_list.job_id" : job_id}},
            { "$project": {"skills" : "$jobs_list.skills"}}
    ])
    jobs_list_of_skills = pd.DataFrame(list(skills))['skills'].iloc[0]
    
    # Getting a df with the candidates' email and skills
    emails = collection.aggregate([
          { "$unwind": "$jobs_list" },
          { "$match": { "jobs_list.job_id": job_id }},
          { "$unwind": "$jobs_list.candidates_list" },
          { "$project": {"_id": "$jobs_list.job_id", "skills": "$jobs_list.candidates_list.skills",
                         "emails": '$jobs_list.candidates_list.email' } }
    ])
        
    job_emails_skills_df = pd.DataFrame(list(emails)).rename(columns={"_id":"Job_id"})
    
    # Creating a similarity index such that lower score indicates of high similarity (high number of matches).
    # e.g: value of 0 for this index means that there's a perfect match!
    job_emails_skills_df['similarity_to_requirements'] = job_emails_skills_df['skills'].apply(lambda row:
                                                         len(set(jobs_list_of_skills) - set(row)))
    # Ordering in an ascending order of matches, such that the lowest number of matches is first and the highest
    # number of matches is last
    job_emails_skills_df.sort_values(by= 'similarity_to_requirements', ascending= False, inplace= True)
    return job_emails_skills_df['emails']

<b> count_jobs_by_date Report 1: </b><br>
Returns a DF with three columns (one of them is indexed): date, number of open jobs, number of closed jobs, ordered by date (in ascending order). <br>
<b> Note that if a job's status was changed to 'closed', it will be counted as 'closed' from its publish date (because 'lastModifiedDate' is always 2021)


In [8]:
def count_jobs_by_date():
    #Creating the DataFrame - with rows as index (date for each day for the whole year)
    df = pd.DataFrame(columns=['# of opened jobs', '# of closed jobs'], index=pd.date_range(start='1/1/2020', end='31/12/2020'))
    
    # aggregation to get the number of opened jobs for each date:
    aggregation_function = collection.aggregate([
    { "$unwind": "$jobs_list"},
    { "$match": {"jobs_list.status": "open"} },
    { "$group": {"_id": "$jobs_list.publish_date", "open_jobs": {"$sum": 1}}}
    ])
    
    # Filling the DF with the opened jobs in the correct dates:
    for row in aggregation_function:
        df.loc[row["_id"],"# of opened jobs"] = row["open_jobs"]
    
    # aggregation to get the number of colsed jobs for each date:
    aggregation_function = collection.aggregate([
    { "$unwind": "$jobs_list"},
    { "$match": {"jobs_list.status": "close"} },
    { "$group": {"_id": "$jobs_list.publish_date", "closed_jobs": {"$sum": 1}}}
    ])
    
    # Filling the DF with the closed jobs in the correct dates:
    for row in aggregation_function:
        df.loc[row["_id"],"# of closed jobs"] = row["closed_jobs"]
    
    # Now w'd like to apply a commulative sum to reflect the correct jobs' status distribution:
    df = df.fillna(0)   
    df['# of opened jobs'] = df['# of opened jobs'].cumsum()
    df['# of closed jobs'] = df['# of closed jobs'].cumsum()
    
    return df   

<b> count_candidates_by_job Report 1: </b><br>
Return the number of candidates, grouped by job id

In [9]:
def count_candidates_by_job():
    threshold = (datetime.datetime.today() - datetime.timedelta(days=30)) # setting thershold (30 days) for agg. func.
    # aggregate function - returns no. of applications from subdocuments of each job
    a = collection.aggregate([
      { "$unwind": "$jobs_list" },
      { "$unwind": "$jobs_list.candidates_list" },
      { "$project": {"_id": '$jobs_list.job_id', "time": "$jobs_list.candidates_list.application_time"}},
    ]
    )
    # Preparing the final df to return - # of candidates per job id:
    candidates = pd.DataFrame(list(a))
    count_jobs_for_id_df = candidates.loc[candidates['time'] > threshold].groupby(by= '_id').count().reset_index()
    
    if count_jobs_for_id_df.empty:
        return "No jobs with applications during the last 30 days"
    return count_jobs_for_id_df.rename(columns={'_id':"Job ID", "time": "Number of Candidates"})

<b> Recovery function: </b><br>
This function inserts the needed data back to Redis should it suddenly fail or shut-down

In [10]:
def recovery():
    # job_id
    recover_id = collection.aggregate([
      { "$unwind": "$jobs_list" },
      { "$project": {"_id": { "$max": "$jobs_list.job_id"}}},
    ]
    )
    job_id = pd.DataFrame(list(recover_id)).max().values # get the latest job_id given in the db
    if job_id.size == 0:
        r.set('stud16:{}'.format('job_id'), 0) 
        return     # Exit the function
    else:
        r.set('stud16:{}'.format('job_id'), int(job_id[0]) + 1)
        
    # map job_id -> job status
    map_job_id_job_status = collection.aggregate([
      { "$unwind": "$jobs_list" },
      { "$project": {"_id": "$jobs_list.job_id","status":"$jobs_list.status"}}
    ]
    )
    job_id_status_df = pd.DataFrame(list(map_job_id_job_status))
    if not job_id_status_df.empty:
        job_id_status_df['_id'] = job_id_status_df['_id'].apply(lambda r: "stud16:{}".format(r)) # creating the key
        job_id_status_dict = job_id_status_df.set_index('_id').T.to_dict('records')[0]
        r.mset(job_id_status_dict) # insert to Redis
            
    # job_id_e -> set(emails)
    email_set = collection.aggregate([
      { "$unwind": "$jobs_list" },
      { "$unwind": "$jobs_list.candidates_list" },
      { "$project": {"_id": "$jobs_list.job_id","email":"$jobs_list.candidates_list.email"}}
    ]
    )
    email_set = pd.DataFrame(list(email_set))
    email_set = email_set.groupby('_id')['email'].apply(list).to_frame()
    email_set['job_id'] = email_set.index
    email_set['job_id'] = email_set['job_id'].apply(lambda r: 'stud16:job{}_e'.format(r) ) # applying key format for Redis
    email_set = email_set.set_index('job_id').T.to_dict('records')[0] # converting to dict (key,email set)
    
    # insert to Redis each value at a time:
    for k,v in email_set.items():
        for e in v:
            r.sadd(k,e)
            

<b> EXECUTION function: </b><br>
This function applies all of the functions above

In [11]:
def execute():
    recovery()
    add_company({'company_name':'TAU', 'company_description':'University'})
    add_job({'job_title':'bi developer', 'location': 'Tel Aviv',
             'skills':['python','big data','mongodb'],'status':'open',
             'publish_date':'05-02-2020'},'TAU')
    application({'candidate_name':'laura', 'email':'laura@gmail.com',
                 'linkedin':'https://www.linkedin.com/in/laura/', "skills": ['java','sql'],
                'application_date':'05-02-2020 15:00:00'}, '0', 'TAU')
    
    update_job_status('TAU', '0', 'close')
    
    show_number_of_jobs('Tel Aviv', 'bi developer')
    
    print
    print "Candidates' emails for the job (sorted):"
    print show_candidates('TAU', '0')
    print
    print "Report 1 - Jobs status distribution per date:"
    print count_jobs_by_date()
    print
    print "Report 2 - Candidates by Job:"
    print count_candidates_by_job()
               

In [12]:
# list(collection.find())    

In [13]:
# print "Candidates' emails for the job (sorted):"
# show_candidates('TAU', '0')

# Establishing Connections:

In [14]:
r = redis.StrictRedis(host='bdl1.eng.tau.ac.il', port=6379)  # creating reddis connection
client = MongoClient() # creating MongoDB connection
db = client['stud16']
# clean up the collection before start working
db.hm1_stud16.drop()
# creating new collection
collection = db.hm1_stud16

In [15]:
r.flushall()

True

# Execute - run me!:

In [16]:
execute()

TAU was successfully added

Number of open bi developer positions at Tel Aviv is: 0

Candidates' emails for the job (sorted):
0    laura@gmail.com
Name: emails, dtype: object

Report 1 - Jobs status distribution per date:
            # of opened jobs  # of closed jobs
2020-01-01                 0                 0
2020-01-02                 0                 0
2020-01-03                 0                 0
2020-01-04                 0                 0
2020-01-05                 0                 0
2020-01-06                 0                 0
2020-01-07                 0                 0
2020-01-08                 0                 0
2020-01-09                 0                 0
2020-01-10                 0                 0
2020-01-11                 0                 0
2020-01-12                 0                 0
2020-01-13                 0                 0
2020-01-14                 0                 0
2020-01-15                 0                 0
2020-01-16                

# Checks - for self use & debugging - do not notice:

In [1]:
# #The following are part of the checks and inputs we used to validate our data model:
# company = {'company_name':'TAU', 'company_description':'University'}
# add_company(company)

# company = {'company_name':'BGU', 'company_description':'University'}
# add_company(company)

# job ={'job_title':'analyst', 'location': 'Tel Aviv',
#       'skills':['python','big data','mongodb'],'status':'open','publish_date':'01-15-2020'}
# add_job(job, 'TAU')

# print collection.find_one({"company_name":'TAU'})

# job ={'job_title':'product', 'location': 'Tel Aviv',
#       'skills':['python','big data','mongodb'],'status':'open','publish_date':'01-02-2020'}
# add_job(job, 'TAU')

# job ={'job_title':'product', 'location': 'Tel Aviv',
#       'skills':['python','big data','mongodb'],'status':'open','publish_date':'01-02-2020'}
# add_job(job, 'TAU')


# application({'candidate_name':'yuval', 'email':'yuval@gmail.com',
# 'linkedin':'https://www.linkedin.com/in/laura/', 'skills':['python','sql'],
# 'application_date':'01-05-2019 15:00:00'}, '1','BGU')

# application({'candidate_name':'aaa', 'email':'tamir@gmail.com',
#     'linkedin': 'https://www.linkedin.com/in/laura/', 'skills':['excel','msproject'],
#     'application_date':'30-12-2020 15:00:00'}, '11', 'TAU')

# print collection.find_one({"company_name":'TAU'})

#print show_candidates('TAU', '0')

#  The End :)