# Change Data Capture




#### This notebook implements the CDC process for the Libraries table. Here are the steps in the process:  

1.   Make a copy of the CSV file so we can revert back if needed `library_copy.csv`

2.   Simulate changes (inserts, updates, and deletes) by manually altering `library.csv`:

**Inserts**: Added 3 records to the file:<br>
`{9216,"124 AMBER ST. ALICE, WY 82590-7841"}` <br>
`{9217,"734 COAST PARK THOMAS, WY 82364-2134"}` <br>
`{9218,"820 BLOSSOM ROAD MOORE, WY 82003-5235"}` <br>

**Updates**: removed M from zip4 and replaced with NA <br>

**Deletes**: removed the twenty-four records where county was `DEKALB`:<br>
`{177,"DEKALB"}` <br>
`{178,"DEKALB"}` <br>
`{179,"DEKALB"}` <br>
`{181,"DEKALB"}` <br>
`{264,"DEKALB"}` <br>
`{269,"DEKALB"}` <br>
`{296,"DEKALB"}` <br>
`{1090,"DEKALB"}` <br>
`{1912,"DEKALB"}` <br>
`{1980,"DEKALB"}` <br>
`{2040,"DEKALB"}` <br>
`{2084,"DEKALB"}` <br>
`{2242,"DEKALB"}` <br>
`{2250,"DEKALB"}` <br>
`{2261,"DEKALB"}` <br>
`{2281,"DEKALB"}` <br>
`{2323,"DEKALB"}` <br>
`{2381,"DEKALB"}` <br>
`{2394,"DEKALB"}` <br>
`{2477,"DEKALB"}` <br>
`{2478,"DEKALB"}` <br>
`{2479,"DEKALB"}` <br>
`{2480,"DEKALB"}` <br>
`{7664,"DEKALB"}` <br>
<br>

4.   Rename `air_carriers.csv` to `air_carriers_032324.csv`, to indicate the date in which the changes arrived.

5.   Make a new folder in the GCS bucket called `incrementals`.

6.   Copy `libraries_032329.csv` into our `incrementals` folder in GCS.

7.   Make a copy of the raw table in case we need to revert back to it (`libraries_copy`).

8.   Make a new dataset in BigQuery to hold loading tables (`books_ldg`).

9.   Load `libraries_032329.csv` into the loading area (`books_ldg.libraries_032329`).

10.   Detect the deltas between `books_ldg.libraries_032329` and `books_raw.libraries`:
- If the new record is the same as the old one, ignore it because it means that it's unchanged.
- If the new record is different from the old record, update the old record in the raw table such that it matches the new record in the loading table.
- If the old record in the raw table does not have a corresponding new record in the loading table, delete it from the raw table.  

11.   Make a copy of staging table (`books_stg.libraries_copy`) so we can use it for comparison.

12.   Re-create the staging table `books_stg.libraries` by applying the same cleansing, modeling, and validation logic as before. The logic in staging remains the same as before.

13.   Make a copy of target table (`books_csp.libraries_copy`) so we can use it for comparison.

14.  Once the staging table is ready, merge it into the target table (`books_csp.libraries_copy`) as follows:

- If the record in staging is the same as the record in the target table, ignore it.
- If the record in staging is different from the record in the target table, set the `discontinue_time` of the existing record in the target table and insert the new record into the target table. The `effective_time` of the new record should be equal to the current timestamp and the `discontinue_time` of the old record should be equal to the current timestamp - 1 second.
-If a record in the target table does not have a record in staging, set its `discontinue_time` to current timestamp - 1 second.  


The rest of this notebook implements the steps 7-14. Note: steps 1-6 were done by hand.  

### Create backup of the raw table (step 7)

### **Don't re-run this cell after mutating the raw table!**

In [1]:
%%bigquery
create or replace table books_raw.libraries_copy as
  select * from books_raw.libraries

Query is running:   0%|          |

### Create the loading dataset (step 8)

In [2]:
%%bigquery
create schema if not exists books_ldg
  options(location="us")

Query is running:   0%|          |

### Create and populate the loading table (step 9)

In [3]:
from google.cloud import storage
from google.cloud import bigquery

project_id = "aaron-nayoung"
bucket_name = "book_data_p0"
folder_name = "incrementals"
dataset_name = "books_ldg"
region = "us"
file_name = 'libraries_032329.csv'
table_name = 'libraries_032329'

storage_client = storage.Client()
bq_client = bigquery.Client()

schema = [
    bigquery.SchemaField("library_id", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("stabr", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("libname", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("address", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("city", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("zip", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("zip4", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("cnty", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("full_address", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("load_time", "TIMESTAMP", mode="REQUIRED", default_value_expression="CURRENT_TIMESTAMP"),
]

uri = "gs://{}/{}/{}".format(bucket_name, folder_name, file_name)
table_id = "{}.{}.{}".format(project_id, dataset_name, table_name)

table = bigquery.Table(table_id, schema=schema)
table = bq_client.create_table(table, exists_ok=True)
print("Created table {}".format(table.table_id))

# remove the load_time field from the schema before loading the data,
# the load_time value will be auto-generated
del schema[-1]

job_config = bigquery.LoadJobConfig(
      schema=schema,
      skip_leading_rows=1,
      source_format=bigquery.SourceFormat.CSV,
      write_disposition="WRITE_TRUNCATE",
      field_delimiter=","
    )

load_job = bq_client.load_table_from_uri(uri, table_id, job_config=job_config)
load_job.result()

destination_table = bq_client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))


Created table libraries_032329
Loaded 9194 rows.


### Detect the deltas and refresh the raw table (step 10)

#### Identify the deltas

In [9]:
%%bigquery
SELECT t.library_id, r.cnty AS old_county, t.cnty AS new_county
FROM books_ldg.libraries_032329 t
FULL JOIN books_raw.libraries r ON t.library_id = r.library_id
WHERE r.cnty != t.cnty
OR t.cnty IS NULL OR r.cnty IS NULL
ORDER BY t.library_id


Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,library_id,old_county,new_county
0,,DEKALB,
1,,DEKALB,
2,,DEKALB,
3,,DEKALB,
4,,DEKALB,
5,,DEKALB,
6,,DEKALB,
7,,DEKALB,
8,,DEKALB,
9,,DEKALB,


#### Process new records (inserts)

In [10]:
%%bigquery
select (select count(*) from books_ldg.libraries_032329) as ldg_count,
(select count(*) from books_raw.libraries) as raw_count

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,ldg_count,raw_count
0,9194,9215


In [11]:
%%bigquery
INSERT INTO books_raw.libraries (library_id, stabr, libname, address, city, zip, zip4, cnty, full_address, load_time)
SELECT *
FROM books_ldg.libraries_032329
WHERE library_id NOT IN (SELECT library_id FROM books_raw.libraries)

Query is running:   0%|          |

In [12]:
%%bigquery
select (select count(*) from books_ldg.libraries_032329) as ldg_count,
(select count(*) from books_raw.libraries) as raw_count

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,ldg_count,raw_count
0,9194,9218


#### Process the changed records (updates)

In [13]:
%%bigquery
SELECT t.library_id, t.zip4 AS new_zip4,
       r.zip4 AS old_zip4,
FROM books_ldg.libraries_032329 t
FULL JOIN books_raw.libraries r ON t.library_id = r.library_id
WHERE r.zip4 != t.zip4
   OR r.zip4 IS NULL OR t.zip4 IS NULL
ORDER BY t.library_id;


Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,library_id,new_zip4,old_zip4
0,,,1328
1,,,2304
2,,,9527
3,,,1440
4,,,4070
...,...,...,...
1287,9171,,M
1288,9177,,M
1289,9192,,M
1290,9205,,M


In [14]:
%%bigquery
update books_raw.libraries r
  set r.zip4 = (select l.zip4 from books_ldg.libraries_032329 l where l.library_id = r.library_id),
  r.load_time = (select l.load_time from books_ldg.libraries_032329 l where l.library_id = r.library_id)
  where r.zip4 != (select zip4 from books_ldg.libraries_032329 l where l.library_id = r.library_id)
  and r.library_id = (select library_id from books_ldg.libraries_032329 l where l.library_id = r.library_id)

Query is running:   0%|          |

In [15]:
%%bigquery
select l.library_id, l.zip4 as new_zip4, r.zip4 as old_zip4
from books_ldg.libraries_032329 l join books_raw.libraries r on l.library_id = r.library_id
where l.zip4 != r.zip4
order by l.library_id

Query is running:   0%|          |

Downloading: |          |

Unnamed: 0,library_id,new_zip4,old_zip4


#### Process the deleted records (deletes)

In [16]:
%%bigquery
select r.library_id as old_library_id, r.cnty as old_cnty, l.library_id as new_library_id, l.cnty as new_cnty
from books_ldg.libraries_032329 l right join books_raw.libraries r on l.library_id = r.library_id
where l.library_id is null
order by r.library_id

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,old_library_id,old_cnty,new_library_id,new_cnty
0,177,DEKALB,,
1,178,DEKALB,,
2,179,DEKALB,,
3,180,DEKALB,,
4,263,DEKALB,,
5,269,DEKALB,,
6,296,DEKALB,,
7,1090,DEKALB,,
8,1912,DEKALB,,
9,1980,DEKALB,,


In [17]:
%%bigquery
delete from books_raw.libraries r
where r.library_id not in (select l.library_id from books_ldg.libraries_032329 l)

Query is running:   0%|          |

In [18]:
%%bigquery
select r.library_id as old_library_id, r.cnty as old_cnty, l.library_id as new_library_id, l.cnty as new_cnty
from books_ldg.libraries_032329 l right join books_raw.libraries r on l.library_id = r.library_id
where l.library_id is null
order by r.library_id

Query is running:   0%|          |

Downloading: |          |

Unnamed: 0,old_library_id,old_cnty,new_library_id,new_cnty


In [19]:
%%bigquery
select * from books_raw.libraries
order by load_time desc

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,library_id,stabr,libname,address,city,zip,zip4,cnty,full_address,load_time
0,44,AK,SITKA PUBLIC LIBRARY,320 HARBOR DRIVE,SITKA,99835,7553,SITKA,"320 HARBOR DRIVE SITKA, AK 99835-7553",2024-04-01 21:11:11.146744+00:00
1,2,AK,ANCHORAGE PUBLIC LIBRARY,3600 DENALI STREET,ANCHORAGE,99503,6055,ANCHORAGE,"3600 DENALI STREET ANCHORAGE, AK 99503-6055",2024-04-01 21:11:11.146744+00:00
2,29,AK,KODIAK PUBLIC LIBRARY,612 EGAN WAY,KODIAK,99615,6487,KODIAK ISLAND,"612 EGAN WAY KODIAK, AK 99615-6487",2024-04-01 21:11:11.146744+00:00
3,72,AK,LAKE MINCHUMINA COMMUNITY LIBRARY,123 AIRPORT WAY,LAKE MINCHUMINA,99757,9800,YUKON-KOYUKUK,"123 AIRPORT WAY LAKE MINCHUMINA, AK 99757-9800",2024-04-01 21:11:11.146744+00:00
4,46,AK,JOYCE K. CARVER MEMORIAL SOLDOTNA PUBLIC LIBRARY,235 NORTH BINKLEY ST.,SOLDOTNA,99669,7523,KENAI PENINSULA,"235 NORTH BINKLEY ST. SOLDOTNA, AK 99669-7523",2024-04-01 21:11:11.146744+00:00
...,...,...,...,...,...,...,...,...,...,...
9189,9127,WV,RUSSELL MEMORIAL PUBLIC LIBRARY,10038 SENECA TRAIL,MILL CREEK,26280,,RANDOLPH,"10038 SENECA TRAIL MILL CREEK, WV 26280-M",2024-04-01 21:04:16.825918+00:00
9190,9149,WV,SOUTH JEFFERSON PUBLIC LIBRARY,49 CHURCH STREET,SUMMIT POINT,25446,,JEFFERSON,"49 CHURCH STREET SUMMIT POINT, WV 25446-M",2024-04-01 21:04:16.825918+00:00
9191,9177,WV,RUPERT PUBLIC LIBRARY,124 GREENBRIER STREET,RUPERT,25984,,GREENBRIER,"124 GREENBRIER STREET RUPERT, WV 25984-M",2024-04-01 21:04:16.825918+00:00
9192,9205,WY,CROOK COUNTY PUBLIC LIBRARY SYSTEM,414 MAIN STREET,SUNDANCE,82729,,CROOK,"414 MAIN STREET SUNDANCE, WY 82729-M",2024-04-01 21:04:16.825918+00:00


## Create copy of staging table (step 11)

**Don't re-run this cell after mutating the staging table.**

In [20]:
%%bigquery
create or replace table books_stg.Libraries_copy as
  select * from books_stg.Libraries

Query is running:   0%|          |

## Re-create the staging table (step 12)

#### **Note: This logic is the same as before (and was copied from previous notebooks).**

---



In [22]:
%%bigquery
create or replace table books_stg.Libraries as
  select distinct * except(load_time), 'librarydb' as data_source, load_time
  from books_raw.libraries

Query is running:   0%|          |

In [23]:
%%bigquery
select (select count(*) from books_raw.libraries) as raw_count,
(select count(*) from books_stg.Libraries) as staging_count

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,raw_count,staging_count
0,9194,9194


In [24]:
%%bigquery
alter table books_stg.Libraries
  add primary key (library_id) not enforced;

Query is running:   0%|          |

In [25]:
%%bigquery
select library_id, count(*) duplicate_records
from books_stg.Libraries
group by library_id
having count(*) > 1
order by count(*) desc

Query is running:   0%|          |

Downloading: |          |

Unnamed: 0,library_id,duplicate_records


## Create copy of target table (step 13)

**Don't re-run this cell after mutating the target table.**

In [26]:
%%bigquery
create or replace table books_csp.Libraries_copy as
  select * from books_csp.Libraries

Query is running:   0%|          |

## Merge staging into target table (step 14)

In [27]:
%%bigquery
select count(*) as num_records from books_csp.Libraries

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,num_records
0,9215


In [28]:
%%bigquery
select distinct effective_time, discontinue_time, status_flag
from books_csp.Libraries

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,effective_time,discontinue_time,status_flag
0,2024-02-19 23:48:40.429751+00:00,NaT,True


#### Merge in the new records and the deleted records:

In [29]:
%%bigquery
merge books_csp.Libraries t
using books_stg.Libraries s
on t.library_id = s.library_id
-- handle deleted records
when not matched by source then
  update set discontinue_time = current_timestamp(), status_flag = false
-- handle new records
when not matched by target then
  insert (library_id, libname, stabr, address, city, zip, zip4, cnty, full_address, data_source, load_time, effective_time, status_flag)
    values (s.library_id, s.libname, s.stabr, s.address, s.city, s.zip, s.zip4, s.cnty, s.full_address, s.data_source, s.load_time, current_timestamp(), true)

Query is running:   0%|          |

In [30]:
%%bigquery
select distinct effective_time, discontinue_time, status_flag
from books_csp.Libraries

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,effective_time,discontinue_time,status_flag
0,2024-02-19 23:48:40.429751+00:00,NaT,True
1,2024-02-19 23:48:40.429751+00:00,2024-04-01 21:14:43.903017+00:00,False
2,2024-04-01 21:14:43.903017+00:00,NaT,True


#### Now handle the updated records:

In [32]:
%%bigquery
select s.*
from books_csp.Libraries t join books_stg.Libraries s
on t.library_id = s.library_id
where s.full_address != t.full_address OR s.zip4 != t.zip4 OR s.cnty != t.cnty;

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,library_id,stabr,libname,address,city,zip,zip4,cnty,full_address,data_source,load_time
0,36,AK,KEGOAYAH KOZGA LIBRARY,100 WEST 7TH AVENUE,NOME,99762,,NOME,"100 WEST 7TH AVENUE NOME, AK 99762-M",librarydb,2024-04-01 21:04:16.825918+00:00
1,67,AK,ANIAK PUBLIC LIBRARY,270 RIVERFRONT DRIVE,ANIAK,99557,,BETHEL,"270 RIVERFRONT DRIVE ANIAK, AK 99557-M",librarydb,2024-04-01 21:04:16.825918+00:00
2,4,AK,KUSKOKWIM CONSORTIUM LIBRARY,420 CHIEF EDDIE HOFFMAN HIGHWAY,BETHEL,99559,,BETHEL,"420 CHIEF EDDIE HOFFMAN HIGHWAY BETHEL, AK 995...",librarydb,2024-04-01 21:04:16.825918+00:00
3,6,AK,CANTWELL COMMUNITY LIBRARY,1 SCHOOL ROAD,CANTWELL,99729,,DENALI,"1 SCHOOL ROAD CANTWELL, AK 99729-M",librarydb,2024-04-01 21:04:16.825918+00:00
4,21,AK,TRI-VALLEY COMMUNITY LIBRARY,400 SUNTRANA STREET,HEALY,99743,,DENALI,"400 SUNTRANA STREET HEALY, AK 99743-M",librarydb,2024-04-01 21:04:16.825918+00:00
...,...,...,...,...,...,...,...,...,...,...,...
1263,9125,WV,HELVETIA PUBLIC LIBRARY,4901 PICKENS ROAD,HELVETIA,26224,,RANDOLPH,"4901 PICKENS ROAD HELVETIA, WV 26224-M",librarydb,2024-04-01 21:04:16.825918+00:00
1264,9149,WV,SOUTH JEFFERSON PUBLIC LIBRARY,49 CHURCH STREET,SUMMIT POINT,25446,,JEFFERSON,"49 CHURCH STREET SUMMIT POINT, WV 25446-M",librarydb,2024-04-01 21:04:16.825918+00:00
1265,9177,WV,RUPERT PUBLIC LIBRARY,124 GREENBRIER STREET,RUPERT,25984,,GREENBRIER,"124 GREENBRIER STREET RUPERT, WV 25984-M",librarydb,2024-04-01 21:04:16.825918+00:00
1266,9205,WY,CROOK COUNTY PUBLIC LIBRARY SYSTEM,414 MAIN STREET,SUNDANCE,82729,,CROOK,"414 MAIN STREET SUNDANCE, WY 82729-M",librarydb,2024-04-01 21:04:16.825918+00:00


In [34]:
%%bigquery
declare current_ts TIMESTAMP;
set current_ts = current_timestamp();

create temp table updates as
  select s.*
  from books_csp.Libraries t join books_stg.Libraries s
  on t.library_id = s.library_id
  where s.full_address != t.full_address OR s.zip4 != t.zip4 OR s.cnty != t.cnty;

update books_csp.Libraries
set discontinue_time = timestamp_sub(current_ts, interval 1 second), status_flag = false
where library_id in (select library_id from updates);

insert into books_csp.Libraries
  (library_id, libname, stabr, address, city, zip, zip4, cnty, full_address, data_source, load_time, effective_time, status_flag)
    (select library_id, libname, stabr, address, city, zip, zip4, cnty, full_address, data_source, load_time, current_ts, true
    from updates);

Query is running:   0%|          |

In [35]:
%%bigquery
select distinct effective_time, discontinue_time, status_flag
from books_csp.Libraries

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,effective_time,discontinue_time,status_flag
0,2024-02-19 23:48:40.429751+00:00,NaT,True
1,2024-02-19 23:48:40.429751+00:00,2024-04-01 21:16:22.143220+00:00,False
2,2024-02-19 23:48:40.429751+00:00,2024-04-01 21:14:43.903017+00:00,False
3,2024-04-01 21:14:43.903017+00:00,NaT,True
4,2024-04-01 21:16:23.143220+00:00,NaT,True


In [38]:
%%bigquery
select * from books_csp.Libraries
where library_id = 9171

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,library_id,stabr,libname,address,city,zip,zip4,cnty,full_address,data_source,load_time,effective_time,discontinue_time,status_flag
0,9171,WV,WYOMING COUNTY PUBLIC LIBRARY,155 PARK STREET,PINEVILLE,24874,M,WYOMING,"155 PARK STREET PINEVILLE, WV 24874-M",librarydb,2024-01-29 23:59:06.732594+00:00,2024-02-19 23:48:40.429751+00:00,2024-04-01 21:16:22.143220+00:00,False
1,9171,WV,WYOMING COUNTY PUBLIC LIBRARY,155 PARK STREET,PINEVILLE,24874,,WYOMING,"155 PARK STREET PINEVILLE, WV 24874-M",librarydb,2024-04-01 21:04:16.825918+00:00,2024-04-01 21:16:23.143220+00:00,NaT,True
