#SCD-Type2_Implementation

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [0]:
from pyspark.sql.functions import * 
from pyspark.sql.window import Window

In [0]:
dbutils.fs.mkdirs("/FileStore/SCD")

Out[4]: True

In [0]:
dbutils.fs.mkdirs("/FileStore/SCD/source/")
dbutils.fs.mkdirs("/FileStore/SCD/target/")


Out[5]: True

In [0]:
DATE_FORMAT = "yyyy-MM-dd"
future_date = "9999-12-31"  #some future date
source_url = "/FileStore/SCD/source/customers1.csv"
destination_url = "/FileStore/SCD/target"
primary_key = ["customerid"]
slowly_changing_cols = [ "email","phone","address", "city", "state", "zipcode"]
implementation_cols = ["effective_date","end_date","active_flag"]


#end_date -> might be NULL, to show as current record (or) -> future date.
#active_flag -> true: current_record and -> false: not current_record

In [0]:
customers_source_schema = "customerid long,firstname string, lastname string, email string, phone string, address string, city string, state string, zipcode long"

In [0]:
customers_target_schema = "customerid long,firstname string, lastname string, email string, phone string, address string, city string, state string, zipcode long, customer_skey long, effective_date date, end_date date, active_flag boolean"

#there are 4 more columns added:
"""
1.customer_skey -> for surrogate key
2.effective_date
3.end_date
4.active_flag
"""

Out[11]: '\n1.customer_skey -> for surrogate key\n2.effective_date\n3.end_date\n4.active_flag\n'

In [0]:
customers_source_df = spark.read \
.format("csv") \
.option("header",True) \
.schema(customers_source_schema) \
.load(source_url)

###customers1.csv
#####these records coming from relational database

In [0]:
customers_source_df.show(truncate=False)

+----------+-----------+--------+---------------------------+--------+-------------+---------+-----+-------+
|customerid|firstname  |lastname|email                      |phone   |address      |city     |state|zipcode|
+----------+-----------+--------+---------------------------+--------+-------------+---------+-----+-------+
|1         |John       |Doe     |johndoe@email.com          |555-1234|123 Main St  |Anytown  |CA   |12345  |
|2         |Jane       |Smith   |janesmith@email.com        |555-5678|456 Oak Ave  |Sometown |NY   |67890  |
|3         |Robert     |Johnson |robertjohnson@email.com    |555-8765|789 Pine Ln  |Othercity|TX   |34567  |
|4         |Alice      |Williams|alicewilliams@email.com    |555-4321|234 Cedar Dr |Yourtown |FL   |89012  |
|5         |Michael    |Brown   |michaelbrown@email.com     |555-9876|567 Elm Blvd |Theirtown|IL   |45678  |
|6         |Emily      |Miller  |emilymiller@email.com      |555-6543|890-Birch Rd |Newcity  |WA   |23456  |
|7         |David  

-- to move this data coming from relational-databse(source) to datawarehouse(target), \
-- we need to create Customer Dimension for which we need to have surrogate key \
-- here taking row_number() as surrogat key, which would be acting as primary key also.

In [0]:
window_def = Window.orderBy("customerid")


In [0]:
enhanced_customers_source_df = spark.read \
.format("csv") \
.option("header",True) \
.schema(customers_source_schema) \
.load(source_url) \
.withColumn("customer_skey",row_number().over(window_def)) \
.withColumn("effective_date",date_format(current_date(), DATE_FORMAT)) \
.withColumn("end_date",date_format(lit(future_date), DATE_FORMAT)) \
.withColumn("active_flag", lit(True))

#lit(): to create literal values for DataFrame operations.
#It helps in adding constant values as new columns or in expressions within transformations.

In [0]:
enhanced_customers_source_df.show(truncate=False)

+----------+-----------+--------+---------------------------+--------+-------------+---------+-----+-------+-------------+--------------+----------+-----------+
|customerid|firstname  |lastname|email                      |phone   |address      |city     |state|zipcode|customer_skey|effective_date|end_date  |active_flag|
+----------+-----------+--------+---------------------------+--------+-------------+---------+-----+-------+-------------+--------------+----------+-----------+
|1         |John       |Doe     |johndoe@email.com          |555-1234|123 Main St  |Anytown  |CA   |12345  |1            |2024-10-21    |9999-12-31|true       |
|2         |Jane       |Smith   |janesmith@email.com        |555-5678|456 Oak Ave  |Sometown |NY   |67890  |2            |2024-10-21    |9999-12-31|true       |
|3         |Robert     |Johnson |robertjohnson@email.com    |555-8765|789 Pine Ln  |Othercity|TX   |34567  |3            |2024-10-21    |9999-12-31|true       |
|4         |Alice      |Williams|a

In [0]:
#writing enhanced_customers_df to target location

enhanced_customers_source_df.write.mode('overwrite') \
.option("header",True) \
.option("delimiter",",") \
.csv(destination_url)

In [0]:
#reading target 

customers_target_df = spark.read \
.format("csv") \
.option("header",True) \
.schema(customers_target_schema) \
.load(destination_url)

In [0]:
customers_target_df.show()

+----------+-----------+--------+--------------------+--------+-------------+---------+-----+-------+-------------+--------------+----------+-----------+
|customerid|  firstname|lastname|               email|   phone|      address|     city|state|zipcode|customer_skey|effective_date|  end_date|active_flag|
+----------+-----------+--------+--------------------+--------+-------------+---------+-----+-------+-------------+--------------+----------+-----------+
|         1|       John|     Doe|   johndoe@email.com|555-1234|  123 Main St|  Anytown|   CA|  12345|            1|    2024-10-21|9999-12-31|       true|
|         2|       Jane|   Smith| janesmith@email.com|555-5678|  456 Oak Ave| Sometown|   NY|  67890|            2|    2024-10-21|9999-12-31|       true|
|         3|     Robert| Johnson|robertjohnson@ema...|555-8765|  789 Pine Ln|Othercity|   TX|  34567|            3|    2024-10-21|9999-12-31|       true|
|         4|      Alice|Williams|alicewilliams@ema...|555-4321| 234 Cedar Dr

In [0]:
customers_source_df.show()

+----------+-----------+--------+--------------------+--------+-------------+---------+-----+-------+
|customerid|  firstname|lastname|               email|   phone|      address|     city|state|zipcode|
+----------+-----------+--------+--------------------+--------+-------------+---------+-----+-------+
|         1|       John|     Doe|   johndoe@email.com|555-1234|  123 Main St|  Anytown|   CA|  12345|
|         2|       Jane|   Smith| janesmith@email.com|555-5678|  456 Oak Ave| Sometown|   NY|  67890|
|         3|     Robert| Johnson|robertjohnson@ema...|555-8765|  789 Pine Ln|Othercity|   TX|  34567|
|         4|      Alice|Williams|alicewilliams@ema...|555-4321| 234 Cedar Dr| Yourtown|   FL|  89012|
|         5|    Michael|   Brown|michaelbrown@emai...|555-9876| 567 Elm Blvd|Theirtown|   IL|  45678|
|         6|      Emily|  Miller|emilymiller@email...|555-6543| 890-Birch Rd|  Newcity|   WA|  23456|
|         7|      David|   Jones|davidjones@email.com|555-2345|678 Maple Ave| Your

In [0]:
#every time we insert new records, we would have to take maximum of surrogate key and add to it.

max_sk = customers_target_df.agg({"customer_skey": "max"}).collect()[0][0]

In [0]:
print(max_sk)

10


###customers2.csv
####the data getting changed over the time as below:
1. In 1st row, the Email address updated from email.com to gmail.com
2. In 2nd row, we have changed Phone number from "555-5678" to "555-5679"
3. In 3rd row, we have updated  city,state,zipcode
4. Inserted two new records ie. row 11 and row12
5. Deleted 10th record, ie. row 10

-updates(row 1,2,3) \
-insert(row 11,12) \
-delete(row 10) \
-unchanged(all other records)


In [0]:
#from target(DWH):had 10 records with 4 extra columns
# Now, we are dividing into 2 dataframes based on active_flag as:
"""
active_flag -> true: means this is a current record, which is going on 
=> active_customers_target_df

active_flag -> false: means this is a past record 
=> inactive_customers_target_df

"""

Out[24]: '\nactive_flag -> true: means this is a current record, which is going on \n=> active_customers_target_df\n\nactive_flag -> false: means this is a past record \n=> inactive_customers_target_df\n\n'

In [0]:
active_customers_target_df = customers_target_df.where(col("active_flag")==True)

In [0]:
inactive_customers_target_df = customers_target_df.where(col("active_flag")==False)

In [0]:
#initially now, all 10 records are active in the target - as we got those records and we uploaded them
#therefore, no results for inactive_customers_target_df.show() for now.

active_customers_target_df.show()

+----------+-----------+--------+--------------------+--------+-------------+---------+-----+-------+-------------+--------------+----------+-----------+
|customerid|  firstname|lastname|               email|   phone|      address|     city|state|zipcode|customer_skey|effective_date|  end_date|active_flag|
+----------+-----------+--------+--------------------+--------+-------------+---------+-----+-------+-------------+--------------+----------+-----------+
|         1|       John|     Doe|   johndoe@email.com|555-1234|  123 Main St|  Anytown|   CA|  12345|            1|    2024-10-21|9999-12-31|       true|
|         2|       Jane|   Smith| janesmith@email.com|555-5678|  456 Oak Ave| Sometown|   NY|  67890|            2|    2024-10-21|9999-12-31|       true|
|         3|     Robert| Johnson|robertjohnson@ema...|555-8765|  789 Pine Ln|Othercity|   TX|  34567|            3|    2024-10-21|9999-12-31|       true|
|         4|      Alice|Williams|alicewilliams@ema...|555-4321| 234 Cedar Dr

In [0]:
inactive_customers_target_df.show()

+----------+---------+--------+-----+-----+-------+----+-----+-------+-------------+--------------+--------+-----------+
|customerid|firstname|lastname|email|phone|address|city|state|zipcode|customer_skey|effective_date|end_date|active_flag|
+----------+---------+--------+-----+-----+-------+----+-----+-------+-------------+--------------+--------+-----------+
+----------+---------+--------+-----+-----+-------+----+-----+-------+-------------+--------------+--------+-----------+



####BUT, now new records (customers2.csv) come as file has changed in database.

so, now we need to get these changes done in target (for any subsequent runs, instead of writing as "overwrite"): \
-updates(row 1,2,3) \
-insert(row 11,12) \
-delete(row 10) \
-unchanged(all other records)

In [0]:
source_url = "/FileStore/SCD/source/customers2.csv"
customers_source_df = spark.read \
.format("csv") \
.option("header",True) \
.schema(customers_source_schema) \
.load(source_url)

customers_source_df.show()

+----------+-----------+--------+--------------------+--------+-------------+----------+-----+-------+
|customerid|  firstname|lastname|               email|   phone|      address|      city|state|zipcode|
+----------+-----------+--------+--------------------+--------+-------------+----------+-----+-------+
|         1|       John|     Doe|   johndoe@gmail.com|555-1234|  123 Main St|   Anytown|   CA|  12345|
|         2|       Jane|   Smith| janesmith@email.com|555-5679|  456 Oak Ave|  Sometown|   NY|  67890|
|         3|     Robert| Johnson|robertjohnson@ema...|555-8765|   123 Elm Ln|Harborcity|   FL|  87654|
|         4|      Alice|Williams|alicewilliams@ema...|555-4321| 234 Cedar Dr|  Yourtown|   FL|  89012|
|         5|    Michael|   Brown|michaelbrown@emai...|555-9876| 567 Elm Blvd| Theirtown|   IL|  45678|
|         6|      Emily|  Miller|emilymiller@email...|555-6543| 890 Birch Rd|   Newcity|   WA|  23456|
|         7|      David|   Jones|davidjones@email.com|555-2345|678 Maple 

####join active_customers_df (target dwh only active) and customers_source_df (complete source dataframe)

as whichever are deleted records, we mark as inactive - which we shouldn't consider. therefore, consider only about active records as these are the ones which has to be upodated or deleted.

####Observations:
1.If NULL's in target and not NULL in source -> insert should happen in target \
2.If not NULL in target and NULL in source -> record to be deleted in target \
3.For update records -> we have to match the columns (6 keys,ie. slowly_changing_cols) to check if there is any change
=> we can take a hash of the 6 keys (we get single big string) to identify anything changed or not.
=> If the hash key is different : we have to update
=> If the hash key in both source and target is same : then no change

In [0]:
active_customers_target_df.join(customers_source_df, "customerid" , "full_outer").show()

+----------+-----------+--------+--------------------+--------+-------------+---------+-----+-------+-------------+--------------+----------+-----------+-----------+--------+--------------------+--------+-------------+----------+-----+-------+
|customerid|  firstname|lastname|               email|   phone|      address|     city|state|zipcode|customer_skey|effective_date|  end_date|active_flag|  firstname|lastname|               email|   phone|      address|      city|state|zipcode|
+----------+-----------+--------+--------------------+--------+-------------+---------+-----+-------+-------------+--------------+----------+-----------+-----------+--------+--------------------+--------+-------------+----------+-----+-------+
|         1|       John|     Doe|   johndoe@email.com|555-1234|  123 Main St|  Anytown|   CA|  12345|            1|    2024-10-21|9999-12-31|       true|       John|     Doe|   johndoe@gmail.com|555-1234|  123 Main St|   Anytown|   CA|  12345|
|         2|       Jane|

renaming columns of target with suffix --> example: firstname_target \
renaming columns of source with suffix --> example: firstname_source


In [0]:
def column_renamer(df, suffix, append):
   
    if append:
        new_column_names = list(map(lambda x: x+suffix, df.columns)) #to add suffix to column names, if append=True
        
    else:
        new_column_names = list(map(lambda x: x.replace(suffix,""), df.columns)) #to remove suffix from column names, if append=False
        
    return df.toDF(*new_column_names)


#hash function based on 6 columns that can change
def get_hash(df, keys_list): 
 
    columns = [col(column) for column in keys_list]
    
    if columns:
        return df.withColumn("hash_md5", md5(concat_ws("", *columns))) #returns another dataframe with one extra column "hash_md5"
    else:
        return df.withColumn("hash_md5", md5(lit(1))) 


In [0]:
active_customers_target_df_hash = column_renamer(get_hash(active_customers_target_df, slowly_changing_cols), suffix="_target", append=True)
customers_source_df_hash = column_renamer(get_hash(customers_source_df, slowly_changing_cols), suffix="_source", append=True)


In [0]:
active_customers_target_df_hash.show(truncate=False)

+-----------------+----------------+---------------+---------------------------+------------+--------------+-----------+------------+--------------+--------------------+---------------------+---------------+------------------+--------------------------------+
|customerid_target|firstname_target|lastname_target|email_target               |phone_target|address_target|city_target|state_target|zipcode_target|customer_skey_target|effective_date_target|end_date_target|active_flag_target|hash_md5_target                 |
+-----------------+----------------+---------------+---------------------------+------------+--------------+-----------+------------+--------------+--------------------+---------------------+---------------+------------------+--------------------------------+
|1                |John            |Doe            |johndoe@email.com          |555-1234    |123 Main St   |Anytown    |CA          |12345         |1                   |2024-10-21           |9999-12-31     |true         

In [0]:
customers_source_df_hash.show(truncate=False)

+-----------------+----------------+---------------+---------------------------+------------+--------------+-----------+------------+--------------+--------------------------------+
|customerid_source|firstname_source|lastname_source|email_source               |phone_source|address_source|city_source|state_source|zipcode_source|hash_md5_source                 |
+-----------------+----------------+---------------+---------------------------+------------+--------------+-----------+------------+--------------+--------------------------------+
|1                |John            |Doe            |johndoe@gmail.com          |555-1234    |123 Main St   |Anytown    |CA          |12345         |8db85f2fdc7c5c1e4d3df64278be364e|
|2                |Jane            |Smith          |janesmith@email.com        |555-5679    |456 Oak Ave   |Sometown   |NY          |67890         |4b77ad952717438becad8ab634f274b5|
|3                |Robert          |Johnson        |robertjohnson@email.com    |555-8765  

In [0]:
merged_df = active_customers_target_df_hash.join(customers_source_df_hash, col("customerid_source") ==  col("customerid_target") , "full_outer") \
.withColumn("Action", when(col("hash_md5_source") == col("hash_md5_target")  , 'NOCHANGE')\
.when(col("customerid_source").isNull(), 'DELETE')\
.when(col("customerid_target").isNull(), 'INSERT')\
.otherwise('UPDATE'))

In [0]:
merged_df.show()

+-----------------+----------------+---------------+--------------------+------------+--------------+-----------+------------+--------------+--------------------+---------------------+---------------+------------------+--------------------+-----------------+----------------+---------------+--------------------+------------+--------------+-----------+------------+--------------+--------------------+--------+
|customerid_target|firstname_target|lastname_target|        email_target|phone_target|address_target|city_target|state_target|zipcode_target|customer_skey_target|effective_date_target|end_date_target|active_flag_target|     hash_md5_target|customerid_source|firstname_source|lastname_source|        email_source|phone_source|address_source|city_source|state_source|zipcode_source|     hash_md5_source|  Action|
+-----------------+----------------+---------------+--------------------+------------+--------------+-----------+------------+--------------+--------------------+----------------

In [0]:
unchanged_records = column_renamer(merged_df.filter(col("action") == 'NOCHANGE'), suffix="_target", append=False).select(active_customers_target_df.columns) #selecting columns from abov merged_df which are in target table.

In [0]:
unchanged_records.show()

+----------+-----------+--------+--------------------+--------+-------------+---------+-----+-------+-------------+--------------+----------+-----------+
|customerid|  firstname|lastname|               email|   phone|      address|     city|state|zipcode|customer_skey|effective_date|  end_date|active_flag|
+----------+-----------+--------+--------------------+--------+-------------+---------+-----+-------+-------------+--------------+----------+-----------+
|         4|      Alice|Williams|alicewilliams@ema...|555-4321| 234 Cedar Dr| Yourtown|   FL|  89012|            4|    2024-10-21|9999-12-31|       true|
|         5|    Michael|   Brown|michaelbrown@emai...|555-9876| 567 Elm Blvd|Theirtown|   IL|  45678|            5|    2024-10-21|9999-12-31|       true|
|         7|      David|   Jones|davidjones@email.com|555-2345|678 Maple Ave| Yourcity|   GA|  78901|            7|    2024-10-21|9999-12-31|       true|
|         8|      Sarah|Anderson|sarahanderson@ema...|555-5432|  901 Pine St

In [0]:
insert_records = column_renamer(merged_df.filter(col("action") == 'INSERT'), suffix="_source", append=False) \
                .select(customers_source_df.columns) \
                .withColumn("row_number",row_number().over(window_def)) \
                .withColumn("customer_skey",col("row_number")+ max_sk) \
                .withColumn("effective_date",date_format(current_date(),DATE_FORMAT)) \
                .withColumn("end_date",date_format(lit(future_date),DATE_FORMAT)) \
                .withColumn("active_flag", lit(True)) \
                .drop("row_number") #deleted as was for internal purpose

insert_records.show()

#.withColumn("customer_skey",col("row_number")+ max_sk)  -> max row_number would be our surrogate key
#.drop("row_number") -> deleted as was for internal purpose#

+----------+---------+--------+--------------------+--------+------------+----------+-----+-------+-------------+--------------+----------+-----------+
|customerid|firstname|lastname|               email|   phone|     address|      city|state|zipcode|customer_skey|effective_date|  end_date|active_flag|
+----------+---------+--------+--------------------+--------+------------+----------+-----+-------+-------------+--------------+----------+-----------+
|        11|    Grace|  Turner|graceturner@email...|555-1122|  567 Oak St|  Cityview|   CA|  98765|           11|    2024-10-21|9999-12-31|       true|
|        12|   Connor|   Evans|connorevans@email...|555-2233|890 Pine Ave|Townsville|   TX|  54321|           12|    2024-10-21|9999-12-31|       true|
+----------+---------+--------+--------------------+--------+------------+----------+-----+-------+-------------+--------------+----------+-----------+



In [0]:
#recalculagte max surrogate key
max_sk = insert_records.agg({"customer_skey": "max"}).collect()[0][0]

In [0]:
print(max_sk)

12


In [0]:
update_records = column_renamer(merged_df.filter(col("action") == 'UPDATE'), suffix="_target", append=False)\
                .select(active_customers_target_df.columns)\
                .withColumn("end_date", date_format(current_date(),DATE_FORMAT))\
                .withColumn("active_flag", lit(False))\
            .unionByName(
            column_renamer(merged_df.filter(col("action") == 'UPDATE'), suffix="_source", append=False)\
                .select(customers_source_df.columns)\
                .withColumn("effective_date",date_format(current_date(),DATE_FORMAT))\
                .withColumn("end_date",date_format(lit(future_date),DATE_FORMAT))\
                .withColumn("row_number",row_number().over(window_def))\
                .withColumn("customer_skey",col("row_number")+ max_sk)\
                .withColumn("active_flag", lit(True))\
                .drop("row_number")
                )
            
#when insert:
# end_date the previous record -> make end_date as current_date for previous record
# insert a new record -> where start_date will be current date and  end_date will be future date

#now as to insert new record - unionByName
"""
unionByName() method is used to combine two DataFrames while aligning their columns by name. This is particularly useful when the DataFrames have different column orders or when one DataFrame has additional columns that the other does not.
"""



Out[44]: '\nunionByName() method is used to combine two DataFrames while aligning their columns by name. This is particularly useful when the DataFrames have different column orders or when one DataFrame has additional columns that the other does not.\n'

In [0]:
update_records.show()

+----------+---------+--------+--------------------+--------+------------+----------+-----+-------+-------------+--------------+----------+-----------+
|customerid|firstname|lastname|               email|   phone|     address|      city|state|zipcode|customer_skey|effective_date|  end_date|active_flag|
+----------+---------+--------+--------------------+--------+------------+----------+-----+-------+-------------+--------------+----------+-----------+
|         1|     John|     Doe|   johndoe@email.com|555-1234| 123 Main St|   Anytown|   CA|  12345|            1|    2024-10-21|2024-10-21|      false|
|         2|     Jane|   Smith| janesmith@email.com|555-5678| 456 Oak Ave|  Sometown|   NY|  67890|            2|    2024-10-21|2024-10-21|      false|
|         3|   Robert| Johnson|robertjohnson@ema...|555-8765| 789 Pine Ln| Othercity|   TX|  34567|            3|    2024-10-21|2024-10-21|      false|
|         6|    Emily|  Miller|emilymiller@email...|555-6543|890-Birch Rd|   Newcity|   

In [0]:
max_sk = update_records.agg({"customer_skey": "max"}).collect()[0][0]

In [0]:
print(max_sk)

16


In [0]:
delete_records = column_renamer(merged_df.filter(col("action") == 'DELETE'), suffix="_target", append=False)\
                .select(active_customers_target_df.columns)\
                .withColumn("end_date", date_format(current_date(),DATE_FORMAT))\
                .withColumn("active_flag", lit(False))

delete_records.show()

+----------+---------+--------+--------------------+--------+-------------+--------+-----+-------+-------------+--------------+----------+-----------+
|customerid|firstname|lastname|               email|   phone|      address|    city|state|zipcode|customer_skey|effective_date|  end_date|active_flag|
+----------+---------+--------+--------------------+--------+-------------+--------+-----+-------+-------------+--------------+----------+-----------+
|        10|   Olivia|   Clark|oliviaclark@email...|555-3456|567-Cedar-Ave|Thatcity|   TN|  67890|           10|    2024-10-21|2024-10-21|      false|
+----------+---------+--------+--------------------+--------+-------------+--------+-----+-------+-------------+--------------+----------+-----------+



In [0]:
resultant_df = inactive_customers_target_df \
            .unionByName(unchanged_records)\
            .unionByName(insert_records)\
            .unionByName(update_records)\
            .unionByName(delete_records)

resultant_df.show()

#keeping all inactive_customers_target and those 4 things we calculated (unchanged_records,insert_records,update_records,delete_records)

+----------+-----------+--------+--------------------+--------+-------------+----------+-----+-------+-------------+--------------+----------+-----------+
|customerid|  firstname|lastname|               email|   phone|      address|      city|state|zipcode|customer_skey|effective_date|  end_date|active_flag|
+----------+-----------+--------+--------------------+--------+-------------+----------+-----+-------+-------------+--------------+----------+-----------+
|         4|      Alice|Williams|alicewilliams@ema...|555-4321| 234 Cedar Dr|  Yourtown|   FL|  89012|            4|    2024-10-21|9999-12-31|       true|
|         5|    Michael|   Brown|michaelbrown@emai...|555-9876| 567 Elm Blvd| Theirtown|   IL|  45678|            5|    2024-10-21|9999-12-31|       true|
|         7|      David|   Jones|davidjones@email.com|555-2345|678 Maple Ave|  Yourcity|   GA|  78901|            7|    2024-10-21|9999-12-31|       true|
|         8|      Sarah|Anderson|sarahanderson@ema...|555-5432|  901 P