In [0]:
from pyspark.sql import DataFrameWriter
import pandas as pd
from pyspark.sql.functions import *

In [0]:
dbutils.secrets.listScopes()

[SecretScope(name='scope31dec')]

In [0]:
dbutils.secrets.list(scope = 'scope31dec')

[SecretMetadata(key='blobkeysecret31dec')]

In [0]:
spark.conf.set("fs.azure.account.key.healthcareblob31dec.dfs.core.windows.net",
    dbutils.secrets.get(scope="scope31dec", key="blobkeysecret31dec"))

In [0]:
display(dbutils.fs.ls("abfss://rawdata@healthcareblob31dec.dfs.core.windows.net"))

path,name,size,modificationTime
abfss://rawdata@healthcareblob31dec.dfs.core.windows.net/Patient_records.csv,Patient_records.csv,5110,1704010961000
abfss://rawdata@healthcareblob31dec.dfs.core.windows.net/claims.json,claims.json,16385,1704027744000
abfss://rawdata@healthcareblob31dec.dfs.core.windows.net/disease.csv,disease.csv,1489,1704010960000
abfss://rawdata@healthcareblob31dec.dfs.core.windows.net/group.csv,group.csv,4390,1704010960000
abfss://rawdata@healthcareblob31dec.dfs.core.windows.net/hospital.csv,hospital.csv,1328,1704010960000
abfss://rawdata@healthcareblob31dec.dfs.core.windows.net/subgroup.csv,subgroup.csv,561,1704010960000
abfss://rawdata@healthcareblob31dec.dfs.core.windows.net/subscriber.csv,subscriber.csv,12061,1704010960000


In [0]:
claims_data = spark.read.json("abfss://rawdata@healthcareblob31dec.dfs.core.windows.net/claims.json")

In [0]:
#claims_data.show(5, False)

+-----------------+----------+--------------------------+------------+----------+--------+----------------+--------------+----------+
|Claim_Or_Rejected|SUB_ID    |_id                       |claim_amount|claim_date|claim_id|claim_type      |disease_name  |patient_id|
+-----------------+----------+--------------------------+------------+----------+--------+----------------+--------------+----------+
|N                |SUBID1000 |{65912332935ad3481f59f5e6}|79874       |1949-03-14|0       |claims of value |Galactosemia  |187158    |
|NaN              |SUBID10001|{65912332935ad3481f59f5e7}|151142      |1970-03-16|1       |claims of policy|Bladder cancer|112766    |
|NaN              |SUBID10002|{65912332935ad3481f59f5e8}|59924       |2008-02-03|2       |claims of value |Kidney cancer |199252    |
|NaN              |SUBID10003|{65912332935ad3481f59f5e9}|143120      |1995-02-08|3       |claims of fact  |Suicide       |133424    |
|Y                |SUBID10004|{65912332935ad3481f59f5ea}|16863

In [0]:
claims_data = claims_data.drop(col('_id'))

In [0]:
claims_data.show(5, False)

+-----------------+----------+------------+----------+--------+----------------+--------------+----------+
|Claim_Or_Rejected|SUB_ID    |claim_amount|claim_date|claim_id|claim_type      |disease_name  |patient_id|
+-----------------+----------+------------+----------+--------+----------------+--------------+----------+
|N                |SUBID1000 |79874       |1949-03-14|0       |claims of value |Galactosemia  |187158    |
|NaN              |SUBID10001|151142      |1970-03-16|1       |claims of policy|Bladder cancer|112766    |
|NaN              |SUBID10002|59924       |2008-02-03|2       |claims of value |Kidney cancer |199252    |
|NaN              |SUBID10003|143120      |1995-02-08|3       |claims of fact  |Suicide       |133424    |
|Y                |SUBID10004|168634      |1967-05-23|4       |claims of value |Food allergy  |172579    |
+-----------------+----------+------------+----------+--------+----------------+--------------+----------+
only showing top 5 rows



In [0]:
claims_data = claims_data.replace('NaN', None)

In [0]:
claims_data = claims_data.fillna({"Claim_Or_Rejected" : "N"})

In [0]:
claims_data.select([count(when(isnan(c) | col(c).isNull(), c)). alias(c) for c in claims_data.columns]).show()

+-----------------+------+------------+----------+--------+----------+------------+----------+
|Claim_Or_Rejected|SUB_ID|claim_amount|claim_date|claim_id|claim_type|disease_name|patient_id|
+-----------------+------+------------+----------+--------+----------+------------+----------+
|                0|     0|           0|         0|       0|         0|           0|         0|
+-----------------+------+------------+----------+--------+----------+------------+----------+



In [0]:
claims_data.groupBy(claims_data.columns).count().where("count > 1").show()

+-----------------+------+------------+----------+--------+----------+------------+----------+-----+
|Claim_Or_Rejected|SUB_ID|claim_amount|claim_date|claim_id|claim_type|disease_name|patient_id|count|
+-----------------+------+------------+----------+--------+----------+------------+----------+-----+
+-----------------+------+------------+----------+--------+----------+------------+----------+-----+



In [0]:
claims_data.dropDuplicates()

DataFrame[Claim_Or_Rejected: string, SUB_ID: string, claim_amount: string, claim_date: string, claim_id: bigint, claim_type: string, disease_name: string, patient_id: bigint]

In [0]:
output_container_path = "abfss://stagingdata@healthcareblob31dec.dfs.core.windows.net"
output_blob_folder = "stagingdata/"
claims_data.coalesce(1).write.mode("overwrite").option("header" , "true"). format("com.databricks.spark.csv").save(output_blob_folder)
files = dbutils.fs.ls(output_blob_folder)
outputfile = [x for x in files if x.name.startswith("part-")]
dbutils.fs.mv(outputfile[0].path, "%s/claims_data_stage.csv"% output_container_path)

True