ERP is an acronym that stands for **Enterprise Resource Planning** (ERP). 

It's a business process management software that manages and integrates a company's financials, supply chain, operations, commerce, reporting, manufacturing, and human resource activities. 

One important ERP entity is the **Account Receivable (AR)**: it refers to the money a company's customers owe for goods or services they have received.

 **Account Receivable (AR)** could be:
 * Invoice
 * Credit Note
 * Debit Note
 * Cancellation
 * Miscellaneous

Each AR is made by several part like the **header** – the part with general information about customers/suppliers that define the invoice – the **list of items**, the **list of payments**, details about the **customers**, details about the **shipping**, ...

# Parameters

* N: number of invoices
* M: number of payments
* K: number of customers

In [None]:
N=10000
M=12500
K=150

# AR Header

The Header of an AR document contains some general information like
* Customer ID
* Value
* Due Date
* Posting Date
* Document Number - must be unique per fiscal year
* Fiscal Year
* Document Type

Assumptions:
* we have "Invoice" has only type

In [None]:
from random import randint
from datetime import datetime,timedelta
def headerGenerator(k=5):
  postingDate = datetime(2022,1,1)+timedelta(randint(0,200))
  return {
          "customerId":"Customer_{customerId}".format(customerId=str(randint(0,k)+1).zfill(3)),
          "value":randint(50,10000),
          "documentCurrency":"EUR", 
          "postingDate":postingDate.strftime("%Y-%m-%d"),
          "dueDate":(postingDate+timedelta(randint(0,60))).strftime("%Y-%m-%d"),
          "fiscalYear":postingDate.strftime("%Y"),
          "documentType":"Invoice"
         }


def headerList(k=5,n=1000):
  rawHeaderList = [headerGenerator(k) for k in range(n)]
  rawHeaderList.sort(key=lambda row: row.get("postingDate"))
  for pos,val in enumerate(rawHeaderList):
    val["documentNumber"]="2022-{docNum}".format(docNum=str(pos).zfill(5))
  return rawHeaderList
  
myARList = headerList(K,N)

# AR Payments

List of lines that represent a payment made by a customer on a given AR.
* Document Number
* Payment Date
* Value Paid

In [None]:
def paymentGenerator(InvoiceList):
  documentNumber = "2022-{docNum}".format(docNum=str(randint(0,len(InvoiceList)-1)).zfill(5))
  invoice = [k for k in InvoiceList if k.get("documentNumber")==documentNumber][0]
  postingDate = datetime.strptime(invoice.get("postingDate"),"%Y-%m-%d")
  return { 
          "documentNumber":documentNumber,
          "paymentDate":(postingDate+timedelta(randint(15,90))).strftime("%Y-%m-%d"),
          "valuePaid":randint(1,invoice.get("value"))
          ,"documentCurrency":invoice.get("documentCurrency")
         }


def paymentList(InvoiceList,m=250):
  return [paymentGenerator(InvoiceList) for k in range(m)]
   
myPaymentList = paymentList(myARList,M)  


# Part 00
* Define the type of each table (Log or Registry): which are the keys of these tables?

Both Header and Payments are log, because I cannot update or delete no one of them.
Keys:
* Header: documentNumber and fiscalYear
* Payments: documentNumber and paymenteDate (under the assumpions: i) I can receive multiple payments for a given AR, ii) I cannot receive more than one payment for a given invoice per day

# Part 01
* Create the two RDDs checking everything is ok!
* Create a unique RDD with pieces of information both from header and payments

In [None]:
import pyspark
if not sc:
    sc = pyspark.SparkContext("local[*]")

In [None]:
headerRDD = sc.parallelize(myARList)
paymentsRDD = sc.parallelize(myPaymentList)
headerRDD.count()==N,paymentsRDD.count()==M

Spark is based on piping operation using '.' to concatenate them...

In [None]:
def filterRow(row):
  return row.get("customerId")=='Customer_042'
headerRDD.filter(lambda headerDict: filterRow(headerDict)).map(lambda valueRow: valueRow.get("value")).filter(lambda row: row>1000).count()

In [None]:
semiJoinHeaderRDD = headerRDD.map(lambda row:((row.get("documentNumber"), row.get("fiscalYear")),row))
semiJoinHeaderRDD.first()

In [None]:
semiJoinPaymentsRDD = paymentsRDD.map(lambda row:((row.get("documentNumber"), datetime.strptime(row.get("paymentDate"),"%Y-%m-%d").strftime("%Y")),row))
semiJoinPaymentsRDD.first()

In [None]:
semiJoinPaymentsRDD.join(semiJoinHeaderRDD)
semiJoinPaymentsRDD.join(headerRDD)

# Join format
Join result is a tuple of two (key,(values)) while Values is a tuple of two (valueLeft,valueRight):
* joinResult[0]: key
* joinResult[1][0]: leftValue
* joinResult[1][1]: rightValue

In [None]:
joinedRdd = semiJoinPaymentsRDD.join(semiJoinHeaderRDD)
joinFirstResult = joinedRdd.first()
print("Keys:{keys}".format(keys=joinFirstResult[0]))
print("Left part of the join:{valueLeft}".format(valueLeft=joinFirstResult[1][0]))
print("Right part of the join:{valueRight}".format(valueRight=joinFirstResult[1][1]))

In [None]:
def formatRow(row):
  basicRow = {"header":row[1][1]} #header
  basicRow["keyTuple"] = row[0]
  basicRow["paymentList"] = [row[1][0]]
  return basicRow
niceJoineRDD = joinedRdd.map(lambda row: formatRow(row))
niceJoineRDD.first()

In [None]:
niceJoineRDD.map(lambda x: (x.get("keyTuple"),1)).reduceByKey(lambda left,right: left+right).first()

In [None]:
result = niceJoineRDD.filter(lambda row: row.get("keyTuple")==('2022-09741', '2022')).collect()
for pos,val in enumerate(result):
  print("Element number {pos}".format(pos=pos))
  print(val)

In [None]:
def mergerFunction(leftDict,rightDict):
  leftDict["paymentList"] +=rightDict["paymentList"]
  return leftDict
  
reducedNiceJoinedRDD = niceJoineRDD.map(lambda row: (row.get("keyTuple"),row)).reduceByKey(lambda left,right: mergerFunction(left,right) ).map(lambda x: x[1])

In [None]:
reducedNiceJoinedRDD.filter(lambda row: row.get("keyTuple")==('2022-09741', '2022')).first()

# Alternative and scalable approach

In [None]:
def quantitativeRepr(row):
  return {"amount":row.get("valuePaid"),"numberOfPayments":1,"lastDate":row.get("paymentDate")}

def combineFun(firstPayment,secondPayment):
  firstPayment["amount"] += secondPayment.get("amount")
  firstPayment["numberOfPayments"] += secondPayment.get("numberOfPayments")
  firstPayment["lastDate"] = secondPayment.get("lastDate") if secondPayment.get("lastDate")>firstPayment.get("lastDate") else firstPayment.get("lastDate")
  return firstPayment
  
lightPaymentRDD = paymentsRDD.map(lambda x: (x.get("documentNumber"),quantitativeRepr(x)))\
                .reduceByKey(lambda firstPayment,secondPayment: combineFun(firstPayment,secondPayment)
                            ).map(lambda x: {"keyTuple":(x[0],'2022'),"paymentStats":x[1]})
lightPaymentRDD.first()

In [None]:
lightlyJoinedRdd = lightPaymentRDD.map(lambda x: (x.get("keyTuple"),x)).join(semiJoinHeaderRDD)
lightlyJoinedRdd.first()

In [None]:
def rowFormatter(row):
  headerDict = row[1][0].copy()
  paymentDict = row[1][1].copy()
  headerDict.update(paymentDict)
  return headerDict
  
scalableJoinedRdd = lightlyJoinedRdd.map(lambda x: rowFormatter(x))
scalableJoinedRdd.count(),semiJoinHeaderRDD.count() #<== we need to make a fullOuterJoin

In [None]:
def rowFormatter2(row):
  '''
  This version considers null rows
  '''
  headerDict = row[1][1].copy()
  paymentDict = row[1][0].copy() if type(row[1][0])==dict else {'keyTuple': row[0],
   'paymentStats': {'amount': 0,
    'numberOfPayments': 0,
    'lastDate': '1999-01-01'}}
  headerDict.update(paymentDict)
  return headerDict
scalableJoinedRdd2 = lightPaymentRDD.map(lambda x: (x.get("keyTuple"),x)).rightOuterJoin(semiJoinHeaderRDD).map(lambda x: rowFormatter2(x))
scalableJoinedRdd2.count()==semiJoinHeaderRDD.count()

In [None]:
scalableJoinedRdd2.first()#<== we will use it for the rest of the workshop

# Pay attention to Python types

In [None]:
myString = "a1"
myString[0],myString[1]

In [None]:
sc.parallelize(["a1","a2","a1","b1","b2"]).reduceByKey(lambda first,second: first+second).collect()

# Part 02
* How many invoices are open (i.e., not completely paid)?
* How many invoices are closed (i.e., completely paid)?
* How many invoices are overdued (i.e., not completely paid and with a due date in the past)?
* How many invoices have been paid not in time (i.e., completely paid and with the last payment after the due date)?
* Add to the RDD the information of "closingDate" as the date of the payment that close that invoice.
* Add to the RDD the boolean of "inTime": True if the closingDate < dueDate else False

## open Invoices

In [None]:
openInvoices = scalableJoinedRdd2.filter(lambda x: x.get("value")>x.get("paymentStats").get("amount"))
openInvoices.first() #<lets use first to check the logic

In [None]:
openInvoices.count()

## Closed invoice

In [None]:
closedInvoices = scalableJoinedRdd2.filter(lambda x: x.get("value")<=x.get("paymentStats").get("amount"))
closedInvoices.count() 

In [None]:
closedInvoices.count()  + openInvoices.count() == N 

## Overdued

In [None]:
from datetime import datetime
overduedInvoices = openInvoices.filter(lambda x: datetime.strptime(x.get("dueDate"),"%Y-%M-%d")<datetime.now())
overduedInvoices.first()


In [None]:
overduedInvoices.count()

In [None]:
notInTime = closedInvoices.filter(lambda x: datetime.strptime(x.get("dueDate"),"%Y-%M-%d")<datetime.strptime(x.get("paymentStats").get("lastDate"),"%Y-%M-%d"))
notInTime.count()

In [None]:
def statDecorator(row):
  newRow = row.copy()
  isOpen = row.get("value")>row.get("paymentStats").get("amount")
  newRow["isOpen"] =  isOpen
  newRow["isInTime"] =  datetime.strptime(row.get("dueDate"),"%Y-%M-%d")>datetime.now() if isOpen else datetime.strptime(row.get("dueDate"),"%Y-%M-%d")>datetime.strptime(row.get("paymentStats").get("lastDate"),"%Y-%M-%d")
  return newRow

def statEnricher(row):  
  newRow = row.copy()
  isOpen = row.get("isOpen")
  isInTime = row.get("isInTime")
  newRow["paidInTime"] = not isOpen and isInTime
  newRow["paidNotInTime"] = not isOpen and not isInTime
  newRow["overdued"] = isOpen and not isInTime
  newRow["openInTime"] = isOpen and isInTime
  return newRow
  
enrichedScalableJoinedRdd2 =scalableJoinedRdd2.map(lambda x: statDecorator(x)).map(lambda x: statEnricher(x))
enrichedScalableJoinedRdd2.first()

In [None]:
enrichedScalableJoinedRdd2.count()

# Part 03 - Debit Note
* How many invoices have been paid for more then their value?
* Add to the Header RDD for each of them a Debit Note with the value to be charged back and the date of today

In [None]:
overPaidInvoices = enrichedScalableJoinedRdd2.filter(lambda x: (x.get("value")<x.get("paymentStats").get("amount")))
overPaidInvoices.count()

In [None]:
# under the hypothesis no-one is adding more payment, so this is going to be executed as transcation
lastDocNumber = enrichedScalableJoinedRdd2.map(lambda x: x.get("documentNumber")).max()
zipperOverPaidInvoices=overPaidInvoices.map(lambda x: x.get("documentNumber")).zipWithIndex()
zipperOverPaidInvoices.first()


In [None]:
def documentNumberCreator(x,lastDocNumber:str):
  lastNum = int(lastDocNumber.split("-")[1])
  newNum = "2022-{num}".format(num=lastNum+1+x[1])
  return (x[0],"2022"),newNum
  
documentMapperRdd = zipperOverPaidInvoices.map(lambda x: documentNumberCreator(x,lastDocNumber) )
documentMapperRdd.first()

In [None]:
from datetime import timedelta
def debitNoteFormatter(x):
  newDoc = x[1][0].copy()  
  newDoc["documentType"] = "Debit Note"
  newDoc["documentNumber"] = x[1][1]
  newDoc["value"] -= x[1][0]["paymentStats"]["amount"]
  newDoc["paymentStats"] =  {'amount': 0,
    'numberOfPayments': 0,
    'lastDate': '1999-01-01'}
  newDoc["dueDate"] = (datetime.now()+timedelta(90)).strftime("%Y-%m-%d")
  newDoc["isOpen"] = True
  newDoc["isInTime"] = True
  return newDoc
  
  
debitNoteRdd = enrichedScalableJoinedRdd2.map(lambda x: (x.get("keyTuple"),x)).join(documentMapperRdd).map(lambda x: debitNoteFormatter(x)).map(lambda x: statEnricher(x))
debitNoteRdd.first()

In [None]:
arFullRDD = debitNoteRdd.union(enrichedScalableJoinedRdd2)
arFullRDD.count()

# Part 04 - Paymenets Frequency
* Add to the Payment Rdd the computed "expectedPaymentDate". It is based on the two previous payments, and is the last payment date + the difference between it and the payment right before, customer by customer.
So, in the example below, for the first two payment is not possible to compute, while for the third, the expected payment is the 2022/10/15 (date of the last payment) plus 3 (the difference between it and the payment of 2022/10/12) 
| customerId  | paymentDate | expectedPaymentDate | documentNumber | ... |
|-------------|-------------|---------------------|----------------|-----|
| Customer001 | 2022/10/12  | N/A                 | 2022_01001     | ... |
| Customer001 | 2022/10/15  | N/A                 | 2022_01004     | ... |
| Customer001 | 2022/10/16  | 2022/10/18 (15+3)   | 2022_00904     | ... |
| Customer001 | 2022/10/20  | 2022/10/17 (16+1)   | 2022_01004     | ... |
| Customer001 | 2022/10/30  | 2022/11/24 (20+4)   | 2022_01101     | ... |
| Customer001 | ...         | ...                 | ...            | ... |
* Show for each customer, the average error of such method

# Part 05 - Cosine Similarity
* How many customers has the company?
* Draw the histogram - without using .hist() - as the number of customer with 1 invoice, the number of customers with 2 invoices, ...
* Define two customers similarity based on the cosine similarity computed on the average payment time per day
    * a day with no invoice posted count as zero
    * for other days, compute the average payment timing using the due date as zero (10 days in advance means -10, 3 days after means +3)

# Cosine Similarity

In [None]:
#customerRDD = ...  {"CustomerId":"","postingDate":datetime(),"dueDate":datetime(),"paymentDate":datetime()}
def cosineSimilarityMapper(x):
  return {"customerId":x.get("customerId"),"dueDate":datetime.strptime(x.get("dueDate"),"%Y-%m-%d"),"paymentDate":datetime.strptime(x.get("paymentStats").get("lastDate"),"%Y-%m-%d"),"postingDate":datetime.strptime(x.get("postingDate"),"%Y-%m-%d")}
  
customerRDD = enrichedScalableJoinedRdd2.filter(lambda x: x.get("isOpen")==False).map(lambda x: cosineSimilarityMapper(x))
customerRDD.take(5)

In [None]:
#computing the average as the sum divided by the count avoid the need to create a big list and then use average() or no.avg()... to be in a form (row,col,value)
preparedCustomerRDD = customerRDD.map(lambda x: ((x.get("customerId"),x.get("postingDate")),(x.get("dueDate")-x.get("paymentDate"),1))).reduceByKey(lambda x,y: (x[0]+y[0],x[1]+y[1])).map(lambda x: {"customerId":x[0][0],"postingDate":x[0][1],"avgDelay":x[1][0]/x[1][1]})
preparedCustomerRDD.first()

In [None]:
#coming to a form (column,(row,value))
toBeJoinedPreparedCustomerRDD = preparedCustomerRDD.map(lambda x: (x.get("postingDate"),(x.get("customerId"),(x.get("avgDelay")))))
toBeJoinedPreparedCustomerRDD.first()

In [None]:
#let's take only half matrix... here we have (postingDateA,((customer01,avgDay_customer01ATpostingDateA),(customer02,avgDay_customer02ATpostingDateA)))
joinedPreparedCustomerRDD = toBeJoinedPreparedCustomerRDD.join(toBeJoinedPreparedCustomerRDD).filter(lambda x: x[1][0]>=x[1][1])
joinedPreparedCustomerRDD.first()

In [None]:
#but we need to compute for each customer the cosine similarity, so we move customers as key... (Customer01,Customer02),avgDay_customer01ATpostingDateA*avgDay_customer02ATpostingDateA ==> we ignore the given posting date, we already used it for the join...
toBeReducedRdd = joinedPreparedCustomerRDD.map(lambda x: ((x[1][0][0],x[1][1][0]),(x[1][0][1].days*x[1][1][1].days))).reduceByKey(lambda x,y: x+y)
toBeReducedRdd.first()

In [None]:
#computing the norm
normRDD = toBeJoinedPreparedCustomerRDD.map(lambda x: (x[1][0],pow(x[1][1].days,2))).reduceByKey(lambda x,y: x+y).map(lambda x: (x[0],pow(x[1],0.5)))
normMap = normRDD.collectAsMap()


In [None]:
normRDD.filter(lambda x: x[1]==0).count()

In [None]:
# toBeReducedRdd<== give us for a couple of customer the upper part of cosine similarity (Customer01,Customer02),SUM(avgDay_customer01ATpostingDateI*avgDay_customer02ATpostingDateI)
fullCustomerSimilarity = toBeReducedRdd.flatMap(lambda x: [{"Customer01":x[0][0],"Customer02":x[0][1],"upperScore":x[1]},{"Customer01":x[0][1],"Customer02":x[0][0],"upperScore":x[1]}]) #<== we re-full the matrix we halved before
fullCustomerSimilarity.first()

In [None]:
toBeReducedRdd.first()

In [None]:
def normVec(x,normMap):
  customer01 = x[0][0]
  customer02 = x[0][1]
  score = x[1]
  res = {"customer01":customer01,"customer02":customer02,"partialScore":score}
  norm01 = max([normMap.get(customer01),1])
  res["norm01"] = norm01
  norm02 = max([normMap.get(customer02),1])
  res["norm02"] = norm02
  res["normScore"] = score/(norm01*norm02)
  return res
  
normalisedFullCustomerSimilarity = toBeReducedRdd.map(lambda x: normVec(x,normMap))
normalisedFullCustomerSimilarity.first()

In [None]:
normalisedFullCustomerSimilarity.filter(lambda x: x.get("customer01")==x.get("customer02")).map(lambda x: x.get("normScore")).stats()

In [None]:
normalisedFullCustomerSimilarity.filter(lambda x: x.get("customer01")!=x.get("customer02")).map(lambda x: x.get("normScore")).stats()