In [26]:

import findspark
findspark.init()
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [27]:

file_location = "C:/Users/Dinesh_2/Desktop/Patient_Dashboard_Active.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df_vw = spark.read.format(file_type) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .option("encoding", "UTF-8") \
  .load(file_location)


In [28]:
df_tgt=df_vw.select('Patient','DOB','SSN','Address').distinct()
df_src=df_vw.select('Patient','DOB','SSN','Address').distinct()
df_tgt.show()

+--------------------+----------+-----------+--------------------+
|             Patient|       DOB|        SSN|             Address|
+--------------------+----------+-----------+--------------------+
|     BARRIENTOS, EVA|05/22/1933|619-12-5310|8203 TELEGRAPH RD...|
|  BOWIE, REGINALD D.|10/06/1955|270-38-5318|7716 PICKERING AV...|
|  CANDELARIA, VICTOR|10/29/1961|       null|11500 DOLAN AVENU...|
|  CANDELAS, REYNALDO|11/12/1953|       null|12120 CHANDLER BL...|
|      CANTA, ARMINDA|08/31/1934|       null|3401 LEMON ST. RI...|
|        CANTU, ELISA|10/07/1945|       null|1717 S WINERY AVE...|
|  CASTRO, JENNIE (I)|06/20/1931|       null|12111 CHANDLER BL...|
|       CHAO, FIDELIA|12/16/1940|549-58-9561|9925 LA ALAMEDA A...|
|    COPE, LETCHER H.|08/18/1924|       null|6817 N. ROWELL AV...|
|  COTTER, JACQUELINE|04/18/1963|564-17-8110|5400 STINE RD. BA...|
|      CROCHET, CHASE|02/25/1991|       null|35 VERNON NEWPORT...|
|   DAMICO, DENNIS B.|06/23/1945|564-64-2141|413 E. CYPRESS GL

In [29]:

df_tgt = df_tgt.withColumnRenamed('DOB','Date of Birth').withColumnRenamed('Address','ADDR')
df_tgt

DataFrame[Patient: string, Date of Birth: string, SSN: string, ADDR: string]

In [30]:

df_tgt = df_tgt.fillna({'ADDR':'NA', 'SSN' : 'NA'})
df_src = df_src.fillna({'Address':'NA', 'SSN' : 'NA'})
df_tgt.show()

+--------------------+-------------+-----------+--------------------+
|             Patient|Date of Birth|        SSN|                ADDR|
+--------------------+-------------+-----------+--------------------+
|     BARRIENTOS, EVA|   05/22/1933|619-12-5310|8203 TELEGRAPH RD...|
|  BOWIE, REGINALD D.|   10/06/1955|270-38-5318|7716 PICKERING AV...|
|  CANDELARIA, VICTOR|   10/29/1961|         NA|11500 DOLAN AVENU...|
|  CANDELAS, REYNALDO|   11/12/1953|         NA|12120 CHANDLER BL...|
|      CANTA, ARMINDA|   08/31/1934|         NA|3401 LEMON ST. RI...|
|        CANTU, ELISA|   10/07/1945|         NA|1717 S WINERY AVE...|
|  CASTRO, JENNIE (I)|   06/20/1931|         NA|12111 CHANDLER BL...|
|       CHAO, FIDELIA|   12/16/1940|549-58-9561|9925 LA ALAMEDA A...|
|    COPE, LETCHER H.|   08/18/1924|         NA|6817 N. ROWELL AV...|
|  COTTER, JACQUELINE|   04/18/1963|564-17-8110|5400 STINE RD. BA...|
|      CROCHET, CHASE|   02/25/1991|         NA|35 VERNON NEWPORT...|
|   DAMICO, DENNIS B

In [31]:
from pyspark.sql import functions as F
df_tgt=df_tgt.withColumn('DW_CREATED_DATE',F.current_timestamp()).withColumn('DW_UPDATED_DATE',F.lit('1900-01-01 00:00'))
df_tgt.show()

+--------------------+-------------+-----------+--------------------+--------------------+----------------+
|             Patient|Date of Birth|        SSN|                ADDR|     DW_CREATED_DATE| DW_UPDATED_DATE|
+--------------------+-------------+-----------+--------------------+--------------------+----------------+
|     BARRIENTOS, EVA|   05/22/1933|619-12-5310|8203 TELEGRAPH RD...|2020-06-29 11:41:...|1900-01-01 00:00|
|  BOWIE, REGINALD D.|   10/06/1955|270-38-5318|7716 PICKERING AV...|2020-06-29 11:41:...|1900-01-01 00:00|
|  CANDELARIA, VICTOR|   10/29/1961|         NA|11500 DOLAN AVENU...|2020-06-29 11:41:...|1900-01-01 00:00|
|  CANDELAS, REYNALDO|   11/12/1953|         NA|12120 CHANDLER BL...|2020-06-29 11:41:...|1900-01-01 00:00|
|      CANTA, ARMINDA|   08/31/1934|         NA|3401 LEMON ST. RI...|2020-06-29 11:41:...|1900-01-01 00:00|
|        CANTU, ELISA|   10/07/1945|         NA|1717 S WINERY AVE...|2020-06-29 11:41:...|1900-01-01 00:00|
|  CASTRO, JENNIE (I)|   06/

In [32]:
#df_src.select('Patient','Address').filter(F.col('Patient')=='ABLE, SHARI').show()
df_src=df_src.withColumn('Address',F.when(F.col('Patient')=='ABLE, SHARI','ABC').otherwise(F.col('Address')))
df_src.select('Patient','Address').filter(F.col('Patient')=='ABLE, SHARI').show()


+-----------+-------+
|    Patient|Address|
+-----------+-------+
|ABLE, SHARI|    ABC|
+-----------+-------+



In [33]:
df_tgt=df_tgt.withColumnRenamed('Patient','Patient_tgt').withColumnRenamed('ADDR','ADDR_tgt').withColumnRenamed('SSN','SSN_tgt')
df_src=df_src.withColumnRenamed('Patient','Patient_src').withColumnRenamed('Address','ADDR_src').withColumnRenamed('SSN','SSN_src').withColumnRenamed('DOB','DOB_src')
df_tgt

DataFrame[Patient_tgt: string, Date of Birth: string, SSN_tgt: string, ADDR_tgt: string, DW_CREATED_DATE: timestamp, DW_UPDATED_DATE: string]

In [34]:

df1 = df_tgt.join(df_src.select('Patient_src',"DOB_src",'SSN_src','ADDR_src'), df_tgt.Patient_tgt == df_src.Patient_src, how = 'fullouter')
         

df1=df1.withColumn('Action',F.when((df1.Patient_src.isNotNull()) & (df1.Patient_tgt.isNotNull()) & (df1.ADDR_src!=df1.ADDR_tgt),'Update')\
                   .when((df1.Patient_src.isNull()) | (df1.Patient_tgt.isNull()),'Insert')\
                   .otherwise('No Action'))
df1.select('Patient_tgt','Date of Birth','SSN_tgt','Addr_tgt','ADDR_src','Action').show()                   
                             


+--------------------+-------------+-----------+--------------------+--------------------+---------+
|         Patient_tgt|Date of Birth|    SSN_tgt|            Addr_tgt|            ADDR_src|   Action|
+--------------------+-------------+-----------+--------------------+--------------------+---------+
|  ALCALA, MICHAEL A.|   03/06/1965|554-57-9553|20902 DENKER AVE....|20902 DENKER AVE....|No Action|
| BAUTISTA, ANDREA M.|   02/04/1936|         NA|11500 DOLAN AVE D...|11500 DOLAN AVE D...|No Action|
|      BROWN, DEBORAH|   02/04/1952|         NA|1046 E. LANCASTER...|1046 E. LANCASTER...|No Action|
|        CITI, SHARON|   12/27/1961|548-45-4769|240 EAST 6TH STRE...|240 EAST 6TH STRE...|No Action|
|   EBIHARA, MITCHELL|   05/06/1958|         NA|7242 WESTMINSTER ...|7242 WESTMINSTER ...|No Action|
|   ENGLANDER, BINNIE|   07/03/1944|         NA|12229 CHANDLER BL...|12229 CHANDLER BL...|No Action|
|   ENGLANDER, BINNIE|   07/03/1944|         NA|12229 CHANDLER BL...|12229 CHANDLER BL...| 

In [None]:
# SCD Type 1
df_insert=df1.filter(df1.Action=='Insert').select('Patient_src','DOB_src','SSN_src','ADDR_src')

df_update=df1.filter(df1.Action=='Update').select('Patient_tgt','Date of Birth','SSN_tgt','ADDR_src','DW_CREATED_DATE')

df_noaction=df1.filter(df1.Action=='No Action').select('Patient_tgt','Date of Birth','SSN_tgt','ADDR_tgt','DW_CREATED_DATE','DW_UPDATED_DATE')

#df_noaction.show()
#df_update.show()

In [None]:
from pyspark.sql.functions import lit
from pyspark.sql.functions import unix_timestamp, from_unixtime
from pyspark.sql import functions as F

#Add the Audit columns in Upsert and insert dataframe
df_insert = df_insert.withColumn('DW_CREATED_DATE',F.current_timestamp()).withColumn('DW_UPDATED_DATE',F.lit('1900-01-01 00:00'))
df_update = df_update.withColumn('DW_UPDATED_DATE',F.current_timestamp())


In [None]:
df_final=df_noaction.union(df_insert)
df_final=df_final.union(df_update)
df_final.show()

In [35]:
df_final.select('Patient_tgt','ADDR_tgt','DW_UPDATED_DATE').filter(F.col('Patient_tgt')=='ABLE, SHARI').show()

+-----------+--------+--------------------+
|Patient_tgt|ADDR_tgt|     DW_UPDATED_DATE|
+-----------+--------+--------------------+
|ABLE, SHARI|     ABC|2020-06-29 11:47:...|
+-----------+--------+--------------------+



In [36]:
#scd type 2
df_tgt=df_tgt.withColumn('Active',F.lit(1))
df_tgt.show()

+--------------------+-------------+-----------+--------------------+--------------------+----------------+------+
|         Patient_tgt|Date of Birth|    SSN_tgt|            ADDR_tgt|     DW_CREATED_DATE| DW_UPDATED_DATE|Active|
+--------------------+-------------+-----------+--------------------+--------------------+----------------+------+
|     BARRIENTOS, EVA|   05/22/1933|619-12-5310|8203 TELEGRAPH RD...|2020-06-29 11:52:...|1900-01-01 00:00|     1|
|  BOWIE, REGINALD D.|   10/06/1955|270-38-5318|7716 PICKERING AV...|2020-06-29 11:52:...|1900-01-01 00:00|     1|
|  CANDELARIA, VICTOR|   10/29/1961|         NA|11500 DOLAN AVENU...|2020-06-29 11:52:...|1900-01-01 00:00|     1|
|  CANDELAS, REYNALDO|   11/12/1953|         NA|12120 CHANDLER BL...|2020-06-29 11:52:...|1900-01-01 00:00|     1|
|      CANTA, ARMINDA|   08/31/1934|         NA|3401 LEMON ST. RI...|2020-06-29 11:52:...|1900-01-01 00:00|     1|
|        CANTU, ELISA|   10/07/1945|         NA|1717 S WINERY AVE...|2020-06-29 

In [37]:
df2 = df_tgt.join(df_src.select('Patient_src',"DOB_src",'SSN_src','ADDR_src'), df_tgt.Patient_tgt == df_src.Patient_src, how = 'fullouter')
         

df2=df2.withColumn('Action',F.when((df2.Patient_src.isNotNull()) & (df2.Patient_tgt.isNotNull()) & (df2.ADDR_src!=df2.ADDR_tgt),'Upsert')\
                   .when((df2.Patient_src.isNull()) | (df2.Patient_tgt.isNull()),'Insert')\
                   .when(df2.Active==0,'Inactive')
                   .otherwise('No Action'))
df2.select('Patient_tgt','Date of Birth','SSN_tgt','Addr_tgt','ADDR_src','Active','Action').show()     

+--------------------+-------------+-----------+--------------------+--------------------+------+---------+
|         Patient_tgt|Date of Birth|    SSN_tgt|            Addr_tgt|            ADDR_src|Active|   Action|
+--------------------+-------------+-----------+--------------------+--------------------+------+---------+
|  ALCALA, MICHAEL A.|   03/06/1965|554-57-9553|20902 DENKER AVE....|20902 DENKER AVE....|     1|No Action|
| BAUTISTA, ANDREA M.|   02/04/1936|         NA|11500 DOLAN AVE D...|11500 DOLAN AVE D...|     1|No Action|
|      BROWN, DEBORAH|   02/04/1952|         NA|1046 E. LANCASTER...|1046 E. LANCASTER...|     1|No Action|
|        CITI, SHARON|   12/27/1961|548-45-4769|240 EAST 6TH STRE...|240 EAST 6TH STRE...|     1|No Action|
|   EBIHARA, MITCHELL|   05/06/1958|         NA|7242 WESTMINSTER ...|7242 WESTMINSTER ...|     1|No Action|
|   ENGLANDER, BINNIE|   07/03/1944|         NA|12229 CHANDLER BL...|12229 CHANDLER BL...|     1|No Action|
|   ENGLANDER, BINNIE|   07/

In [38]:

df2_insert=df2.filter(df2.Action=='Insert').select('Patient_src','DOB_src','SSN_src','ADDR_src')

df2_upsert=df2.filter(df2.Action=='Upsert').select('Patient_tgt','Date of Birth','SSN_tgt','ADDR_src')



In [39]:
df_final2=df_tgt.join(df2.select('Patient_src','DOB_src','SSN_src','ADDR_src','Action'), (df_tgt.Patient_tgt == df2.Patient_src) & (df2.Active==1), how = 'left_outer')
df_final2.show()

+-------------------+-------------+-----------+--------------------+--------------------+----------------+------+-------------------+----------+-----------+--------------------+---------+
|        Patient_tgt|Date of Birth|    SSN_tgt|            ADDR_tgt|     DW_CREATED_DATE| DW_UPDATED_DATE|Active|        Patient_src|   DOB_src|    SSN_src|            ADDR_src|   Action|
+-------------------+-------------+-----------+--------------------+--------------------+----------------+------+-------------------+----------+-----------+--------------------+---------+
| ALCALA, MICHAEL A.|   03/06/1965|554-57-9553|20902 DENKER AVE....|2020-06-29 11:55:...|1900-01-01 00:00|     1| ALCALA, MICHAEL A.|03/06/1965|554-57-9553|20902 DENKER AVE....|No Action|
|BAUTISTA, ANDREA M.|   02/04/1936|         NA|11500 DOLAN AVE D...|2020-06-29 11:55:...|1900-01-01 00:00|     1|BAUTISTA, ANDREA M.|02/04/1936|         NA|11500 DOLAN AVE D...|No Action|
|     BROWN, DEBORAH|   02/04/1952|         NA|1046 E. LANCA

In [40]:
df_final2.select('Patient_tgt','Date of Birth','SSN_tgt','ADDR_tgt','ADDR_src','Active','Action').filter(F.col('Patient_tgt')=='ABLE, SHARI').show()

+-----------+-------------+---------+--------------------+--------+------+------+
|Patient_tgt|Date of Birth|  SSN_tgt|            ADDR_tgt|ADDR_src|Active|Action|
+-----------+-------------+---------+--------------------+--------+------+------+
|ABLE, SHARI|   06/06/1939|566488725|225 North Crescen...|     ABC|     1|Upsert|
+-----------+-------------+---------+--------------------+--------+------+------+



In [41]:
df_final2 = df_final2.withColumn('Active',F.when((df_final2.Action == 'Upsert') | (df_final2.Action == 'Inactive'),0).otherwise(df_final2.Active)).withColumn('DW_UPDATED_DATE',F.when((df_final2.Action == 'Upsert') | (df_final2.Action == 'Inactive'),F.current_timestamp()).otherwise(df_final2.DW_UPDATED_DATE))
df_final2= df_final2.select('Patient_tgt','Date of Birth','SSN_tgt','ADDR_tgt','Active','DW_CREATED_DATE','DW_UPDATED_DATE')
df_final2.show()

+-------------------+-------------+-----------+--------------------+------+--------------------+--------------------+
|        Patient_tgt|Date of Birth|    SSN_tgt|            ADDR_tgt|Active|     DW_CREATED_DATE|     DW_UPDATED_DATE|
+-------------------+-------------+-----------+--------------------+------+--------------------+--------------------+
| ALCALA, MICHAEL A.|   03/06/1965|554-57-9553|20902 DENKER AVE....|     1|2020-06-29 11:56:...|    1900-01-01 00:00|
|BAUTISTA, ANDREA M.|   02/04/1936|         NA|11500 DOLAN AVE D...|     1|2020-06-29 11:56:...|    1900-01-01 00:00|
|     BROWN, DEBORAH|   02/04/1952|         NA|1046 E. LANCASTER...|     1|2020-06-29 11:56:...|    1900-01-01 00:00|
|       CITI, SHARON|   12/27/1961|548-45-4769|240 EAST 6TH STRE...|     1|2020-06-29 11:56:...|    1900-01-01 00:00|
|  EBIHARA, MITCHELL|   05/06/1958|         NA|7242 WESTMINSTER ...|     1|2020-06-29 11:56:...|    1900-01-01 00:00|
|  ENGLANDER, BINNIE|   07/03/1944|         NA|12229 CHA

In [42]:

df2_insert=df2_insert.union(df2_upsert)
df2_insert = df2_insert.withColumn('ACTIVE',F.lit(1)).withColumn('DW_CREATED_DATE',F.current_timestamp()) \
.withColumn('DW_UPDATED_DATE',F.lit('1900-01-01 00:00'))


df2_insert.show()

+--------------------+----------+-----------+--------------------+------+--------------------+----------------+
|         Patient_src|   DOB_src|    SSN_src|            ADDR_src|ACTIVE|     DW_CREATED_DATE| DW_UPDATED_DATE|
+--------------------+----------+-----------+--------------------+------+--------------------+----------------+
|   ENGLANDER, BINNIE|07/03/1944|         NA|12229 CHANDLER BL...|     1|2020-06-29 11:58:...|1900-01-01 00:00|
|   ENGLANDER, BINNIE|07/03/1944|559-60-2149|12229 CHANDLER BL...|     1|2020-06-29 11:58:...|1900-01-01 00:00|
|          KIM, HELEN|09/15/1939|563-33-6990|2441 W Orangethro...|     1|2020-06-29 11:58:...|1900-01-01 00:00|
|          KIM, HELEN|09/15/1939|         NA|2441 ORANGETHROPE...|     1|2020-06-29 11:58:...|1900-01-01 00:00|
|  CROUCH, MICHAEL D.|03/10/1973|572-49-1949|6025 PINE AVE. MA...|     1|2020-06-29 11:58:...|1900-01-01 00:00|
|  CROUCH, MICHAEL D.|03/10/1973|572-49-1949|630 W SEPULVEDA B...|     1|2020-06-29 11:58:...|1900-01-01

In [43]:
df_final2=df_final2.union(df2_insert)
df_final2.show()

+-------------------+-------------+-----------+--------------------+------+--------------------+--------------------+
|        Patient_tgt|Date of Birth|    SSN_tgt|            ADDR_tgt|Active|     DW_CREATED_DATE|     DW_UPDATED_DATE|
+-------------------+-------------+-----------+--------------------+------+--------------------+--------------------+
| ALCALA, MICHAEL A.|   03/06/1965|554-57-9553|20902 DENKER AVE....|     1|2020-06-29 11:59:...|    1900-01-01 00:00|
|BAUTISTA, ANDREA M.|   02/04/1936|         NA|11500 DOLAN AVE D...|     1|2020-06-29 11:59:...|    1900-01-01 00:00|
|     BROWN, DEBORAH|   02/04/1952|         NA|1046 E. LANCASTER...|     1|2020-06-29 11:59:...|    1900-01-01 00:00|
|       CITI, SHARON|   12/27/1961|548-45-4769|240 EAST 6TH STRE...|     1|2020-06-29 11:59:...|    1900-01-01 00:00|
|  EBIHARA, MITCHELL|   05/06/1958|         NA|7242 WESTMINSTER ...|     1|2020-06-29 11:59:...|    1900-01-01 00:00|
|  ENGLANDER, BINNIE|   07/03/1944|         NA|12229 CHA

In [44]:
df_final2.filter(F.col('Patient_tgt')=='ABLE, SHARI').show()

+-----------+-------------+---------+--------------------+------+--------------------+--------------------+
|Patient_tgt|Date of Birth|  SSN_tgt|            ADDR_tgt|Active|     DW_CREATED_DATE|     DW_UPDATED_DATE|
+-----------+-------------+---------+--------------------+------+--------------------+--------------------+
|ABLE, SHARI|   06/06/1939|566488725|225 North Crescen...|     0|2020-06-29 12:00:...|2020-06-29 12:00:...|
|ABLE, SHARI|   06/06/1939|566488725|                 ABC|     1|2020-06-29 12:00:...|    1900-01-01 00:00|
+-----------+-------------+---------+--------------------+------+--------------------+--------------------+

