# Data Transfer

In [1]:
from google.cloud import bigquery_datatransfer
from datetime import datetime, timedelta

In [2]:
# The project where the query job runs is the same as the project
# containing the destination dataset.
env = "val"

project_id = f"lt-dia-analytics-{env}-cfs"
dataset_id = "ppl_analytics_output_cfs"

table_id = "loyalty_lcl"

# This service account will be used to execute the scheduled queries. Omit
# this request parameter to run the query as the user with the credentials
# associated with this client.
service_account_name = f"sa-lt-dia-{env}-cfs-lylty@lt-dia-analytics-{env}-cfs.iam.gserviceaccount.com"

In [3]:
# Query for PCO Reporting (optimized)
query_string = f"""
SELECT
  ROW_NUMBER() OVER() AS ROW_ID,
  down.emple_id AS Workday_ID,
  NULL AS Empty_Col_1,
  up.trans_date_time AS Trans_Date_Time,
  up.PLU_Number,
  up.PLU_Number AS One_Time_Payment_Plan,
  NULL AS Empty_Col_2,
  NULL AS Empty_Col_3,
  NULL AS Empty_Col_4,
  NULL AS Empty_Col_5,
  up.points_award AS Points_Award,
  up.cons_wallt_id AS Wallet_ID,
  up.hse_hld_wallt_id,
  up.identity_num AS Full_PCO_Num,

FROM (
  SELECT
    C.cons_wallt_id,
    C.hse_hld_wallt_id,
    C.identity_num,
    B.scan_cd AS PLU_Number,
    CONCAT(A.trans_dt," ",A.drvd_trans_tm) AS trans_date_time,
    SUM(A.bskt_earn_pnt_qty) AS points_award
  FROM
    `lt-dia-lake-{env}-consume.transactional_loyalty.loyalty_lcl_transaction_summary` AS A
  INNER JOIN
    `lt-dia-lake-{env}-consume.transactional_loyalty.loyalty_lcl_transaction_scancode` AS B
  ON
    A.pos_trans_id = B.pos_trans_id
    AND B.scan_cd IN ('19277', '8727')
  INNER JOIN
    `lt-dia-lake-{env}-consume.entity_b2c.loyalty_identity_curr` AS C
  ON
    A.identity_id = C.identity_id
  WHERE
    A.trans_dt BETWEEN DATE_ADD(CURRENT_DATE(), INTERVAL -7 DAY) AND CURRENT_DATE()
    AND B.trans_dt BETWEEN DATE_ADD(CURRENT_DATE(), INTERVAL -7 DAY) AND CURRENT_DATE()
  GROUP BY
    1,
    2,
    3,
    4,
    5) AS up

LEFT OUTER JOIN (
  SELECT
    cons_wallt_id,
    hse_hld_wallt_id,
    identity_num,
    emple_id,
    pco_card_num
  FROM (
    SELECT
      emple_id,
      pco_card_num
    FROM
      `lt-dia-analytics-{env}-cfs.pco_tax_compliance.human_capital_management_snpsht`
    GROUP BY
      1,
      2) AS A
  INNER JOIN (
    SELECT
      cons_wallt_id,
      hse_hld_wallt_id,
      identity_num,
      RIGHT(identity_num,13) AS pco_to_match
    FROM
      `lt-dia-lake-{env}-consume.entity_b2c.loyalty_identity_curr`
    GROUP BY
      1,
      2,
      3) AS B
  ON
    A.pco_card_num = B.pco_to_match
  GROUP BY
    1,
    2,
    3,
    4,
    5) AS down
ON
  up.cons_wallt_id = down.cons_wallt_id

GROUP BY
  up.cons_wallt_id,
  up.trans_date_time,
  up.PLU_Number,
  up.hse_hld_wallt_id,
  up.identity_num,
  down.emple_id,
  down.pco_card_num,
  up.points_award;
"""

In [4]:
transfer_client = bigquery_datatransfer.DataTransferServiceClient()
parent = transfer_client.common_project_path(project_id)

transfer_config = bigquery_datatransfer.TransferConfig(
    destination_dataset_id=dataset_id,
    display_name=f"Ingesting Loyalty LCL for PCO Reporting",
    data_source_id="scheduled_query",
    params={
        "query": query_string,
        "destination_table_name_template": f"{table_id}",
        "write_disposition": "WRITE_TRUNCATE",
        # "partitioning_field": "busn_eff_dtm",
    },
    schedule="every 24 hours",
    # schedule_options = bigquery_datatransfer.ScheduleOptions(end_time = datetime.today() + timedelta(days=1))
)

transfer_config = transfer_client.create_transfer_config(
    bigquery_datatransfer.CreateTransferConfigRequest(
        parent=parent,
        transfer_config=transfer_config,
        service_account_name=service_account_name,
    )
)

print(f"Created {table_id} scheduled query '{transfer_config.name}'")
    

Created loyalty_lcl scheduled query 'projects/756139447418/locations/northamerica-northeast1/transferConfigs/656606f1-0000-27ec-826f-089e0825bd38'
