##Connect to mkldsadlsedldev001 for adls gen2 storage account

In [2]:
class ADLSGen2Mount:
  """
  Summary:
  This class can be used to create a databricks mount point for a ADLS Gen2 Storage Container
  
  Parameters:
  mount_point = "/mnt/adlsgen2" # location for the new mount point
  scope_name = "KeyVault_ADLSGEN2" # azure key vault backed databricks secret scope name
  app_reg_directory_id = "https://login.microsoftonline.com/28e5a145-70c6-44e3-ba17-7b09d54fe531/oauth2/token" # azure active directory app registration "Directory ID" under properties folder
  storage_account = "abfss://mkldsadlsedlfsdev001@mkldsadlsedldev001.dfs.core.windows.net/" # mkldsadlsedlfsdev001 is the File System name on the mkldsadlsedlfsdev001 storage account
  """
  def __init__(self, mount_point, scope_name, app_reg_directory_id, storage_account):
    self.mount_point = mount_point
    self.scope_name = scope_name
    self.app_reg_directory_id = app_reg_directory_id
    self.storage_account = storage_account
    
    # this is accessing username and password that were added to a secret scope in databricks via the CLI
    # the secrets point to and AD App Registration that has owner permissions of the ADLS2
    self.username = dbutils.secrets.get(scope = self.scope_name, key = "dbx-adls-username-workshop")
    self.password = dbutils.secrets.get(scope = self.scope_name, key = "dbx-adls-password-workshop")
  
  def mount(self):
    """
    This method creates a databricks mount point for a ADLS Gen2 Storage Container
    """
    # this is accessing username and password that were added to a secret scope in databricks via the CLI
    # the secrets point to and AD App Registration that has owner permissions of the ADLS2
    # username = dbutils.secrets.get(scope = self.scope_name, key = "username")
    # password = dbutils.secrets.get(scope = self.scope_name, key = "password")

    # check for existance of source mount point and add the mount if it does not exist
    if not self.mount_point in [m.mountPoint for m in dbutils.fs.mounts()]:
      configs = {"fs.azure.account.auth.type": "OAuth",
                 "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
                 "fs.azure.account.oauth2.client.id": self.username, # Application ID from azure active directory app registration
                 "fs.azure.account.oauth2.client.secret": self.password,
                 "fs.azure.account.oauth2.client.endpoint": self.app_reg_directory_id # azure active directory app registration "Directory ID" under properties folder
                }

      dbutils.fs.mount(
        source = self.storage_account,
        mount_point = self.mount_point,
        extra_configs = configs)
      return("{} mounted".format(self.mount_point))
    
    else:
      return("{} already mounted".format(self.mount_point))

In [3]:
adls_mount_point = "/mnt/dbxworkshop" # location for the new mount point
scope_name = "dbx-workshop2" # azure key vault backed databricks secret scope name
app_reg_directory_id = "https://login.microsoftonline.com/28e5a145-70c6-44e3-ba17-7b09d54fe531/oauth2/token" # azure active directory app registration "Directory ID" under properties folder
storage_account = "abfss://mkldsadlsedlfsdevdbxworkshop@mkldsadlsedldev001.dfs.core.windows.net/" # mkldsadlsedlfsdevdbxworkshop is the File System name on the mkldsadlsedlfsdev001 storage account

mount = ADLSGen2Mount(adls_mount_point, scope_name, app_reg_directory_id, storage_account)
result = mount.mount()
#print(result)

In [4]:
#dbutils.fs.mounts()

In [5]:
from pyspark.sql.types import *

class Hydrator:
  """
  Summary:
  This class can be used to create objects for Azure SQL Datawarehouse table hydration
  
  Parameters:
  data_file_path = "dbfs:/mnt/jasonsadlsgen2/Refined_ADW/Historical_Claim_data_2019_09_12.txt" # location of data file to be hydrated
  format_file_path = "dbfs:/mnt/jasonsadlsgen2/Refined_ADW/Historical_Claim_fmt_2019_09_12.json" # location of format file to be used for hydration
  schema_name = "dbo" # name of the schema you want to hydrate the table too Azure SQL Datawarehouse
  table_name = "testing_data" # name of the table you want to hydrate too Azure SQL Datawarehouse
  """
  def __init__(self, data_file_path, format_file_path, schema_name, table_name):
    self.data_file_path = data_file_path
    self.format_file_path = format_file_path
    self.schema_name = schema_name
    self.table_name = table_name
    
  def hydrate(self):
    """
    This method hydrates an Azure SQL Datawarehouse table 
    """
      
    data_file_df = self.create_data_dataframe()
    create_table_statement = self.format_sql_create_statement()
    
    self.write_to_sql_dw(data_file_df, create_table_statement)
    
    print("{0}.{1} table has been hydrated".format(self.schema_name, self.table_name))
    
  def create_data_dataframe(self):
    """
    This method creates a databricks dataframe from a cedilla delimited .txt data file
    """
    schema_struct=self.create_schema_struct_type()
    data_file_df = spark.read.\
    format("com.databricks.spark.csv").\
    option("charset", "ISO-8859-1").\
    option("header", "false").\
    option("delimiter", "Ç").\
    option("mode", "PERMISSIVE").\
    option("columnNameOfCorruptRecord", "_corrupt_record").\
    schema(schema_struct).\
    load(self.data_file_path)
    
    return(data_file_df)
        
  def create_format_dataframe(self):
    """
    This method creates a databricks dataframe from a .json format file
    """
    
    format_file_df = spark.read.json(self.format_file_path)
    
    return(format_file_df)
  
  def format_sql_create_statement(self):
    """
    This method creates a sql string to be used for creating a table on a Azure SQL Datawarehouse 
    """
    
    metadata_dataframe = self.create_format_dataframe()
    
    metadata_dict = metadata_dataframe.toPandas().to_dict()
    #print(metadata_dict)
    #type(metadata_dict["Metadata"][0])

    sql_create_table_statement = "IF (SELECT 1 FROM sys.schemas WHERE name = '{0}') IS NULL BEGIN EXEC('CREATE SCHEMA [{0}]') END; IF OBJECT_ID('{0}.{1}', 'U') IS NOT NULL DROP TABLE {0}.{1}; CREATE TABLE {0}.{1} (".format(self.schema_name, self.table_name)
    
    for x in metadata_dict["Metadata"][0]:
      sql_create_table_statement += ("[" + x.Column_Name + "] " + x.Data_Type + " " + ("NOT NULL" if x.Is_Nullable=="N" else "NULL") + ", ")
  
    sql_create_table_statement = sql_create_table_statement[:-2] + ");"

    return(sql_create_table_statement)
  
  def create_schema_struct_type(self):
    """
    This method creates a schema to be used when reading in the data file to dataframe
    """
    
    metadata_dataframe = self.create_format_dataframe()
    metadata_dict = metadata_dataframe.toPandas().to_dict()
    columns_dict = metadata_dict['Metadata']
    row_list = columns_dict[0]
    columns_list = [column.Column_Name for column in row_list]
    #columns_list.append('_corrupt_record')
    schema_dict = {'fields': [], 'type': 'struct'}
    
    #print(columns_list)
    
    for column in columns_list:
      schema_dict['fields'].append({'metadata': {}, 'name': column, 'nullable': True, 'type': 'string'})
    
    #schema_dict['fields'].append({'metadata': {}, 'name': '_corrupt_record', 'nullable': True, 'type': 'string'})
    
    table_schema_struct = StructType.fromJson(schema_dict)

    return(table_schema_struct)
  
  def write_to_sql_dw(self, data_file_df, create_table_statement):
    """
    This method runs sql create table statements and writes data from a dataframe to a table on Azure SQL Datawarehouse 
    helpful website
    https://docs.databricks.com/spark/latest/data-sources/azure/sql-data-warehouse.html
    """

    ##Azure SQLDW connector within 
    storage_account_name = 'mkldsblobsqldwauddev001'
    storage_access_key = 'uDRRhhGVD2AlwXvrWiAezIQxbdBs/tqnAUj0lWSgsZXRlppd2wcDbElG9jEQbDSV8vDqDCnrkVhSCrNJHywlaA=='
    storage_container = 'testing'
    storage_directory = 'tempdir'

    spark.conf.set('fs.azure.account.key.' + storage_account_name + '.blob.core.windows.net', storage_access_key)

    blob_conn = 'wasbs://{}@{}.blob.core.windows.net/{}'.format(storage_container, storage_account_name, storage_directory)

    sqldw_db = 'mkl-ds-sqldw-edw-dev-003'
    sqldw_server = 'mkl-ds-sqls-edw-dev-003'
    sqldw_user = dbutils.secrets.get(scope = "dbx_workshop2", key = "username")
    sqldw_pass = dbutils.secrets.get(scope = "dbx_workshop2", key = "password")

    sqldw_jdbc = 'jdbc:sqlserver://{}.database.windows.net:1433;'.format(sqldw_server) + \
                 'database={};'.format(sqldw_db) + \
                 'user={}@{};'.format(sqldw_user,sqldw_server) + \
                 'password={};'.format(sqldw_pass) + \
                 'encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;'

    data_file_df.write \
      .format("com.databricks.spark.sqldw") \
      .mode("append") \
      .option("url", sqldw_jdbc) \
      .option("forwardSparkAzureStorageCredentials", 'true') \
      .option("preActions", create_table_statement) \
      .option("dbTable", "{0}.{1}".format(self.schema_name, self.table_name)) \
      .option("tempDir", blob_conn) \
      .save()
    

In [6]:
def CreateBaseTable(data_file_path, format_file_path, table_name, schema_name):
  hydrator = Hydrator(data_file_path, format_file_path, schema_name, table_name)
  table_name = hydrator.create_data_dataframe()
  table_name.createOrReplaceTempView(table_name)

# data_file_path = 'dbfs:/mnt/dbxworkshop/ADW/Refined/Historical_Claim/Claim_Feature_Payment_Log/20191021/Historical_Claim.Claim_Feature_Payment_Log.txt'
# format_file_path = 'dbfs:/mnt/dbxworkshop/ADW/Refined/Historical_Claim/Claim_Feature_Payment_Log/20191021/Claim_Feature_Payment_Log_20190927.fmt'
# schema_name = 'testing'
# table_name = 'testing_data'



#schema = hydrator.create_schema_struct_type()

#print(schema)





#display(Claim_Feature_Payment_Log)
#create_table_statement = hydrator.format_sql_create_statement()

#hydrator.hydrate()




In [7]:
import os
from pprint import pprint

reportingday = '20191021'
base_tables = ['Claim_Folder','Claim_Feature','Claim_Feature_Payment_Log','Claim_Feature_Reserve_Log','Policy']
top_level_location = '/dbfs/mnt/dbxworkshop/Refined/ADW'
#top_level_location = "/mnt/jasonsadlsgen2/Refined"

tables_dict = {}
for path, dirs, files in os.walk(top_level_location):
  table_details = {}
  for f in files:
    table = f[:-13]
    if (path.endswith(reportingday)) and (table in base_tables):
      full_path = '{}/{}'.format(path, f)
      file_system = 'dbfs:'
      working_path = file_system + full_path[5:]
      schema = working_path.split('/')[-4]
      #print(schema + '.' + table)
      #print(working_path)

      table_details.update(schema = schema)
      file_type = f[-4:]
      if file_type == '.fmt':
        table_details['format_file'] = working_path
      elif file_type == '.txt':
        table_details['data_file'] = working_path    
      #print(os.path.abspath(f))

      tables_dict[table] = table_details
    
pprint(tables_dict)



In [8]:
for table in tables_dict:
  table_name = table
  schema_name = tables_dict[table]['schema']
  data_file_path = tables_dict[table]['data_file']
  format_file_path = tables_dict[table]['format_file']
  
  #if table_name != 'Insured' and table_name != 'Claimant' and table_name != 'Claim_Feature':
  hydrator = Hydrator(data_file_path, format_file_path, schema_name, table_name)
  df = hydrator.create_data_dataframe()
  df.createOrReplaceTempView("{0}".format(table_name))
  #hydrator.hydrate()

In [9]:
%sql
-- 4518912
select count(*) from Claim_Feature


count(1)
4518912


In [10]:
%sql
--1248454
select count(*) from Claim_Folder


count(1)
1248454


In [11]:
%sql
--7,474,251
select count(*) from Claim_Feature_Reserve_Log



count(1)
7474251


In [12]:
import uuid
import datetime
from multiprocessing.pool import ThreadPool
numberOfThreads = 10
pool = ThreadPool(numberOfThreads)
#-----------------------------------------------------------------------------------------
#Run these notebooks parallel (load_dim_product, load_dim_market, load_dim_period, load_dim_upc)
#-----------------------------------------------------------------------------------------
startTime = datetime.datetime.now()
pool.starmap(
  #lambda table_name, arguments: CreateBaseTable(arguments['data_file'], arguments['format_file'], arguments['schema'], table_name), tables_dict)
  lambda table_name, arguments: print(arguments['data_file'], arguments['format_file'], arguments['schema'], table_name)
)
print((datetime.datetime.now() - startTime).seconds/60.0)


In [13]:
import os
from pprint import pprint

top_level_location = "/dbfs/mnt/jasonsadlsgen2/Refined/SANDBOX_DS_DL_SOLUTIONS"

tables_dict = {}
for path, dirs, files in os.walk(top_level_location):
  #print(path)
  table_details = {}
  for f in files:
    full_path = "{}/{}".format(path, f)
    file_system = "dbfs:"
    working_path = file_system + full_path[5:]
    table = f[:-13]
    schema = working_path.split('/')[-4]
    #print(schema + '.' + table)
    #print(working_path)
    
    table_details.update(schema = schema)
    file_type = f[-4:]
    if file_type == '.fmt':
      table_details['format_file'] = working_path
    elif file_type == '.txt':
      table_details['data_file'] = working_path    
    #print(os.path.abspath(f))
    
    tables_dict[table] = table_details
    
pprint(tables_dict)



In [16]:
cte_payment_log=spark.sql("""
SELECT 
		coalesce(cf.Extended_Claim_Feature_Key, pl.Claim_Feature_Key) AS Claim_Feature_Key
		,pl.Paid_Loss_Amount
		,pl.Paid_ALAE_Amount
		,CASE WHEN cf.Extended_Claim_Feature_Key IS NOT NULL 
			THEN pl.Recovered_ALAE_Deductible_Amount + pl.Insured_Obligation_Paid_ALAE_Amount
			ELSE pl.Recovered_ALAE_Deductible_Amount END AS Recovered_ALAE_Deductible_Amount
		,pl.Recovered_ALAE_Salvage_Amount
		,pl.Recovered_ALAE_Subrogation_Amount
		,CASE WHEN cf.Extended_Claim_Feature_Key IS NOT NULL 
			THEN pl.Recovered_Loss_Deductible_Amount + pl.Insured_Obligation_Paid_Loss_Amount
			ELSE pl.Recovered_Loss_Deductible_Amount END AS Recovered_Loss_Deductible_Amount
		,pl.Recovered_Loss_Salvage_Amount 
		,pl.Recovered_Loss_Subrogation_Amount
		,CASE WHEN cf.Extended_Claim_Feature_Key IS NOT NULL 
			THEN 0 
			ELSE pl.Insured_Obligation_Paid_Loss_Amount END AS Insured_Obligation_Loss_Paid_Amount
		,CASE WHEN cf.Extended_Claim_Feature_Key IS NOT NULL 
			THEN 0 
			ELSE pl.Insured_Obligation_Paid_ALAE_Amount END AS Insured_Obligation_ALAE_Paid_Amount
		,pl.Insured_Obligation_Recovered_Loss_Amount 
		,pl.Insured_Obligation_Recovered_ALAE_Amount
	FROM  Claim_Feature_Payment_Log  pl
	inner join Claim_Feature cf on
	           pl.Claim_Feature_Key = cf.Claim_Feature_Key
	WHERE pl.Originating_System_Code in ('01','03','04','05','07','28')
    AND CAST(pl.Valid_to_Date AS date) =  CAST('9999-12-31' AS date)
    AND CAST(cf.Valid_to_Date AS date)= CAST('9999-12-31' AS date)
    """)

In [17]:
cte_payment_log.createOrReplaceTempView("cte_payment_log")

In [18]:
%sql 
select count(*) from cte_payment_log

count(1)
2680668


In [19]:
cte_payment_log_agg=spark.sql("""
SELECT 
		pl.Claim_Feature_Key AS Claim_Feature_Key
		,SUM(coalesce(pl.Paid_Loss_Amount,0)) Loss_Paid_Amount
		,SUM(coalesce(pl.Paid_ALAE_Amount,0)) ALAE_Paid_Amount
		,SUM(coalesce(pl.Recovered_ALAE_Deductible_Amount,0)) Recovered_ALAE_Deductible_Amount
		,SUM(coalesce(pl.Recovered_ALAE_Salvage_Amount,0)) Recovered_ALAE_Salvage_Amount
		,SUM(coalesce(pl.Recovered_ALAE_Subrogation_Amount,0)) Recovered_ALAE_Subrogation_Amount
		,SUM(coalesce(pl.Recovered_Loss_Deductible_Amount,0)) Recovered_Loss_Deductible_Amount
		,SUM(coalesce(pl.Recovered_Loss_Salvage_Amount,0)) Recovered_Loss_Salvage_Amount
		,SUM(coalesce(pl.Recovered_Loss_Subrogation_Amount,0)) Recovered_Loss_Subrogation_Amount
		,SUM(coalesce(pl.Insured_Obligation_Loss_Paid_Amount,0)) Insured_Obligation_Loss_Paid_Amount
		,SUM(coalesce(pl.Insured_Obligation_ALAE_Paid_Amount,0)) Insured_Obligation_ALAE_Paid_Amount
		,SUM(coalesce(pl.Insured_Obligation_Recovered_Loss_Amount,0)) Insured_Obligation_Recovered_Loss_Amount
		,SUM(coalesce(pl.Insured_Obligation_Recovered_ALAE_Amount,0)) Insured_Obligation_Recovered_ALAE_Amount
	FROM cte_payment_log pl
	GROUP BY pl.Claim_Feature_Key
    """)

In [20]:
cte_payment_log_agg.createOrReplaceTempView("cte_payment_log_agg")

In [21]:
%sql
select * from cte_payment_log_agg

Claim_Feature_Key,Loss_Paid_Amount,ALAE_Paid_Amount,Recovered_ALAE_Deductible_Amount,Recovered_ALAE_Salvage_Amount,Recovered_ALAE_Subrogation_Amount,Recovered_Loss_Deductible_Amount,Recovered_Loss_Salvage_Amount,Recovered_Loss_Subrogation_Amount,Insured_Obligation_Loss_Paid_Amount,Insured_Obligation_ALAE_Paid_Amount,Insured_Obligation_Recovered_Loss_Amount,Insured_Obligation_Recovered_ALAE_Amount
0000000254||1||04,0.0,4090.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
0000000300||1||04,98962.62,15750.72,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
0000000610||1||04,0.0,3316.02,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
0000256||1||01,0.0,2.273736754432321e-13,3871.92,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
0000460||1||01,180000.0,16033.95,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
00005403||1||05,-1.091393642127514e-11,272037.1800000001,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
0000930||1||01,106225.19999999998,11963.1,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
0000949||1||01,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
0001732||1||01,316.20000000000005,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
0001926||1||01,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [22]:
cte_reserve_log=spark.sql("""
	SELECT
		coalesce(cf.Extended_Claim_Feature_Key, rl.Claim_Feature_Key) AS Claim_Feature_Key	
		,SUM(CASE WHEN cf.Originating_System_Code = '04' then 0 ELSE coalesce(rl.Case_Outstanding_Loss_Reserve_Amount,0) END) Case_Outstanding_Loss_Reserve_Amount
		,SUM(CASE WHEN cf.Originating_System_Code = '04' then 0 ELSE coalesce(rl.Case_Outstanding_ALAE_Reserve_Amount,0) END) Case_Outstanding_ALAE_Reserve_Amount
		,SUM(CASE WHEN cf.Originating_System_Code = '04' then 0 ELSE coalesce(rl.Case_Outstanding_Insured_Obligation_Loss_Reserve_Amount,0) END) Case_Outstanding_Insured_Obligation_Loss_Reserve_Amount
		,SUM(CASE WHEN cf.Originating_System_Code = '04' then 0 ELSE coalesce(rl.Case_Outstanding_Insured_Obligation_ALAE_Reserve_Amount,0) END) Case_Outstanding_Insured_Obligation_ALAE_Reserve_Amount
	FROM Claim_Feature_Reserve_Log rl
	inner join Claim_Feature cf on
	           rl.Claim_Feature_Key  = cf.Claim_Feature_Key                
    WHERE  CAST(rl.Valid_to_Date AS date)= CAST('9999-12-31' AS date)
    AND  CAST(cf.Valid_to_Date AS date)= CAST('9999-12-31' AS date)       
	GROUP BY coalesce(cf.Extended_Claim_Feature_Key, rl.Claim_Feature_Key)
""")

In [23]:
cte_reserve_log.createOrReplaceTempView("cte_reserve_log")

In [24]:
summary=spark.sql(
"""
SELECT
	 pol.Policy_Version_Key
	,fldr.Claim_Folder_Key
	,ftr.Claim_Feature_Key
	,ftr.Policy_Version_Line_Of_Business_Insured_Object_Key
	--	Loss Run Standard Financial Data Elements
	--	Loss Elements
	,coalesce(reslog.Case_Outstanding_Loss_Reserve_Amount, 0) AS Case_Loss_Reserve_Amount --1
	,coalesce(paylog.Loss_Paid_Amount, 0) AS Loss_Paid_Amount 
	,coalesce(paylog.Recovered_Loss_Deductible_Amount, 0) AS Loss_Deductible_Recovered_Amount 
	,coalesce(reslog.Case_Outstanding_Loss_Reserve_Amount , 0)
		+ coalesce(paylog.Loss_Paid_Amount, 0)
		- coalesce(paylog.Recovered_Loss_Deductible_Amount, 0)
		AS Case_Loss_Incurred_Before_Recoveries_Amount 
	,coalesce(paylog.Recovered_Loss_Salvage_Amount, 0) AS Salvage_Loss_Recovered_Amount 
	,coalesce(paylog.Recovered_Loss_Subrogation_Amount, 0) AS Subrogation_Loss_Recovered_Amount 
	,coalesce(reslog.Case_Outstanding_Loss_Reserve_Amount,0) 
		+ coalesce(paylog.Loss_Paid_Amount,0) 
		- coalesce(paylog.Recovered_Loss_Deductible_Amount,0) 
		- coalesce(paylog.Recovered_Loss_Salvage_Amount,0) 
		- coalesce(paylog.Recovered_Loss_Subrogation_Amount,0) 
		AS Case_Loss_Incurred_Net_Of_Recoveries_Amount
	--	ALAE Elements
	,coalesce(reslog.Case_Outstanding_ALAE_Reserve_Amount, 0)  AS Case_ALAE_Reserve_Amount --1
	,coalesce(paylog.ALAE_Paid_Amount, 0)  AS ALAE_Paid_Amount 
	,coalesce(paylog.Recovered_ALAE_Deductible_Amount, 0)  AS ALAE_Deductible_Recovered_Amount 
	,coalesce(reslog.Case_Outstanding_ALAE_Reserve_Amount,0) 
		+ coalesce(paylog.ALAE_Paid_Amount,0) 
		- coalesce(paylog.Recovered_ALAE_Deductible_Amount,0) 
		AS Case_ALAE_Incurred_Before_Recoveries_Amount
	,coalesce(paylog.Recovered_ALAE_Salvage_Amount, 0) AS Salvage_ALAE_Recovered_Amount 
	,coalesce(paylog.Recovered_ALAE_Subrogation_Amount, 0) AS Subrogation_ALAE_Recovered_Amount 
	,coalesce(reslog.Case_Outstanding_ALAE_Reserve_Amount,0) 
		+ coalesce(paylog.ALAE_Paid_Amount,0) 
		- coalesce(paylog.Recovered_ALAE_Deductible_Amount,0) 
		- coalesce(paylog.Recovered_ALAE_Salvage_Amount,0) 
		- coalesce(paylog.Recovered_ALAE_Subrogation_Amount,0) 
		AS Case_ALAE_Incurred_Net_Of_Recoveries_Amount
	--	Total
	,coalesce(reslog.Case_Outstanding_Loss_Reserve_Amount,0) 
		+ coalesce(reslog.Case_Outstanding_ALAE_Reserve_Amount,0) 
		AS Total_Case_Reserve_Amount --1
	,coalesce(paylog.Loss_Paid_Amount,0) 
		+ coalesce(paylog.ALAE_Paid_Amount,0) 
		AS Total_Paid_Amount
	,coalesce(paylog.Recovered_Loss_Deductible_Amount,0) 
		+ coalesce(paylog.Recovered_ALAE_Deductible_Amount,0) 
		AS Total_Deductible_Recovered_Amount
	,coalesce(reslog.Case_Outstanding_Loss_Reserve_Amount,0) 
		+ coalesce(paylog.Loss_Paid_Amount,0) 
		- coalesce(paylog.Recovered_Loss_Deductible_Amount,0) 
		+ coalesce(reslog.Case_Outstanding_ALAE_Reserve_Amount,0) 
		+ coalesce(paylog.ALAE_Paid_Amount,0) 
		- coalesce(paylog.Recovered_ALAE_Deductible_Amount,0) 
		AS Total_Case_Incurred_Before_Recoveries_Amount
	,coalesce(reslog.Case_Outstanding_Loss_Reserve_Amount,0) 
		+ coalesce(paylog.Loss_Paid_Amount,0) 
		- coalesce(paylog.Recovered_Loss_Deductible_Amount,0) 
		- coalesce(paylog.Recovered_Loss_Salvage_Amount,0) 
		- coalesce(paylog.Recovered_Loss_Subrogation_Amount,0) 
		+ coalesce(reslog.Case_Outstanding_ALAE_Reserve_Amount,0) 
		+ coalesce(paylog.ALAE_Paid_Amount,0) 
		- coalesce(paylog.Recovered_ALAE_Deductible_Amount,0) 
		- coalesce(paylog.Recovered_ALAE_Salvage_Amount,0) 
		- coalesce(paylog.Recovered_ALAE_Subrogation_Amount,0) 
		AS Total_Case_Incurred_Net_Of_Recoveries_Amount
	------ Insured Obligation Amounts
	-- Reserve
	,coalesce(reslog.Case_Outstanding_Insured_Obligation_Loss_Reserve_Amount, 0) AS Case_Insured_Obligation_Loss_Reserve_Amount 
	,coalesce(reslog.Case_Outstanding_Insured_Obligation_ALAE_Reserve_Amount, 0) AS Case_Insured_Obligation_ALAE_Reserve_Amount 
	-- Payment
	,coalesce(paylog.Insured_Obligation_Loss_Paid_Amount, 0) AS Insured_Obligation_Loss_Paid_Amount 
	,coalesce(paylog.Insured_Obligation_ALAE_Paid_Amount, 0) AS Insured_Obligation_ALAE_Paid_Amount 
	,coalesce(paylog.Insured_Obligation_Recovered_Loss_Amount, 0) AS Insured_Obligation_Loss_Recovered_Amount
	,coalesce(paylog.Insured_Obligation_Recovered_ALAE_Amount, 0) AS Insured_Obligation_ALAE_Recovered_Amount
	-- Insured Obligation Aggregate Amounts
	,coalesce(reslog.Case_Outstanding_Insured_Obligation_ALAE_Reserve_Amount, 0)
		+ coalesce(reslog.Case_Outstanding_Insured_Obligation_Loss_Reserve_Amount, 0) AS  Total_Case_Insured_Obligation_Reserve_Amount
	,coalesce(paylog.Insured_Obligation_Loss_Paid_Amount, 0)
		+ coalesce(paylog.Insured_Obligation_ALAE_Paid_Amount, 0) AS Total_Insured_Obligation_Paid_Amount
	,coalesce(paylog.Insured_Obligation_Recovered_Loss_Amount, 0)
		+ coalesce(paylog.Insured_Obligation_Recovered_ALAE_Amount, 0) AS Total_Insured_Obligation_Recovered_Amount
	,coalesce(reslog.Case_Outstanding_Insured_Obligation_Loss_Reserve_Amount, 0)  
		+ coalesce(paylog.Insured_Obligation_Loss_Paid_Amount, 0) 
		+ coalesce(paylog.Insured_Obligation_Recovered_Loss_Amount, 0) AS Case_Loss_Insured_Obligation_Incurred_Net_Of_Recoveries_Amount
	-- Case_Loss_Incurred_Net_of_Insured_Obligation_Amount
	,coalesce(reslog.Case_Outstanding_Insured_Obligation_Loss_Reserve_Amount, 0)  
		+ coalesce(paylog.Insured_Obligation_Loss_Paid_Amount, 0) 
		+ coalesce(paylog.Insured_Obligation_Recovered_Loss_Amount, 0)
		+ (
			-- Case Loss Incurred Net of Recoveries Amount
			coalesce(reslog.Case_Outstanding_Loss_Reserve_Amount,0) 
			+ coalesce(paylog.Loss_Paid_Amount,0) 
			- coalesce(paylog.Recovered_Loss_Deductible_Amount,0) 
			- coalesce(paylog.Recovered_Loss_Salvage_Amount,0) 
			- coalesce(paylog.Recovered_Loss_Subrogation_Amount,0) 
		) AS Ground_Up_Case_Loss_Incurred_Net_Of_Recoveries_Amount
	,coalesce(reslog.Case_Outstanding_Insured_Obligation_ALAE_Reserve_Amount, 0)  
		+ coalesce(paylog.Insured_Obligation_ALAE_Paid_Amount, 0) 
		+ coalesce(paylog.Insured_Obligation_Recovered_ALAE_Amount, 0) AS Case_ALAE_Insured_Obligation_Incurred_Net_Of_Recoveries_Amount
	-- Case_ALAE_Incurred_Net_of_Insured_Obligation_Amount
	,coalesce(reslog.Case_Outstanding_Insured_Obligation_ALAE_Reserve_Amount, 0)  
		+ coalesce(paylog.Insured_Obligation_ALAE_Paid_Amount, 0) 
		+ coalesce(paylog.Insured_Obligation_Recovered_ALAE_Amount, 0)
		+ (
			-- Case ALAE Incurred Net of Recoveries Amount
			coalesce(reslog.Case_Outstanding_ALAE_Reserve_Amount,0) 
			+ coalesce(paylog.ALAE_Paid_Amount,0) 
			- coalesce(paylog.Recovered_ALAE_Deductible_Amount,0) 
			- coalesce(paylog.Recovered_ALAE_Salvage_Amount,0) 
			- coalesce(paylog.Recovered_ALAE_Subrogation_Amount,0) 
		)  AS Ground_Up_Case_ALAE_Incurred_Net_Of_Recoveries_Amount
	-- Total_Case_Incurred_Net_of_Insured_Obligation_Amount
	, (	-- Total_Case_Insured_Obligation_Amount
		coalesce(reslog.Case_Outstanding_Insured_Obligation_ALAE_Reserve_Amount, 0)
		+ coalesce(reslog.Case_Outstanding_Insured_Obligation_Loss_Reserve_Amount, 0)
		) 
	+ (	-- Total_Insured_Obligation_Paid_Amount
		coalesce(paylog.Insured_Obligation_Loss_Paid_Amount, 0)
		+ coalesce(paylog.Insured_Obligation_ALAE_Paid_Amount, 0)
		)
	+ ( -- Total_Insured_Obligation_Recovered_Amount
		coalesce(paylog.Insured_Obligation_Recovered_Loss_Amount, 0)
		+ coalesce(paylog.Insured_Obligation_Recovered_ALAE_Amount, 0)
		) AS Total_Case_Insured_Obligation_Incurred_Net_of_Recoveries_Amount
	,coalesce(reslog.Case_Outstanding_Loss_Reserve_Amount,0) 
		+ coalesce(paylog.Loss_Paid_Amount,0) 
		- coalesce(paylog.Recovered_Loss_Deductible_Amount,0) 
		- coalesce(paylog.Recovered_Loss_Salvage_Amount,0) 
		- coalesce(paylog.Recovered_Loss_Subrogation_Amount,0) 
		+ coalesce(reslog.Case_Outstanding_ALAE_Reserve_Amount,0) 
		+ coalesce(paylog.ALAE_Paid_Amount,0) 
		- coalesce(paylog.Recovered_ALAE_Deductible_Amount,0) 
		- coalesce(paylog.Recovered_ALAE_Salvage_Amount,0) 
		- coalesce(paylog.Recovered_ALAE_Subrogation_Amount,0) 
		+ (	-- Total_Case_Insured_Obligation_Amount
			coalesce(reslog.Case_Outstanding_Insured_Obligation_ALAE_Reserve_Amount, 0)
			+ coalesce(reslog.Case_Outstanding_Insured_Obligation_Loss_Reserve_Amount, 0)
		) 
		+ (	-- Total_Insured_Obligation_Paid_Amount
			coalesce(paylog.Insured_Obligation_Loss_Paid_Amount, 0)
			+ coalesce(paylog.Insured_Obligation_ALAE_Paid_Amount, 0)
		)
		+ ( -- Total_Insured_Obligation_Recovered_Amount
			coalesce(paylog.Insured_Obligation_Recovered_Loss_Amount, 0)
			+ coalesce(paylog.Insured_Obligation_Recovered_ALAE_Amount, 0)
		) AS Total_Case_Ground_Up_Incurred_Net_of_Recoveries_Amount
	,ftr.Originating_System_Code
FROM Policy pol
INNER JOIN Claim_Feature ftr
	ON pol.Policy_Version_Key  = ftr.Policy_Version_Key 
INNER JOIN Claim_Folder fldr
	ON ftr.Claim_Folder_Key  = fldr.Claim_Folder_Key
-- Changing these to LEFT OUTER JOINS to improve performance
LEFT OUTER JOIN cte_payment_log_agg paylog
ON paylog.Claim_Feature_Key  = ftr.Claim_Feature_Key
LEFT OUTER JOIN cte_reserve_log reslog
ON reslog.Claim_Feature_Key  = ftr.Claim_Feature_Key
WHERE ftr.Extended_Claim_Feature_Key IS NULL
AND pol.Originating_System_Code in ('01','03','04','05','07','28')
AND CAST(ftr.Valid_to_Date AS date)= CAST('9999-12-31' AS date)
AND  CAST(pol.Valid_to_Date AS date)= CAST('9999-12-31' AS date)    
AND  CAST(fldr.Valid_to_Date AS date)= CAST('9999-12-31' AS date)    
""")

In [25]:
summary.createOrReplaceTempView("summary")

In [26]:
%sql 
select count(*) from summary

count(1)
1129809


In [27]:
%sql 
select * from summary

count(1)
14763646
