In [None]:
# Databricks notebook source
# configs = {"fs.azure.account.auth.type": "OAuth",
#        "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
#        "fs.azure.account.oauth2.client.id": "9a216f35-7fd8-4ed1-b9a4-fb22c14343c7",
#        "fs.azure.account.oauth2.client.secret": "dbutils.secrets.get(scope="datalake-akv-scope",key="dev-databrick-appreg--client-secret")",
#        "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/781802be-916f-42df-a204-78a2b3144934/oauth2/token",
#        "fs.azure.createRemoteFileSystemDuringInitialization": "true"}

#  dbutils.fs.mount(
#  source = "abfss://raw@devusscdata.dfs.core.windows.net/",
#  mount_point = "/mnt/raw/products/spapp",
#  extra_configs = configs)

# COMMAND ----------

import pyspark.sql.functions as F
import re

# Use the previously established DBFS mount point to read the data.
# Create a data frame to read data.
BBGRawDF = spark.read.format('csv').options(
    header='true', inferschema='true').load("/mnt/raw/products/spapp/Bloomberg_Raw_Data.csv")

# Use the dataframe loaded with raw and clean special characters from column headers.
BBGTempDF = BBGRawDF.select([F.col(col).alias(re.sub("[^0-9a-z%+A-Z$]+","_",col)) for col in BBGRawDF.columns])
BBGTemp1DF = BBGTempDF.select([F.col(col).alias(re.sub("[^0-9a-z%_A-Z$]+","Plus",col)) for col in BBGTempDF.columns])
BBGCleanDF = BBGTemp1DF.select([F.col(col).alias(re.sub("[^0-9a-z_A-Z$]+","Percent",col)) for col in BBGTemp1DF.columns])

#convert all column name to lowercase
for col in BBGCleanDF.columns:
    BBGCleanDF = BBGCleanDF.withColumnRenamed(col, col.lower())

# Create a temporary view using the dataframe with cleaned column headers.
BBGCleanDF.createOrReplaceTempView("BBG_Clean")

BBGTempDF.unpersist()
BBGTemp1DF.unpersist()

# COMMAND ----------

# MAGIC %sql
# MAGIC select * from BBG_Clean

# COMMAND ----------

# Save the cleaned format of the csv file to the curated path

import datetime 
now = datetime.datetime.now()
timestamp = str(now.strftime("%Y%m%d_%H-%M-%S"))

BBGCleanDF.coalesce(1).write.format("com.databricks.spark.csv").save("/mnt/curated/products/spapp/bloomberg_csv_data_extract/"+timestamp+"",header = "true")

# COMMAND ----------

# Create Delta file from CSV

BBGNewDataDF = spark.read.format('csv').options(header='true', inferschema='true').load("/mnt/curated/products/spapp/bloomberg_csv_data_extract/"+timestamp+"")
BBGNewDataDF.write.format("delta").save("/mnt/curated/products/spapp/bloomberg_data_delta_files/"+timestamp+"")


# COMMAND ----------

# MAGIC %sql
# MAGIC -- Create table using the load csv file
# MAGIC 
# MAGIC DROP TABLE IF EXISTS spapp.BLOOMBERG_DATA;
# MAGIC CREATE TABLE spapp.BLOOMBERG_DATA USING DELTA LOCATION "/mnt/curated/products/spapp/bloomberg_data_delta_files/20211112_22-02-14";

# COMMAND ----------

# MAGIC %sql
# MAGIC select * from spapp.bloomberg_data

# COMMAND ----------

BBG_Data_DF = spark.sql("select * from spapp.bloomberg_data")

# COMMAND ----------

jdbcHostname = "dev-ussc-spapp-dbsvr.database.windows.net"
jdbcPort = "1433"
jdbcDatabase = "dev-ussc-spapp-db"
properties = {
 "user" : "svc_spapp",
 "password" : "wn3I7Dn3M3YeFZaZtKwl" }

url = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname,jdbcPort,jdbcDatabase)

# COMMAND ----------

from pyspark.sql import *
import pandas as pd
Load_BBG_DF = DataFrameWriter(BBG_Data_DF)
Load_BBG_DF.jdbc(url=url, table= "bloomberg_data", mode ="overwrite", properties = properties)

# COMMAND ----------

# left_df.merge(right_df, on='cusip', how='left')

Level1_Cols_DF= spark.sql("""select x.CUSIP
, x.Delinq_30_59
, x.Delinq_60_89
, x.Delinq_60Plus
, round((x.Delinq_90Plus + x.Bankrupt),2) as Delinq_90Plus
, x.Bankrupt
, x.Percent_Always_Performing_last_60_mos_
, x.Percent_Re_Performing_last_60_mos_
, x.Foreclosure
, x.REO
, x.Delinq_60Plus_FC_REO_incl_BK_
, x.Group_Collateral_Type
, x.WAVG_Rem_Term
, x.3mo_CRR
, x.6mo_CRR
, x.12mo_CRR
, x.BBG_3mo_CRR
, x.BBG_6mo_CRR
, x.BBG_12mo_CRR
, case when x.12mo_CRR <=0 then x.BBG_12mo_CRR else 12mo_CRR end as Base_CRR
, x.3mo_CDR
, x.6mo_CDR
, x.12mo_CDR
, x.3mo_Severity_Reported_
, x.6mo_Severity_Reported_
, x.12mo_Severity_Reported_
, x.BBG_3mo_Severity
, x.BBG_6mo_Severity
, x.BBG_12mo_Severity
, x.Number_of_Assets
, x.Delinq_Stop_Advance_Percent
, x.Loan_Servicer

from 
(select i.CUSIP
, round(case when i.Delinq_30_59 ='-' then 0 else i.Delinq_30_59 end,2) as Delinq_30_59 
, round(case when i.Delinq_60_89 ='-' then 0 else i.Delinq_60_89 end,2) as Delinq_60_89
, round(case when i.Delinq_60Plus ='-' then 0 else i.Delinq_60Plus end,2) as Delinq_60Plus
, round(case when i.Delinq_90Plus ='-' then 0 else i.Delinq_90Plus end,2) as Delinq_90Plus
, case when i.Bankrupt ='-' then 0 else i.Bankrupt end as Bankrupt
, case when i.Percent_Always_Performing_last_60_mos_ = '-' then 0 else i.Percent_Always_Performing_last_60_mos_ end as Percent_Always_Performing_last_60_mos_
, case when i.Percent_Re_Performing_last_60_mos_ = '-' then 0 else i.Percent_Re_Performing_last_60_mos_ end as Percent_Re_Performing_last_60_mos_
, case when i.Foreclosure ='-' then 0 else i.Foreclosure end as Foreclosure
, case when i.REO ='-' then 0 else i.REO end as REO
, case when i.Delinq_60Plus_FC_REO_incl_BK_ ='-' then 0 else i.Delinq_60Plus_FC_REO_incl_BK_ end as Delinq_60Plus_FC_REO_incl_BK_
, case when i.Group_Collateral_Type ='-' then case when i.Deal_Subcategory ='-' then 0 else i.Deal_Subcategory end else i.Group_Collateral_Type end as Group_Collateral_Type
, i.WAVG_Rem_Term as WAVG_Rem_Term
, round(case when i.3mo_CRR ='-' then case when i.3mo_CRR_in_Full ='-' then 0 else i.3mo_CRR_in_Full end else i.3mo_CRR end,2) as 3mo_CRR
, round(case when i.6mo_CRR ='-' then case when i.6mo_CRR_in_Full ='-' then 0 else i.6mo_CRR_in_Full end else i.6mo_CRR end,2) as 6mo_CRR
, round(case when i.12mo_CRR ='-' then case when i.12mo_CRR_in_Full ='-' then 0 else i.12mo_CRR_in_Full end else i.12mo_CRR end,2) as 12mo_CRR
, round(case when b.BBG_3mo_CRR ='-' then 0 else b.BBG_3mo_CRR end,2) as BBG_3mo_CRR
, round(case when b.BBG_6mo_CRR ='-' then 0 else b.BBG_6mo_CRR end,2) as BBG_6mo_CRR
, round(case when b.BBG_12mo_CRR ='-' then 0 else b.BBG_12mo_CRR end,2) as BBG_12mo_CRR
, round(case when i.3mo_CDR ='-' then case when i.3mo_CDR_Reported_ ='-' then 0 else i.3mo_CDR_Reported_ end else i.3mo_CDR end,2) as 3mo_CDR
, round(case when i.6mo_CDR ='-' then case when i.6mo_CDR_Reported_ ='-' then 0 else i.6mo_CDR_Reported_ end else i.6mo_CDR end,2) as 6mo_CDR
, round(case when i.12mo_CDR ='-' then case when i.12mo_CDR_Reported_ ='-' then 0 else i.12mo_CDR_Reported_ end else i.12mo_CDR end,2) as 12mo_CDR
, round(case when i.3mo_Severity_Reported_ ='-' then case when i.3mo_Severity ='-' then 0 else i.3mo_Severity end else i.3mo_Severity_Reported_ end,2) as 3mo_Severity_Reported_
, round(case when i.6mo_Severity_Reported_ ='-' then case when i.6mo_Severity ='-' then 0 else i.6mo_Severity end else i.6mo_Severity_Reported_ end,2) as 6mo_Severity_Reported_
, round(case when i.12mo_Severity_Reported_ ='-' then case when i.12mo_Severity ='-' then 0 else i.12mo_Severity end else i.12mo_Severity_Reported_ end,2) as 12mo_Severity_Reported_
, round(case when b.BBG_3mo_Severity ='-' then 0 else b.BBG_3mo_Severity end,2) as BBG_3mo_Severity
, round(case when b.BBG_6mo_Severity ='-' then 0 else b.BBG_6mo_Severity end,2) as BBG_6mo_Severity
, round(case when b.BBG_12mo_Severity ='-' then 0 else b.BBG_12mo_Severity end,2) as BBG_12mo_Severity
, case when i.Number_of_Assets ='-' then 0 else i.Number_of_Assets end as Number_of_Assets
, case when i.Delinq_Stop_Advance_Percent ='-' then 0 else i.Delinq_Stop_Advance_Percent end as Delinq_Stop_Advance_Percent
, case when i.Insurance ='-' then 0 else i.Insurance end as Insurance
, case when i.Loan_Servicer ='-' then 0 else i.Loan_Servicer end as Loan_Servicer 

from spapp.intex_data i 
join spapp.bloomberg_data b 
on i.CUSIP = b.CUSIP)x;""")

# COMMAND ----------

Level1_Cols_DF.dtypes

# COMMAND ----------

# MAGIC %sql
# MAGIC select * from spapp.intex_new_data i ;
# MAGIC select * from spapp.bloomberg_data b ;

# COMMAND ----------

# MAGIC %sql
# MAGIC select i.Percent_Re_Performing_last_60_mos_ from spapp.intex_new_data i  

# COMMAND ----------

spark.sql("""select x.CUSIP
, x.Delinq_30_59
, x.Delinq_60_89
, x.Delinq_60Plus
, round((x.Delinq_90Plus + x.Bankrupt),2) as Delinq_90Plus
, x.Bankrupt
, x.Percent_Always_Performing_last_60_mos_
, x.Percent_Re_Performing_last_60_mos_
, x.Foreclosure
, x.REO
, x.Delinq_60Plus_FC_REO_incl_BK_
, x.Group_Collateral_Type
, x.WAVG_Rem_Term
, x.3mo_CRR
, x.6mo_CRR
, x.12mo_CRR
, x.BBG_3mo_CRR
, x.BBG_6mo_CRR
, x.BBG_12mo_CRR
, case when x.12mo_CRR <=0 then x.BBG_12mo_CRR else 12mo_CRR end as Base_CRR
, x.3mo_CDR
, x.6mo_CDR
, x.12mo_CDR
, x.3mo_Severity_Reported_
, x.6mo_Severity_Reported_
, x.12mo_Severity_Reported_
, x.BBG_3mo_Severity
, x.BBG_6mo_Severity
, x.BBG_12mo_Severity
, x.Number_of_Assets
, x.Delinq_Stop_Advance_Percent
, x.Loan_Servicer

from 
(select i.CUSIP
, round(case when i.Delinq_30_59 ='-' then 0 else i.Delinq_30_59 end,2) as Delinq_30_59 
, round(case when i.Delinq_60_89 ='-' then 0 else i.Delinq_60_89 end,2) as Delinq_60_89
, round(case when i.Delinq_60Plus ='-' then 0 else i.Delinq_60Plus end,2) as Delinq_60Plus
, round(case when i.Delinq_90Plus ='-' then 0 else i.Delinq_90Plus end,2) as Delinq_90Plus
, case when i.Bankrupt ='-' then 0 else i.Bankrupt end as Bankrupt
, case when i.Percent_Always_Performing_last_60_mos_ = '-' then 0 else i.Percent_Always_Performing_last_60_mos_ end as Percent_Always_Performing_last_60_mos_
, case when i.Percent_Re_Performing_last_60_mos_ = '-' then 0 else i.Percent_Re_Performing_last_60_mos_ end as Percent_Re_Performing_last_60_mos_
, case when i.Foreclosure ='-' then 0 else i.Foreclosure end as Foreclosure
, case when i.REO ='-' then 0 else i.REO end as REO
, case when i.Delinq_60Plus_FC_REO_incl_BK_ ='-' then 0 else i.Delinq_60Plus_FC_REO_incl_BK_ end as Delinq_60Plus_FC_REO_incl_BK_
, case when i.Group_Collateral_Type ='-' then Case when i.Deal_Subcategory ='-' then 0 else i.Deal_Subcategory end else i.Group_Collateral_Type end as Group_Collateral_Type
, i.WAVG_Rem_Term as WAVG_Rem_Term
, round(Case when i.3mo_CRR ='-' then Case when i.3mo_CRR_in_Full ='-' then 0 else i.3mo_CRR_in_Full end else i.3mo_CRR end,2) as 3mo_CRR
, round(Case when i.6mo_CRR ='-' then Case when i.6mo_CRR_in_Full ='-' then 0 else i.6mo_CRR_in_Full end else i.6mo_CRR end,2) as 6mo_CRR
, round(Case when i.12mo_CRR ='-' then Case when i.12mo_CRR_in_Full ='-' then 0 else i.12mo_CRR_in_Full end else i.12mo_CRR end,2) as 12mo_CRR
, round(Case when b.BBG_3mo_CRR ='-' then 0 else b.BBG_3mo_CRR end,2) as BBG_3mo_CRR
, round(Case when b.BBG_6mo_CRR ='-' then 0 else b.BBG_6mo_CRR end,2) as BBG_6mo_CRR
, round(Case when b.BBG_12mo_CRR ='-' then 0 else b.BBG_12mo_CRR end,2) as BBG_12mo_CRR
, round(Case when i.3mo_CDR ='-' then Case when i.3mo_CDR_Reported_ ='-' then 0 else i.3mo_CDR_Reported_ end else i.3mo_CDR end,2) as 3mo_CDR
, round(Case when i.6mo_CDR ='-' then Case when i.6mo_CDR_Reported_ ='-' then 0 else i.6mo_CDR_Reported_ end else i.6mo_CDR end,2) as 6mo_CDR
, round(Case when i.12mo_CDR ='-' then Case when i.12mo_CDR_Reported_ ='-' then 0 else i.12mo_CDR_Reported_ end else i.12mo_CDR end,2) as 12mo_CDR
, round(Case when i.3mo_Severity_Reported_ ='-' then Case when i.3mo_Severity ='-' then 0 else i.3mo_Severity end else i.3mo_Severity_Reported_ end,2) as 3mo_Severity_Reported_
, round(Case when i.6mo_Severity_Reported_ ='-' then Case when i.6mo_Severity ='-' then 0 else i.6mo_Severity end else i.6mo_Severity_Reported_ end,2) as 6mo_Severity_Reported_
, round(Case when i.12mo_Severity_Reported_ ='-' then Case when i.12mo_Severity ='-' then 0 else i.12mo_Severity end else i.12mo_Severity_Reported_ end,2) as 12mo_Severity_Reported_
, round(Case when b.BBG_3mo_Severity ='-' then 0 else b.BBG_3mo_Severity end,2) as BBG_3mo_Severity
, round(Case when b.BBG_6mo_Severity ='-' then 0 else b.BBG_6mo_Severity end,2) as BBG_6mo_Severity
, round(Case when b.BBG_12mo_Severity ='-' then 0 else b.BBG_12mo_Severity end,2) as BBG_12mo_Severity
, case when i.Number_of_Assets ='-' then 0 else i.Number_of_Assets end as Number_of_Assets
, case when i.Delinq_Stop_Advance_Percent ='-' then 0 else i.Delinq_Stop_Advance_Percent end as Delinq_Stop_Advance_Percent
, case when i.Insurance ='-' then 0 else i.Insurance end as Insurance
, case when i.Loan_Servicer ='-' then 0 else i.Loan_Servicer end as Loan_Servicer 

from spapp.intex_new_data i 
join spapp.bloomberg_data b 
on i.CUSIP = b.CUSIP)x;""").show()

# COMMAND ----------

display(Level1_Cols_DF)

# COMMAND ----------

jdbcHostname = "dev-ussc-spapp-dbsvr.database.windows.net"
jdbcPort = "1433"
jdbcDatabase = "dev-ussc-spapp-db"
properties = {
 "user" : "svc_spapp",
 "password" : "wn3I7Dn3M3YeFZaZtKwl" }

url = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname,jdbcPort,jdbcDatabase)


# COMMAND ----------

from pyspark.sql import *
import pandas as pd
Load_Lvl1_DF = DataFrameWriter(Level1_Cols_DF)
Load_Lvl1_DF.jdbc(url=url, table= "Liq_Lvl1_Data", mode ="overwrite", properties = properties)

# COMMAND ----------

# MAGIC %sql
# MAGIC 
# MAGIC --select count(*) from spapp.intex_new_data;--2085
# MAGIC 
# MAGIC --select count(*) from spapp.bloomberg_data;--2085

# COMMAND ----------

Level1_Cols_DF.count()

# count 2000

# COMMAND ----------

