In [25]:
import pyspark
from pyspark.sql.types import *
import pyspark.sql.functions as f 
import requests
import json
from pyspark.sql import SparkSession

spark=SparkSession.\
    builder.\
    appName("Transactions").\
    getOrCreate()


claim_schema=StructType([
    StructField("SOURCE_SYSTEM", StringType()),
    StructField("CLAIM_ID", StringType()),
    StructField("CONTRACT_SOURCE_SYSTEM", StringType()),
    StructField("CONTRACT_ID", IntegerType()),
    StructField("CLAIM_TYPE", StringType()),
    StructField("DATE_OF_LOSS", StringType()),
    StructField("AMOUNT", DecimalType(16,5)),
    StructField("CREATION_DATE", StringType()),
])

contract_schema=StructType([
    StructField("SOURCE_SYSTEM", StringType()),
    StructField("CONTRACT_ID", StringType()),
    StructField("CONTRACT_TYPE", StringType()),
    StructField("INSURED_PERIOD_FROM", StringType()),
    StructField("INSURED_PERIOD_TO", StringType()),
    StructField("CREATION_DATE", StringType())
])

def read_csv(file_name,schema):
    df=(spark.read.option("header",True).csv(file_name,schema=schema))
    return(df)

def convert_timestamp(df,col_ts):
    df=(df.withColumn(col_ts,f.to_timestamp(f.trim(f.col(col_ts)),"dd.MM.yyyy HH:mm")))
    return(df)
def convert_date(df,col_date):
    df=df.withColumn(col_date,f.to_date(f.trim(f.col(col_date)),"dd.MM.yyyy"))
    return(df)

def get_nse_id(claim_id):
    try:
        response=requests.get(f"https://api.hashify.net/hash/md4/hex?value={claim_id}")
        data = json.loads(response.text)
        return(data['Digest'])
    except Exception as e:
        return e


3.5.0


In [2]:
claim_df=read_csv("claim.csv",claim_schema)

contract_df=read_csv("contract.csv",contract_schema)

In [3]:
col_ts="CREATION_DATE"
col_date="DATE_OF_LOSS"

claim_df=convert_timestamp(claim_df,"CREATION_DATE")
claim_df=convert_date(claim_df,"DATE_OF_LOSS")
claim_df.show()

+-----------------+-----------+----------------------+-----------+----------+------------+-----------+-------------------+
|    SOURCE_SYSTEM|   CLAIM_ID|CONTRACT_SOURCE_SYSTEM|CONTRACT_ID|CLAIM_TYPE|DATE_OF_LOSS|     AMOUNT|      CREATION_DATE|
+-----------------+-----------+----------------------+-----------+----------+------------+-----------+-------------------+
|Claim_SR_Europa_3|CL_68545123|  Contract_SR_Europa_3|   97563756|         2|  2021-02-14|  523.21000|2022-01-17 14:45:00|
|Claim_SR_Europa_3|  CL_962234|  Contract_SR_Europa_4|  408124123|         1|  2021-01-30|52369.00000|2022-01-17 14:46:00|
|Claim_SR_Europa_3|  CL_895168|  Contract_SR_Europa_3|   13767503|      NULL|  2020-09-02|98465.00000|2022-01-17 14:45:00|
|Claim_SR_Europa_3|CX_12066501|  Contract_SR_Europa_3|  656948536|         2|  2022-01-04| 9000.00000|2022-01-17 14:45:00|
|Claim_SR_Europa_3| RX_9845163|  Contract_SR_Europa_3|  656948536|         2|  2015-06-04|11000.00000|2022-01-17 14:45:00|
|Claim_SR_Europa

In [6]:
contract_df=convert_timestamp(contract_df,"CREATION_DATE")
contract_df=convert_date(contract_df,"INSURED_PERIOD_FROM")
contract_df=convert_date(contract_df,"INSURED_PERIOD_TO")
contract_df=contract_df.withColumn("CONTRACT_ID",f.col("CONTRACT_ID").cast(IntegerType()))
contract_df.show()

+--------------------+-----------+-------------+-------------------+-----------------+-------------------+
|       SOURCE_SYSTEM|CONTRACT_ID|CONTRACT_TYPE|INSURED_PERIOD_FROM|INSURED_PERIOD_TO|      CREATION_DATE|
+--------------------+-----------+-------------+-------------------+-----------------+-------------------+
|Contract_SR_Europa_3|  408124123|       Direct|         2015-01-01|       2099-01-01|2022-01-17 13:42:00|
|Contract_SR_Europa_3|   46784575|       Direct|         2015-01-01|       2099-01-01|2022-01-17 13:42:00|
|Contract_SR_Europa_3|   97563756|             |         2015-01-01|       2099-01-01|2022-01-17 13:42:00|
|Contract_SR_Europa_3|   13767503|  Reinsurance|         2015-01-01|       2099-01-01|2022-01-17 13:42:00|
|Contract_SR_Europa_3|  656948536|             |         2015-01-01|       2099-01-01|2022-01-17 13:42:00|
+--------------------+-----------+-------------+-------------------+-----------------+-------------------+



In [15]:

get_nse_id_udf=f.udf(get_nse_id, StringType())

final_df=(claim_df.alias('c')
.join(contract_df.alias('co'),on=['CONTRACT_ID'],how='inner')
.selectExpr("'Europe 3' as CONTRACT_SOURCE_SYSTEM",
    "co.CONTRACT_ID as CONTRACT_SOURCE_SYSTEM_ID",
    "CLAIM_ID",
    "split(CLAIM_ID,'_')[1] as SOURCE_SYSTEM_ID",
    """case 
    when CLAIM_TYPE = 2 then 'Corporate'
    when CLAIM_TYPE = 1 then 'Private'
    else 'Unknown' end as TRANSACTION_TYPE""",
    """case
    when CLAIM_ID like 'CL%' then 'COINSURANCE'
    when CLAIM_ID like 'RX%' then 'REINSURANCE'
    end as TRANSACTION_DIRECTION""",
    "c.AMOUNT as CONFORMED_VALUE",
    "c.DATE_OF_LOSS as BUSINESS_DATE",
    "c.CREATION_DATE as CREATION_DATE",
    "current_date() as SYSTEM_TIMESTAMP"
    )
.withColumn("NSE_ID",get_nse_id_udf(f.col('CLAIM_ID')))
)

In [16]:
final_df.show()

+----------------------+-------------------------+-----------+----------------+----------------+---------------------+---------------+-------------+-------------------+----------------+
|CONTRACT_SOURCE_SYSTEM|CONTRACT_SOURCE_SYSTEM_ID|   CLAIM_ID|SOURCE_SYSTEM_ID|TRANSACTION_TYPE|TRANSACTION_DIRECTION|CONFORMED_VALUE|BUSINESS_DATE|      CREATION_DATE|SYSTEM_TIMESTAMP|
+----------------------+-------------------------+-----------+----------------+----------------+---------------------+---------------+-------------+-------------------+----------------+
|              Europe 3|                 97563756|CL_68545123|        68545123|       Corporate|          COINSURANCE|      523.21000|   2021-02-14|2022-01-17 14:45:00|      2024-06-19|
|              Europe 3|                408124123|  CL_962234|          962234|         Private|          COINSURANCE|    52369.00000|   2021-01-30|2022-01-17 14:46:00|      2024-06-19|
|              Europe 3|                 13767503|  CL_895168|        