# Change Data Capture



### Create backup of the raw table (step 7)

### **Don't re-run this cell after mutating the raw table!**

In [1]:
%%bigquery
create or replace table magazine_recipes_raw.faker_journalists_copy as
  select * from magazine_recipes_raw.faker_journalists

Query is running:   0%|          |

### Create the loading dataset (step 8)

In [2]:
%%bigquery
create schema if not exists magazine_recipes_ldg
  options(location="us")

Query is running:   0%|          |

### Create and populate the loading table (step 9)

In [9]:
def create_load_table(file_name, table_name, schema, delimiter=","):

  uri = "gs://{}/{}/{}".format(bucket_name, folder_name, file_name)
  table_id = "{}.{}.{}".format(project_id, dataset_name, table_name)

  table = bigquery.Table(table_id, schema=schema)
  table = bq_client.create_table(table, exists_ok=True)
  print("Created table {}".format(table.table_id))

  # remove the load_time field from the schema before loading the data,
  # the load_time value will be auto-generated
  del schema[-1]

  job_config = bigquery.LoadJobConfig(
        schema=schema,
        skip_leading_rows=1,
        source_format=bigquery.SourceFormat.CSV,
        write_disposition="WRITE_TRUNCATE",
        field_delimiter=delimiter,
        allow_jagged_rows = True,
        allow_quoted_newlines = True,
        ignore_unknown_values = True,
        quote_character='"'
      )

  load_job = bq_client.load_table_from_uri(uri, table_id, job_config=job_config)
  load_job.result()

  destination_table = bq_client.get_table(table_id)
  print("Loaded {} rows.".format(destination_table.num_rows))

In [13]:
from google.cloud import storage
from google.cloud import bigquery

project_id = "shidcs329e"
bucket_name = "cookbook_data113"
folder_name = "incrementals"
dataset_name = "magazine_recipes_ldg"
region = "us"
file_name = 'raw_faker_recipe_journalists_03292024.csv'
table_name = 'faker_journalists_032924'


schema = [
  bigquery.SchemaField("author_id", "INTEGER", mode="REQUIRED"),
  bigquery.SchemaField("name", "STRING", mode="NULLABLE"),
  bigquery.SchemaField("age", "INTEGER", mode="NULLABLE"),
  bigquery.SchemaField("phone_number", "STRING", mode="NULLABLE"),
  bigquery.SchemaField("state", "STRING", mode="NULLABLE"),
  bigquery.SchemaField("load_time", "TIMESTAMP", mode="REQUIRED", default_value_expression="CURRENT_TIMESTAMP"),
]
create_load_table(file_name, table_name, schema)

Created table faker_journalists_032924
Loaded 91 rows.


In [14]:
%%bigquery
select * from magazine_recipes_ldg.faker_journalists_032924

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,author_id,name,age,phone_number,state,load_time
0,92,Jimmy Falon,22,214-417-1738,HI,2024-03-29 20:34:11.191045+00:00
1,13,Heather Roberts,25,(499)524-6610x935,IN,2024-03-29 20:34:11.191045+00:00
2,22,Christina Walker,25,(701)568-8477x9361,KS,2024-03-29 20:34:11.191045+00:00
3,40,David Chen,25,+1-380-466-0657x3547,WY,2024-03-29 20:34:11.191045+00:00
4,15,Joseph Freeman,26,1-890-507-5470,OH,2024-03-29 20:34:11.191045+00:00
...,...,...,...,...,...,...
86,53,Joshua Barry,56,608-603-5138x533,ME,2024-03-29 20:34:11.191045+00:00
87,7,Samantha Barnes,58,577-251-5917x953,VA,2024-03-29 20:34:11.191045+00:00
88,21,Robin Martinez,59,876-304-5229,CA,2024-03-29 20:34:11.191045+00:00
89,67,Chris Young,60,(926)254-7604x848,WI,2024-03-29 20:34:11.191045+00:00


### Detect the deltas and refresh the raw table (step 10)

#### Identify the deltas

In [23]:
%%bigquery
select t.author_id as new_author_id, t.name as new_name, t.age as new_age, t.phone_number as new_phone_number, t.state as new_state,
r.author_id as old_author_id, r.name as old_name, r.age as old_age, r.phone_number as old_phone_number, r.state as old_state
from magazine_recipes_ldg.faker_journalists_032924 t full join magazine_recipes_raw.faker_journalists r on t.author_id = r.author_id
where t.name != r.name
or t.age != r.age
or t.phone_number != r.phone_number
or t.state != r.state
or t.name is null or r.name is null
or t.age is null or r.age is null
or t.phone_number is null or r.phone_number is null
or t.state is null or r.state is null
or t.author_id is null or r.author_id is null

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,new_author_id,new_name,new_age,new_phone_number,new_state,old_author_id,old_name,old_age,old_phone_number,old_state
0,91.0,Taylor Swift,64.0,682-270-4005,NY,,,,,
1,92.0,Jimmy Falon,22.0,214-417-1738,HI,,,,,
2,,,,,,90.0,Matthew Church,38.0,(763)276-3637x69144,PA
3,,,,,,88.0,Jasmine Townsend,39.0,(290)262-2005,AR
4,3.0,Beyonce Carter,41.0,1-701-360-9753,TX,3.0,Sean Green,41.0,+1-701-360-9753,ID
5,15.0,Joseph Freeman,26.0,1-890-507-5470,OH,15.0,Joseph Freeman,26.0,+1-890-507-5470,OH
6,93.0,Matthew McConaughey,33.0,972-732-6574,WI,,,,,
7,,,,,,89.0,Ashley Carr,33.0,001-826-306-6953x25838,DC
8,73.0,Michelle Shah DDS,55.0,1-855-736-2082,NH,73.0,Michelle Shah DDS,55.0,+1-855-736-2082,NH
9,94.0,Trisha Paytas,40.0,817-370-7063,CA,,,,,


#### Process new records (inserts)

In [24]:
%%bigquery
select (select count(*) from magazine_recipes_ldg.faker_journalists_032924) as ldg_count,
(select count(*) from magazine_recipes_raw.faker_journalists) as raw_count

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,ldg_count,raw_count
0,91,90


In [25]:
%%bigquery
insert into magazine_recipes_raw.faker_journalists(author_id, name, age, phone_number, state, load_time)
  (select *
  from magazine_recipes_ldg.faker_journalists_032924
  where author_id not in (select author_id from magazine_recipes_raw.faker_journalists))

Query is running:   0%|          |

In [26]:
%%bigquery
select (select count(*) from magazine_recipes_ldg.faker_journalists_032924) as ldg_count,
(select count(*) from magazine_recipes_raw.faker_journalists) as raw_count

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,ldg_count,raw_count
0,91,94


#### Process the changed records (updates)

In [27]:
%%bigquery
select t.author_id as new_author_id, t.name as new_name, t.age as new_age, t.phone_number as new_phone_number, t.state as new_state,
r.author_id as old_author_id, r.name as old_name, r.age as old_age, r.phone_number as old_phone_number, r.state as old_state
from magazine_recipes_ldg.faker_journalists_032924 t full join magazine_recipes_raw.faker_journalists r on t.author_id = r.author_id
where t.name != r.name
or t.age != r.age
or t.phone_number != r.phone_number
or t.state != r.state

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,new_author_id,new_name,new_age,new_phone_number,new_state,old_author_id,old_name,old_age,old_phone_number,old_state
0,3,Beyonce Carter,41,1-701-360-9753,TX,3,Sean Green,41,+1-701-360-9753,ID
1,73,Michelle Shah DDS,55,1-855-736-2082,NH,73,Michelle Shah DDS,55,+1-855-736-2082,NH
2,15,Joseph Freeman,26,1-890-507-5470,OH,15,Joseph Freeman,26,+1-890-507-5470,OH


In [32]:
%%bigquery
update magazine_recipes_raw.faker_journalists r
  set r.name = (select name from magazine_recipes_ldg.faker_journalists_032924 t where t.author_id = r.author_id),
  r.load_time = (select load_time from magazine_recipes_ldg.faker_journalists_032924 t where t.author_id = r.author_id)
  where r.name != (select name from magazine_recipes_ldg.faker_journalists_032924 l where l.author_id = r.author_id)
  and r.author_id = (select author_id from magazine_recipes_ldg.faker_journalists_032924 l where l.author_id = r.author_id);

update magazine_recipes_raw.faker_journalists r
  set r.age = (select age from magazine_recipes_ldg.faker_journalists_032924 t where t.author_id = r.author_id),
  r.load_time = (select load_time from magazine_recipes_ldg.faker_journalists_032924 t where t.author_id = r.author_id)
  where r.age != (select age from magazine_recipes_ldg.faker_journalists_032924 l where l.author_id = r.author_id)
  and r.author_id = (select author_id from magazine_recipes_ldg.faker_journalists_032924 l where l.author_id = r.author_id);

update magazine_recipes_raw.faker_journalists r
  set r.phone_number = (select phone_number from magazine_recipes_ldg.faker_journalists_032924 t where t.author_id = r.author_id),
  r.load_time = (select load_time from magazine_recipes_ldg.faker_journalists_032924 t where t.author_id = r.author_id)
  where r.phone_number != (select phone_number from magazine_recipes_ldg.faker_journalists_032924 l where l.author_id = r.author_id)
  and r.author_id = (select author_id from magazine_recipes_ldg.faker_journalists_032924 l where l.author_id = r.author_id);

update magazine_recipes_raw.faker_journalists r
  set r.state = (select state from magazine_recipes_ldg.faker_journalists_032924 t where t.author_id = r.author_id),
  r.load_time = (select load_time from magazine_recipes_ldg.faker_journalists_032924 t where t.author_id = r.author_id)
  where r.state != (select state from magazine_recipes_ldg.faker_journalists_032924 l where l.author_id = r.author_id)
  and r.author_id = (select author_id from magazine_recipes_ldg.faker_journalists_032924 l where l.author_id = r.author_id);



Query is running:   0%|          |

In [33]:
%%bigquery
select t.author_id as new_author_id, t.name as new_name, t.age as new_age, t.phone_number as new_phone_number, t.state as new_state,
r.author_id as old_author_id, r.name as old_name, r.age as old_age, r.phone_number as old_phone_number, r.state as old_state
from magazine_recipes_ldg.faker_journalists_032924 t full join magazine_recipes_raw.faker_journalists r on t.author_id = r.author_id
where t.name != r.name
or t.age != r.age
or t.phone_number != r.phone_number
or t.state != r.state

Query is running:   0%|          |

Downloading: |          |

Unnamed: 0,new_author_id,new_name,new_age,new_phone_number,new_state,old_author_id,old_name,old_age,old_phone_number,old_state


#### Process the deleted records (deletes)

In [34]:
%%bigquery
select t.author_id as new_author_id, t.name as new_name, t.age as new_age, t.phone_number as new_phone_number, t.state as new_state,
r.author_id as old_author_id, r.name as old_name, r.age as old_age, r.phone_number as old_phone_number, r.state as old_state
from magazine_recipes_ldg.faker_journalists_032924 t full join magazine_recipes_raw.faker_journalists r on t.author_id = r.author_id
where t.author_id is null

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,new_author_id,new_name,new_age,new_phone_number,new_state,old_author_id,old_name,old_age,old_phone_number,old_state
0,,,,,,88,Jasmine Townsend,39,(290)262-2005,AR
1,,,,,,89,Ashley Carr,33,001-826-306-6953x25838,DC
2,,,,,,90,Matthew Church,38,(763)276-3637x69144,PA


In [35]:
%%bigquery
delete from magazine_recipes_raw.faker_journalists r
where r.author_id not in (select l.author_id from magazine_recipes_ldg.faker_journalists_032924 l)

Query is running:   0%|          |

In [36]:
%%bigquery
select t.author_id as new_author_id, t.name as new_name, t.age as new_age, t.phone_number as new_phone_number, t.state as new_state,
r.author_id as old_author_id, r.name as old_name, r.age as old_age, r.phone_number as old_phone_number, r.state as old_state
from magazine_recipes_ldg.faker_journalists_032924 t full join magazine_recipes_raw.faker_journalists r on t.author_id = r.author_id
where t.author_id is null

Query is running:   0%|          |

Downloading: |          |

Unnamed: 0,new_author_id,new_name,new_age,new_phone_number,new_state,old_author_id,old_name,old_age,old_phone_number,old_state


In [37]:
%%bigquery
select * from magazine_recipes_raw.faker_journalists
order by load_time desc

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,author_id,name,age,phone_number,state,load_time
0,92,Jimmy Falon,22,214-417-1738,HI,2024-03-29 20:34:11.191045+00:00
1,91,Taylor Swift,64,682-270-4005,NY,2024-03-29 20:34:11.191045+00:00
2,93,Matthew McConaughey,33,972-732-6574,WI,2024-03-29 20:34:11.191045+00:00
3,94,Trisha Paytas,40,817-370-7063,CA,2024-03-29 20:34:11.191045+00:00
4,15,Joseph Freeman,26,1-890-507-5470,OH,2024-03-29 20:34:11.191045+00:00
...,...,...,...,...,...,...
86,81,Joyce Allen,55,282.352.7808x185,CT,2024-01-27 00:25:41.566545+00:00
87,53,Joshua Barry,56,608-603-5138x533,ME,2024-01-27 00:25:41.566545+00:00
88,7,Samantha Barnes,58,577-251-5917x953,VA,2024-01-27 00:25:41.566545+00:00
89,21,Robin Martinez,59,876-304-5229,CA,2024-01-27 00:25:41.566545+00:00


## Create copy of staging table (step 11)

**Don't re-run this cell after mutating the staging table.**

In [38]:
%%bigquery
create or replace table magazine_recipes_stg.Journalists_copy as
  select * from magazine_recipes_stg.Journalists

Query is running:   0%|          |

## Re-create the staging table (step 12)

#### **Note: This logic is the same as before (and was copied from previous notebooks).**

---



In [39]:
%%bigquery
create or replace table magazine_recipes_stg.Journalists as
  select journalist_id, name_array[0] as f_name, name_array[1] as l_name, age, phone, state, 'faker' as data_source, load_time
  from
  (select author_id as journalist_id, age, phone_number as phone, state, split(name, ' ') as name_array, load_time
  from magazine_recipes_raw.faker_journalists)

Query is running:   0%|          |

In [40]:
%%bigquery
select (select count(*) from magazine_recipes_raw.faker_journalists) as raw_count,
(select count(*) from magazine_recipes_stg.Journalists) as staging_count

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,raw_count,staging_count
0,91,91


In [41]:
%%bigquery
alter table  magazine_recipes_stg.Journalists
  add primary key (journalist_id) not enforced;

Query is running:   0%|          |

In [42]:
%%bigquery
select journalist_id, count(*) duplicate_records
from  magazine_recipes_stg.Journalists
group by journalist_id
having count(*) > 1
order by count(*) desc

Query is running:   0%|          |

Downloading: |          |

Unnamed: 0,journalist_id,duplicate_records


## Create copy of target table (step 13)

**Don't re-run this cell after mutating the target table.**

In [43]:
%%bigquery
create or replace table magazine_recipes_csp.Journalists_copy as
  select * from magazine_recipes_csp.Journalists

Query is running:   0%|          |

## Merge staging into target table (step 14)

In [44]:
%%bigquery
select count(*) as num_records from magazine_recipes_csp.Journalists

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,num_records
0,90


In [45]:
%%bigquery
select distinct effective_time, discontinue_time, status_flag
from magazine_recipes_csp.Journalists

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,effective_time,discontinue_time,status_flag
0,2024-02-18 22:30:52.049073+00:00,NaT,True


#### Merge in the new records and the deleted records:

In [50]:
%%bigquery
merge magazine_recipes_csp.Journalists t
using magazine_recipes_stg.Journalists s
on t.journalist_id = s.journalist_id
-- handle deleted records
when not matched by source then
  update set discontinue_time = current_timestamp(), status_flag = false
-- handle new records
when not matched by target then
  insert (journalist_id, f_name, l_name, age, phone, state, data_source, load_time, effective_time, status_flag)
  values (s.journalist_id, s.f_name, s.l_name, s.age, s.phone, s.state, s.data_source, s.load_time, current_timestamp(), true)

Query is running:   0%|          |

In [51]:
%%bigquery
select distinct effective_time, discontinue_time, status_flag
from magazine_recipes_csp.Journalists

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,effective_time,discontinue_time,status_flag
0,2024-03-29 21:03:37.083133+00:00,NaT,True
1,2024-02-18 22:30:52.049073+00:00,2024-03-29 21:03:37.083133+00:00,False
2,2024-02-18 22:30:52.049073+00:00,NaT,True


#### Now handle the updated records:

In [52]:
%%bigquery
select s.*
from magazine_recipes_csp.Journalists t join magazine_recipes_stg.Journalists s
on t.journalist_id = s.journalist_id
where t.f_name != s.f_name
or t.l_name != s.l_name
or t.age != s.age
or t.phone != s.phone
or t.state != s.state

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,journalist_id,f_name,l_name,age,phone,state,data_source,load_time
0,15,Joseph,Freeman,26,1-890-507-5470,OH,faker,2024-03-29 20:34:11.191045+00:00
1,3,Beyonce,Carter,41,1-701-360-9753,TX,faker,2024-03-29 20:34:11.191045+00:00
2,73,Michelle,Shah,55,1-855-736-2082,NH,faker,2024-03-29 20:34:11.191045+00:00


In [53]:
%%bigquery
declare current_ts TIMESTAMP;
set current_ts = current_timestamp();

create temp table updates as
  select s.*
  from magazine_recipes_csp.Journalists t join magazine_recipes_stg.Journalists s
  on t.journalist_id = s.journalist_id
  where t.f_name != s.f_name
  or t.l_name != s.l_name
  or t.age != s.age
  or t.phone != s.phone
  or t.state != s.state;

update magazine_recipes_csp.Journalists
set discontinue_time = timestamp_sub(current_ts, interval 1 second), status_flag = false
where journalist_id in (select journalist_id from updates);

insert into magazine_recipes_csp.Journalists
  (journalist_id, f_name, l_name, age, phone, state, data_source, load_time, effective_time, status_flag)
    (select journalist_id, f_name, l_name, age, phone, state, data_source, load_time, current_ts, true
    from updates);


Query is running:   0%|          |

In [54]:
%%bigquery
select distinct effective_time, discontinue_time, status_flag
from magazine_recipes_csp.Journalists

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,effective_time,discontinue_time,status_flag
0,2024-03-29 21:03:37.083133+00:00,NaT,True
1,2024-02-18 22:30:52.049073+00:00,NaT,True
2,2024-02-18 22:30:52.049073+00:00,2024-03-29 21:06:17.425539+00:00,False
3,2024-02-18 22:30:52.049073+00:00,2024-03-29 21:03:37.083133+00:00,False
4,2024-03-29 21:06:18.425539+00:00,NaT,True


In [55]:
%%bigquery
select * from magazine_recipes_csp.Journalists
where journalist_id = 93

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,journalist_id,f_name,l_name,age,phone,state,data_source,load_time,effective_time,discontinue_time,status_flag
0,93,Matthew,McConaughey,33,972-732-6574,WI,faker,2024-03-29 20:34:11.191045+00:00,2024-03-29 21:03:37.083133+00:00,NaT,True


In [None]:
%%bigquery
drop table magazine_recipes_ldg.faker_journalists_032924;

Query is running:   0%|          |