# MicroHack SAP & Data AI - Setup Payment Generation
This notebook can be used to generate payments based upon extracted Sales Order Headers using CDS View `ZBD_ISALESDOC_E` and placed upon a Azure Data Lake.

The payments will be generated based upon the CustomerGroup and the custGroupSettingsDict. The following logic is used :

`paymentDate = BillingdocumentDate + PayOffset +/- random(PayOffsetVariance)`

The payments will be written to Azure Data Lake.

Initial Setup
* CustomerGroup Z1 Offset = 30 +/- 5 days
* CustomerGroup Z2 Offset = 70 +/- 10 days
* Other CustomerGroups : defaultOffset = 10 days

Assumptions :
* There is a 1-1 link between SalesOrderHeaders and Payments
* Payment Value = Sales Order Value (We assume the customers pay their bill)

## Extract SalesOrderHeaders- Synapse Pipeline Setup
The Sales Order Headers need to be exported as a CSV File with Header Lines.
In this pipeline json make sure to add paramaters `convertDateToDateTime` and `convertTimeToTimespan` in order to have dates converted to data format.

## Load Settings

In [1]:
%%pyspark

import pandas as pd
import numpy as np
import random
from datetime import date, datetime, timedelta

# Settings
# Payment Settings
OUTPUT_COLS =['PaymentNr', 'SalesOrderNr', 'CustomerNr', 'CustomerName', 'PaymentDate', 'Value']
startPaymentNr = 30000 #Initial nr for the Payment Number range
paymentTermDue = 60    #Offset for Payment Due date

# Payment Offset Settings
custGroupSettingsDict = {'CustomerGroup' : ['Z1', 'Z2'],
                         'PayOffset' : [30, 70], #Nr of days after billing date by when customer pays
                         'PayOffsetVariance' : [5, 10]}
custGroupSettingsDf = pd.DataFrame(custGroupSettingsDict)
custGroupSettingsDf.set_index("CustomerGroup", inplace=True)
defaultOffset = 10 #Paymentset off when CustomerGroup not found in custGroupSettingsDict

#File Settings & Paths
account_name = 'azdatalakegen2bdl' # fill in your datalake account name
container_name = 'microhack' # fill in your container name
relative_path_sales = 'exportSalesOrderHeaders/SalesOrderHeaders.csv' # fill in your relative folder path to the Sales OrderHeader CSV
relative_path_payments = 'importPayments/paymentsForCosmosDB' # fill in your relatuve folder path to the folder where to write the Payments
salesSeparator = "," #CSV separator for Sales OrderHeader csv input
paymentsSeparator = ";" #CSV separator for Payments csv output

adls_path_sales = 'abfss://%s@%s.dfs.core.windows.net/%s' % (container_name, account_name, relative_path_sales)
print('Path to Sales Orderheader csv file : ' + adls_path_sales)
adls_path_payments = 'abfss://%s@%s.dfs.core.windows.net/%s' % (container_name, account_name, relative_path_payments)
print('Directory to write Payments : ' + adls_path_payments)

#Print Settings
print(custGroupSettingsDf)


StatementMeta(SparkPool1, 14, 1, Finished, Available)

Path to Sales Orderheader csv file : abfss://microhack@azdatalakegen2bdl.dfs.core.windows.net/exportSalesOrderHeaders/SalesOrderHeaders.csv
Directory to write Payments : abfss://microhack@azdatalakegen2bdl.dfs.core.windows.net/importPayments/paymentsForCosmosDB
               PayOffset  PayOffsetVariance
CustomerGroup                              
Z1                    30                  5
Z2                    70                 10

## Load Sales Orders Headers

In [2]:
#Load the Sales Order Headers
sparkDfSalesOrderHeaders = spark.read.load(adls_path_sales, format='csv'
, delimiter = salesSeparator
, header=True
, inferSchema=True
)

# sparkDfSalesOrderHeaders.write.mode("overwrite").saveAsTable("default.SalesOrderHeaders")
print("Nr of Sales Order Headers : " + str(sparkDfSalesOrderHeaders.count()))
sparkDfSalesOrderHeaders.printSchema()

display(sparkDfSalesOrderHeaders.limit(10))

StatementMeta(SparkPool1, 14, 2, Finished, Available)

Nr of Sales Order Headers : 2987
root
 |-- MANDT: integer (nullable = true)
 |-- SALESDOCUMENT: long (nullable = true)
 |-- SDDOCUMENTCATEGORY: string (nullable = true)
 |-- SALESDOCUMENTTYPE: string (nullable = true)
 |-- SALESDOCUMENTPROCESSINGTYPE: string (nullable = true)
 |-- CREATIONDATE: timestamp (nullable = true)
 |-- CREATIONTIME: string (nullable = true)
 |-- LASTCHANGEDATE: timestamp (nullable = true)
 |-- LASTCHANGEDATETIME: double (nullable = true)
 |-- SALESORGANIZATION: integer (nullable = true)
 |-- DISTRIBUTIONCHANNEL: integer (nullable = true)
 |-- ORGANIZATIONDIVISION: integer (nullable = true)
 |-- SALESGROUP: integer (nullable = true)
 |-- SALESOFFICE: integer (nullable = true)
 |-- SOLDTOPARTY: string (nullable = true)
 |-- CUSTOMERNAME: string (nullable = true)
 |-- COUNTRY: string (nullable = true)
 |-- CITYNAME: string (nullable = true)
 |-- POSTALCODE: string (nullable = true)
 |-- CUSTOMERACCOUNTGROUP: string (nullable = true)
 |-- SALESDISTRICT: string (nul

SynapseWidget(Synapse.DataFrame, e5683464-8413-4b87-b771-49f40ef01a55)

## Generate Payments

In [3]:
#Loop over the SalesData and determine the payment data
paymentDf = pd.DataFrame(columns=OUTPUT_COLS)
paymentNr = startPaymentNr - 1
#Convert Spark DataFrame to Pandas - do we need this?
pdDfSalesOrderHeaders = sparkDfSalesOrderHeaders.toPandas()

#Loop over the SalesOrderHeaders to generate the payments
for index, salesHeader in pdDfSalesOrderHeaders.iterrows():
    paymentNr = paymentNr + 1
    salesOrderNr = salesHeader['SALESDOCUMENT']
    customerNr = salesHeader['SOLDTOPARTY']
    customerName = salesHeader['CUSTOMERNAME']
    customerGroup = salesHeader['CUSTOMERGROUP']
    paymentDueDate = salesHeader['BILLINGDOCUMENTDATE'] + timedelta(paymentTermDue)
    paymentValue = salesHeader['TOTALNETAMOUNT']
    
    if(customerGroup in ['Z1', 'Z2']):
        variance = custGroupSettingsDf.loc[customerGroup].PayOffsetVariance
        randomoffset = (random.randint(1, 2 * variance)) - variance
        offsetd = round(custGroupSettingsDf.loc[customerGroup].PayOffset + randomoffset, 0)
        #convert numpy.int64 to int to support timedelta
        offset = int(offsetd)
    else:
        offset = defaultOffset
    
    paymentDate = salesHeader['BILLINGDOCUMENTDATE'] + timedelta(offset)

    paymentDf=paymentDf.append({'PaymentNr' : paymentNr,
                                'SalesOrderNr' : salesOrderNr, 
                                'CustomerNr' : customerNr,
                                'CustomerName' : customerName, 
                                'CustomerGroup' : customerGroup,
                                'PaymentDate' : paymentDate,
                                'PaymentOffSet' : offset,
                                'PaymentDueDate' : paymentDueDate,
                                'Value' : paymentValue,
                                'variance' : variance,
                                'randomoffset' : randomoffset,
                                'offset' : offset,
                                'BillingDate' : salesHeader['BILLINGDOCUMENTDATE']
                               }, ignore_index=True)

print("Nr of Payments : " + str(len(paymentDf)))
paymentDf.head()

StatementMeta(SparkPool1, 14, 3, Finished, Available)

Nr of Payments : 2987
  PaymentNr SalesOrderNr  CustomerNr  ... offset randomoffset  variance
0     30000         2564    USCU_S13  ...   33.0          3.0       5.0
1     30001            4    USCU_L10  ...   78.0          8.0      10.0
2     30002         2565    USCU_L04  ...   80.0         10.0      10.0
3     30003         1859  0017100005  ...   10.0         10.0      10.0
4     30004         1862  0017100005  ...   10.0         10.0      10.0

[5 rows x 13 columns]

In [4]:
#Verify the Payments
display(paymentDf.groupby("CustomerGroup").describe())

StatementMeta(SparkPool1, 14, 4, Finished, Available)

SynapseWidget(Synapse.DataFrame, 0f7d458a-2940-46a6-ab7e-6275c5f59859)


  'JavaPackage' object is not callable
Attempting non-optimization as 'spark.sql.execution.arrow.fallback.enabled' is set to true.

## Write Payments to csv

In [5]:
#Select only necessary columns, drop the test columns like offset, randomoffset, variance, ...
paymentDf2 = paymentDf[OUTPUT_COLS]
display(paymentDf2.head(10))

StatementMeta(SparkPool1, 14, 5, Finished, Available)

SynapseWidget(Synapse.DataFrame, f5d0ffd6-b50e-4e3b-99f2-0a018fc1b6e9)

In [6]:
#Convert to Spark DataFrame
sparkDfPayments = spark.createDataFrame(paymentDf2)

#Write Payments to Spark Table
#sparkDfPayments.write.mode('overwrite').saveAsTable('sparkpayments')

#This will write csv files per partition
#sparkDfPayments.write.mode('overwrite').option("header", "true").option("delimiter", paymentsSeparator).csv('adls_path_payments')

sparkDfPayments.repartition(1).write.mode('overwrite').option("header", "true").option("delimiter", paymentsSeparator).csv(adls_path_payments)

StatementMeta(SparkPool1, 14, 6, Finished, Available)

