In [None]:
from dotenv import load_dotenv
import os
from google.cloud import bigquery
from google.oauth2 import service_account
from pandas_gbq import to_gbq
import pandas as pd
import datetime
import warnings
import json
import subprocess
import numpy as np
import requests

warnings.filterwarnings('ignore')

load_dotenv(override=True)


True

In [159]:
#expiry_time = 0

In [160]:


# build your token-URL once
TOKEN_URL = (
    f"https://{os.environ['ATSCALE_URL']}"
    "/auth/realms/atscale/protocol/openid-connect/token"
)

def fetch_tokens(grant_type="password", **extra_fields):
    """
    grant_type: "password" or "refresh_token"
    extra_fields: e.g. username, password, refresh_token
    """
    cmd = [
        "curl", "--location", TOKEN_URL,
        "--insecure",
        "--header", "Content-Type: application/x-www-form-urlencoded",
        "--data-urlencode", f"client_id=atscale-modeler",
        "--data-urlencode", f"client_secret={os.environ['ATSCALE_CLIENT_SECRET']}",
        "--data-urlencode", f"grant_type={grant_type}"
    ]
    # append the extra form fields
    for key, val in extra_fields.items():
        cmd += ["--data-urlencode", f"{key}={val}"]
    raw = subprocess.check_output(cmd)
    return json.loads(raw)

# 1) Initial fetch with user/pass
token_payload = fetch_tokens(
    grant_type="password",
    username=os.environ["ATSCALE_USER"],
    password=os.environ["ATSCALE_USER_PASSWORD"]
)

# compute absolute expiry timestamp
now_utc = datetime.datetime.now(datetime.timezone.utc)
expires_in = token_payload["expires_in"]  # seconds
expiry_time = now_utc + datetime.timedelta(seconds=expires_in)

access_token  = token_payload["access_token"]
refresh_token = token_payload["refresh_token"]


# --- later, before any API call, check & refresh if needed ---
def ensure_token():
    global access_token, refresh_token, expiry_time, token_payload

    # use timezone-aware now()
    now_utc = datetime.datetime.now(datetime.timezone.utc)

    if now_utc >= expiry_time:
        # time to refresh
        token_payload = fetch_tokens(
            grant_type="refresh_token",
            refresh_token=refresh_token
        )
        expires_in   = token_payload["expires_in"]
        expiry_time  = now_utc + datetime.timedelta(seconds=expires_in)
        access_token = token_payload["access_token"]
        refresh_token= token_payload["refresh_token"]
        print("🔄 Refreshed access token")

    return access_token

# Usage:
# headers = {"Authorization": f"Bearer {ensure_token()}"}


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  3510  100  3386  100   124   7626    279 --:--:-- --:--:-- --:--:--  7905


In [161]:
# === Your BigQuery setup ===
project_id    = 'glb-semanticlayerpoc-dev'
dataset_id    = 'atscale_aggregates'
table_name = 'atsclae_query_log'
full_table_id = f"{dataset_id}.{table_name}"

full_load_hours = 160 # Fallback to current time minus this many hours if query fails. Max limit from atscale <168

In [162]:

# Define the scope (can be more specific if needed)
SCOPES = ['https://www.googleapis.com/auth/cloud-platform']


# Authenticate using the service account file
credentials = service_account.Credentials.from_service_account_file(
    os.getenv("SERVICE_ACCOUNT_FILE"),
    scopes=SCOPES
)

# Example: Initialize a client (e.g., Google Cloud Storage)
client = bigquery.Client(credentials=credentials)



In [163]:
# Get the most recent startTime

query = f"""
SELECT startTime
FROM `{full_table_id}`
ORDER BY startTime DESC
LIMIT 1
"""

try:
    query_job = client.query(query)
    result = query_job.result()
    # Grab the first row if available
    row = next(iter(result), None)
    if row and hasattr(row, 'startTime'):
        last_load = row.startTime.strftime("%Y-%m-%d %H:%M:%S.%f")
    else:
        raise ValueError("No results returned for startTime")
except Exception as e:
    print("Failed to retrieve last load timestamp: %s", e)
    # Fallback: current time minus specified hours
    fallback_time = datetime.datetime.now() - datetime.timedelta(hours=full_load_hours)
    last_load = fallback_time.strftime("%Y-%m-%d %H:%M:%S.%f")

print("Last load timestamp:", last_load)

Failed to retrieve last load timestamp: %s 404 Not found: Table glb-semanticlayerpoc-dev:atscale_aggregates.atsclae_query_log was not found in location EU; reason: notFound, message: Not found: Table glb-semanticlayerpoc-dev:atscale_aggregates.atsclae_query_log was not found in location EU

Location: EU
Job ID: 12db982f-0beb-4bae-b980-72d08b78f43e

Last load timestamp: 2025-06-03 00:56:57.901752


In [None]:
# === Helper to flatten result + events + subqueries ===
def extract_rows(result):
    """
    For a single 'result' dict, this will produce one row per (event, subquery). 
    If an event has no 'subqueries' or an empty list, we still emit exactly one row
    with the subquery-related columns set to None.
    """
    # ‣ Collect all result-level fields first (timestamps still in ms)
    base_info = {
        "status":        result.get("status"),
        "modelName":     result.get("modelName"),
        "catalogName":   result.get("catalogName"),
        "user":          result.get("user"),
        "queryId":       result.get("queryId"),
        "startTime":     result.get("startTime"),   # ms
        "duration":      result.get("duration"),    # numeric
        "attributes":    ", ".join(result.get("attributes", [])),
        "measures":      ", ".join(result.get("measures", [])),
        "aggregates":    ", ".join(result.get("aggregates", [])),
        "optimization":  ", ".join(result.get("optimization", [])),
        "failedMessage": result.get("failedMessage", None),
    }

    rows = []

    #  Loop over each event in result["events"]
    for event in result.get("events", []):
        # Capture event-level fields (still in ms)
        event_info = {
            "event_name":      event.get("name"),
            "event_startTime": event.get("startTime"),   # ms
            "event_duration":  event.get("duration"),    # numeric
        }

        # Now check subqueries for this event
        subqueries = event.get("subqueries", [])

        if subqueries and isinstance(subqueries, list):
            # If there are one or more subqueries, loop through them
            for subq in subqueries:
                # Start with the base_info + event_info
                row = {}
                row.update(base_info)
                row.update(event_info)

                # Flatten every key in the subquery under "subquery_<key>"
                # For your JSON, subq typically has: name, startTime, duration, subqueryId
                for key, val in subq.items():
                    col_name = f"subquery_{key}"
                    row[col_name] = val

                rows.append(row)
        else:
            # No subqueries for this event → emit a single row with subquery_* = None
            row = {}
            row.update(base_info)
            row.update(event_info)

            # We know the JSON's subquery objects have keys: name, startTime, duration, subqueryId
            # So we explicitly add those columns as None
            row["subquery_name"]       = None
            row["subquery_startTime"]  = None
            row["subquery_duration"]   = None
            row["subquery_subqueryId"] = None

            rows.append(row)

    return rows

In [None]:


# === API Setup ===
url = f"https://{os.environ['ATSCALE_URL']}/api/queries?showCanaries=false&startDate={last_load}"  

headers = {
    "Authorization": "Bearer " + ensure_token(),  
    "Accept": "application/json"
}
url

'https://34.79.201.83/api/queries?showCanaries=false&startDate=2025-06-03 00:56:57.901752'

In [None]:
page = 1
page_size = 20  
has_next_page = True
all_rows = []

# === Pagination loop ===
while has_next_page:
    params = {
        "page": page,
        "pageSize": page_size
    }

    response = requests.get(url, headers=headers, verify=False, params=params)
    response.raise_for_status()
    data = response.json()

    # Extract results and append rows
    for result in data.get("results", []):
        all_rows.extend(extract_rows(result))

    # Determine whether to continue
    has_next_page = data.get("hasNextPage", False)
    page += 1

# === Create final DataFrame ===
combined_df = pd.DataFrame(all_rows)

# Example: show results
combined_df.head()


Unnamed: 0,status,modelName,catalogName,user,queryId,startTime,duration,attributes,measures,aggregates,optimization,failedMessage,event_name,event_startTime,event_duration,subquery_name,subquery_startTime,subquery_duration,subquery_subqueryId
0,successful,,,System,d7def5b3-5b90-4ba5-9640-7dc675710014,1749480880689,368,,,,,,Inbound Query,1749480880689,2.3,,,,
1,successful,,,System,d7def5b3-5b90-4ba5-9640-7dc675710014,1749480880689,368,,,,,,Planning,1749480880692,8.841,,,,
2,successful,,,System,d7def5b3-5b90-4ba5-9640-7dc675710014,1749480880689,368,,,,,,Outbound,1749480880700,353.178,Query 1,1749481000000.0,348.0,d7def5b3-5b90-4ba5-9640-7dc675710014
3,successful,,,System,d7def5b3-5b90-4ba5-9640-7dc675710014,1749480880689,368,,,,,,Result Processing,1749480881054,3.803,,,,
4,successful,,,System,543abd7e-268b-426b-a109-9a079fefafd8,1749480880317,328,,,,,,Inbound Query,1749480880317,2.241,,,,


In [None]:

# === Do some cleansing ===

def clean_dataframe_for_bigquery(df):
    cleaned_df = df.copy()
    
    # (A1) Suppose your timestamp columns are:
    datetime_cols = ["startTime", "event_startTime", "subquery_startTime"]

    # (A2) And your purely numeric columns are (durations, etc.):
    numeric_cols = ["duration", "event_duration", "subquery_duration"]

    for col in cleaned_df.columns:
        if col in datetime_cols:
            # Ensure datetime is clean

            cleaned_df[col] = pd.to_datetime(cleaned_df[col], unit='ms' , errors="coerce").values.astype("datetime64[us]")

            continue

        # If any of these still show up as object, explicitly cast them now:
        if col in numeric_cols:
            cleaned_df[col] = pd.to_numeric(cleaned_df[col], errors="coerce")
            continue

        # If values are dict or list, convert to JSON strings
        if cleaned_df[col].apply(lambda x: isinstance(x, (dict, list))).any():
            cleaned_df[col] = cleaned_df[col].apply(json.dumps)

        # Replace None/NaN with empty string, cast to string
        #cleaned_df[col] = cleaned_df[col].apply(lambda x: "" if x is None or pd.isna(x) else str(x))
        cleaned_df[col] = cleaned_df[col].fillna("").astype(str)
 
        # Finally cast to a PyArrow‐safe string dtype
        cleaned_df[col] = cleaned_df[col].astype(str)

    return cleaned_df

# Use this before uploading
cleaned_df = clean_dataframe_for_bigquery(combined_df)

cleaned_df.head()


Unnamed: 0,status,modelName,catalogName,user,queryId,startTime,duration,attributes,measures,aggregates,optimization,failedMessage,event_name,event_startTime,event_duration,subquery_name,subquery_startTime,subquery_duration,subquery_subqueryId
0,successful,,,System,d7def5b3-5b90-4ba5-9640-7dc675710014,2025-06-09 14:54:40.689,368,,,,,,Inbound Query,2025-06-09 14:54:40.689,2.3,,NaT,,
1,successful,,,System,d7def5b3-5b90-4ba5-9640-7dc675710014,2025-06-09 14:54:40.689,368,,,,,,Planning,2025-06-09 14:54:40.692,8.841,,NaT,,
2,successful,,,System,d7def5b3-5b90-4ba5-9640-7dc675710014,2025-06-09 14:54:40.689,368,,,,,,Outbound,2025-06-09 14:54:40.700,353.178,Query 1,2025-06-09 14:54:40.706,348.0,d7def5b3-5b90-4ba5-9640-7dc675710014
3,successful,,,System,d7def5b3-5b90-4ba5-9640-7dc675710014,2025-06-09 14:54:40.689,368,,,,,,Result Processing,2025-06-09 14:54:41.054,3.803,,NaT,,
4,successful,,,System,543abd7e-268b-426b-a109-9a079fefafd8,2025-06-09 14:54:40.317,328,,,,,,Inbound Query,2025-06-09 14:54:40.317,2.241,,NaT,,


In [168]:

# === Append the DataFrame ===
to_gbq(
    cleaned_df,
    destination_table=full_table_id,
    project_id=project_id,
    credentials=credentials,
    location= 'EU',
    if_exists="replace"                    # ✅ Append data if table exists
)


100%|██████████| 1/1 [00:00<00:00, 11244.78it/s]
