# insert_dws_campaign_cost_prd_d_inc

- Name: insert_dws_sale_customer_prd_d_inc
- Description: insert data to dws_sale_customer_prd_d_inc. Complete sales table - for further understanding of product_id, customer_id, stores.
- Data modeling reference: https://docs.google.com/spreadsheets/d/1FlbiZEBue2SCAUo1WOkdtX5WbEDyDrZ9B2uiZUSiRio/edit?gid=596791655#gid=596791655
- Target Table: dws_sale_customer_prd_d_inc
- Source Table: dwd_sale_d_inc, dim_product_full, dim_store_full, dim_city_full, dim_customer_full
- Created by: alvinxyzhang
- Created Date: 2025-04-29
- Version: v1.0

In [None]:
from google.cloud import bigquery
import datetime
import pandas as pd

## Scheduled Task & Partition Table

In [None]:
# Construct a BigQuery client object.
client = bigquery.Client()

table_schema = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("product_id", "INTEGER"),
        bigquery.SchemaField("purchase_type", "STRING"),
        bigquery.SchemaField("product_name", "STRING"),
        bigquery.SchemaField("category", "STRING"),
        bigquery.SchemaField("brand", "STRING"),
        bigquery.SchemaField("order_id", "INTEGER"),
        bigquery.SchemaField("number_of_items", "INTEGER"),
        bigquery.SchemaField("retail_price", "FLOAT"),
        bigquery.SchemaField("retail_cost", "FLOAT"),
        bigquery.SchemaField("total_price", "FLOAT"),
        bigquery.SchemaField("total_cost", "FLOAT"),
        bigquery.SchemaField("customer_id", "INTEGER"),
        bigquery.SchemaField("home_address", "STRING"),
        bigquery.SchemaField("longitude", "FLOAT"),
        bigquery.SchemaField("latitude", "FLOAT"),
        bigquery.SchemaField("email", "STRING"),
        bigquery.SchemaField("age", "INTEGER"),
        bigquery.SchemaField("gender", "STRING"),
        bigquery.SchemaField("account_created_date", "DATE"),
        bigquery.SchemaField("is_vip_membership", "INTEGER"),
        bigquery.SchemaField("family_size", "INTEGER"),
        bigquery.SchemaField("occupation", "STRING"),
        bigquery.SchemaField("annual_salary_estimate", "INTEGER"),
        bigquery.SchemaField("ad_group", "INTEGER"),
        bigquery.SchemaField("store_id", "INTEGER"),
        bigquery.SchemaField("store_name", "STRING"),
        bigquery.SchemaField("address", "STRING"),
        bigquery.SchemaField("city", "STRING"),
        bigquery.SchemaField("province", "STRING"),
        bigquery.SchemaField("lat", "FLOAT"),
        bigquery.SchemaField("long", "FLOAT"),
        bigquery.SchemaField("city_population", "INTEGER"),
        bigquery.SchemaField("city_lat", "FLOAT"),
        bigquery.SchemaField("city_long", "FLOAT")
    ]

insert_query = f"""
    SELECT `date`
        , sale_tbl.product_id
        , purchase_type
        , product_name
        , category
        , brand
        , order_id
        , SUM(number_of_items) number_of_items
        , AVG(retail_price) retail_price
        , AVG(cost) retail_cost
        , SUM(number_of_items * retail_price) total_price
        , SUM(number_of_items * cost) total_cost
        , sale_tbl.customer_id
        , home_address
        , customer_tbl.longitude AS customer_long
        , customer_tbl.latitude AS customer_lat
        , email
        , age
        , gender
        , account_created_date
        , is_vip_membership
        , family_size
        , occupation
        , annual_salary_estimate
        , ad_group
        , sale_tbl.store_id
        , store_name
        , store_tbl.address AS store_address
        , store_tbl.city
        , province
        , store_tbl.lat AS store_lat
        , store_tbl.long AS store_long
        , store_tbl.population AS city_population
        , city_tbl.lat AS city_lat
        , city_tbl.long AS city_long
    FROM
    (
      SELECT sale_date AS date
          , product_id
          , customer_id
          , store_id
          , order_id
          , purchase_type
          , SUM(number_of_items) number_of_items
      FROM `positive-karma-457703-i3.retail_dashboard.dwd_sale_d_inc`
      WHERE sale_date = @date_
      GROUP BY sale_date
          , product_id
          , customer_id
          , store_id
          , order_id
          , purchase_type
    ) AS sale_tbl
    LEFT JOIN `positive-karma-457703-i3.retail_dashboard.dim_product_full` AS prd_tbl
      ON sale_tbl.product_id = prd_tbl.product_id
    LEFT JOIN `positive-karma-457703-i3.retail_dashboard.dim_customer_full` AS customer_tbl
      ON sale_tbl.customer_id = customer_tbl.customer_id
    LEFT JOIN `positive-karma-457703-i3.retail_dashboard.dim_store_full` AS store_tbl
      ON sale_tbl.store_id = store_tbl.store_id
    LEFT JOIN `positive-karma-457703-i3.retail_dashboard.dim_city_full` AS city_tbl
      ON store_tbl.city = city_tbl.city
    GROUP BY `date`
          , sale_tbl.product_id
          , purchase_type
          , product_name
          , category
          , brand
          , order_id
          , sale_tbl.customer_id
          , home_address
          , customer_tbl.longitude
          , customer_tbl.latitude
          , email
          , age
          , gender
          , account_created_date
          , is_vip_membership
          , family_size
          , occupation
          , annual_salary_estimate
          , ad_group
          , sale_tbl.store_id
          , store_name
          , store_tbl.address
          , store_tbl.city
          , province
          , store_tbl.lat
          , store_tbl.long
          , store_tbl.population
          , city_tbl.lat
          , city_tbl.long
"""

# Step 1: Check if table exists
def check_table(table_id_, full_table_id_, schema_, partition_column_):
  try:
      table = client.get_table(full_table_id_)
      print(f"    Table {table_id_} already exists.")
  except:
      print(f"    Creating partitioned table {table_id_}...")

      table = bigquery.Table(full_table_id_, schema=schema_)
      table.time_partitioning = bigquery.TimePartitioning(
          type_=bigquery.TimePartitioningType.DAY,
          field=partition_col
      )

      table = client.create_table(table)
      print(f"Created partitioned table {full_table_id_}")

# Step 2: Insert Query
def insert_table(insert_query_, full_table_id_partition_, query_parameters_):
  job_config = bigquery.QueryJobConfig(
      destination = full_table_id_partition_,
      write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
      use_legacy_sql=False,
      query_parameters=query_parameters_
  )
  query_job_ = client.query(insert_query_, job_config=job_config)  # Make an API request.

  print('   Result:', query_job_.result())

  print('   Errors: ', query_job_.errors)

  return(query_job_)

# Step 3: Check Insert Rows
def check_insert_rows(partition_col_, full_table_id_, job_date_):
  query = f"""
      SELECT {partition_col_}, count(1) row_cnt
      FROM {full_table_id_}
      WHERE {partition_col_} = @date_
      GROUP BY {partition_col_}
  """
  job_config = bigquery.QueryJobConfig(
      query_parameters=[
          bigquery.ScalarQueryParameter("date_", "DATE", job_date_)
      ]
  )
  query_job = client.query_and_wait(query, job_config=job_config)

  row_cnt_df = query_job.to_dataframe()
  return(row_cnt_df)

# Main excution
dataset_id = 'retail_dashboard'
target_table = 'dws_sale_customer_prd_d_inc'
partition_col = 'date'
job_date = datetime.date(2024, 1, 1)

# Full table IDs
partition = job_date.strftime("%Y%m%d")
full_table_id = f"{client.project}.{dataset_id}.{target_table}"
full_table_id_partition = f"{client.project}.{dataset_id}.{target_table}${partition}"
query_parameters = [
        bigquery.ScalarQueryParameter("date_", "DATE", job_date)
    ]

print('- Execution: ', job_date, ' Partition: ', partition)
check_table(table_id_=target_table, full_table_id_=full_table_id, schema_ = table_schema, partition_column_ = partition_col)
query_job = insert_table(insert_query_ = insert_query, full_table_id_partition_ = full_table_id_partition, query_parameters_ = query_parameters)
print('- Finish writing to Partition ', partition)
row_cnt_df = check_insert_rows(partition_col_ = partition_col, full_table_id_ = full_table_id, job_date_ = job_date)
print('- Check: insert', str(row_cnt_df['row_cnt'].values[0]), 'rows.')

- Execution:  2024-01-01  Partition:  20240101
    Table dws_sale_customer_prd_d_inc already exists.
   Result: <google.cloud.bigquery.table.RowIterator object at 0x7f8588b1e7a0>
   Errors:  None
- Finish writing to Partition  20240101
- Check: insert 3171 rows.


## Iterate over Date


In [7]:
start_date = datetime.date(2024, 1, 1)
end_date = datetime.date(2025, 4, 25)
dataset_id = 'retail_dashboard'
target_table = 'dws_sale_customer_prd_d_inc'
partition_col = 'date'
full_table_id = f"{client.project}.{dataset_id}.{target_table}"

total_row_cnt_df = pd.DataFrame()

# iterate over range of dates
while (start_date <= end_date):
  job_date = start_date

  # Full table IDs
  partition = job_date.strftime("%Y%m%d")
  full_table_id_partition = f"{client.project}.{dataset_id}.{target_table}${partition}"
  query_parameters = [
        bigquery.ScalarQueryParameter("date_", "DATE", job_date)
    ]

  print('- Execution: ', job_date, ' Partition: ', partition)
  check_table(table_id_=target_table, full_table_id_=full_table_id, schema_ = table_schema, partition_column_ = partition_col)
  query_job = insert_table(insert_query_ = insert_query, full_table_id_partition_ = full_table_id_partition, query_parameters_ = query_parameters)
  print('- Finish writing to Partition ', partition)
  row_cnt_df = check_insert_rows(partition_col_ = partition_col, full_table_id_ = full_table_id, job_date_ = job_date)
  if row_cnt_df.shape[0] == 0:
    print('- Check: insert 0 rows.')
  else:
    print('- Check: insert', str(row_cnt_df['row_cnt'].values[0]), 'rows.')

  total_row_cnt_df = pd.concat([total_row_cnt_df, row_cnt_df], axis=0, join='outer', ignore_index=True)

  start_date += datetime.timedelta(days=1)


- Execution:  2024-01-01  Partition:  20240101
    Table dws_sale_customer_prd_d_inc already exists.
   Result: <google.cloud.bigquery.table.RowIterator object at 0x7f8588908040>
   Errors:  None
- Finish writing to Partition  20240101
- Check: insert 3171 rows.
- Execution:  2024-01-02  Partition:  20240102
    Table dws_sale_customer_prd_d_inc already exists.
   Result: <google.cloud.bigquery.table.RowIterator object at 0x7f8588909d50>
   Errors:  None
- Finish writing to Partition  20240102
- Check: insert 3035 rows.
- Execution:  2024-01-03  Partition:  20240103
    Table dws_sale_customer_prd_d_inc already exists.
   Result: <google.cloud.bigquery.table.RowIterator object at 0x7f858a1583a0>
   Errors:  None
- Finish writing to Partition  20240103
- Check: insert 1817 rows.
- Execution:  2024-01-04  Partition:  20240104
    Table dws_sale_customer_prd_d_inc already exists.
   Result: <google.cloud.bigquery.table.RowIterator object at 0x7f8588909cf0>
   Errors:  None
- Finish writin

### Insert Summary

In [8]:
total_row_cnt_df

Unnamed: 0,date,row_cnt
0,2024-01-01,3171
1,2024-01-02,3035
2,2024-01-03,1817
3,2024-01-04,3636
4,2024-01-05,3811
...,...,...
476,2025-04-21,3405
477,2025-04-22,2932
478,2025-04-23,4449
479,2025-04-24,4412
