In [22]:
#import block
from dask.distributed import Client

import dask.array as da
from dask import delayed
import dask.dataframe as dd
import pandas as pd
import os
import uuid
import datetime
from enum import Enum 
import dateutil
import numpy as np
import itertools

# set the number of workers
client = Client(n_workers=4)

In [23]:
class glean_type(Enum):
    vendor_not_seen_in_a_while = 'vendor_not_seen_in_a_while'
    accrual_alert ='accrual_alert' 
    large_month_increase_mtd ='large_month_increase_mtd' 
    no_invoice_received = 'no_invoice_received'

class glean_location(Enum):
    invoice = 'invoice'
    vendor = 'vendor'

class Glean_generator:
    def __init__(self):
        self.df_invoice = pd.read_csv('invoice.csv').dropna()
        self.df_line_item = pd.read_csv('line_item.csv')
        self.df_invoice['invoice_date'] = pd.to_datetime(self.df_invoice['invoice_date'], format='%Y-%m-%d')
        self.df_invoice['period_start_date'] = pd.to_datetime(self.df_invoice['period_start_date'], format='%Y-%m-%d')
        self.df_invoice['period_end_date'] = pd.to_datetime(self.df_invoice['period_end_date'], format='%Y-%m-%d')
        self.df_line_item['period_end_date'] = pd.to_datetime(self.df_line_item['period_end_date'], format='%Y-%m-%d')
        
    def logic_1(self, vendor_id):
        temp_df = self.df_invoice.loc[self.df_invoice['canonical_vendor_id'] == vendor_id][:]
        if len(temp_df) <= 1:
            return None
        temp_df = temp_df.sort_values(by=[ 'invoice_date'])
        temp_df_x = temp_df[:-1].copy()
        temp_df_y = temp_df[1:].copy()
        temp_df_x.columns =list(map(lambda x: x +'_x', temp_df_x.columns))
        temp_df_y.columns = list(map(lambda x: x +'_y', temp_df_y.columns))
        temp_df = pd.concat([temp_df_x.reset_index(drop=True), temp_df_y.reset_index(drop=True)], axis=1)
        temp_df['days_passed'] = temp_df.invoice_date_y - temp_df.invoice_date_x
        temp_df = temp_df.loc[temp_df['days_passed']>datetime.timedelta(days = 90)]
        if len(temp_df)> 0:
            end_date = temp_df.invoice_date_y
            start_date = temp_df.invoice_date_x
            months = list(map(lambda x,y:str((y.year - x.year) * 12 + (y.month - x.month)),start_date, end_date ))
            glean_text = list(map(lambda x: 'First new bill in '+ x  + ' months from vendor ' +vendor_id, months))
            temp_df['glean_text'] = glean_text
            temp_df= temp_df.loc[:,['invoice_id_y', 'canonical_vendor_id_y','glean_text']]
            temp_df.columns = ['invoice_id', 'canonical_vendor_id','glean_text']
            # adding standard enteries 
            temp_df['glean_type'] = list(map(lambda x: glean_type.vendor_not_seen_in_a_while.value, range(len(temp_df))))
            temp_df['glean_location'] = list(map(lambda x: glean_location.invoice.value, range(len(temp_df))))
            temp_df['glean_date'] = list(map(lambda x: datetime.date.today().isoformat(), range(len(temp_df))))
            temp_df['glean_id'] = list(map(lambda x: str(uuid.uuid1()), range(len(temp_df))))
            temp_df = temp_df[['glean_id','glean_date','glean_text','glean_type','glean_location','invoice_id', 'canonical_vendor_id']]
            return temp_df
        else:
            return None 
    def initiate_logic_1_solution(self):
        result_logic_1 = []
        result_logic_1 = list(map(lambda x: delayed(self.logic_1)(x) ,set(self.df_invoice.canonical_vendor_id[:])))
        result_logic_1 = list(filter(None.__ne__, result_logic_1))
        result_logic_1 = delayed(pd.concat)(result_logic_1)
        self.result_logic_1 = result_logic_1.compute()
        
    def logic_2_part_1(self,row):
        temp_result = {}
        temp_result['glean_id'] = []
        temp_result['glean_date'] = []
        temp_result['glean_text'] = []
        temp_result['glean_type'] = []
        temp_result['glean_location'] = []
        temp_result['invoice_id'] = []
        temp_result['canonical_vendor_id'] = []
        if (row['period_end_date'] - row['invoice_date'])  > datetime.timedelta(days = 90):
            temp_result['glean_id'].append(str(uuid.uuid1()))
            temp_result['glean_date'].append(datetime.date.today().isoformat())
            #glean_text logic
            glean_text = 'Line items from vendor '+row['canonical_vendor_id'] +' in this invoice cover future periods (through '+str(row['period_end_date'].strftime('%Y-%m-%d'))+' )'
            temp_result['glean_text'].append(glean_text)
            temp_result['glean_type'].append(glean_type.accrual_alert.value)
            temp_result['glean_location'].append(glean_location.invoice.value)
            temp_result['invoice_id'].append(row['invoice_id'])
            temp_result['canonical_vendor_id'].append(row['canonical_vendor_id'])
            return pd.DataFrame(temp_result)
        
    def logic_2_part_2(self,vendor_id):
        temp_result = {}
        temp_result['glean_id'] = []
        temp_result['glean_date'] = []
        temp_result['glean_text'] = []
        temp_result['glean_type'] = []
        temp_result['glean_location'] = []
        temp_result['invoice_id'] = []
        temp_result['canonical_vendor_id'] = []
     
        row = self.result_filtered.loc[self.result_filtered['canonical_vendor_id'] == vendor_id].iloc[-1,:]
        temp_result['glean_id'].append(str(uuid.uuid1()))
        temp_result['glean_date'].append(datetime.date.today().isoformat())
        #glean_text logic
        glean_text = 'Line items from vendor '+row['canonical_vendor_id'] +' in this invoice cover future periods (through '+str(row['period_end_date_x'].strftime('%Y-%m-%d'))+' )'            
        
        temp_result['glean_text'].append(glean_text)
        temp_result['glean_type'].append(glean_type.accrual_alert.value)
        temp_result['glean_location'].append(glean_location.invoice.value)
        temp_result['invoice_id'].append(row['invoice_id'])
        temp_result['canonical_vendor_id'].append(row['canonical_vendor_id'])
        return pd.DataFrame(temp_result)
    
    def initiate_logic_2_solution_part_1(self):
        result_logic_2_part_1 = []
        rows =list(map(lambda x: self.df_invoice.iloc[x,:], range(len(self.df_invoice))))
        result_logic_2_part_1 = list(map(lambda x: delayed(self.logic_2_part_1)(x) ,rows))
        result_logic_2_part_1 = list(filter(None.__ne__, result_logic_2_part_1))
        result_logic_2_part_1 = delayed(pd.concat)(result_logic_2_part_1)
        self.result_logic_2_part_1  = result_logic_2_part_1.compute()
        
    def initiate_logic_2_solution_part_2(self):    
        result = pd.merge(self.df_line_item, self.df_invoice,how='left', on='invoice_id')
        result = result.iloc[:,[0,1,2,3,4,5,6,7,11]]
        result['days_passed'] = result.period_end_date_x -  result.invoice_date
        self.result_filtered = result.loc[result['days_passed'] > datetime.timedelta(days = 90)]
        
        result_logic_2_part_2 = []
        result_logic_2_part_2 = list(map(lambda x: delayed(self.logic_2_part_2)(x) ,set(self.result_filtered.canonical_vendor_id)))
        result_logic_2_part_2 = list(filter(None.__ne__, result_logic_2_part_2))
        result_logic_2_part_2 = delayed(pd.concat)(result_logic_2_part_2)
        self.result_logic_2_part_2  = result_logic_2_part_2.compute()
        
    def logic_3(self, vendor_id):
        temp_result = {}
        temp_result['glean_id'] = []
        temp_result['glean_date'] = []
        temp_result['glean_text'] = []
        temp_result['glean_type'] = []
        temp_result['glean_location'] = []
        temp_result['invoice_id'] = []
        temp_result['canonical_vendor_id'] = []    
        vendor_specific_df = self.result.loc[self.result['canonical_vendor_id'] == vendor_id]
        invoice_dates = list(set(self.result.loc[self.result['canonical_vendor_id'] == vendor_id].invoice_date))
        invoice_dates = set(list(map(lambda x: datetime.date(x.year, x.month ,1),invoice_dates  ))) # one per month
        for date in invoice_dates:
            date = datetime.date(date.year, date.month ,1)
            ending_of_range = date +  dateutil.relativedelta.relativedelta(months=1)
            current_month_starting = ending_of_range - dateutil.relativedelta.relativedelta(months=1)
            starting_of_range =  ending_of_range -  dateutil.relativedelta.relativedelta(months=12) #<=
            vendor_specific_current_month_df = vendor_specific_df[(vendor_specific_df['invoice_date'] >= current_month_starting.isoformat()) & (vendor_specific_df['invoice_date'] < ending_of_range.isoformat())]
            vendor_specific_past_12_month_df = vendor_specific_df[(vendor_specific_df['invoice_date'] >= starting_of_range.isoformat()) & (vendor_specific_df['invoice_date'] < ending_of_range.isoformat())]
            monthly_bill = vendor_specific_current_month_df.total_amount_x.sum()
            average_bill_12_months = vendor_specific_past_12_month_df.total_amount_x.mean()


            check_condition = False
            #glean_text logic
            #glean_text = 'Monthly spend with '+vendor_id +' is $'+ str(monthly_bill)+' '+ str((monthly_bill/average_bill_12_months)*100) + '% higher than average'
            #condition 1
            if ((monthly_bill > 10000) and ((monthly_bill - average_bill_12_months) > (0.5 *average_bill_12_months))):    
                check_condition = True
                glean_text = 'Monthly spend with '+vendor_id +' is $'+ str(round(monthly_bill,2))+' ('+ str(round((monthly_bill/average_bill_12_months)*100,2)) + '%) higher than average'
            #condition 2
            if ( (monthly_bill < 10000) and ((monthly_bill - average_bill_12_months) > (2*average_bill_12_months)) and (monthly_bill > 100) ):
                check_condition = True
                glean_text = 'Monthly spend with '+vendor_id +' is $'+ str(round(monthly_bill,2))+' ('+ str(round((monthly_bill/average_bill_12_months)*100,2)) + '%) higher than average'
            #condtion 3
            if ((monthly_bill < 1000) and ((monthly_bill - average_bill_12_months) > (5*average_bill_12_months))and (monthly_bill > 100)):
                check_condition = True
                glean_text = 'Monthly spend with '+vendor_id +' is $'+ str(round(monthly_bill,2))+' ('+ str(round((monthly_bill/average_bill_12_months)*100,2)) + '%) higher than average'        

            if check_condition:
                temp_result['glean_id'].append(str(uuid.uuid1()))
                temp_result['glean_date'].append(datetime.date.today().isoformat())
                temp_result['glean_text'].append(glean_text)
                temp_result['glean_type'].append(glean_type.large_month_increase_mtd.value)
                temp_result['glean_location'].append(glean_location.vendor.value)
                temp_result['invoice_id'].append(np.nan)
                temp_result['canonical_vendor_id'].append(vendor_id)
                temp_result_df = pd.DataFrame(temp_result)
                return temp_result_df
    
    def initiate_logic_3_solution(self):
        result_logic_3 = []
        result = pd.merge(self.df_line_item, self.df_invoice,how='left', on='invoice_id')
        result = result.iloc[:,[0,1,2,3,4,5,6,7,11]]       
        self.result = result
        result_logic_3 = list(map(lambda x: delayed(self.logic_3)(x) ,set(self.result.canonical_vendor_id)))
        result_logic_3 = list(filter(None.__ne__, result_logic_3))
        result_logic_3 = delayed(pd.concat)(result_logic_3)
        self.result_logic_3 = result_logic_3.compute()
        
    def build_dask_graph_for_solution(self):
        self.initiate_logic_1_solution()
        self.initiate_logic_2_solution_part_1()
        self.initiate_logic_2_solution_part_2()
        self.initiate_logic_3_solution()
        
    def compute_dask_graph(self):
        output = pd.concat([self.result_logic_1,self.result_logic_2_part_1,self.result_logic_2_part_2,self.result_logic_3])
        #output = output.compute()
        self.output = output.reset_index(drop=True)
        return None
    
    def display_output(self):
        return self.output
    
    def save_output_as_csv(self):
        print('saved the output to disk in CSV format')
        self.output.to_csv('output.csv', index=False)
        
        

In [24]:
# run this block to start execution of the program
glean_object = Glean_generator()
glean_object.build_dask_graph_for_solution()
glean_object.compute_dask_graph()
glean_object.save_output_as_csv()
glean_object.display_output()

saved the output to disk in CSV format


Unnamed: 0,glean_id,glean_date,glean_text,glean_type,glean_location,invoice_id,canonical_vendor_id
0,288efc0c-4357-11eb-a0e2-2016b9350fa1,2020-12-21,First new bill in 14 months from vendor 638e18...,vendor_not_seen_in_a_while,invoice,73a22fc0-8b17-4fa4-b702-0e850fa199c0,638e18e6-906b-4dc2-af56-7b12107fdf70
1,280fee70-4357-11eb-8768-2016b9350fa1,2020-12-21,First new bill in 5 months from vendor a96c5d6...,vendor_not_seen_in_a_while,invoice,bee78f9f-2da0-4435-b84f-74720824a734,a96c5d60-7df0-4360-b714-02ef221e4bd0
2,280fee71-4357-11eb-a3e0-2016b9350fa1,2020-12-21,First new bill in 3 months from vendor a96c5d6...,vendor_not_seen_in_a_while,invoice,90d81ad1-aa68-4178-b85b-fa49bf1ce7df,a96c5d60-7df0-4360-b714-02ef221e4bd0
3,288210e2-4357-11eb-adf8-2016b9350f9e,2020-12-21,First new bill in 8 months from vendor 0d41b0c...,vendor_not_seen_in_a_while,invoice,e0d0a045-5b38-4874-ab65-1bb521648dea,0d41b0c0-dd27-4c32-909d-a190744e747b
4,27fe3f12-4357-11eb-9de0-2016b9350f9e,2020-12-21,First new bill in 3 months from vendor 2db713d...,vendor_not_seen_in_a_while,invoice,43e791f9-d00a-48f6-ac4f-b61a06ae7448,2db713d9-3a53-493e-94a6-ac99c46692f6
...,...,...,...,...,...,...,...
474,99afb02e-4357-11eb-becd-2016b9350f9e,2020-12-21,Monthly spend with dc885256-e806-4a7a-bd68-37b...,large_month_increase_mtd,vendor,,dc885256-e806-4a7a-bd68-37b1eed72643
475,992bc552-4357-11eb-ad98-2016b9350fa1,2020-12-21,Monthly spend with 27184514-17c5-466d-9f68-e02...,large_month_increase_mtd,vendor,,27184514-17c5-466d-9f68-e02253695bfa
476,9932ec50-4357-11eb-baae-2016b9350fa1,2020-12-21,Monthly spend with 10eaea15-03c3-4d52-9120-9fa...,large_month_increase_mtd,vendor,,10eaea15-03c3-4d52-9120-9fa817ae4fd3
477,99449086-4357-11eb-adc3-2016b9350fa1,2020-12-21,Monthly spend with b21a0540-aaff-4d8d-ad8e-443...,large_month_increase_mtd,vendor,,b21a0540-aaff-4d8d-ad8e-4430809fe0ff


In [25]:
client.close() # terminated the worker nodes

## Learning outcomes and Summary

-	I solved this challenge using Dask. I was eager to try it for the first time as I read that it is quite simple, and any code written using regular python can be tweaked easily to run using Dask. The design philosophy of Dask is advertised to reuse the interface of Pandas Data Frames and scikit learn to keep learning curve to minimum.  To bad, its just a good advertisement. While I was working, I realized not all functionality of Pandas data frame was implemented using Dask data frame. I was prototyping my solution using pandas frame and wanted to tweak it to work with Dask. This approach seems to be in efficient, I should have coded my solution to work with dask right from the start. I should have sticked to Spark 
-	My solution solved 3 out 4 logics which I was tasked to solve. I need to some more clarification to solve logic 4.
-	Though my current solution works fine, I realize it is suboptimal. It can be tweaked to make it more efficient. I am unable to implement to those tweaks as I am concerned that I may end up introducing more bugs to my solution before deadline than improving solution.
-	Datasets need to be larger than a certain threshold to really take advantage and get a feel of distributed frame works. The current dataset given is too small to take advantage of distributed frameworks. 
-	I could have done better If I had more time to play around with my solution. 
-	I believe the data set we are provided is a sample of what your company works with. I would suggest having a look at Apache Kafka, I read its is good tool for handling data streams and event driven solution. Please do have a look at it if you have not already. 
-	In conclusion, I enjoyed working on the challenge. Looking forward to hearing back from your team. I hope to meet you and your team members in round 2, and I will be presented with opportunity to explain on how I can add more value to your company. 
