<a href="https://colab.research.google.com/github/jharna-dohotia/my_Repo/blob/master/Swiss_RE_Data_engineer_Task.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Swiss RE Data Engineer Task

In [2]:
import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark= SparkSession \
       .builder \
       .appName("Our First Spark Example") \
       .getOrCreate()

spark

#Importing necessary libraries

In [98]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, StringType, FloatType, DateType, DecimalType
from datetime import datetime
import requests
import json
from pyspark.sql.functions import udf, col, explode
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType
from pyspark.sql import Row

# Data Ingestion

Assumptions:
Assuming data is stored in a Database named - **Europa** with
Tables:
*   Contract
*   Claim

We would perform the following spark operations:

contract = spark.sql("select * from europa.contract")

claims = spark.sql("select * from europa.claim")

However, to illustrate the transformations, I have chosen to read the supplied data into manually created dataframes



Reading Data for **Contract**

In [212]:
contract_schema = StructType([StructField("SOURCE_SYSTEM",StringType(),nullable=False),
                              StructField("CONTRACT_ID", IntegerType() ,nullable = False),
                              StructField("CONTRACT_TYPE",StringType(),nullable=True),
                              StructField("INSURED_PERIOD_FROM",StringType(),nullable=True),
                              StructField ("INSURED_PERIOD_TO", StringType(), nullable=True),
                              StructField("CREATION_DATE",StringType(),nullable=True)])

contract_data = [("Contract_SR_Europa_3", 408124123, "Direct" , "01.01.2015", "01.01.2099", "17.01.2022 13:42"),
 ("Contract_SR_Europa_3", 46784575, "Direct", "01.01.2015", "01.01.2099", "17.01.2022 13:42"),
 ("Contract_SR_Europa_3",97563756,"", "01.01.2015", "01.01.2099", "17.01.2022 13:42"),
  ("Contract_SR_Europa_3", 13767503, "Reinsurance", "01.01.2015", "01.01.2099","17.01.2022 13:42"),
  ("Contract_SR_Europa_3", 656948536,"", "01.01.2015", "01.01.2099", "17.01.2022 13:42")]

contract = spark.createDataFrame(contract_data, contract_schema)
contract.show()

+--------------------+-----------+-------------+-------------------+-----------------+----------------+
|       SOURCE_SYSTEM|CONTRACT_ID|CONTRACT_TYPE|INSURED_PERIOD_FROM|INSURED_PERIOD_TO|   CREATION_DATE|
+--------------------+-----------+-------------+-------------------+-----------------+----------------+
|Contract_SR_Europa_3|  408124123|       Direct|         01.01.2015|       01.01.2099|17.01.2022 13:42|
|Contract_SR_Europa_3|   46784575|       Direct|         01.01.2015|       01.01.2099|17.01.2022 13:42|
|Contract_SR_Europa_3|   97563756|             |         01.01.2015|       01.01.2099|17.01.2022 13:42|
|Contract_SR_Europa_3|   13767503|  Reinsurance|         01.01.2015|       01.01.2099|17.01.2022 13:42|
|Contract_SR_Europa_3|  656948536|             |         01.01.2015|       01.01.2099|17.01.2022 13:42|
+--------------------+-----------+-------------+-------------------+-----------------+----------------+



Cleaning up the **Contract** data and casting to correct data types

In [213]:
contract = contract.withColumn("INSURED_PERIOD_FROM",to_date(col("INSURED_PERIOD_FROM"),'dd.MM.yyyy').cast("date"))\
.withColumn("INSURED_PERIOD_TO",to_date(col("INSURED_PERIOD_TO"), "dd.MM.yyyy").cast("date"))\
.withColumn("CREATION_DATE", to_timestamp(col('CREATION_DATE'), "dd.MM.yyyy HH:mm").cast("timestamp"))
contract.show()

+--------------------+-----------+-------------+-------------------+-----------------+-------------------+
|       SOURCE_SYSTEM|CONTRACT_ID|CONTRACT_TYPE|INSURED_PERIOD_FROM|INSURED_PERIOD_TO|      CREATION_DATE|
+--------------------+-----------+-------------+-------------------+-----------------+-------------------+
|Contract_SR_Europa_3|  408124123|       Direct|         2015-01-01|       2099-01-01|2022-01-17 13:42:00|
|Contract_SR_Europa_3|   46784575|       Direct|         2015-01-01|       2099-01-01|2022-01-17 13:42:00|
|Contract_SR_Europa_3|   97563756|             |         2015-01-01|       2099-01-01|2022-01-17 13:42:00|
|Contract_SR_Europa_3|   13767503|  Reinsurance|         2015-01-01|       2099-01-01|2022-01-17 13:42:00|
|Contract_SR_Europa_3|  656948536|             |         2015-01-01|       2099-01-01|2022-01-17 13:42:00|
+--------------------+-----------+-------------+-------------------+-----------------+-------------------+



In [214]:
for i in contract.schema:
  print(i)

StructField('SOURCE_SYSTEM', StringType(), False)
StructField('CONTRACT_ID', IntegerType(), False)
StructField('CONTRACT_TYPE', StringType(), True)
StructField('INSURED_PERIOD_FROM', DateType(), True)
StructField('INSURED_PERIOD_TO', DateType(), True)
StructField('CREATION_DATE', TimestampType(), True)


Reading Data for **Claims**

In [215]:
claim_schema = StructType([StructField("SOURCE_SYSTEM",StringType(),nullable=False),
                              StructField("CLAIM_ID", StringType() ,nullable = False),
                              StructField("CONTRACT_SOURCE_SYSTEM",StringType(),nullable = False),
                              StructField("CONTRACT_ID",IntegerType(),nullable=False),
                              StructField("CLAIM_TYPE", StringType(), nullable=True),
                              StructField("DATE_OF_LOSS",StringType(),nullable=True),
                              StructField("AMOUNT",StringType(),nullable=True),
                              StructField("CREATION_DATE",StringType(),nullable=True)])

claim_data = [("Claim_SR_Europa_3","CL_68545123","Contract_SR_Europa_3",97563756,"2","14.02.2021",523.21,"17.01.2022 14:45"),
                 ("Claim_SR_Europa_3","CL_962234","Contract_SR_Europa_4",408124123,"1","30.01.2021",52369.0,"17.01.2022 14:46"),
                 ("Claim_SR_Europa_3","CL_895168","Contract_SR_Europa_3",13767503,"","02.09.2020",98465,"17.01.2022 14:45"),
                 ("Claim_SR_Europa_3","CX_12066501","Contract_SR_Europa_3",656948536,"2","04.01.2022",9000,"17.01.2022 14:45"),
                 ("Claim_SR_Europa_3","RX_9845163","Contract_SR_Europa_3",656948536,"2","04.06.2015",11000,"17.01.2022 14:45"),
                 ("Claim_SR_Europa_3","CL_39904634","Contract_SR_Europa_3",656948536,"2","04.11.2020",11000,"17.01.2022 14:46"),
                 ("Claim_SR_Europa_3","U_7065313","Contract_SR_Europa_3",46589516,"1","29.09.2021",11000,"17.01.2022 14:46")
]

claim = spark.createDataFrame(claim_data, claim_schema)
claim.show()

+-----------------+-----------+----------------------+-----------+----------+------------+-------+----------------+
|    SOURCE_SYSTEM|   CLAIM_ID|CONTRACT_SOURCE_SYSTEM|CONTRACT_ID|CLAIM_TYPE|DATE_OF_LOSS| AMOUNT|   CREATION_DATE|
+-----------------+-----------+----------------------+-----------+----------+------------+-------+----------------+
|Claim_SR_Europa_3|CL_68545123|  Contract_SR_Europa_3|   97563756|         2|  14.02.2021| 523.21|17.01.2022 14:45|
|Claim_SR_Europa_3|  CL_962234|  Contract_SR_Europa_4|  408124123|         1|  30.01.2021|52369.0|17.01.2022 14:46|
|Claim_SR_Europa_3|  CL_895168|  Contract_SR_Europa_3|   13767503|          |  02.09.2020|  98465|17.01.2022 14:45|
|Claim_SR_Europa_3|CX_12066501|  Contract_SR_Europa_3|  656948536|         2|  04.01.2022|   9000|17.01.2022 14:45|
|Claim_SR_Europa_3| RX_9845163|  Contract_SR_Europa_3|  656948536|         2|  04.06.2015|  11000|17.01.2022 14:45|
|Claim_SR_Europa_3|CL_39904634|  Contract_SR_Europa_3|  656948536|      

Cleaning up the **Claim** data and casting to correct data types

In [216]:
claim = claim.withColumn("DATE_OF_LOSS",to_date(col("DATE_OF_LOSS"),'dd.MM.yyyy').cast("date"))\
.withColumn("AMOUNT",col("AMOUNT").cast("decimal(16,5)"))\
.withColumn("CREATION_DATE", to_timestamp(col('CREATION_DATE'), "dd.MM.yyyy HH:mm").cast("timestamp"))
claim.show()

+-----------------+-----------+----------------------+-----------+----------+------------+-----------+-------------------+
|    SOURCE_SYSTEM|   CLAIM_ID|CONTRACT_SOURCE_SYSTEM|CONTRACT_ID|CLAIM_TYPE|DATE_OF_LOSS|     AMOUNT|      CREATION_DATE|
+-----------------+-----------+----------------------+-----------+----------+------------+-----------+-------------------+
|Claim_SR_Europa_3|CL_68545123|  Contract_SR_Europa_3|   97563756|         2|  2021-02-14|  523.21000|2022-01-17 14:45:00|
|Claim_SR_Europa_3|  CL_962234|  Contract_SR_Europa_4|  408124123|         1|  2021-01-30|52369.00000|2022-01-17 14:46:00|
|Claim_SR_Europa_3|  CL_895168|  Contract_SR_Europa_3|   13767503|          |  2020-09-02|98465.00000|2022-01-17 14:45:00|
|Claim_SR_Europa_3|CX_12066501|  Contract_SR_Europa_3|  656948536|         2|  2022-01-04| 9000.00000|2022-01-17 14:45:00|
|Claim_SR_Europa_3| RX_9845163|  Contract_SR_Europa_3|  656948536|         2|  2015-06-04|11000.00000|2022-01-17 14:45:00|
|Claim_SR_Europa

Renaming CLAIM.CREATION_DATE to avoid ambiguity

In [217]:
claim = claim.withColumnRenamed("CREATION_DATE","CLAIM_CREATION_DATE")

In [218]:
for i in claim.schema:
  print(i)

StructField('SOURCE_SYSTEM', StringType(), False)
StructField('CLAIM_ID', StringType(), False)
StructField('CONTRACT_SOURCE_SYSTEM', StringType(), False)
StructField('CONTRACT_ID', IntegerType(), False)
StructField('CLAIM_TYPE', StringType(), True)
StructField('DATE_OF_LOSS', DateType(), True)
StructField('AMOUNT', DecimalType(16,5), True)
StructField('CLAIM_CREATION_DATE', TimestampType(), True)


#Transformations

Assuming:
1. Joining condition is on CONTRACT_ID
2. Performing a outer join to get all available contracts and claims unless specified otherwise



In [219]:
transformations = contract.join(claim,[contract.CONTRACT_ID == claim.CONTRACT_ID],'outer').drop(claim["CONTRACT_ID"])

Mappings

In [220]:
#CONTRACT_SOURCE_SYSTEM = Default to “Europe 3”
transformations = transformations.withColumn("CONTRACT_SOURCE_SYSTEM",lit('Europe 3'))

#CONTRACT_SOURCE_SYSTEM_ID = CONTRACT.CONTRACT_ID
transformations = transformations.withColumn("CONTRACT_SOURCE_SYSTEM_ID",col("CONTRACT_ID").cast("long"))

#SOURCE_SYSTEM_ID = CLAIM_ID without prefix (e.g. A_123 --> 123)
#Assumption - assuming prefix is a string followed by an underscore '_' and selecting the latter half as SOURCE_SYSTEM_ID
transformations = transformations.withColumn("SOURCE_SYSTEM_ID",split(col("CLAIM_ID"),'_')[1].cast("integer"))

#TRANSACTION_TYPE - "Corporate" if CLAIM_TYPE = 2 "Private" if CLAIM_TYPE = 1 "Unknown" if CLAIM_TYPE is empty
transformations = transformations.withColumn("TRANSACTION_TYPE",when(transformations.CLAIM_TYPE==2,"Corporate")\
                                             .when(transformations.CLAIM_TYPE==1,"Private")\
                                             .otherwise("Unknown"))

In [221]:
#TRANSACTION_DIRECTION - "COINSURANCE" when CLAIM_ID contains "CL" prefix "REINSURANCE" when CLAIM_ID contains "RX" prefix
transformations = transformations.withColumn("TRANSACTION_DIRECTION",when(split(col("CLAIM_ID"),'_')[0]=="CL","COINSURANCE")\
                                             .when(split(col("CLAIM_ID"),'_')[0]=="RX","REINSURANCE")
                                             .otherwise('NULL'))

In [222]:
#CONFORMED_VALUE - CLAIM.AMOUNT
transformations = transformations.withColumn("CONFORMED_VALUE",col("AMOUNT"))
#BUSINESS_DATE - CLAIM.DATEOFLOSS mapped to format YYYY-MM-DD
#Note: CLAIM.DATEOFLOSS already mapped to correct format during datatype cleanup
transformations = transformations.withColumn("BUSINESS_DATE",col('DATE_OF_LOSS'))

In [223]:
 #CREATION_DATE = CLAIM.CREATION_DATE mapped to format YYYY-MM-DD HH:mm:ss
 #Already mapped to correct format during datatype cleanup
 transformations = transformations.withColumn("CREATION_DATE",claim["CLAIM_CREATION_DATE"]).drop(contract["CREATION_DATE"]).drop(claim["CLAIM_CREATION_DATE"])

 #SYSTEM_TIMESTAMP System timestamp of creating the transaction
 transformations = transformations.withColumn("SYSTEM_TIMESTAMP",date_format(current_timestamp(), 'yyyy-MM-dd HH:mm:ss').cast("timestamp"))

In [224]:
#NSE_ID Map -- CLAIM_ID to a unique id via rest API call for every input row: https://api.hashify.net/hash/md4/hex?value=CLAIM_ID
## (Returns a JSON, take field "Digest" as result)

#Creating function to execute restAPI for each CLAIM_ID
def executeRestApi(CLAIM_ID):
  #
  res = None
  # Make API request, get response object back, get "Digest" field from result text.
  try:
    res = requests.get("https://api.hashify.net/hash/md4/hex?value={}".format(CLAIM_ID))
  except Exception as e:
    return e
  if res != None and res.status_code == 200:
    return json.loads(res.text)['Digest']
  return None

#Converting function exectueRestApi to UDF
udf_executeRestApi = udf(lambda claim_id: executeRestApi(claim_id))

#Mapping CLAIM_ID to get NSE_ID
transformations = transformations.withColumn("NSE_ID",udf_executeRestApi(col('CLAIM_ID')))

In [225]:
transformations.show()

+--------------------+-----------+-------------+-------------------+-----------------+-------------------+-----------------+-----------+----------------------+----------+------------+-----------+-------------------------+----------------+----------------+---------------------+---------------+-------------+-------------------+--------------------+
|       SOURCE_SYSTEM|CONTRACT_ID|CONTRACT_TYPE|INSURED_PERIOD_FROM|INSURED_PERIOD_TO|      CREATION_DATE|    SOURCE_SYSTEM|   CLAIM_ID|CONTRACT_SOURCE_SYSTEM|CLAIM_TYPE|DATE_OF_LOSS|     AMOUNT|CONTRACT_SOURCE_SYSTEM_ID|SOURCE_SYSTEM_ID|TRANSACTION_TYPE|TRANSACTION_DIRECTION|CONFORMED_VALUE|BUSINESS_DATE|   SYSTEM_TIMESTAMP|              NSE_ID|
+--------------------+-----------+-------------+-------------------+-----------------+-------------------+-----------------+-----------+----------------------+----------+------------+-----------+-------------------------+----------------+----------------+---------------------+---------------+---------

Generating Output as per specified schema

In [226]:
transformations = transformations.select(['CONTRACT_SOURCE_SYSTEM',
                                 'CONTRACT_SOURCE_SYSTEM_ID',
                                 'SOURCE_SYSTEM_ID',
                                 'TRANSACTION_TYPE',
                                 'TRANSACTION_DIRECTION',
                                 'CONFORMED_VALUE',
                                 'BUSINESS_DATE',
                                 'CREATION_DATE',
                                 'SYSTEM_TIMESTAMP',
                                 'NSE_ID'])

In [227]:
#Adding metadata as specified in output schema
transformations = transformations.withMetadata('CONTRACT_SOURCE_SYSTEM', {'primary_key': 'true'})\
.withMetadata('CONTRACT_SOURCE_SYSTEM_ID', {'primary_key': 'true'})\
.withMetadata('NSE_ID', {'primary_key': 'true'})

#Changing nullable state of output columns as per output schema
# Change column metadata
fields = [StructField(field.name,
          field.dataType,
          True) if field.name!='NSE_ID' else StructField(field.name,
          field.dataType,
          False) for field in transformations.schema.fields ]

# Store changed data frame in new_schema
new_schema = StructType(fields)
transformations = spark.createDataFrame(transformations.rdd,
                           new_schema)
transformations.printSchema()

root
 |-- CONTRACT_SOURCE_SYSTEM: string (nullable = true)
 |-- CONTRACT_SOURCE_SYSTEM_ID: long (nullable = true)
 |-- SOURCE_SYSTEM_ID: integer (nullable = true)
 |-- TRANSACTION_TYPE: string (nullable = true)
 |-- TRANSACTION_DIRECTION: string (nullable = true)
 |-- CONFORMED_VALUE: decimal(16,5) (nullable = true)
 |-- BUSINESS_DATE: date (nullable = true)
 |-- CREATION_DATE: timestamp (nullable = true)
 |-- SYSTEM_TIMESTAMP: timestamp (nullable = true)
 |-- NSE_ID: string (nullable = false)



In [228]:
transformations.show()

+----------------------+-------------------------+----------------+----------------+---------------------+---------------+-------------+-------------------+-------------------+--------------------+
|CONTRACT_SOURCE_SYSTEM|CONTRACT_SOURCE_SYSTEM_ID|SOURCE_SYSTEM_ID|TRANSACTION_TYPE|TRANSACTION_DIRECTION|CONFORMED_VALUE|BUSINESS_DATE|      CREATION_DATE|   SYSTEM_TIMESTAMP|              NSE_ID|
+----------------------+-------------------------+----------------+----------------+---------------------+---------------+-------------+-------------------+-------------------+--------------------+
|              Europe 3|                 13767503|          895168|         Unknown|          COINSURANCE|    98465.00000|   2020-09-02|2022-01-17 14:45:00|2024-06-16 14:07:46|01ffa3820d8f2d942...|
|              Europe 3|                     NULL|         7065313|         Private|                 NULL|    11000.00000|   2021-09-29|2022-01-17 14:46:00|2024-06-16 14:07:46|22e72db452e973fec...|
|         

# Output File

Choosing parquet as the output file to store the data and metadata

In [None]:
transformations.write.parquet("TRANSFORMATIONS.parquet")

In [232]:
output = spark.read.parquet("TRANSFORMATIONS.parquet")

In [234]:
output.show()

+----------------------+-------------------------+----------------+----------------+---------------------+---------------+-------------+-------------------+-------------------+--------------------+
|CONTRACT_SOURCE_SYSTEM|CONTRACT_SOURCE_SYSTEM_ID|SOURCE_SYSTEM_ID|TRANSACTION_TYPE|TRANSACTION_DIRECTION|CONFORMED_VALUE|BUSINESS_DATE|      CREATION_DATE|   SYSTEM_TIMESTAMP|              NSE_ID|
+----------------------+-------------------------+----------------+----------------+---------------------+---------------+-------------+-------------------+-------------------+--------------------+
|              Europe 3|                 13767503|          895168|         Unknown|          COINSURANCE|    98465.00000|   2020-09-02|2022-01-17 14:45:00|2024-06-16 14:07:46|01ffa3820d8f2d942...|
|              Europe 3|                     NULL|         7065313|         Private|                 NULL|    11000.00000|   2021-09-29|2022-01-17 14:46:00|2024-06-16 14:07:46|22e72db452e973fec...|
|         