In [4]:
import sys
import pandas as pd
import json
import os
import logging
import numpy as np
import datetime

############### Initialize BigQuery Client ###############

from google.cloud import bigquery, bigquery_storage
from google.oauth2 import service_account


WD = "/var/lib/mesos/slaves/1773e61b-66cc-4c76-9f4e-6a16e4569303-S6739/frameworks/201205082337-0000000003-0000/executors/thermos-ads-prediction-devel-zhejianp-spark-notebook-0-28b245fb-90b5-4c0c-a70a-445d37a1cd2f/runs/0754985d-601f-4748-a7de-aaf52c448cb5/sandbox/workspace/"
# Path to your service account JSON key file
key_path = os.path.join(WD, "twttr/twttr-rev-core-data-prod-d3dac275bdaf.txt")

# Load credentials
credentials = service_account.Credentials.from_service_account_file(
    key_path,
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

# Initialize BigQuery client with the service account
client = bigquery.Client(
    credentials=credentials,
    project=credentials.project_id,  # Or specify your project ID: "your-project-id"
    location="US"  # Match the dataset's location (e.g., US, EU)
)

bqstorage_client = bigquery_storage.BigQueryReadClient(credentials=credentials)
########################################################



In [35]:
start_date = datetime.date(2025, 9, 18)
end_date = datetime.date(2025, 9, 18)

date_list = [
    (start_date + datetime.timedelta(days=i)).strftime("%Y-%m-%d")
    for i in range((end_date - start_date).days + 1)
]



In [36]:
from string import Template

query_template = Template("""
DECLARE utc_date DATE DEFAULT '${date}';
CREATE OR REPLACE TABLE `twttr-rev-core-data-prod.dpa.dpa_attributed_conversion_30d_${date_nodash}`
AS
WITH ad_impression AS 
( SELECT * FROM ( SELECT impressionData.impressionId AS impressionId, clientInfo.userId64 AS userId64, clientInfo.deviceId AS deviceId, IF(impressionData.dpaData IS NOT NULL, 1, 0) AS isDpa, impressionData.promotedTweetId, impressionData.requestId, impressionData.advertiserId, impressionData.accountId, impressionData.campaignId, impressionData.lineItemId, impressionData.mediaCreativeDetails.mediaId, impressionData.mediaCreativeDetails.creativeId64, impressionData.promotedAppId, impressionData.dpaData.productIds, impressionData.dpaData.productSetId, impressionData.dpaData.productKeys, impressionCallbackEpochTimeMilliSec, ROW_NUMBER() OVER ( PARTITION BY impressionData.impressionId ORDER BY impressionCallbackEpochTimeMilliSec DESC ) AS rn FROM `twttr-rev-core-data-prod.logs.ads_impression_callback_events` WHERE DATE(_PARTITIONTIME) BETWEEN DATE_SUB(utc_date , INTERVAL 30 DAY) AND utc_date AND impressionData.impressionId IS NOT NULL ) WHERE rn = 1 ), 
ad_conversion AS ( SELECT impressionId, ANY_VALUE(octOrMact) AS octOrMact, ARRAY_AGG(DISTINCT conversionType IGNORE NULLS) AS uniqueConversionTypes, ANY_VALUE(advertiserAccountId) AS advertiserAccountId, MAX(conversion_time_ms) AS conversion_time_ms, ANY_VALUE(datehour) as datehour FROM ( SELECT COALESCE( engagementCounterKey.dimensions.octDimensions.impressionId, engagementCounterKey.dimensions.mactDimensions.impressionId ) AS impressionId, IF( engagementCounterKey.dimensions.octDimensions.impressionId IS NOT NULL, 'octDimensions', 'mactDimensions' ) AS octOrMact, conversionId.conversionType AS conversionType, COALESCE( processedConversionRecord.mactProcessedConversionRecord.appConversionEvent.conversion_time_ms, lifeTimeValueParameters.attributedInstallEvent.conversion_time_ms ) AS conversion_time_ms, conversionId.advertiserId AS advertiserId, COALESCE( processedConversionRecord.octProcessedConversionRecord.anonymousConversionRecord.advertiserAccountId, processedConversionRecord.mactProcessedConversionRecord.advertiserAccountId ) AS advertiserAccountId, datehour FROM `twttr-bq-ads-attribution-prod.rdu_dataset.attributed_conversion_record` WHERE interactionPrivacyStatus.appTrackingTransparencyStatus = 1 AND conversionId.conversionType IN (2, 3, 4, 6, 7, 12, 13, 32) AND DATE(datehour)= DATE_SUB(utc_date , INTERVAL 1 DAY)) GROUP BY impressionId ), 
dpa_click_data AS ( SELECT engagementEvent.impressionData.impressionId, engagementEvent.impressionData.userId64 AS userId64, engagementEvent.engagementEpochTimeMilliSec AS engagementEpochTimeMilliSec, engagementEvent.engagementDetails.ucEventMetadata.mediaIndex AS mediaIndex, engagementEvent.impressionData.dpaData.productKeys AS productKeys, engagementEvent.engagementType AS engagementType, CASE WHEN engagementEvent.engagementDetails.ucEventMetadata.mediaIndex IS NOT NULL AND ARRAY_LENGTH(engagementEvent.impressionData.dpaData.productKeys) > engagementEvent.engagementDetails.ucEventMetadata.mediaIndex AND engagementEvent.engagementDetails.ucEventMetadata.mediaIndex >= 0 THEN engagementEvent.impressionData.dpaData.productKeys[ OFFSET(engagementEvent.engagementDetails.ucEventMetadata.mediaIndex) ] ELSE NULL END AS clickedProductKey, CASE WHEN engagementEvent.engagementDetails.ucEventMetadata.mediaIndex IS NULL THEN 'media_index_null' WHEN engagementEvent.engagementDetails.ucEventMetadata.mediaIndex < 0 THEN 'media_index_negative' WHEN ARRAY_LENGTH(engagementEvent.impressionData.dpaData.productKeys) <= engagementEvent.engagementDetails.ucEventMetadata.mediaIndex THEN 'mediaIndex_out_of_bounds' ELSE 'product_key_available' END AS status 
FROM `twttr-rev-core-data-prod.logs.ads_spend_event` WHERE DATE(_PARTITIONTIME) BETWEEN DATE(DATE_SUB(utc_date , INTERVAL 30 DAY)) AND DATE(utc_date ) AND engagementEvent.impressionData.dpaData IS NOT NULL AND engagementEvent.engagementType = 42 -- Click 
) 

SELECT ad_impression.impressionId, ad_impression.userId64, ad_impression.deviceId, ad_impression.isDpa, ad_impression.promotedTweetId, ad_impression.requestId, ad_impression.advertiserId, ad_impression.accountId, ad_impression.campaignId, ad_impression.lineItemId, ad_impression.mediaId, ad_impression.creativeId64, ad_impression.promotedAppId, ad_impression.productIds, ad_impression.productSetId, ad_impression.impressionCallbackEpochTimeMilliSec, ad_conversion.octOrMact, ad_conversion.uniqueConversionTypes, ad_conversion.conversion_time_ms, dpa_click_data.clickedProductKey, dpa_click_data.engagementType, COALESCE(dpa_click_data.productKeys, ad_impression.productKeys) AS productKeys, dpa_click_data.status, utc_date as _date FROM ad_impression INNER JOIN ad_conversion ON ad_impression.impressionId = ad_conversion.impressionId LEFT JOIN dpa_click_data ON ad_impression.impressionId = dpa_click_data.impressionId


""")

for date in date_list:
  date_nodash = datetime.datetime.strptime(date, "%Y-%m-%d").strftime("%Y%m%d")
  print(f"Running {date}, and save to {date_nodash} ")
  query = query_template.substitute(date=date, date_nodash=date_nodash)

  # Run BQ
  query_job = client.query(query)
  job_id = query_job.job_id
  print(f"BigQuery Job ID for {date}: {job_id}")

  query_job.result()  # Blocks until query finishes

  # Check for success
  if query_job.errors:
      print(f"Query failed for {date}: {query_job.errors}")
  else:
      print(f"Query succeeded for {date} and table created: dpa_attributed_conversion_30d_{date_nodash}")


Running 2025-09-18, and save to 20250918 
BigQuery Job ID for 2025-09-18: 019fb8d4-1195-4127-bdbd-51bc1ee55d60
Query succeeded for 2025-09-18 and table created: dpa_attributed_conversion_30d_20250918
