In [None]:
import dbldatagen as dg
import pyspark.sql.functions as F
from datetime import timedelta, datetime
from pyspark.sql.types import DateType
import pyspark.sql.functions as F
from pyspark.sql.types import StringType,BooleanType,DateType,IntegerType,LongType

# Data set mapping
# customer_id => unique
# policy.customerid => customer_id.id
# policy_id.id => option.policyid => 
# policy.servingagentid => life_agent.agent_no 

UNIQUE_CUSTOMERS = 2_000_000
CUSTOMER_ID_MIN_VALUE = 20_000_000

UNIQUE_POLICY = 500_000
POLICY_OPTIONS = UNIQUE_POLICY * 100

POLICY_CUSTOMER_ID_MIN_VAL = CUSTOMER_ID_MIN_VALUE
POLICY_ID_MIN_VALUE = 10_000_000
POLICY_SERVING_AGENT_ID_MIN_VALUE = 30_000_000

AGENT_CLIENT_NO_MIN = 20000000

SERVING_AGENT_ID_MIN = 12345
UNIQUE_AGENTS = 1000
AGENT_ROWS = 50_000
AGENT_NUM_MIN_VALUE = POLICY_SERVING_AGENT_ID_MIN_VALUE

OPTIONS_ROWS = 300_000
OPTIONS_ID_MIN_VALUE = 100000000000000

CUSTOMER_TRANSACTIONS = UNIQUE_CUSTOMERS * 2
interval = timedelta(days=1, hours=1)
start = datetime(1980, 1, 1, 0, 0, 0)
end = datetime(2022, 1, 1, 0, 0, 0)

spark.catalog.clearCache()  # clear cache so that if we run multiple times to check
                            # performance, we're not relying on cache
shuffle_partitions_requested = 8
partitions_requested = 8
data_rows = UNIQUE_CUSTOMERS

spark.conf.set("spark.sql.shuffle.partitions", shuffle_partitions_requested)
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 20000)

spark.catalog.clearCache()

In [None]:
cosmosEndpoint = "https://test-cosmos.documents.azure.com:443/"
cosmosMasterKey = "secret_key"
cosmosDatabaseName = "database_name"

# Configure Catalog Api to be used
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", cosmosMasterKey)

# create an Azure Cosmos DB database using catalog api
spark.sql("CREATE DATABASE IF NOT EXISTS cosmosCatalog.{};".format(cosmosDatabaseName))

def load_data_into_cosmos(container_name, parition_key_col_name,dataframe,partition_id_col_name="id"):
    create_containet_sql = f"CREATE TABLE IF NOT EXISTS cosmosCatalog.{cosmosDatabaseName}.{container_name} using cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/{parition_key_col_name}', manualThroughput = '10000')"
    cfg = {
  "spark.cosmos.accountEndpoint" : cosmosEndpoint,
  "spark.cosmos.accountKey" : cosmosMasterKey,
  "spark.cosmos.database" : cosmosDatabaseName,
  "spark.cosmos.container" : ""
    }
    cfg["spark.cosmos.container"] = container_name
    spark.sql(create_containet_sql)
    dataframe.write\
   .format("cosmos.oltp")\
   .options(**cfg)\
   .mode("APPEND")\
   .save()

In [None]:
def load_data_into_cosmos(container_name,parition_key_col_name,dataframe,partition_id_col_name="id"):
    create_containet_sql = f"CREATE TABLE IF NOT EXISTS cosmosCatalog.{cosmosDatabaseName}.{container_name} using cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/{parition_key_col_name}', manualThroughput = '10000')"
    cfg = {
  "spark.cosmos.accountEndpoint" : cosmosEndpoint,
  "spark.cosmos.accountKey" : cosmosMasterKey,
  "spark.cosmos.database" : cosmosDatabaseName,
  "spark.cosmos.container" : ""
    }
    cfg["spark.cosmos.container"] = container_name
    spark.sql(create_containet_sql)
    dataframe.write\
   .format("cosmos.oltp")\
   .options(**cfg)\
   .mode("APPEND")\
   .save()

In [None]:
def read_csv(path):
     return spark.read.option("header", True).option("inferSchema",False).csv(path).schema

In [None]:
customer_schema = read_csv("dbfs:/FileStore/PACS/customer.csv")
policy_schema = read_csv("dbfs:/FileStore/PACS/policy.csv")
agent_schema = read_csv("dbfs:/FileStore/PACS/life_agent.csv")
options_schema = read_csv("dbfs:/FileStore/PACS/productcomponentoption.csv")
# transactions_schema = read_csv("dbfs:/FileStore/PACS/Transactions-1.csv")

In [None]:
# id = policyno
# partition_key = customerid
policy_dataspec = (dg.DataGenerator(spark, rows=UNIQUE_POLICY, partitions=partitions_requested)
        .withSchema(policy_schema)
        .withColumn("tmp_num", LongType(), minValue=0x1000000000000,uniqueValues=UNIQUE_POLICY, omit=True, baseColumnType="hash")
        .withColumn("policyno", IntegerType(),minValue=POLICY_ID_MIN_VALUE, maxValue=POLICY_ID_MIN_VALUE*100,baseColumn="tmp_num")
        .withColumnSpec("customerid",prefix='LA-SG', minValue=POLICY_CUSTOMER_ID_MIN_VAL, uniqueValues=UNIQUE_CUSTOMERS, baseColumn='tmp_num' )
        .withColumn("tmp_date","date",data_range=dg.DateRange(start, end, timedelta(days=1, hours=1),datetime_format="%Y-%m-%d"),random=True,omit=True)
        .withColumnSpecs("last_updated_date",baseColumn='tmp_date')
        .withColumnSpecs("contractdate",baseColumn='tmp_date')
        .withColumnSpecs("submitteddate",baseColumn='tmp_date')
        .withColumnSpecs("inceptiondate",baseColumn='tmp_date')
        .withColumnSpecs("nonshieldinceptiondate",baseColumn='tmp_date')
        .withColumnSpecs("firstissuedate",baseColumn='tmp_date')
        .withColumnSpecs("enddate",baseColumn='tmp_date')
        .withColumnSpecs("status",values=["ACTIVE", "EXPIRED", "UNKNOWN"], random=True)
        .withColumnSpec("servingagentid",prefix='A', minValue=SERVING_AGENT_ID_MIN, uniqueValues=UNIQUE_AGENTS, baseColumn='tmp_num')
        .withColumnSpec("policyownerid", minValue=SERVING_AGENT_ID_MIN, uniqueValues=UNIQUE_AGENTS, baseColumn='tmp_num')
      )
policy_df = policy_dataspec.build().drop("id").withColumn("id",F.concat(F.lit("LA-SG_"),F.col("policyno")))
display(policy_df)

last_updated_date,datasource,policyno,customerid,totalpremium,outstandingpremium,term,sumassured,contractdate,submitteddate,inceptiondate,nonshieldinceptiondate,firstissuedate,enddate,receiveddate,deliverydate,renewalDate,canceldate,cancelreason,epolicyoption,status,sourceofbusiness,premiumstatus,branch,ispremiumholiday,nextpremiumdue,nextbillingdue,parentpolicy,scope,dispatchmethod,witnessesid,payerid,policyownerid,servingagentid,statuscode,sellingagentid,appointeeid,assigneeid,trusteeid,tpaid,dispatchdestination,dispatchaddress,validflag,proposalid,createtime,updatetime,createdby,updatedby,mandate_reference_number,mode_of_payment,surrender_value,surrender_value_date,estimated_nav,estimated_nav_date,guaranteed_value_at_maturity,guaranteed_value_at_maturity_effective_date,policy_class,policy_type,traditonal_ilp_flag,total_cashback_accumulated,total_cashback_accumulated_interest,accumulated_amount_and_interest_effective_date,is_prushield,policy_maturity_date,shield_maturity_date,exclusion_indicator,total_premium_paid,total_product_discount,secondary_payment_method,suspendedaccountbalance,uwdecision,policyno.1,id
1994-03-28,0,166502143,LA-SG_20786461,0,0,0,0,1994-03-28,1994-03-28,1994-03-28,1994-03-28,1994-03-28,1994-03-28,0,0,0,0,0,0,EXPIRED,0,0,0,0,0,0,0,0,0,0,0,12806,A_12806,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,166502143,LA-SG_166502143
1994-08-01,1,166607007,LA-SG_20891325,1,1,1,1,1994-08-01,1994-08-01,1994-08-01,1994-08-01,1994-08-01,1994-08-01,1,1,1,1,1,1,ACTIVE,1,1,1,1,1,1,1,1,1,1,1,12670,A_12670,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,166607007,LA-SG_166607007
1998-06-23,2,166499066,LA-SG_20783384,2,2,2,2,1998-06-23,1998-06-23,1998-06-23,1998-06-23,1998-06-23,1998-06-23,2,2,2,2,2,2,EXPIRED,2,2,2,2,2,2,2,2,2,2,2,12729,A_12729,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,166499066,LA-SG_166499066
2018-10-20,3,166647045,LA-SG_20931363,3,3,3,3,2018-10-20,2018-10-20,2018-10-20,2018-10-20,2018-10-20,2018-10-20,3,3,3,3,3,3,EXPIRED,3,3,3,3,3,3,3,3,3,3,3,12708,A_12708,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,166647045,LA-SG_166647045
2010-03-24,4,166740278,LA-SG_21024596,4,4,4,4,2010-03-24,2010-03-24,2010-03-24,2010-03-24,2010-03-24,2010-03-24,4,4,4,4,4,4,EXPIRED,4,4,4,4,4,4,4,4,4,4,4,12941,A_12941,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,166740278,LA-SG_166740278
2017-04-10,5,166810606,LA-SG_21094924,5,5,5,5,2017-04-10,2017-04-10,2017-04-10,2017-04-10,2017-04-10,2017-04-10,5,5,5,5,5,5,UNKNOWN,5,5,5,5,5,5,5,5,5,5,5,13269,A_13269,5,5,5,5,5,5,5,5,5,5,5,5,5,5,5,5,5,5,5,5,5,5,5,5,5,5,5,5,5,5,5,5,5,5,5,5,5,166810606,LA-SG_166810606
2008-11-26,6,166571783,LA-SG_20856101,6,6,6,6,2008-11-26,2008-11-26,2008-11-26,2008-11-26,2008-11-26,2008-11-26,6,6,6,6,6,6,ACTIVE,6,6,6,6,6,6,6,6,6,6,6,12446,A_12446,6,6,6,6,6,6,6,6,6,6,6,6,6,6,6,6,6,6,6,6,6,6,6,6,6,6,6,6,6,6,6,6,6,6,6,6,6,166571783,LA-SG_166571783
2021-10-22,7,166543149,LA-SG_20827467,7,7,7,7,2021-10-22,2021-10-22,2021-10-22,2021-10-22,2021-10-22,2021-10-22,7,7,7,7,7,7,UNKNOWN,7,7,7,7,7,7,7,7,7,7,7,12812,A_12812,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,166543149,LA-SG_166543149
2019-06-01,8,166742254,LA-SG_21026572,8,8,8,8,2019-06-01,2019-06-01,2019-06-01,2019-06-01,2019-06-01,2019-06-01,8,8,8,8,8,8,ACTIVE,8,8,8,8,8,8,8,8,8,8,8,12917,A_12917,8,8,8,8,8,8,8,8,8,8,8,8,8,8,8,8,8,8,8,8,8,8,8,8,8,8,8,8,8,8,8,8,8,8,8,8,8,166742254,LA-SG_166742254
1996-09-11,9,166769987,LA-SG_21054305,9,9,9,9,1996-09-11,1996-09-11,1996-09-11,1996-09-11,1996-09-11,1996-09-11,9,9,9,9,9,9,EXPIRED,9,9,9,9,9,9,9,9,9,9,9,12650,A_12650,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,166769987,LA-SG_166769987


In [None]:
# id = id
# parition_key = id
customer_dataspec = (dg.DataGenerator(spark, rows=UNIQUE_CUSTOMERS, partitions=partitions_requested)
        .withSchema(customer_schema)
        .withColumn("tmp_num", LongType(), minValue=0x1000000000000,uniqueValues=UNIQUE_POLICY, omit=True, baseColumnType="hash")
        .withColumn("customerid",prefix='LA-SG', minValue=POLICY_CUSTOMER_ID_MIN_VAL, uniqueValues=UNIQUE_CUSTOMERS, baseColumn='tmp_num' )
        .withColumn("tmp_date","date",data_range=dg.DateRange(start, end, timedelta(days=1, hours=1),datetime_format="%Y-%m-%d"),random=True,omit=True)
        .withColumnSpecs("last_updated_date",baseColumn='tmp_date')
        .withColumnSpec("dateofincorporation", baseColumn='tmp_date')
        .withColumnSpec("type", values=["PERSONAL", "CORPORATE", "UNKNOWN"], random=True)
        .withColumnSpec("status", values=["ACTIVE", "EXPIRED", "UNKNOWN"], random=True)
        .withColumnSpec("sex", values=["MALE", "FEMALE"], random=True)
        .withColumnSpec("firstname", template=r'\\w \\w|\\w a. \\w')
        .withColumnSpec("surname", template=r'\\w \\w|\\w a. \\w')
        .withColumnSpec("middlename", template=r'\\w \\w|\\w a. \\w')
        .withColumnSpec("idtype", values=["NRIC", "FIN","PASSPORT","OTHER"], random=True)
        .withColumnSpec("deleteflag", values=["Y", "N"], random=True)
      )
customer_df = customer_dataspec.build().drop("id").withColumnRenamed("customerid","id")
display(customer_df)

last_updated_date,datasource,name,clientid,dateofincorporation,type,status,password,attributes,firstname,surname,middlename,alternatename,dob,dateofdeath,startdate,sex,maritalstatus,salutation,language,race,role,placeofbirth,countryofbirth,countryofbirthcode,placeofdeath,countryofdeath,nationality,nationalitycode,nationalid,encryptednric,idtype,occupationcode,occupationdescription,occupationclass,customerriskscore,customersegment,upgradershortfall,eligibleforopus,taxexemptflag,cka,vip,staffflag,amlflag,createtime,updatetime,createdby,updatedby,deleteflag,id
1994-03-28,0,0,0,1994-03-28,PERSONAL,EXPIRED,0,0,velit veniam,ipsum eiusmod,proident in,0,0,0,0,FEMALE,0,0,0,0,0,0,0,0,0,0,0,0,0,0,OTHER,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,N,LA-SG_20786461
1994-08-01,1,1,1,1994-08-01,UNKNOWN,ACTIVE,1,1,deserunt non,in tempor,ea u. reprehenderit,1,1,1,1,MALE,1,1,1,1,1,1,1,1,1,1,1,1,1,1,PASSPORT,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,N,LA-SG_20891325
1998-06-23,2,2,2,1998-06-23,CORPORATE,EXPIRED,2,2,deserunt mollit,in excepteur,velit k. ipsum,2,2,2,2,FEMALE,2,2,2,2,2,2,2,2,2,2,2,2,2,2,FIN,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,N,LA-SG_20783384
2018-10-20,3,3,3,2018-10-20,CORPORATE,EXPIRED,3,3,ea sunt,veniam e. pariatur,aliqua c. quis,3,3,3,3,MALE,3,3,3,3,3,3,3,3,3,3,3,3,3,3,FIN,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,N,LA-SG_20931363
2010-03-24,4,4,4,2010-03-24,CORPORATE,EXPIRED,4,4,laboris y. sed,commodo r. dolor,dolore qui,4,4,4,4,MALE,4,4,4,4,4,4,4,4,4,4,4,4,4,4,NRIC,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,N,LA-SG_21024596
2017-04-10,5,5,5,2017-04-10,PERSONAL,UNKNOWN,5,5,anim s. magna,qui l. laborum,officia commodo,5,5,5,5,FEMALE,5,5,5,5,5,5,5,5,5,5,5,5,5,5,FIN,5,5,5,5,5,5,5,5,5,5,5,5,5,5,5,5,N,LA-SG_21094924
2008-11-26,6,6,6,2008-11-26,CORPORATE,ACTIVE,6,6,exercitation y. in,elit m. dolore,occaecat s. nostrud,6,6,6,6,MALE,6,6,6,6,6,6,6,6,6,6,6,6,6,6,PASSPORT,6,6,6,6,6,6,6,6,6,6,6,6,6,6,6,6,Y,LA-SG_20856101
2021-10-22,7,7,7,2021-10-22,CORPORATE,UNKNOWN,7,7,velit ullamco,pariatur h. cupidatat,voluptate duis,7,7,7,7,MALE,7,7,7,7,7,7,7,7,7,7,7,7,7,7,NRIC,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,Y,LA-SG_20827467
2019-06-01,8,8,8,2019-06-01,PERSONAL,ACTIVE,8,8,ut occaecat,duis aute,ullamco g. magna,8,8,8,8,FEMALE,8,8,8,8,8,8,8,8,8,8,8,8,8,8,FIN,8,8,8,8,8,8,8,8,8,8,8,8,8,8,8,8,N,LA-SG_21026572
1996-09-11,9,9,9,1996-09-11,CORPORATE,EXPIRED,9,9,aliquip v. eiusmod,mollit fugiat,fugiat dolor,9,9,9,9,MALE,9,9,9,9,9,9,9,9,9,9,9,9,9,9,FIN,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,N,LA-SG_21054305


In [None]:
# id: agent_no
# partition_key:agent_no
agent_dataspec = (dg.DataGenerator(spark, rows=AGENT_ROWS, partitions=partitions_requested)
        .withSchema(agent_schema)
        .withColumn("tmp_num", LongType(), minValue=0x1000000000000,uniqueValues=UNIQUE_POLICY, omit=True, baseColumnType="hash")
        .withColumn("agentid",prefix='LA-SG', minValue=POLICY_CUSTOMER_ID_MIN_VAL, uniqueValues=UNIQUE_CUSTOMERS, baseColumn='tmp_num' )
        .withColumnSpec("agent_no",prefix='A', minValue=SERVING_AGENT_ID_MIN, uniqueValues=UNIQUE_AGENTS, baseColumn='tmp_num')
        .withColumnSpec("agent_client_no", minValue=AGENT_CLIENT_NO_MIN, uniqueValues=UNIQUE_AGENTS, baseColumn='tmp_num')
        .withColumn("tmp_date","date",data_range=dg.DateRange(start, end, timedelta(days=1, hours=1),datetime_format="%Y-%m-%d"),random=True,omit=True)
        .withColumnSpecs("last_updated_date",baseColumn='tmp_date')
        .withColumnSpecs("appointment_date",baseColumn="tmp_date")
        .withColumnSpec("sales_agency_unit", template=r'\\w \\w|\\w a. \\w')
        .withColumnSpec("reporting_to_agent_name", template=r'\\w \\w|\\w a. \\w')
        .withColumnSpec("agency_unit_code", template=r'\\w')
        .withColumnSpec("agent_status", values=["ACTIVE", "EXPIRED", "UNKNOWN"], random=True)
      )
agent_df = agent_dataspec.build().drop("id").withColumnRenamed("agentid","id")
display(agent_df)

last_updated_date,datasource,agent_no,agent_client_no,agent_status,appointment_date,termination_date,agency_unit_code,agent_type,business_name,suspension_ind,sales_channel,sales_agency_unit,agent_licence_no,reporting_to_agent_no,reporting_to_agent_name,branch,commission_class,deleteflag,agenttype,salesunit,id
1994-03-28,0,A_12806,20000461,UNKNOWN,1994-03-28,0,minim,0,0,0,0,sit f. exercitation,0,0,ipsum ullamco,0,0,0,0,0,LA-SG_20786461
1994-08-01,1,A_12670,20000325,ACTIVE,1994-08-01,1,sed,1,1,1,1,officia u. laborum,1,1,cillum m. labore,1,1,1,1,1,LA-SG_20891325
1998-06-23,2,A_12729,20000384,UNKNOWN,1998-06-23,2,aute,2,2,2,2,velit i. cupidatat,2,2,quis non,2,2,2,2,2,LA-SG_20783384
2018-10-20,3,A_12708,20000363,EXPIRED,2018-10-20,3,consectetur,3,3,3,3,laboris j. quis,3,3,cupidatat ut,3,3,3,3,3,LA-SG_20931363
2010-03-24,4,A_12941,20000596,EXPIRED,2010-03-24,4,cillum,4,4,4,4,minim z. commodo,4,4,occaecat f. laborum,4,4,4,4,4,LA-SG_21024596
2017-04-10,5,A_13269,20000924,EXPIRED,2017-04-10,5,culpa,5,5,5,5,velit reprehenderit,5,5,exercitation voluptate,5,5,5,5,5,LA-SG_21094924
2008-11-26,6,A_12446,20000101,UNKNOWN,2008-11-26,6,sit,6,6,6,6,commodo aliquip,6,6,in fugiat,6,6,6,6,6,LA-SG_20856101
2021-10-22,7,A_12812,20000467,UNKNOWN,2021-10-22,7,culpa,7,7,7,7,laborum s. adipiscing,7,7,ex s. elit,7,7,7,7,7,LA-SG_20827467
2019-06-01,8,A_12917,20000572,EXPIRED,2019-06-01,8,excepteur,8,8,8,8,ullamco consectetur,8,8,laborum x. non,8,8,8,8,8,LA-SG_21026572
1996-09-11,9,A_12650,20000305,ACTIVE,1996-09-11,9,in,9,9,9,9,labore e. aute,9,9,enim commodo,9,9,9,9,9,LA-SG_21054305


In [None]:
options_dataspec = (dg.DataGenerator(spark, rows=OPTIONS_ROWS, partitions=partitions_requested)
            .withSchema(options_schema)
            .withColumn("tmp_num", LongType(), minValue=0x1000000000000,uniqueValues=UNIQUE_POLICY, omit=True, baseColumnType="hash")
            .withColumn("_optionsid",prefix='LA-SG', minValue=100_000_000_000_000,maxValue=100_900_000_000_000, baseColumn='tmp_num' )
            .withColumn("_policyid",prefix='LA-SG', minValue=POLICY_CUSTOMER_ID_MIN_VAL, uniqueValues=UNIQUE_CUSTOMERS, baseColumn='tmp_num' )
            .withColumnSpec("productcomponentid", template=r'\\w-\\w-\\w')
            .withColumn("tmp_date","date",data_range=dg.DateRange(start, end, timedelta(days=1, hours=1),datetime_format="%Y-%m-%d"),random=True,omit=True)
            .withColumnSpecs("last_updated_date",baseColumn='tmp_date')
            .withColumnSpecs("premiumcessdate",baseColumn='tmp_date')
            .withColumnSpecs("riskcessdate",baseColumn='tmp_date')
            .withColumnSpecs("startdate",baseColumn='tmp_date')
            .withColumnSpecs("enddate",baseColumn='tmp_date')
            .withColumnSpecs("coverage_rerate_date",baseColumn='tmp_date')
            .withColumnSpecs("coverage_rerate_from_date",baseColumn='tmp_date')
      )
options_df = options_dataspec.build().drop("policyid").drop("id").withColumnRenamed("_policyid","policyid").withColumnRenamed("_optionsid","id")
display(options_df)

datasource,life,jlife,coverage,rider,productoptionid,productcomponentid,componentcode,componentfulldescription,term,benefitterm,premiumterm,sumassured,premium,premiumcessterm,premiumcessdate,riskcessdate,commencementdate,nonshieldcommencementdate,riskcessterm,installmentpremium,mortalityclass,loadingpremium,options,status,statuscode,startdate,enddate,premiumstatus,is_multiplier_benefit,date_first_income_payout,date_last_income_payout,last_income_amount,payout_frequency,payout_reinvest_indicator,coverage_rerate_date,coverage_rerate_from_date,sum_assured_category,surrendervalue,surrenderadjustment,riderflag,createtime,updatetime,createdby,updatedby,last_updated_date,id,policyid
0,0,0,0,0,0,excepteur-duis-ullamco,0,0,0,0,0,0,0,0,1994-03-28,1994-03-28,0,0,0,0,0,0,0,0,0,1994-03-28,1994-03-28,0,0,0,0,0,0,0,1994-03-28,1994-03-28,0,0,0,0,0,0,0,0,1994-03-28,LA-SG_943367909,LA-SG_20786461
1,1,1,1,1,1,cupidatat-dolor-laborum,1,1,1,1,1,1,1,1,1994-08-01,1994-08-01,1,1,1,1,1,1,1,1,1,1994-08-01,1994-08-01,1,1,1,1,1,1,1,1994-08-01,1994-08-01,1,1,1,1,1,1,1,1,1994-08-01,LA-SG_943472773,LA-SG_20891325
2,2,2,2,2,2,sint-amet-occaecat,2,2,2,2,2,2,2,2,1998-06-23,1998-06-23,2,2,2,2,2,2,2,2,2,1998-06-23,1998-06-23,2,2,2,2,2,2,2,1998-06-23,1998-06-23,2,2,2,2,2,2,2,2,1998-06-23,LA-SG_943364832,LA-SG_20783384
3,3,3,3,3,3,minim-ut-tempor,3,3,3,3,3,3,3,3,2018-10-20,2018-10-20,3,3,3,3,3,3,3,3,3,2018-10-20,2018-10-20,3,3,3,3,3,3,3,2018-10-20,2018-10-20,3,3,3,3,3,3,3,3,2018-10-20,LA-SG_943512811,LA-SG_20931363
4,4,4,4,4,4,quis-ad-excepteur,4,4,4,4,4,4,4,4,2010-03-24,2010-03-24,4,4,4,4,4,4,4,4,4,2010-03-24,2010-03-24,4,4,4,4,4,4,4,2010-03-24,2010-03-24,4,4,4,4,4,4,4,4,2010-03-24,LA-SG_943606044,LA-SG_21024596
5,5,5,5,5,5,magna-ad-cillum,5,5,5,5,5,5,5,5,2017-04-10,2017-04-10,5,5,5,5,5,5,5,5,5,2017-04-10,2017-04-10,5,5,5,5,5,5,5,2017-04-10,2017-04-10,5,5,5,5,5,5,5,5,2017-04-10,LA-SG_943676372,LA-SG_21094924
6,6,6,6,6,6,dolore-nostrud-id,6,6,6,6,6,6,6,6,2008-11-26,2008-11-26,6,6,6,6,6,6,6,6,6,2008-11-26,2008-11-26,6,6,6,6,6,6,6,2008-11-26,2008-11-26,6,6,6,6,6,6,6,6,2008-11-26,LA-SG_943437549,LA-SG_20856101
7,7,7,7,7,7,sunt-in-culpa,7,7,7,7,7,7,7,7,2021-10-22,2021-10-22,7,7,7,7,7,7,7,7,7,2021-10-22,2021-10-22,7,7,7,7,7,7,7,2021-10-22,2021-10-22,7,7,7,7,7,7,7,7,2021-10-22,LA-SG_943408915,LA-SG_20827467
8,8,8,8,8,8,laborum-elit-voluptate,8,8,8,8,8,8,8,8,2019-06-01,2019-06-01,8,8,8,8,8,8,8,8,8,2019-06-01,2019-06-01,8,8,8,8,8,8,8,2019-06-01,2019-06-01,8,8,8,8,8,8,8,8,2019-06-01,LA-SG_943608020,LA-SG_21026572
9,9,9,9,9,9,sed-commodo-mollit,9,9,9,9,9,9,9,9,1996-09-11,1996-09-11,9,9,9,9,9,9,9,9,9,1996-09-11,1996-09-11,9,9,9,9,9,9,9,1996-09-11,1996-09-11,9,9,9,9,9,9,9,9,1996-09-11,LA-SG_943635753,LA-SG_21054305


In [None]:
print(f"\n Policy Data: {policy_df.count()} \n Customer Data: {customer_df.count()} \n Agent Data: {agent_df.count()} \n Options Data: {options_df.count()}")


 Policy Data: 500000 
 Customer Data: 2000000 
 Agent Data: 50000 
 Options Data: 300000


In [None]:
load_data_into_cosmos(container_name="policy",parition_key_col_name="customerid",dataframe=policy_df)
load_data_into_cosmos(container_name="customer",parition_key_col_name="id",dataframe=customer_df)
load_data_into_cosmos(container_name="agent",parition_key_col_name="agent_no",dataframe=agent_df)
load_data_into_cosmos(container_name="options",parition_key_col_name="policyid",dataframe=options_df)
