## Duplicate Payments Dev Notebook - v1

----

### INV - PAY

### Libraries

In [1]:
# DP Org
import os
import io
import numpy as np
import pandas as pd
from datetime import datetime
from fuzzywuzzy import fuzz
from fuzzywuzzy import process
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
import re
pd.set_option('display.max_rows', 400)
pd.set_option('display.max_columns', 200)
import warnings
warnings.filterwarnings('ignore')
from IPython.display import display, HTML
from joblib import Parallel, delayed
import multiprocessing
import itertools
from collections import Counter


# pyspark 
import pyspark
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import *
from pyspark.sql.session import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.sql.functions import (
    col, max as max_, struct, monotonically_increasing_id, regexp_replace,
    ltrim, rtrim, trim, split, expr, when, concat, lit, create_map, month, year,
    col,to_date, udf, unix_timestamp, from_unixtime, expr, log, log10, log2, abs,lower,datediff, concat_ws)



In [2]:
# starting session
spark = SparkSession.builder.appName("DP_INV_PAY").getOrCreate()

### Data Import

-------------

----
### Sample Start

In [11]:
# reading sample excel
payments_data = spark.read.csv('dp_sample_data.csv',inferSchema=True,header=True)

# Change data type
payments_data = payments_data.withColumn("vendor_id", payments_data["vendor_id"].cast(StringType()))

-----
### Pre - Process

In [12]:
# remove non alpha numerics
for col_iter in payments_data.columns:
    payments_data = payments_data.withColumn(col_iter,regexp_replace(col_iter, r'[^a-zA-Z0-9_]', ''))
    print("cleaning done for >> {}".format(col_iter))

# adding ID column to payments data
payments_data = payments_data.select("*").withColumn("id", monotonically_increasing_id() + 1)

payments_data.show()

print("Part(1/6) - Data Processing Done!")

cleaning done for >> company_code
cleaning done for >> vendor_id
cleaning done for >> invoice_reference_number
cleaning done for >> invoice_date
cleaning done for >> total_invoice_amount
+------------+----------+------------------------+------------+--------------------+---+
|company_code| vendor_id|invoice_reference_number|invoice_date|total_invoice_amount| id|
+------------+----------+------------------------+------------+--------------------+---+
|       erp08|    355186|             IRN_4752_01|    22102019|                 350|  1|
|       erp08|    355186|             IRN_4752_01|    22102019|                 350|  2|
|       erp08|    355186|             IRN_4752_01|    22102019|                 350|  3|
|       erp09|    355185|             IRN_4752_01|    22102019|                 350|  4|
|       erp09|    355185|             IRN_4752_01|    22102019|                 350|  5|
|       erp08|  355456T2|               IRN_98_01|    22102019|                2000|  6|
|       erp0

-----

-----

### Creating ATH and Test based dataframes

In [14]:
# payments_data_org = payments_data
payments_data = payments_data_org

In [15]:
# creating matchkey for exact match
payments_data = payments_data.withColumn("match_key",concat_ws('-', 
                                                         payments_data.company_code,
                                                         payments_data.vendor_id,
                                                         payments_data.invoice_date,
                                                         payments_data.invoice_reference_number,
                                                         payments_data.total_invoice_amount))


# duplicated rows
temp_df = payments_data.groupBy("match_key").count().filter("count>1")
# Change data type
temp_df = temp_df.withColumn("count", temp_df["count"].cast(StringType()))

# reform temp df to append later
payments_data = payments_data.join(temp_df, on=['match_key'], how='left')

# Creating Separate dataframes
payments_df_ath = payments_data.filter(~temp_df["count"].isNull())
payments_df_test = payments_data.filter(temp_df["count"].isNull())

In [16]:
payments_df_ath.show()
payments_df_test.show()

+--------------------+------------+---------+------------------------+------------+--------------------+---+-----+
|           match_key|company_code|vendor_id|invoice_reference_number|invoice_date|total_invoice_amount| id|count|
+--------------------+------------+---------+------------------------+------------+--------------------+---+-----+
|erp08-355186-2210...|       erp08|   355186|             IRN_4752_01|    22102019|                 350|  1|    3|
|erp08-355186-2210...|       erp08|   355186|             IRN_4752_01|    22102019|                 350|  2|    3|
|erp08-355186-2210...|       erp08|   355186|             IRN_4752_01|    22102019|                 350|  3|    3|
|erp09-355185-2210...|       erp09|   355185|             IRN_4752_01|    22102019|                 350|  4|    2|
|erp09-355185-2210...|       erp09|   355185|             IRN_4752_01|    22102019|                 350|  5|    2|
+--------------------+------------+---------+------------------------+----------

---
### Payments DF ATH

In [17]:
# drop count
payments_df_ath = payments_df_ath.drop('count')

# cluster number
payments_df_ath = payments_df_ath.withColumn("cluster_num", temp_df["match_key"])

# match percent
payments_df_ath = payments_df_ath.withColumn("match_percent", lit(100))

# for each test
for test_iter in ["T1", "T2", "T3", "T4"]:
    print(test_iter)
    payments_df_ath = payments_df_ath.withColumn("{}_%".format(test_iter), lit(100))
    payments_df_ath = payments_df_ath.withColumn("{}_flag".format(test_iter), lit(1))
    payments_df_ath = payments_df_ath.withColumn("{}_cluster_num".format(test_iter), payments_df_ath['cluster_num'])

payments_df_ath = payments_df_ath.drop('match_key')

T1
T2
T3
T4


---
### Payments DF TEST

In [18]:
# drop match key
payments_df_test = payments_df_test.drop('match_key', 'count')

In [None]:
## Test Definition
# "Overall" : ['vendor_id', 'invoice_reference_number', 'invoice_date', 'total_invoice_amount'],
# "T1" : ['vendor_id', 'invoice_date', 'total_invoice_amount'],
# "T2" : ['invoice_date', 'invoice_reference_number'],
# "T3" : ['invoice_date', 'total_invoice_amount'],
# "T4" : ['vendor_id','invoice_reference_number','total_invoice_amount']

Creating a function to call for each test

In [19]:
def test_cluster_creator(test_name):

    test_list  = ["T1", "T2", "T3", "T4"]
    
    # condition to define temp payment df for each test
    if(test_name == 'T1'):
        test_list.remove("T1")
        payments_df_test_temp = payments_df_test.withColumn("match_key",concat_ws('-', 
                                                         payments_df_test.company_code,
                                                         payments_df_test.vendor_id,
                                                         payments_df_test.invoice_date,
                                                         payments_df_test.total_invoice_amount))
        
    elif(test_name == 'T2'):
        test_list.remove("T2")
        payments_df_test_temp = payments_df_test.withColumn("match_key",concat_ws('-', 
                                                             payments_df_test.company_code,
                                                             payments_df_test.invoice_date,
                                                             payments_df_test.invoice_reference_number))

    elif(test_name == 'T3'):
        test_list.remove("T3")
        payments_df_test_temp = payments_df_test.withColumn("match_key",concat_ws('-', 
                                                             payments_df_test.company_code,
                                                             payments_df_test.invoice_date,
                                                             payments_df_test.total_invoice_amount))
        
    elif(test_name == 'T4'):
        test_list.remove("T4")
        payments_df_test_temp = payments_df_test.withColumn("match_key",concat_ws('-', 
                                                             payments_df_test.company_code,
                                                             payments_df_test.vendor_id,
                                                             payments_df_test.invoice_reference_number,
                                                             payments_df_test.total_invoice_amount))
        
    
    # duplicated rows
    temp_df = payments_df_test_temp.groupBy("match_key").count().filter("count>1")

    # Change data type
    temp_df = temp_df.withColumn("count", temp_df["count"].cast(StringType()))

    # reform temp df to append later
    temp_df = payments_df_test_temp.join(temp_df, on=['match_key'], how='left')

    # Creating Separate dataframes
    temp_df = temp_df.filter(~temp_df["count"].isNull())

    # cluster number
    temp_df = temp_df.withColumn("cluster_num", temp_df["match_key"])

    # drop count and match key
    temp_df = temp_df.drop('count', 'match_key')

    # default match percent
    temp_df = temp_df.withColumn("match_percent", lit(0))

    # Cluster number and T1 flag
    temp_df = temp_df.withColumn("{}_flag".format(test_name), lit(1))
    temp_df = temp_df.withColumn("{}_%".format(test_name), lit(100))
    temp_df = temp_df.withColumn("{}_cluster_num".format(test_name), temp_df['cluster_num'])


    # for each test
    for test_iter in test_list:
        temp_df = temp_df.withColumn("{}_flag".format(test_iter), lit(0))
        temp_df = temp_df.withColumn("{}_%".format(test_iter), lit(0))
        temp_df = temp_df.withColumn("{}_cluster_num".format(test_iter), lit("-"))

    # aligning columns as per payments_df_ath
    temp_df = temp_df.select(payments_df_ath.columns)
    
    return temp_df

In [20]:
T1_df = test_cluster_creator("T1")
print("T1 count => {}".format(T1_df.count()))

T2_df = test_cluster_creator("T2")
print("T2 count => {}".format(T2_df.count()))

T3_df = test_cluster_creator("T3")
print("T3 count => {}".format(T3_df.count()))

T4_df = test_cluster_creator("T4")
print("T4 count => {}".format(T4_df.count()))

T1 count => 2
T2 count => 2
T3 count => 2
T4 count => 2


In [22]:
payments_df_test.show()

+------------+----------+------------------------+------------+--------------------+---+
|company_code| vendor_id|invoice_reference_number|invoice_date|total_invoice_amount| id|
+------------+----------+------------------------+------------+--------------------+---+
|       erp08|  355456T2|               IRN_98_01|    22102019|                2000|  6|
|       erp08|  355456T2|               IRN_98_01|    22102019|               10000|  7|
|       erp08|355457T1T3|                IRN_1002|    24102019|               13000|  8|
|       erp08|355457T1T3|                IRN_1003|    24102019|               13000|  9|
|       erp08|  355458T4|                 IRN_297|    01042020|                3000| 10|
|       erp08|  355458T4|                 IRN_297|    02042020|                3000| 11|
+------------+----------+------------------------+------------+--------------------+---+



In [23]:
# merge all 4 DF together
def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

test_df_append = unionAll(T1_df, T2_df, T3_df, T4_df)

In [29]:
# appending to ATH
final_df = unionAll(payments_df_ath,test_df_append)

In [30]:
final_df.limit(20).toPandas()

Unnamed: 0,company_code,vendor_id,invoice_reference_number,invoice_date,total_invoice_amount,id,cluster_num,match_percent,T1_%,T1_flag,T1_cluster_num,T2_%,T2_flag,T2_cluster_num,T3_%,T3_flag,T3_cluster_num,T4_%,T4_flag,T4_cluster_num
0,erp08,355186,IRN_4752_01,22102019,350,1,erp08-355186-22102019-IRN_4752_01-350,100,100,1,erp08-355186-22102019-IRN_4752_01-350,100,1,erp08-355186-22102019-IRN_4752_01-350,100,1,erp08-355186-22102019-IRN_4752_01-350,100,1,erp08-355186-22102019-IRN_4752_01-350
1,erp08,355186,IRN_4752_01,22102019,350,2,erp08-355186-22102019-IRN_4752_01-350,100,100,1,erp08-355186-22102019-IRN_4752_01-350,100,1,erp08-355186-22102019-IRN_4752_01-350,100,1,erp08-355186-22102019-IRN_4752_01-350,100,1,erp08-355186-22102019-IRN_4752_01-350
2,erp08,355186,IRN_4752_01,22102019,350,3,erp08-355186-22102019-IRN_4752_01-350,100,100,1,erp08-355186-22102019-IRN_4752_01-350,100,1,erp08-355186-22102019-IRN_4752_01-350,100,1,erp08-355186-22102019-IRN_4752_01-350,100,1,erp08-355186-22102019-IRN_4752_01-350
3,erp09,355185,IRN_4752_01,22102019,350,4,erp09-355185-22102019-IRN_4752_01-350,100,100,1,erp09-355185-22102019-IRN_4752_01-350,100,1,erp09-355185-22102019-IRN_4752_01-350,100,1,erp09-355185-22102019-IRN_4752_01-350,100,1,erp09-355185-22102019-IRN_4752_01-350
4,erp09,355185,IRN_4752_01,22102019,350,5,erp09-355185-22102019-IRN_4752_01-350,100,100,1,erp09-355185-22102019-IRN_4752_01-350,100,1,erp09-355185-22102019-IRN_4752_01-350,100,1,erp09-355185-22102019-IRN_4752_01-350,100,1,erp09-355185-22102019-IRN_4752_01-350
5,erp08,355457T1T3,IRN_1002,24102019,13000,8,erp08-355457T1T3-24102019-13000,0,100,1,erp08-355457T1T3-24102019-13000,0,0,-,0,0,-,0,0,-
6,erp08,355457T1T3,IRN_1003,24102019,13000,9,erp08-355457T1T3-24102019-13000,0,100,1,erp08-355457T1T3-24102019-13000,0,0,-,0,0,-,0,0,-
7,erp08,355456T2,IRN_98_01,22102019,2000,6,erp08-22102019-IRN_98_01,0,0,0,-,100,1,erp08-22102019-IRN_98_01,0,0,-,0,0,-
8,erp08,355456T2,IRN_98_01,22102019,10000,7,erp08-22102019-IRN_98_01,0,0,0,-,100,1,erp08-22102019-IRN_98_01,0,0,-,0,0,-
9,erp08,355457T1T3,IRN_1002,24102019,13000,8,erp08-24102019-13000,0,0,0,-,0,0,-,100,1,erp08-24102019-13000,0,0,-


In [None]:
print("T1 DF")
payments_df_test = payments_df_test.withColumn("match_key",concat_ws('-', 
                                                     payments_df_test.company_code,
                                                     payments_df_test.vendor_id,
                                                     payments_df_test.invoice_date,
                                                     payments_df_test.total_invoice_amount))

# duplicated rows
temp_df_T1 = payments_df_test.groupBy("match_key").count().filter("count>1")

# Change data type
temp_df_T1 = temp_df_T1.withColumn("count", temp_df_T1["count"].cast(StringType()))

# reform temp df to append later
temp_df_T1 = payments_df_test.join(temp_df_T1, on=['match_key'], how='left')

# Creating Separate dataframes
temp_df_T1 = temp_df_T1.filter(~temp_df_T1["count"].isNull())

# cluster number
temp_df_T1 = temp_df_T1.withColumn("cluster_num", temp_df_T1["match_key"])

# drop count and match key
temp_df_T1 = temp_df_T1.drop('count', 'match_key')

# default match percent
temp_df_T1 = temp_df_T1.withColumn("match_percent", lit(0))

# Cluster number and T1 flag
temp_df_T1 = temp_df_T1.withColumn("T1_flag", lit(1))
temp_df_T1 = temp_df_T1.withColumn("T1_%", lit(100))
temp_df_T1 = temp_df_T1.withColumn("T1_cluster_num", temp_df_T1['cluster_num'])


# for each test
for test_iter in ["T2", "T3", "T4"]:
    temp_df_T1 = temp_df_T1.withColumn("{}_flag".format(test_iter), lit(0))
    temp_df_T1 = temp_df_T1.withColumn("{}_%".format(test_iter), lit(0))
    temp_df_T1 = temp_df_T1.withColumn("{}_cluster_num".format(test_iter), lit("-"))
    
temp_df_T1 = temp_df_T1.select(payments_df_ath.columns)

----

In [None]:
print("T2 DF")
payments_df_test = payments_df_test.withColumn("match_key",concat_ws('-', 
                                                     payments_df_test.company_code,
                                                     payments_df_test.invoice_date,
                                                     payments_df_test.invoice_reference_number))

# duplicated rows
temp_df_T2 = payments_df_test.groupBy("match_key").count().filter("count>1")

# duplicated rows
temp_df_T2 = payments_df_test.groupBy("match_key").count().filter("count>1")

# Change data type
temp_df_T2 = temp_df_T2.withColumn("count", temp_df_T2["count"].cast(StringType()))

# reform temp df to append later
temp_df_T2 = payments_df_test.join(temp_df_T2, on=['match_key'], how='left')

# Creating Separate dataframes
temp_df_T2 = temp_df_T2.filter(~temp_df_T2["count"].isNull())

# cluster number
temp_df_T2 = temp_df_T2.withColumn("cluster_num", temp_df_T2["match_key"])

# drop count and match key
temp_df_T2 = temp_df_T2.drop('count', 'match_key')

# default match percent
temp_df_T2 = temp_df_T2.withColumn("match_percent", lit(0))

# Cluster number and T1 flag
temp_df_T2 = temp_df_T2.withColumn("T2_flag", lit(1))
temp_df_T2 = temp_df_T2.withColumn("T2_cluster_num", temp_df_T2['cluster_num'])


# for each test
for test_iter in ["TT", "T3", "T4"]:
    temp_df_T2 = temp_df_T2.withColumn("{}_flag".format(test_iter), lit(0))
    temp_df_T2 = temp_df_T2.withColumn("{}_cluster_num".format(test_iter), lit("-"))

----

In [None]:
print("T3 DF")
payments_df_test = payments_df_test.withColumn("match_key",concat_ws('-', 
                                                     payments_df_test.company_code,
                                                     payments_df_test.invoice_date,
                                                     payments_df_test.total_invoice_amount))


# duplicated rows
temp_df_T3 = payments_df_test.groupBy("match_key").count().filter("count>1")

# duplicated rows
temp_df_T3 = payments_df_test.groupBy("match_key").count().filter("count>1")

# Change data type
temp_df_T3 = temp_df_T3.withColumn("count", temp_df_T3["count"].cast(StringType()))

# reform temp df to append later
temp_df_T3 = payments_df_test.join(temp_df_T3, on=['match_key'], how='left')

# Creating Separate dataframes
temp_df_T3 = temp_df_T3.filter(~temp_df_T3["count"].isNull())

# cluster number
temp_df_T3 = temp_df_T3.withColumn("cluster_num", temp_df_T3["match_key"])

# drop count and match key
temp_df_T3 = temp_df_T3.drop('count', 'match_key')

# default match percent
temp_df_T3 = temp_df_T3.withColumn("match_percent", lit(0))

# Cluster number and T1 flag
temp_df_T3 = temp_df_T3.withColumn("T3_flag", lit(1))
temp_df_T3 = temp_df_T3.withColumn("T3_%", lit(0))
temp_df_T3 = temp_df_T3.withColumn("T3_cluster_num", temp_df_T3['cluster_num'])


# for each test
for test_iter in ["TT", "T2", "T4"]:
    temp_df_T3 = temp_df_T3.withColumn("{}_flag".format(test_iter), lit(0))
    temp_df_T3 = temp_df_T3.withColumn("{}_cluster_num".format(test_iter), lit("-"))

----

In [None]:
print("T4 DF")
payments_df_test = payments_df_test.withColumn("match_key",concat_ws('-', 
                                                     payments_df_test.company_code,
                                                     payments_df_test.vendor_id,
                                                     payments_df_test.invoice_reference_number,
                                                     payments_df_test.total_invoice_amount))


# duplicated rows
temp_df_T4 = payments_df_test.groupBy("match_key").count().filter("count>1")

# duplicated rows
temp_df_T4 = payments_df_test.groupBy("match_key").count().filter("count>1")

# Change data type
temp_df_T4 = temp_df_T4.withColumn("count", temp_df_T4["count"].cast(StringType()))

# reform temp df to append later
temp_df_T4 = payments_df_test.join(temp_df_T4, on=['match_key'], how='left')

# Creating Separate dataframes
temp_df_T4 = temp_df_T4.filter(~temp_df_T4["count"].isNull())

# cluster number
temp_df_T4 = temp_df_T4.withColumn("cluster_num", temp_df_T4["match_key"])

# drop count and match key
temp_df_T4 = temp_df_T4.drop('count', 'match_key')

# default match percent
temp_df_T4 = temp_df_T4.withColumn("match_percent", lit(0))

# Cluster number and T1 flag
temp_df_T4 = temp_df_T4.withColumn("T4_flag", lit(1))
temp_df_T4 = temp_df_T4.withColumn("T4_%", lit(0))
temp_df_T4 = temp_df_T4.withColumn("T4_cluster_num", temp_df_T4['cluster_num'])


# for each test
for test_iter in ["TT", "T2", "T3"]:
    temp_df_T4 = temp_df_T4.withColumn("{}_flag".format(test_iter), lit(0))
    temp_df_T4 = temp_df_T4.withColumn("{}_cluster_num".format(test_iter), lit("-"))

In [None]:
temp_df_T4.show()

----

In [None]:
# merge all 4 DF together
def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

temp_df_test = unionAll(temp_df_T1, temp_df_T2, temp_df_T3, temp_df_T4)

----

In [None]:
temp_df.show()

In [None]:
# duplicated rows
temp_df = payments_df_ath.groupBy("match_key").count().filter("count>1")
# Change data type
temp_df = temp_df.withColumn("count", temp_df["count"].cast(StringType()))
# reform temp df to append later
temp_df = payments_df_ath.join(temp_df, on=['match_key'], how='left')
# removing Nulls
temp_df = temp_df.filter(~temp_df["count"].isNull())
# drop count
temp_df = temp_df.drop('count')
temp_df = temp_df.withColumn("cluster_num", temp_df["match_key"])

In [None]:
# match percent default to 0
temp_df = temp_df.withColumn("match_percent".format(test_iter), lit(0))

In [None]:
temp_df.show()

In [None]:
test_dict = {"Overall" : ['vendor_id', 'invoice_reference_number', 'invoice_date', 'total_invoice_amount'],
             "T1" : ['invoice_date', 'total_invoice_amount', 'vendor_id'],
            "T2" : ['invoice_date', 'invoice_reference_number'],
            "T3" : ['invoice_date', 'total_invoice_amount'],
            "T4" : ['total_invoice_amount', 'vendor_id','invoice_reference_number']}

----

**Cluster Numbers**

In [None]:
# creating cluster number
indexer = StringIndexer(inputCol="match_key", outputCol="cluster_num")
temp_df = indexer.fit(temp_df).transform(temp_df)
temp_df = temp_df.withColumn("cluster_num", temp_df["cluster_num"]+1)
temp_df = temp_df.withColumn("cluster_num", temp_df["cluster_num"].cast(StringType()))
temp_df = temp_df.withColumn("cluster_num",concat_ws('_',temp_df.company_code,temp_df.cluster_num))

-----

-----

-----

## Old Code - //remove later//

### Connection

In [None]:
db_url = "jdbc:sqlserver://azrdwhlantern.database.windows.net:1433;databaseName=azrdwhlantern"
db_name = 'azrdwhlantern'
db_user = "swarit"
db_pwd = "Welcome123"

def read_dwh_query(query):
    query = "(" + query + ") tmp"
    jdbcDF = spark.read \
        .format("jdbc") \
        .option("url", db_url) \
        .option("dbtable", query) \
        .option("user", db_user) \
        .option("password", db_pwd) \
        .load()
    
    return jdbcDF

In [None]:
source_system = 'adopt_erp'

In [None]:
payments_data_fields = ['Id', 'source_system', 'zone', 'company_code','vendor_id', 'invoice_reference_number',
                        'invoice_date', 'total_invoice_amount', 'payment_accounting_document_number']

# data for source system
query = "SELECT TOP 10000 %s" % ', '.join(str(item) for item in payments_data_fields[1:]) +  " from import.invoice_payment_dp where source_system  = '{}'".format(source_system)

# full data
# query = "SELECT %s" % ', '.join(str(item) for item in payments_data_fields[1:]) +  f" from import.invoice_payment_dp"

# reading payments data
payments_data = read_dwh_query(query)

# adding ID column to payments data
payments_data = payments_data.select("*").withColumn("id", monotonically_increasing_id() + 1)

In [None]:
test_df = payments_data.toPandas()

In [None]:
test_df.shape

In [None]:
# converting back
test_df2 = sqlContext.createDataFrame(test_df)

----- 
Test

In [None]:
source_system = 'adopt_erp'

In [None]:
payments_data_fields = ['Id', 'source_system', 'zone', 'company_code','vendor_id', 'invoice_reference_number',
                        'invoice_date', 'total_invoice_amount', 'payment_accounting_document_number',
                       'vendor_is_intercompany', 'vendor_is_employee']

# data for source system
query = "SELECT %s" % ', '.join(str(item) for item in payments_data_fields[1:]) +  " from import.invoice_payment_dp where source_system  = '{}'".format(source_system)

# full data
# query = "SELECT %s" % ', '.join(str(item) for item in payments_data_fields[1:]) +  f" from import.invoice_payment_dp"

# reading payments data
payments_data = read_dwh_query(query)

In [None]:
payments_data = payments_data.toPandas()

In [None]:
# removing intercompany and employee transcations
payments_data = payments_data.loc[(payments_data['vendor_is_intercompany'] != 'Y')]
payments_data = payments_data.loc[(payments_data['vendor_is_employee'] != 'Y')]
payments_data.drop(columns= ['vendor_is_intercompany', 'vendor_is_employee'], inplace = True)

# Adding the Id column to the imported data
payments_data = payments_data.reset_index(drop=False)
payments_data.rename(columns={'index':'Id'}, inplace=True)

In [None]:
payments_data_fields = ['Id','source_system','zone', 'company_code','vendor_id', 'invoice_reference_number',
                        'invoice_date', 'total_invoice_amount', 'payment_accounting_document_number']

#######################################################################################
############################# PRE - PROCESS PART ######################################
#######################################################################################

data = payments_data.copy()

# Remove trailing zeros (problem based solution)
def remove_trailing_zeros(x):
    try:
        # Convert to int
        x = int(x)
        # Convert back to str
        x = str(x)
        return x
    except:
        return str(x)

for field in payments_data_fields:
    data[field] = data[field].apply(remove_trailing_zeros)
    data[field] = data[field].apply(lambda x: x.lstrip())
    data[field] = data[field].apply(lambda x: x.rstrip())

# Subset the data for the required columns
# payments_data = payments_data[['Id', 'vendor_id', 'po_number', 'invoice_reference_number', 'invoice_date', 'total_invoice_amount']]
data = data[payments_data_fields]

data['vendor_id_length'] = data['vendor_id'].str.len()
# print(">>>>>>>>>>>>>>>>>>>>>>> VENDOR ID LENGTH <<<<<<<<<<<<<<<<<<<<<")
# print(data['vendor_id_length'].value_counts())
# len(data)
data = data[(data['vendor_id_length'] == 6) | (data['vendor_id_length'] == 7)]
# len(data)
if(len(data) == 0):
    print('All records are filtered OUT on length of Vendor ID!! Check data source!!')
    
data = data.replace('<not provided>',np.nan)
data = data.replace('[no vendor]',np.nan)
#data = data.replace('None',np.nan)
data['total_invoice_amount'] = data['total_invoice_amount'].replace(' ',np.nan).astype(float)

# print(data.isna().sum())
data = data.fillna('0')
# print(data.isna().sum())


data.reset_index(drop=True,inplace=True)

print('###############################################################################')
print("Number of samples in imported dataset: " + str(data.shape[0]))
print("Number of fields in imported dataset: " + str(data.shape[1]))
print('###############################################################################')

def preProcess(field):
    column = data[field]    
    column = column.map(lambda x: re.sub(r'[^\x00-\x7F]+',' ', str(x)))
    column = column.apply(str)
    column = column.map(lambda x: re.sub('  +', ' ', x))
    column = column.map(lambda x: re.sub('\n', ' ', x))
    column = column.map(lambda x: x.strip())
    column = column.map(lambda x: x.strip('"'))
    column = column.map(lambda x: x.strip("'"))
    column = column.map(lambda x: x.lower())
    column = column.map(lambda x: re.sub('nan', ' ', x))
    #column = column.map(lambda x: x.encode('unicode-escape').decode('utf-8'))
    return column

for field in list(data.columns):
    print("Cleaning column: " + field)
    data[field] = preProcess(field) 
print("Done Cleaning Columns")

# data.apply(lambda x: (x == "").sum())
data = data.loc[data['invoice_reference_number'] != "",]
# data.isna().sum()

print("No. of Data Points: {}".format(len(data)))
print("No. of Unique company codes: {}".format(len(set(data['company_code']))))

data['company_code'] = data['company_code'].replace('[^a-zA-Z0-9 ]', "", regex=True)
data['vendor_id'] = data['vendor_id'].replace('[^a-zA-Z0-9 ]', "", regex=True)


data['invoice_reference_number'] = data['invoice_reference_number'].replace('[^a-zA-Z0-9 ]', "", regex=True)
data['invoice_date'] = data['invoice_date'].replace('[^a-zA-Z0-9 ]', "", regex=True)
data['invoice_date'] = data['invoice_date'].str.replace(' 000000','')
data = data[['Id','source_system','zone','company_code','vendor_id','invoice_reference_number','invoice_date','total_invoice_amount', 'payment_accounting_document_number']]
data['match_key'] = data['vendor_id'] + "-" + data['invoice_reference_number'] + "-" + data['invoice_date'] + "-" + data['total_invoice_amount']


print("Part(1/6) - Data Processing Done!")

---

In [None]:
#######################################################################################
############################# ATH CREATION PART  ######################################
#######################################################################################

## Payment DF ATH

payments_df_ath = data.copy()
payments_df_ath = payments_df_ath.loc[:,['Id','company_code', 'vendor_id', 'invoice_reference_number','invoice_date', 'total_invoice_amount', 'payment_accounting_document_number']] # Added Id column to the list of columns retained
payments_df_ath['cluster_num'] = '1'

test_dict = {"Overall" : ['vendor_id', 'invoice_reference_number', 'invoice_date', 'total_invoice_amount'],
             "T1" : ['invoice_date', 'total_invoice_amount', 'vendor_id'],
             "T2" : ['invoice_date', 'invoice_reference_number'],
             "T3" : ['invoice_date', 'total_invoice_amount'],
             "T4" : ['total_invoice_amount', 'vendor_id','invoice_reference_number']}


# create a function to return DF with test field
def testBuilder(df, test_col_list, test_name):

    #Creating temp DF for analysis
    temp_dup_df = df.loc[:,test_col_list].dropna()

    # find duplicated entries and create unique level dataframe
    temp_dup_df = temp_dup_df.loc[temp_dup_df.duplicated(subset=test_col_list, keep=False),test_col_list]
    temp_dup_df.drop_duplicates(inplace = True)
    temp_dup_df.reset_index(drop=True, inplace = True)

    # creating test group for each test -- any value means that the test was hit -- number indicates group number
    temp_dup_df['{}_grp'.format(test_name)] = pd.Series(np.arange(1,len(temp_dup_df)+1,1))

    # merging with parent dataframe using each "duplicated key" formed using columns for each test
    df = df.merge(temp_dup_df, on=test_col_list, how = 'left', validate="many_to_one")    

    # return original dataframe with additional column for test applied
    return df

# apply tests for each tests defined in "test_dict"
for test_iter in np.arange(0,len(test_dict)):

    # define test column list and test name for testbuilder function
    col_list = test_dict[list(test_dict.keys())[test_iter]]
    name = list(test_dict.keys())[test_iter]

    # apply testBuilder function for each test on groupby dataframe object
    payments_df_ath = payments_df_ath.groupby(['company_code', 'cluster_num']).apply(lambda x : testBuilder(x, col_list, name)).reset_index(drop=True)

# post formatting
payments_df_ath.columns = payments_df_ath.columns.str.replace("_grp", "")

# change groups to flags
# payments_df_ath.loc[:,payments_df_ath.columns.str.contains("[T*]")] = (payments_df_ath.loc[:,payments_df_ath.columns.str.contains("[T*]")].notnull()).astype(int)

# transaction level score
# payments_df_ath['transaction_score'] = payments_df_ath.loc[:,payments_df_ath.columns.str.contains("[T*]")].sum(axis=1)
test_weights = [3,2,1,3]
# payments_df_ath['transaction_score'] = payments_df_ath.loc[:,payments_df_ath.columns.str.contains("[T*]")].multiply(test_weights, axis = 'columns').sum(axis='columns')

# new test_score
payments_df_ath['test_score'] = (payments_df_ath.loc[:,payments_df_ath.columns.str.contains("[T*]")].notnull()).astype(int).multiply(test_weights, axis = 'columns').sum(axis='columns')

# payments_df_ath_org = payments_df_ath.copy()

payments_df_ath['sequential_flag'] = 0.0
payments_df_ath['T0'] = 0.0
print("Part(2/6) - Overall 100% match DF Done!")

---

In [None]:
## Forming DataFrames for ATH and Test Based Clustering

# payments_df_ath = payments_df_ath_org.copy()

payments_df_ath['test_score'].unique()
payments_df_ath.shape
payments_df_ath.loc[payments_df_ath['test_score'] == 0].shape

# removing 0 test score (transactions not hitting any test)
payments_df_ath = payments_df_ath.loc[payments_df_ath['test_score'] != 0]

payments_df_ath.isna().sum()

# creating test based clsuters
payments_df_ath['T1_cluster_num'] = payments_df_ath['company_code'] + "_" + payments_df_ath['T1'].astype(str)
payments_df_ath['T2_cluster_num'] = payments_df_ath['company_code'] + "_" + payments_df_ath['T2'].astype(str)
payments_df_ath['T3_cluster_num'] = payments_df_ath['company_code'] + "_" + payments_df_ath['T3'].astype(str)
payments_df_ath['T4_cluster_num'] = payments_df_ath['company_code'] + "_" + payments_df_ath['T4'].astype(str)

# creating test based and ATH dataframes
payments_df_test = payments_df_ath.loc[payments_df_ath['test_score'] != 9]
payments_df_ath = payments_df_ath.loc[payments_df_ath['test_score'] == 9]

In [None]:
print(payments_df_ath.shape)
print(payments_df_test.shape)

----

In [None]:
## ATH Clustering 

# cluster aggregated score
# payments_df_ath['cluster_score'] = payments_df_ath.groupby(['company_code', 'cluster_num'])['transaction_score'].transform('max')

# sort by company codes and cluster numbers
payments_df_ath.sort_values(['cluster_num'], inplace=True)
payments_df_ath = payments_df_ath.sort_values(['test_score', 'total_invoice_amount', 'invoice_date'], ascending=False)
payments_df_ath = payments_df_ath.loc[payments_df_ath['Overall'].notnull(),:]
payments_df_ath['Overall'].nunique()
payments_df_ath['cluster_num'] = payments_df_ath['Overall'].astype(str) + "_ath"
# payments_df_ath = payments_df_ath.loc[:,['Id','company_code', 'vendor_id', 'invoice_reference_number', 'invoice_date','total_invoice_amount','payment_accounting_document_number','cluster_num']]
payments_df_ath['match_percent'] = 100

# adding test based match percent and flags to improve test based fuzzy match percent code chunk
for name in ['T1', 'T2', 'T3', 'T4']:
    payments_df_ath['{}_%'.format(name)] = 100
    payments_df_ath['{}_flag'.format(name)]  = 1

def cluster_name(df):
    df['cluster_num_new'] = df['company_code'] + "_" + (pd.Categorical(df['cluster_num']).codes + 1).astype(str)
    return df

payments_df_ath = payments_df_ath.groupby('company_code').apply(lambda x : cluster_name(x))
payments_df_ath.drop(columns= ['cluster_num'], inplace = True)
payments_df_ath.rename(columns = {'cluster_num_new' : 'cluster_num'}, inplace = True)

# drop to get unique payment accouting document number across clusters
payments_df_ath.drop_duplicates(['cluster_num', 'payment_accounting_document_number'], inplace=True)

payments_df_ath['sequential_flag'] = 0.0
payments_df_ath['T0'] = 0.0


print(payments_df_ath.shape)
print("Part(3/6) - ATH Part Done!")

---

In [None]:
# initiating test based final DF
payments_df_test_append = pd.DataFrame()

# iterate over each test to create overall cluster based on test cluster
for name in ['T1', 'T2', 'T3', 'T4']:
    try:
        payments_df_single_test = payments_df_test.loc[payments_df_test[name].notnull()]
        # sort by company codes and cluster numbers
        payments_df_single_test.sort_values(['cluster_num'], inplace=True)
        payments_df_single_test = payments_df_single_test.sort_values(['test_score', 'total_invoice_amount', 'invoice_date'], ascending=False)

        payments_df_single_test = payments_df_single_test.loc[payments_df_single_test[name].notnull(),:]

        payments_df_single_test['cluster_num'] = payments_df_single_test[name].astype(str) + "_sth"
        payments_df_single_test['match_percent'] = 0

        # adding test based match percent and flags to improve test based fuzzy match percent code chunk
        for temp_name in ['T1', 'T2', 'T3', 'T4']:
            if(temp_name == name):
                payments_df_single_test['{}_%'.format(temp_name)] = 100
                payments_df_single_test['{}_flag'.format(temp_name)]  = 1
            else:
                payments_df_single_test['{}_%'.format(temp_name)] = 0
                payments_df_single_test['{}_flag'.format(temp_name)]  = 0


        def cluster_name(df):
            df['cluster_num_new'] = df['company_code'] + "_" + (pd.Categorical(df['cluster_num']).codes + 1).astype(str) + '_' + name 
            return df

        payments_df_single_test = payments_df_single_test.groupby('company_code').apply(lambda x : cluster_name(x))
        payments_df_single_test.drop(columns= ['cluster_num'], inplace = True)
        payments_df_single_test.rename(columns = {'cluster_num_new' : 'cluster_num'}, inplace = True)

        # drop to get unique payment accouting document number across clusters
        payments_df_single_test.drop_duplicates(['cluster_num', 'payment_accounting_document_number'], inplace=True)


        # remove clusters with single 
        accounting_num_count_df = payments_df_single_test.groupby('cluster_num')['payment_accounting_document_number'].agg({'count' : len, 'unique_num' : pd.Series.nunique})
        accounting_num_count_df.reset_index(inplace = True)

        # list of clusters to remove
        clusters_to_remove = accounting_num_count_df.loc[(accounting_num_count_df['count'] == 1) | (accounting_num_count_df['unique_num'] == 1), 'cluster_num'].unique()
        payments_df_single_test = payments_df_single_test.loc[~payments_df_single_test['cluster_num'].isin(clusters_to_remove)]

        # append in each iteration
        payments_df_test_append = payments_df_test_append.append(payments_df_single_test)
        print("{} ||  Transactions  =>  {}  ||  Clusters => {}".format(name,len(payments_df_single_test), payments_df_single_test['cluster_num'].nunique()))

    except:
        continue

print("Part(4/6) - Appending Test based DF Done!")

In [None]:
print(payments_df_test.shape)
print(payments_df_test_append.shape)
print(payments_df_test_append['cluster_num'].nunique())

----

In [None]:
payments_df_test_append.drop(columns=['T1_%', 'T1_flag', 'T2_%', 'T2_flag','T3_%', 'T3_flag', 'T4_%', 'T4_flag'], inplace=True)
payments_df_test_append['sequential_flag'] = 0.0


def fuzz_test_creator(df):
    temp_df = df.copy()
    for test_iter in np.arange(0,len(test_dict)):

        col_list = test_dict[list(test_dict.keys())[test_iter]]
        name = list(test_dict.keys())[test_iter]
        temp_df['match_key'] = temp_df[col_list].apply(lambda x : "-".join(x), axis = 1)
        temp_df.sort_values(by = ['match_percent'], ascending=False, inplace=True)
        key = temp_df.head(1)['match_key'].values[0]

        matching_list = process.extract(key, temp_df['match_key'].unique(), limit=temp_df['match_key'].nunique())
        matching_temp_df = pd.DataFrame(matching_list)
        matching_temp_df = matching_temp_df.iloc[:,:2]

        # rename columns
        matching_temp_df.columns = ['match_key', '{}_%'.format(name)]

        # single point comparison will alwasy be an exact match
        if((len(matching_temp_df) == 1)):
            matching_temp_df.iloc[0,1] = 100

        # relative match percent with the closest match
        temp_100_key = matching_temp_df.loc[matching_temp_df['{}_%'.format(name)] == 100, 'match_key']


        if(temp_df.loc[temp_df['match_key'].isin(temp_100_key)].shape[0] < 2):
            try:
                matching_temp_df.iloc[0,1] = matching_temp_df.iloc[1,1]
            except:
                matching_temp_df = matching_temp_df.copy()

        # test based cluster
        matching_temp_df['{}_flag'.format(name)] = 0
        matching_temp_df.loc[matching_temp_df['{}_%'.format(name)] >= 90, '{}_flag'.format(name)]  = 1

#         display(matching_temp_df)
        temp_df = temp_df.merge(matching_temp_df, on='match_key', validate='m:1', how = 'inner')



    # Sequential Flag -------------->>>>
    # checking integer invoice reference number for which sequence test will work
    try:
        temp_df['invoice_reference_number'] = temp_df['invoice_reference_number'].astype(float)
        temp_df.sort_values('invoice_reference_number', inplace=True)
        temp_df.loc[temp_df['invoice_reference_number'].diff()==1, 'sequential_flag'] = 1
    except:
        temp_df['sequential_flag'] = 0


    # Typo Flag ------------------>>>>
    typo_locs = []
    id_list = list(map(str,temp_df['invoice_reference_number']))
    for x,y in itertools.combinations(id_list, 2):
        if(x == y):
            continue
        elif(Counter(x)==Counter(y)):
            typo_locs.extend([x,y])

    typo_locs = list(set(itertools.chain(typo_locs)))
    temp_typo_flag_df = pd.DataFrame({'invoice_reference_number':typo_locs})
    temp_typo_flag_df['cluster_num'] = temp_df['cluster_num'].unique()[0]

    try:
        # creating temp DF for typo flag and merging with parent
        if(len(temp_typo_flag_df) > 0):
            temp_typo_flag_df['T0']=1.0
            temp_typo_flag_df = temp_typo_flag_df.reset_index(drop=True)
            # merge with groupby parent
            temp_df = temp_df.merge(temp_typo_flag_df,how = 'left', on = ['invoice_reference_number', 'cluster_num'], validate='m:1')
            temp_df['T0'] = temp_df['T0'].replace(np.nan,0)
    except:
        temp_df['T0'] = 0

    # return dataframe 
    return pd.DataFrame(temp_df)


# Apply function to groupby object in parallel 
def applyParallel(dfGrouped, func):
    retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
    return pd.concat(retLst)


payments_df_test_final = applyParallel(payments_df_test_append.groupby('cluster_num'), fuzz_test_creator)


try:
    payments_df_test_final['T0'] = payments_df_test_final['T0'].replace(np.nan,0.0)
except:
    payments_df_test_final['T0'] = 0.0  

payments_df_test_final['sequential_flag'] = payments_df_test_final['sequential_flag'].replace(np.nan, 0.0)

# clean
payments_df_test_final.reset_index(drop=True, inplace=True)
payments_df_test_final.drop(columns=['Overall_flag', 'match_percent'], inplace=True)
payments_df_test_final.rename(columns={'Overall_%':'match_percent'}, inplace=True)

# aligning columns
payments_df_test_final = payments_df_test_final.loc[:,list(payments_df_ath.columns)]

print("Part(5/6) - Fuzzy Matching Done!")

print(" #####||| payment fuzzy DF ===> {} |||#####".format(payments_df_test.shape[0]))
print(" #####||| payment fuzzy DF after Appending ===> {} |||#####".format(payments_df_test_append.shape[0]))

### Formating

In [None]:
#######################################################################################
########################## Merging and Final formatting  ###############################
#######################################################################################

# append with 100% match DF
final_df = payments_df_ath.append(payments_df_test_final, ignore_index=True)

# correcting nan cluster numbers
for clus in ['T1_cluster_num', 'T2_cluster_num', 'T3_cluster_num','T4_cluster_num']:
    final_df.loc[final_df[clus].str.contains('_nan'), clus] = '-'

final_df = final_df[['Id','cluster_num','match_percent','T1_%','T1_flag','T2_%','T2_flag','T3_%','T3_flag','T4_%','T4_flag','test_score','T1_cluster_num','T2_cluster_num','T3_cluster_num','T4_cluster_num', 'T0', 'sequential_flag']]
final_df['Id'] = final_df['Id'].astype(int)
data['Id'] = data['Id'].astype(int)

# merging with original dataframe
final_df = pd.merge(final_df,payments_data,how='left',on='Id', validate = 'm:1')

# removing junk clusters with same JE ID 
accounting_num_count_df = final_df.groupby('cluster_num')['payment_accounting_document_number'].agg({'count' : len, 'unique_num' : pd.Series.nunique})
accounting_num_count_df.reset_index(inplace = True)
clusters_to_remove = accounting_num_count_df.loc[(accounting_num_count_df['count'] == 1) | (accounting_num_count_df['unique_num'] == 1), 'cluster_num'].unique()
final_df = final_df.loc[~final_df['cluster_num'].isin(clusters_to_remove)]



# renaming cluster numbers
def cluster_name(df):
    df['cluster_num_new'] = df['company_code'] + "_" + (pd.Categorical(df['cluster_num']).codes + 1).astype(str)
    return df

final_df = final_df.groupby('company_code').apply(lambda x : cluster_name(x))
final_df.drop(columns= ['cluster_num'], inplace = True)
final_df.rename(columns = {'cluster_num_new' : 'cluster_num'}, inplace = True)


final_df[['match_percent', 'T1_%', 'T1_flag', 'T2_%', 'T2_flag', 'T3_%',
       'T3_flag', 'T4_%', 'T4_flag', 'test_score']] = final_df[['match_percent', 'T1_%', 'T1_flag', 'T2_%', 'T2_flag', 'T3_%',
       'T3_flag', 'T4_%', 'T4_flag', 'test_score']].astype(float)

final_df.drop(columns=['Id'], inplace=True)

print("Part(6/6) - Final Formatting Done!")

for test_iter in ['T1', 'T2', 'T3', 'T4']:
    final_df.loc[final_df['{}_cluster_num'.format(test_iter)].str.contains(".0", regex = False),'{}_cluster_num'.format(test_iter)] = final_df.loc[final_df['{}_cluster_num'.format(test_iter)].str.contains(".0", regex = False),'{}_cluster_num'.format(test_iter)].str[:-2]

# Final summary
print('###############################################################################')
print('Number of Transaction ==> {}'.format(final_df.shape[0]))
print('Number of Clusters ==> {}'.format(final_df.cluster_num.nunique()))

# by zone
print(final_df.groupby('zone')['cluster_num'].agg({'Transactions' : len,
                                                   'Clusters' : 'nunique'}).reset_index())
                                    
print('###############################################################################')

# # save to parquet
# final_df.to_parquet(self.parquet_save_path + f"\\DP_INV_PAY_{self.source_system}.parquet", engine='pyarrow', compression='gzip')
# print(f"{self.source_system} Parqeut Saved!")
# print("\n")

-----