## This notebook prepares the raw eInvoice data for the Anomaly Detector 

### Load packages

In [ ]:
%%configure -f
{
"conf": {
    "spark.dynamicAllocation.disableIfMinMaxNotSpecified.enabled": true,
    "spark.dynamicAllocation.enabled": true,
    "spark.dynamicAllocation.minExecutors": 2,
    "spark.dynamicAllocation.maxExecutors": 40,
    "spark.rpc.message.maxSize": 1024
   }
}

In [ ]:
from pyspark.ml.feature import StringIndexerModel
from pyspark.sql.functions import row_number, lit, col, when, count, sum, countDistinct, desc, unix_timestamp, concat, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType, FloatType, ShortType
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from scipy import sparse
import numpy as np
import pandas as pd

In [ ]:
batch_id = ''
invoice_cleaned_path = ''
taxpayer_profile_cleaned_path = ''
edge_path = ''
page_rank_path = ''
transformed_data_path = ''
model_path = ''
time_slice_list = ''
depth_of_supply_chain_max_iter = ''

In [ ]:
time_slices = [i for i in time_slice_list.split(",")]

In [ ]:
# Initiate logging
import logging
from opencensus.ext.azure.log_exporter import AzureLogHandler
from opencensus.ext.azure.trace_exporter import AzureExporter
from opencensus.trace import config_integration
from opencensus.trace.samplers import AlwaysOnSampler
from opencensus.trace.tracer import Tracer

instrumentation_connection_string = mssparkutils.credentials.getSecretWithLS("keyvault", "AppInsightsConnectionString")
config_integration.trace_integrations(['logging'])


logger = logging.getLogger(__name__)
logger.addHandler(AzureLogHandler(connection_string=instrumentation_connection_string))
logger.setLevel(logging.INFO)

tracer = Tracer(
    exporter=AzureExporter(
        connection_string=instrumentation_connection_string
    ),
    sampler=AlwaysOnSampler()
)

# Spool parameters
run_time_parameters = {'custom_dimensions': {
    'batch_id': batch_id,
    'invoice_cleaned_path': invoice_cleaned_path,
    'taxpayer_profile_cleaned_path': taxpayer_profile_cleaned_path,
    'edge_path': edge_path,
    'page_rank_path': page_rank_path,
    'transformed_data_path': transformed_data_path,
    'notebook_name': mssparkutils.runtime.context['notebookname']
} }
  
logger.info(f"{mssparkutils.runtime.context['notebookname']}: INITIALISED", extra=run_time_parameters)

### Load the data from CSVs in Azure Data Lake Storage into Synapse

In [ ]:
with tracer.span('Load cleaned invoice files'):
    df = spark.read.parquet(invoice_cleaned_path,inferSchema=True, header=True)

In [ ]:
with tracer.span('Load page rank file'):
    df_pagerank = spark.read.parquet(page_rank_path,inferSchema=True, header=True)

In [ ]:
with tracer.span('Load taxpayer profile file'):
    df_profile = spark.read.parquet(taxpayer_profile_cleaned_path,inferSchema=True, header=True)
    df_profile = df_profile.withColumn('issuer_id', col('taxpayer_id')).drop('taxpayer_id')
    df_profile = df_profile.withColumn('issuer_id', df_profile['issuer_id'].cast(StringType()))
    df_profile = df_profile.withColumn('taxpayer_type', df_profile['taxpayer_type'].cast(StringType()))
    df_profile = df_profile.withColumn('fiscal_condition', df_profile['fiscal_condition'].cast(StringType()))
    df_profile = df_profile.withColumn('regime_name', df_profile['regime_name'].cast(StringType()))
    df_profile = df_profile.withColumn('taxpayer_size', df_profile['taxpayer_size'].cast(StringType()))
    df_profile = df_profile.withColumn('main_activity', df_profile['main_activity'].cast(StringType()))
    df_profile = df_profile.withColumn('sec1_activity', df_profile['sec1_activity'].cast(StringType()))
    df_profile = df_profile.withColumn('sec2_activity', df_profile['sec2_activity'].cast(StringType()))
    df_profile = df_profile.withColumn('employees_number', df_profile['employees_number'].cast(IntegerType()))
    df_profile = df_profile.withColumn('legal_reg_date', df_profile['legal_reg_date'].cast(DateType()))
    df_profile = df_profile.withColumn('tax_reg_date', df_profile['tax_reg_date'].cast(DateType()))
    df_profile = df_profile.withColumn('e_inv_enroll_date', df_profile['e_inv_enroll_date'].cast(DateType()))
    df_profile = df_profile.withColumn('reported_assets', df_profile['reported_assets'].cast(IntegerType()))
    df_profile = df_profile.withColumn('total_capital', df_profile['total_capital'].cast(FloatType()))
    df_profile = df_profile.withColumn('social_capital', df_profile['social_capital'].cast(FloatType()))
    df_profile = df_profile.withColumn('total_assets', df_profile['total_assets'].cast(FloatType()))
    df_profile = df_profile.withColumn('total_fixed_assets', df_profile['total_fixed_assets'].cast(FloatType()))
    df_profile = df_profile.withColumn('total_liabilities', df_profile['total_liabilities'].cast(FloatType()))
    df_profile = df_profile.withColumn('gross_income', df_profile['gross_income'].cast(FloatType()))
    df_profile = df_profile.withColumn('net_income', df_profile['net_income'].cast(FloatType()))
    df_profile = df_profile.withColumn('total_vat_sales', df_profile['total_vat_sales'].cast(FloatType()))
    df_profile = df_profile.withColumn('credited_einvoicing_value', df_profile['credited_einvoicing_value'].cast(FloatType()))
    df_profile = df_profile.withColumn('state', df_profile['state'].cast(StringType()))
    df_profile = df_profile.withColumn('municipality', df_profile['municipality'].cast(StringType()))
    df_profile = df_profile.withColumn('city', df_profile['city'].cast(StringType()))

In [ ]:
with tracer.span('Load edges file'):
    edges_trimmed_df = spark.read.parquet(edge_path,inferSchema=True, header=True)

In [ ]:
with tracer.span('Load StringIndexer model'):
    model = StringIndexerModel.load(model_path + '/' + '_feature_engineering_indexer_issuer_id.pkl')

### Transform data for input to Anomaly Detector

#### Add new columns

In [ ]:
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
for i in ['C','D','I','P','X']:
    df = df.withColumn("document_type_equals_{}".format(i), when(df.document_type == i, 1).otherwise(0))
df = df.withColumn("issuer_id_equals_receiver_id",when(col("issuer_id")==col("receiver_id"),1).otherwise(0))
df = df.withColumn("total_voucher_to_self",when(col("issuer_id")==col("receiver_id"),col("transformed_total_voucher")).otherwise(0))

#df = df.withColumn('issuer_id',col('issuer_id').cast("Integer"))
#df = df.withColumn('receiver_id',col('receiver_id').cast("Integer"))

df = df.withColumn('issued_date', to_timestamp(col('issued_date')))

df = df.withColumn('day_of_week', F.dayofweek(col('issued_date')))
df = df.withColumn('day_of_month', F.dayofmonth(col('issued_date')))
df = df.withColumn('day_of_year', F.dayofyear(col('issued_date')))
df = df.withColumn('week_of_year', F.weekofyear(col('issued_date')))
df = df.withColumn('month', F.month(col('issued_date')))
df = df.withColumn('quarter', F.quarter(col('issued_date')))
df = df.withColumn('year', F.year(col('issued_date')))
df = df.withColumn('hour', F.hour(col('issued_date')))
df = df.withColumn('week', F.date_format('issued_date', 'W'))
df = df.withColumn('day', F.dayofmonth(col('issued_date')))

#### Index issuer_id

In [ ]:
with tracer.span('Index dataset'):
    df = model.transform(df)
    df = df.withColumn('issuer_id_indexed',col('issuer_id_indexed').cast("Integer"))

#### Filter out events with year less than 1950

In [ ]:
df = df.where(F.year('issued_date') >= 1950)

#### Dataset wide features

In [ ]:
with tracer.span('Calculate features'):
    #First and last issued_date removed so that anomaly detector runs.
    df_additional = df.join(df_profile.select('issuer_id','tax_reg_date','e_inv_enroll_date'),on='issuer_id',how='leftouter')
    df_additional = df_additional.groupby('issuer_id').agg(
        F.min('total_voucher'),
        F.max('total_voucher'),
        F.min('total_tax'),
        F.max('total_tax'),
        F.sum('total_voucher'),
        #F.min('issued_date'),
        #F.max('issued_date'),
        F.datediff(F.max('issued_date'),F.min('issued_date')) + 1,
        F.datediff(F.min('issued_date'),F.first('tax_reg_date')) + 1,
        F.datediff(F.min('issued_date'),F.first('e_inv_enroll_date')) + 1
    )
    df_additional = df_additional.toDF(
        "issuer_id", "min_total_voucher_dataset", "max_total_voucher_dataset", 
        "min_total_tax_dataset","max_total_tax_dataset","total_voucher_dataset",
        #"first_issued_date_dataset","last_issued_date_dataset",
        "num_days_issuing_dataset","time_to_issue_reg", "time_to_issue_enroll")

    df_month = df.groupby('issuer_id','month','year').agg(
        F.lit('01'),
        F.countDistinct('receiver_id')
    )
    df_month = df_month.toDF('issuer_id','month','year','day','monthly_total_buyers')
    df_month = df_month.withColumn('date',F.concat_ws('-',col('year'),col('month'),col('day')).cast('date'))
    df_month = df_month.toDF('issuer_id','month','year','day','monthly_total_buyers','date')
    w1 = Window.orderBy('date')
    df_month = df_month.withColumn('end_date', F.coalesce(F.add_months(F.lead('date').over(w1),-1),'date'))
    df_month = df_month.toDF('issuer_id','month','year','day','monthly_total_buyers','date','end_date')
    df_month = df_month.selectExpr("issuer_id", """
        inline_outer(
            transform(
                sequence(0,int(months_between(end_date, date)+1)),
                i -> (add_months(date,i) as date, IF(i=0,monthly_total_buyers,0) as monthly_total_buyers)
            )
        )
    """)
    df_month = df_month.groupby('issuer_id').agg(
        F.sum('monthly_total_buyers'),
        F.count('monthly_total_buyers')
    )
    df_month = df_month.toDF("issuer_id","average_monthly_total_buyers","denominator")
    df_month = df_month.withColumn("average_monthly_total_buyers",col('average_monthly_total_buyers')/col('denominator')).drop('denominator')

    all_issuers = df.groupby('issuer_id').count()

    df_month_r = df.groupby('receiver_id','month','year').agg(
        F.lit('01'),
        F.countDistinct('issuer_id')
    )

    df_month_r = df_month_r.toDF('issuer_id','month','year','day','monthly_total_suppliers')
    df_month_r = df_month_r.withColumn('date',F.concat_ws('-',col('year'),col('month'),col('day')).cast('date'))
    df_month_r = df_month_r.join(all_issuers,on=['issuer_id'],how='inner').drop('count')
    w1 = Window.orderBy('date')
    df_month_r = df_month_r.withColumn('end_date', F.coalesce(F.add_months(F.lead('date').over(w1),-1),'date'))
    df_month_r = df_month_r.toDF('issuer_id','month','year','day','monthly_total_suppliers','date','end_date')
    df_month_r = df_month_r.selectExpr("issuer_id", """
        inline_outer(
            transform(
                sequence(0,int(months_between(end_date, date))),
                i -> (add_months(date,i) as date, IF(i=0,monthly_total_suppliers,0) as monthly_total_suppliers)
            )
        )
    """)
    df_month_r = df_month_r.groupby('issuer_id').agg(
        F.sum('monthly_total_suppliers'),
        F.count('monthly_total_suppliers')
    )
    df_month_r = df_month_r.toDF("issuer_id","average_monthly_total_suppliers","denominator")
    df_month_r = df_month_r.withColumn("average_monthly_total_suppliers",col('average_monthly_total_suppliers')/col('denominator')).drop('denominator')
    df_month = df_month.join(df_month_r,on=['issuer_id'],how='outer').fillna(0)
    df_additional = df_additional.join(df_month,on='issuer_id',how='leftouter')

    df_month_voucher = df.groupby('issuer_id','month','year').agg(
        F.lit('01'),
        F.sum('transformed_total_voucher')
    )
    df_month_voucher = df_month_voucher.toDF('issuer_id','month','year','day','monthly_total_voucher',)
    df_month_voucher = df_month_voucher.withColumn('date',F.concat_ws('-',col('year'),col('month'),col('day')).cast('date'))
    df_month_voucher = df_month_voucher.toDF('issuer_id','month','year','day','monthly_total_voucher','date')
    w1 = Window.orderBy('date')
    df_month_voucher = df_month_voucher.withColumn('end_date', F.coalesce(F.add_months(F.lead('date').over(w1),-1),'date'))
    df_month_voucher = df_month_voucher.toDF('issuer_id','month','year','day','monthly_total_voucher','date','end_date')
    df_month_voucher = df_month_voucher.selectExpr("issuer_id", """
        inline_outer(
            transform(
                sequence(0,int(months_between(end_date, date)+1)),
                i -> (add_months(date,i) as date, IF(i=0,monthly_total_voucher,0) as monthly_total_voucher)
            )
        )
    """)
    df_month_voucher = df_month_voucher.groupby('issuer_id').agg(
        F.sum('monthly_total_voucher'),
        F.count('monthly_total_voucher')
    )
    df_month_voucher = df_month_voucher.toDF("issuer_id","average_monthly_total_voucher","denominator")
    df_month_voucher = df_month_voucher.withColumn("average_monthly_total_voucher",col('average_monthly_total_voucher')/col('denominator')).drop('denominator')
    df_additional = df_additional.join(df_month_voucher,on='issuer_id',how='leftouter')

    df_month_tax = df.groupby('issuer_id','month','year').agg(
        F.lit('01'),
        F.sum('transformed_total_tax')
    )
    df_month_tax = df_month_tax.toDF('issuer_id','month','year','day','monthly_total_tax',)
    df_month_tax = df_month_tax.withColumn('date',F.concat_ws('-',col('year'),col('month'),col('day')).cast('date'))
    df_month_tax = df_month_tax.toDF('issuer_id','month','year','day','monthly_total_tax','date')
    w1 = Window.orderBy('date')
    df_month_tax = df_month_tax.withColumn('end_date', F.coalesce(F.add_months(F.lead('date').over(w1),-1),'date'))
    df_month_tax = df_month_tax.toDF('issuer_id','month','year','day','monthly_total_tax','date','end_date')
    df_month_tax = df_month_tax.selectExpr("issuer_id", """
        inline_outer(
            transform(
                sequence(0,int(months_between(end_date, date)+1)),
                i -> (add_months(date,i) as date, IF(i=0,monthly_total_tax,0) as monthly_total_tax)
            )
        )
    """)
    df_month_tax = df_month_tax.groupby('issuer_id').agg(
        F.sum('monthly_total_tax'),
        F.count('monthly_total_tax')
    )
    df_month_tax = df_month_tax.toDF("issuer_id","average_monthly_total_tax","denominator")
    df_month_tax = df_month_tax.withColumn("average_monthly_total_tax",col('average_monthly_total_tax')/col('denominator')).drop('denominator')
    df_additional = df_additional.join(df_month_tax,on='issuer_id',how='leftouter')

    df_day_voucher = df.groupby('issuer_id','month','year','day').agg(
        F.sum('transformed_total_voucher')
    )
    df_day_voucher = df_day_voucher.toDF('issuer_id','month','year','day','daily_total_voucher')
    df_day_voucher = df_day_voucher.withColumn('date',F.concat_ws('-',col('year'),col('month'),col('day')).cast('date'))
    df_day_voucher = df_day_voucher.toDF('issuer_id','month','year','day','daily_total_voucher','date')
    w1 = Window.orderBy('date')
    df_day_voucher = df_day_voucher.withColumn('end_date', F.coalesce(F.add_months(F.lead('date').over(w1),-1),'date'))
    df_day_voucher = df_day_voucher.toDF('issuer_id','month','year','day','daily_total_voucher','date','end_date')
    df_day_voucher = df_day_voucher.selectExpr("issuer_id", """
        inline_outer(
            transform(
                sequence(0,int(datediff(end_date, date)+1)),
                i -> (date_add(date,i) as date, IF(i=0,daily_total_voucher,0) as daily_total_voucher)
            )
        )
    """)
    df_day_voucher = df_day_voucher.groupby('issuer_id').agg(
        F.sum('daily_total_voucher'),
        F.count('daily_total_voucher')
    )
    df_day_voucher = df_day_voucher.toDF("issuer_id","average_daily_total_voucher","denominator")
    df_day_voucher = df_day_voucher.withColumn("average_daily_total_voucher",col('average_daily_total_voucher')/col('denominator')).drop('denominator')
    df_additional = df_additional.join(df_day_voucher,on='issuer_id',how='leftouter')

    df_day_tax = df.groupby('issuer_id','month','year','day').agg(
        F.sum('transformed_total_tax')
    )
    df_day_tax = df_day_tax.toDF('issuer_id','month','year','day','daily_total_tax')
    df_day_tax = df_day_tax.withColumn('date',F.concat_ws('-',col('year'),col('month'),col('day')).cast('date'))
    df_day_tax = df_day_tax.toDF('issuer_id','month','year','day','daily_total_tax','date')
    w1 = Window.orderBy('date')
    df_day_tax = df_day_tax.withColumn('end_date', F.coalesce(F.add_months(F.lead('date').over(w1),-1),'date'))
    df_day_tax = df_day_tax.toDF('issuer_id','month','year','day','daily_total_tax','date','end_date')
    df_day_tax = df_day_tax.selectExpr("issuer_id", """
        inline_outer(
            transform(
                sequence(0,int(datediff(end_date, date)+1)),
                i -> (date_add(date,i) as date, IF(i=0,daily_total_tax,0) as daily_total_tax)
            )
        )
    """)
    df_day_tax = df_day_tax.groupby('issuer_id').agg(
        F.sum('daily_total_tax'),
        F.count('daily_total_tax')
    )
    df_day_tax = df_day_tax.toDF("issuer_id","average_daily_total_tax","denominator")
    df_day_tax = df_day_tax.withColumn("average_daily_total_tax",col('average_daily_total_tax')/col('denominator')).drop('denominator')
    df_additional = df_additional.join(df_day_tax,on='issuer_id',how='leftouter')

    df_month12_week4 = df.where((df.week==4) & (df.month==12)).groupby('issuer_id').agg(
        F.sum('transformed_total_voucher')
    ).toDF('issuer_id','week4_month12_total_voucher')
    df_month12_week4_r = df.groupby('receiver_id').agg(
        F.sum('transformed_total_voucher')
    ).toDF('issuer_id','week4_month12_total_purchase')
    df_month12_week4_r = df_month12_week4_r.join(all_issuers,on=['issuer_id'],how='inner').drop('count')
    df_month12_week4 = df_month12_week4.join(df_month12_week4_r,on='issuer_id',how='outer').fillna(0)
    df_additional = df_additional.join(df_month12_week4, on='issuer_id',how='leftouter').fillna(0)

In [ ]:
#Code to create additional features, but these features are at transaction level and so are not included yet
'''
df_dd = df.join(df_profile.select("issuer_id","main_activity","sec1_activity","sec2_activity"), on='issuer_id',how="leftouter")
df_r = df_profile.select("issuer_id","main_activity","sec1_activity","sec2_activity").toDF("receiver_id","main_activity_r","sec1_activity_r","sec2_activity_r")
df_dd = df_dd.join(df_r,on="receiver_id",how="leftouter")
df_dd = df_dd.withColumn('DD_code',col('activity_issuer').substr(1,2))
df_dd = df_dd.withColumn('DD_code_main',col('main_activity').substr(1,2))
df_dd = df_dd.withColumn('DD_code_sec1',col('sec1_activity').substr(1,2))
df_dd = df_dd.withColumn('DD_code_sec2',col('sec2_activity').substr(1,2))
df_dd = df_dd.withColumn('DD_code_main_r',col('main_activity_r').substr(1,2))
df_dd = df_dd.withColumn('DD_code_sec1_r',col('sec1_activity_r').substr(1,2))
df_dd = df_dd.withColumn('DD_code_sec2_r',col('sec2_activity_r').substr(1,2))
df_dd = df_dd.withColumn("activity_variation",when((col("DD_code")==col("DD_code_main_r")) | (col("DD_code")==col("DD_code_sec1_r")) | (col("DD_code")==col("DD_code_sec2_r")) ,0).otherwise(1))
df_dd = df_dd.withColumn("self_activity_variation",when((col("DD_code")==col("DD_code_main")) | (col("DD_code")==col("DD_code_sec1")) | (col("DD_code")==col("DD_code_sec2")) ,0).otherwise(1))
df_dd = df_dd.select("issuer_id","receiver_id","issued_date","DD_code","activity_variation","self_activity_variation")
'''

#### Run groupby / aggregate

In [ ]:
#time aggregations
aggregation_dict = {
    'by_hour':['hour','day_of_month','month','year'],
    'by_day':['day_of_month','month','year'],
    'by_week':['week_of_year','year'],
    'by_month':['month','year'],
    'by_quarter':['quarter','year'],
    'by_year':['year']
}

In [ ]:
transformed_dfs = {}
for key in time_slices:
    issuer_groupby_item = ['issuer_id'] + aggregation_dict[key]
    transformed_data_issuer = df.groupby(*issuer_groupby_item).agg(
        F.first('issued_date'),
        F.count('document_type'),
        F.countDistinct('receiver_id'),
        F.sum('document_type_equals_C'),
        F.sum('document_type_equals_D'),
        F.sum('document_type_equals_I'),
        F.sum('document_type_equals_P'),
        F.sum('document_type_equals_X'),
        F.sum('issuer_id_equals_receiver_id'),
        F.sum('total_voucher_to_self'),
        F.sum('transformed_total_taxable_services'),
        F.sum('transformed_total_non_taxable_services'),
        F.sum('transformed_total_taxable_goods'),
        F.sum('transformed_total_non_taxable_goods'),
        F.sum('transformed_total_taxable'),
        F.sum('transformed_total_non_taxable'),
        F.sum('transformed_total_sales'),
        F.sum('transformed_total_discounts'),
        F.sum('transformed_total_voucher'),
        F.sum('transformed_total_tax'),
        F.min('transformed_total_voucher'),
        F.max('transformed_total_voucher'),
        F.min('transformed_total_tax'),
        F.max('transformed_total_tax'),
        #F.min('issued_date'),
        #F.max('issued_date'),
        F.datediff(F.max('issued_date'),F.min('issued_date')) + 1,
        F.first('issuer_id_indexed')
    )
    
    all_issuers = df.groupby('issuer_id').count()

    receiver_groupby_item = ['receiver_id'] + aggregation_dict[key]

    transformed_data_receiver = df.groupby(*receiver_groupby_item).agg(
        F.first('issued_date'),
        count('document_type'),
        countDistinct('issuer_id'),
        sum('transformed_total_voucher')
    )

    #I = Invoice, D = Debit Note, C = Credit Note, O = Order, P = Purchase, G = Goods certificate, T = Tender, X: Export Invoice

    #total_invoices: where document_type==I
    #total_credit_notes: where document_type==C
    #total_debit_notes: where document_type==D
    #total_export_invoice: where document_type==X

    new_col_names_issuer = ['issuer_id']
    new_col_names_issuer +=  aggregation_dict[key]
    new_col_names_issuer += [
        'issued_date',
        'number_of_transactions', 'total_buyers',
        'total_credit_notes', 'total_debit_notes',
        'total_invoices', 'total_purchase_invoices',
        'total_export_invoices', 'number_of_transactions_to_self', 'total_voucher_to_self',
        'total_taxable_services', 'total_non_taxable_services',
        'total_taxable_goods', 'total_non_taxable_goods',
        'total_taxable', 'total_non_taxable',
        'total_sales', 'total_discounts',
        'total_voucher','total_tax',
        'min_total_voucher_period','max_total_voucher_period',
        'min_total_tax_period', 'max_total_tax_period',
        #'first_issued_date_period','last_issued_date_period',
        'num_days_issuing_period','issuer_id_indexed'
    ]

    new_col_names_receiver = ['issuer_id']
    new_col_names_receiver +=  aggregation_dict[key]
    new_col_names_receiver += [
        'issued_date_r',
        'number_of_purchases', 'total_suppliers',
        'total_purchases'
    ]

    transformed_data_issuer = transformed_data_issuer.toDF(*new_col_names_issuer)
    transformed_data_receiver = transformed_data_receiver.toDF(*new_col_names_receiver)
    transformed_data_receiver_trimmed = transformed_data_receiver.join(all_issuers, on=['issuer_id'],how='inner').drop('count')
    transformed_data_receiver_trimmed = model.transform(transformed_data_receiver_trimmed)
    transformed_data_receiver_trimmed = transformed_data_receiver_trimmed.withColumn('receiver_id_indexed',col('issuer_id_indexed').cast("Integer")).drop("issuer_id_indexed")
    transformed_data = transformed_data_issuer.join(transformed_data_receiver_trimmed, on=['issuer_id']+aggregation_dict[key],how='outer')
    transformed_data = transformed_data.withColumn('issuer_id_indexed', when(F.isnull('issuer_id_indexed'), col('receiver_id_indexed')).otherwise(col('issuer_id_indexed'))).drop('receiver_id_indexed')
    transformed_data = transformed_data.withColumn('issuer_id',col('issuer_id').cast("String"))
    transformed_data = transformed_data.withColumn('issued_date',F.coalesce(col('issued_date'),col('issued_date_r')))
    transformed_data = transformed_data.drop(*aggregation_dict[key]).drop("issued_date_r")
    transformed_data = transformed_data.fillna(0)
    transformed_data = transformed_data.join(df_pagerank,on=['issuer_id_indexed'],how='outer').fillna(0,'pagerank_score')
    transformed_data = transformed_data.join(df_additional,on=['issuer_id'],how='leftouter')
    transformed_dfs[key] = transformed_data.join(df_profile,on=['issuer_id'],how='leftouter').fillna(0,'employees_number')


### Calculate Ratios

In [ ]:
for key in transformed_dfs:
    transformed_data = transformed_dfs[key]
    transformed_data = transformed_data.withColumn("ratio_sales_purchases", when(col('total_purchases')>0, col('total_sales')/col('total_purchases')).otherwise(col('total_sales')))
    transformed_data = transformed_data.withColumn("ratio_tax_sales", when(col('total_sales')>0, col('total_tax')/col('total_sales')).otherwise(col('total_tax')))
    transformed_data = transformed_data.withColumn("ratio_sales_employees", when(col('employees_number')>0, col('total_sales')/col('employees_number')).otherwise(col('total_sales')/0.1))
    transformed_data = transformed_data.withColumn("ratio_buyers_suppliers", when(col('total_suppliers')>0, col('total_buyers')/col('total_suppliers')).otherwise(col('total_buyers')))
    transformed_data = transformed_data.withColumn("ratio_sales_capital", when(col('social_capital')>0, col('total_voucher_dataset')/col('social_capital')).otherwise(col('total_voucher_dataset')))
    transformed_dfs[key] = transformed_data.withColumn("ratio_in_out", when(col('number_of_purchases')>0, col('number_of_transactions')/col('number_of_purchases')).otherwise(col('number_of_transactions')))

### Economic Activity

In [ ]:
df2 = df.groupby('issuer_id','activity_issuer').agg(sum('total_voucher'))
w2 = Window.partitionBy("issuer_id").orderBy(desc("sum(total_voucher)"))
df3 = df2.withColumn("row",row_number().over(w2)).filter(col("row") == 1).drop('row').toDF("issuer_id","act01","total_voucher_act01")
df3 = df3.join(df2.withColumn("row",row_number().over(w2)).filter(col("row") == 2).drop('row').toDF("issuer_id","act02","total_voucher_act02"),on=['issuer_id'],how='outer').fillna(0)
df3 = df3.join(df2.withColumn("row",row_number().over(w2)).filter(col("row") == 3).drop('row').toDF("issuer_id","act03","total_voucher_act03"),on=['issuer_id'],how='outer').fillna(0)
df3 = df3.join(df2.withColumn("row",row_number().over(w2)).filter(col("row") == 4).drop('row').toDF("issuer_id","act04","total_voucher_act04"),on=['issuer_id'],how='outer').fillna(0)
df3 = df3.join(df2.withColumn("row",row_number().over(w2)).filter(col("row") == 5).drop('row').toDF("issuer_id","act05","total_voucher_act05"),on=['issuer_id'],how='outer').fillna(0)
df3 = df3.na.fill('0')

In [ ]:
for key in transformed_dfs:
    transformed_dfs[key] = transformed_dfs[key].join(df3,on=['issuer_id'])

### In-Memory Depth of Supply Chain Calculation/Network Analysis

In [ ]:
node_child_df = edges_trimmed_df.groupby("issuer_id_indexed").agg(F.collect_list("receiver_id_indexed")).toPandas()
node_child_df.columns = ['parent', 'child']
logger.info(f'node_child_df.shape: {node_child_df.shape}')

In [ ]:
#DFS cycle removal algorithm
def find_edges_to_remove(df):
    edges_to_remove = []
    visited = {}
    for node in df.parent:
        if node not in visited:
            visited[node] = 0
        if visited[node] == 0:
            stack = [(node,0)]
            visited[node] = 1
            while len(stack) > 0:
                t, op = stack.pop()
                if op == 0:
                    stack.append((t,1))
                    #Exception handling for case where issuer has no receivers in trimmed data (final retailer)
                    try:
                        children = df[df.parent == t].child.tolist()[0]
                    except:
                        children = []
                    for child in children:
                        if child not in visited:
                            visited[child] = 0
                        if visited[child] == 1:
                            edges_to_remove.append((int(t),int(child)))
                        else:
                            if visited[child] == 0:
                                stack.append((child,0))
                                visited[child] = 1
                else:
                    visited[t] = 2
    return edges_to_remove

In [ ]:
edges_to_remove = find_edges_to_remove(node_child_df)
del node_child_df

schema = StructType([
    StructField("issuer_id_indexed", IntegerType(), True),
    StructField("receiver_id_indexed", IntegerType(), True)
])
remove_df = spark.createDataFrame(edges_to_remove, schema)
del edges_to_remove

In [ ]:
edges_dag = edges_trimmed_df.join(remove_df, on=["issuer_id_indexed","receiver_id_indexed"],how='leftanti').select(["issuer_id_indexed","receiver_id_indexed"]).toDF("I","R")
edges_dag = edges_dag.withColumn("V",F.lit(1))

In [ ]:
def calculate_left_sc_length_pyspark_max(df, max_iter=2):
    '''
    Expected columns are I, R, and V. V starts out at 1.
    
    Returns:
        pyspark dataframe with columns node, sc_left. columns that are zero-length are left out, but can be recovered via a join and fillna
    '''
    left_matrix = df
    i = 0
    while True:
        i += 1
        if i > max_iter: break
        left_vector = left_matrix.groupBy('R').agg(F.max('V')+1).toDF('R_new','V_new')
        left_matrix = df.join(left_vector,df.I==left_vector.R_new,how="leftouter").fillna(0)
        left_matrix = left_matrix.withColumn('V',F.when(F.col('V_new')==0,F.col('V')).otherwise(F.col('V_new'))).drop("R_new").drop("V_new")
    sample_left_length = left_matrix.groupBy('R').agg(F.max('V')).toDF('node','sc_left')
    return sample_left_length

def calculate_right_sc_length_pyspark_max(df, max_iter=2):
    '''
    Expected columns are I, R, and V. V starts out at 1.

    Returns:
        pyspark dataframe with columns node, sc_right. columns that are zero-length are left out, but can be recovered via a join and fillna
    '''
    right_matrix = df
    i = 0
    while True:
        i += 1
        if i > max_iter: break
        right_vector = right_matrix.groupBy('I').agg(F.max('V')+1).toDF('I_new','V_new')
        right_matrix = df.join(right_vector,df.R==right_vector.I_new,how="leftouter").fillna(0)
        right_matrix = right_matrix.withColumn('V',F.when(F.col('V_new')==0,F.col('V')).otherwise(F.col('V_new'))).drop("I_new").drop("V_new")
    sample_right_length = right_matrix.groupBy('I').agg(F.max('V')).toDF('node','sc_right')
    return sample_right_length

def calculate_left_sc_length_pyspark_min(df, max_iter=2):
    '''
    Expected columns are I, R, and V. V starts out at 1.
    
    Returns:
        pyspark dataframe with columns node, sc_left. columns that are zero-length are left out, but can be recovered via a join and fillna
    '''
    left_matrix = df
    i = 0
    while True:
        i += 1
        if i > max_iter: break
        left_vector = left_matrix.groupBy('R').agg(F.min('V')+1).toDF('R_new','V_new')
        left_matrix = df.join(left_vector,df.I==left_vector.R_new,how="leftouter").fillna(0)
        left_matrix = left_matrix.withColumn('V',F.when(F.col('V_new')==0,F.col('V')).otherwise(F.col('V_new'))).drop("R_new").drop("V_new")
    sample_left_length = left_matrix.groupBy('R').agg(F.min('V')).toDF('node','sc_left')
    return sample_left_length

def calculate_right_sc_length_pyspark_min(df, max_iter=2):
    '''
    Expected columns are I, R, and V. V starts out at 1.

    Returns:
        pyspark dataframe with columns node, sc_right. columns that are zero-length are left out, but can be recovered via a join and fillna
    '''
    right_matrix = df
    i = 0
    while True:
        i += 1
        if i > max_iter: break
        right_vector = right_matrix.groupBy('I').agg(F.min('V')+1).toDF('I_new','V_new')
        right_matrix = df.join(right_vector,df.R==right_vector.I_new,how="leftouter").fillna(0)
        right_matrix = right_matrix.withColumn('V',F.when(F.col('V_new')==0,F.col('V')).otherwise(F.col('V_new'))).drop("I_new").drop("V_new")
    sample_right_length = right_matrix.groupBy('I').agg(F.min('V')).toDF('node','sc_right')
    return sample_right_length

def calculate_sc_length(edges_df, max_iter=-1):
    sc_left_min = calculate_left_sc_length_pyspark_min(edges_df,max_iter)
    sc_right_min = calculate_right_sc_length_pyspark_min(edges_df,max_iter)
    sc_min = sc_left_min.join(sc_right_min,'node','outer').fillna(0)
    sc_min = sc_min.withColumn('length',F.col('sc_left')+F.col('sc_right'))
    sc_min = sc_min.withColumn('frac',F.col("sc_left")/F.col('length'))
    sc_min = sc_min.toDF('issuer_id_indexed','min_distance_from_supplier','min_distance_from_customer','min_depth_of_supply_chain','min_place_in_supply_chain')
    sc_left_max = calculate_left_sc_length_pyspark_max(edges_df,max_iter)
    sc_right_max = calculate_right_sc_length_pyspark_max(edges_df,max_iter)
    sc_max = sc_left_max.join(sc_right_max,'node','outer').fillna(0)
    sc_max = sc_max.withColumn('length',F.col('sc_left')+F.col('sc_right'))
    sc_max = sc_max.withColumn('frac',F.col("sc_left")/F.col('length'))
    sc_max = sc_max.toDF('issuer_id_indexed','max_distance_from_supplier','max_distance_from_customer','max_depth_of_supply_chain','max_place_in_supply_chain')
    return sc_max.join(sc_min,'issuer_id_indexed')

In [ ]:
nodes = calculate_sc_length(edges_dag, max_iter=depth_of_supply_chain_max_iter)

In [ ]:
for key in transformed_dfs:
    transformed_dfs[key] = transformed_dfs[key].join(nodes,on=['issuer_id_indexed'],how='leftouter')
    transformed_dfs[key] = transformed_dfs[key].fillna(
        value=0,
        subset=['min_distance_from_supplier','min_distance_from_customer','min_depth_of_supply_chain','min_place_in_supply_chain',
                'max_distance_from_supplier','max_distance_from_customer','max_depth_of_supply_chain','max_place_in_supply_chain']
    )

### Run job and save output to container

In [ ]:
for key in transformed_dfs:
    with tracer.span(f'Saving features for {key} aggregation to ADLS'):
        transformed_data = transformed_dfs[key]
        transformed_data.write.mode("overwrite").option("header", "true").save(transformed_data_path + '/' + key,format='parquet')

### Create external tables

In [ ]:
# serverless SQL config
import pyodbc
database = 'eiad'
driver= '{ODBC Driver 17 for SQL Server}'

sql_user_name = mssparkutils.credentials.getSecretWithLS("keyvault", "SynapseSQLUserName")
sql_user_pwd = mssparkutils.credentials.getSecretWithLS("keyvault", "SynapseSQLPassword")
serverless_sql_endpoint = mssparkutils.credentials.getSecretWithLS("keyvault", "SyanpseServerlessSQLEndpoint")

In [ ]:
def generate_schema_string(dataframe):
    schema_string = ""
    for name in dataframe.schema.fieldNames():
        schema_string += "[" + name + "] "
        datatype = str(dataframe.schema[name].dataType.simpleString())
        if datatype == 'double': datatype = 'float'
        if datatype == 'string': datatype = 'nvarchar(MAX)'
        if datatype == 'timestamp': datatype = 'datetime2(7)'
        schema_string += datatype + ", "
    return schema_string[:-2]

In [ ]:
for key in transformed_dfs:
    with tracer.span(f'Creating SQL table for features of agregation: {key} '):
        path = transformed_data_path + '/' + key
        table_name = path.split('/')[3] + '_' + path.split('/')[2].split('@')[0] + '_' + path.split('/')[4] + '_' + key
        schema_string = generate_schema_string(transformed_dfs[key])
        drop_table_command = f"DROP EXTERNAL TABLE [{table_name}]"
        location = "/".join([i for idx, i in enumerate(path.split('/')) if idx > 2])
        df_sql_command = f"CREATE EXTERNAL TABLE [{table_name}] ({schema_string}) WITH (LOCATION = '{location}/**', DATA_SOURCE = [output_<<STORAGE_ACCOUNT_NAME>>_dfs_core_windows_net], FILE_FORMAT = [SynapseParquetFormat])"
        with pyodbc.connect('DRIVER='+driver+';SERVER=tcp:'+serverless_sql_endpoint+';PORT=1433;DATABASE='+database+';UID='+sql_user_name+';PWD='+ sql_user_pwd) as conn:
            with conn.cursor() as cursor:
                try:
                    cursor.execute(drop_table_command)
                except:
                    pass
                cursor.execute(df_sql_command)