### Import and config

In [1]:
# Imports
import os
import time
import random
import json
import logging

from dotenv import load_dotenv
from tqdm import tqdm

import pandas as pd
import requests

from sqlalchemy import create_engine, text, inspect, Text, Integer, BigInteger, Float, Boolean
from sqlalchemy.dialects.postgresql import JSONB

# Configuration
load_dotenv()

# API urls
AUTH_URL = os.getenv('AUTH_URL')
ACTIVITIES_URL = os.getenv('ACTIVITIES_URL')
ACTIVITY_DETAIL_URL_TMPL = os.getenv('ACTIVITY_DETAIL_URL_TMPL')
ACTIVITY_KUDOS_URL_TMPL = os.getenv('ACTIVITY_KUDOS_URL_TMPL')

# API keys
CLIENT_ID = os.getenv('CLIENT_ID')
CLIENT_SECRET = os.getenv('CLIENT_SECRET')
REFRESH_TOKEN = os.getenv('REFRESH_TOKEN')

# DB
DB_URI = os.getenv('DB_URI')

# Bronze tables
TARGET_B_SCHEMA = os.getenv('TARGET_B_SCHEMA')
ACTIVITIES_B_TABLE = os.getenv('ACTIVITIES_B_TABLE')
DETAILS_B_TABLE = os.getenv('DETAILS_B_TABLE')
KUDOS_B_TABLE = os.getenv('KUDOS_B_TABLE')

# Pagination
PER_PAGE = int(os.getenv('PER_PAGE'))
MAX_PAGES = int(os.getenv('MAX_PAGES'))

# Timeouts and retries
REQUEST_TIMEOUT = int(os.getenv('REQUEST_TIMEOUT'))
MAX_RETRIES = int(os.getenv('MAX_RETRIES'))
BASE_SLEEP = float(os.getenv('BASE_SLEEP'))

REFRESH_THRESHOLD_DAYS = int(os.getenv('REFRESH_THRESHOLD_DAYS'))

# Other
LOG_LEVEL = os.getenv('LOG_LEVEL')

logging.basicConfig(
    level=getattr(logging, LOG_LEVEL.upper(), logging.INFO),
    format="%(asctime)s | %(levelname)s | %(message)s"
)

### API keys validation

In [2]:
REQUIRED_API_ENV = ['CLIENT_ID', 'CLIENT_SECRET', 'REFRESH_TOKEN']
missing_api_env = [env for env in REQUIRED_API_ENV if not os.getenv(env)]
if missing_api_env:
  raise RuntimeError(f"Missing env variables: {', '.join(missing_api_env)}.")

### DB names validation

In [3]:
REQUIRED_DB_ENV = ['DB_URI', 'TARGET_B_SCHEMA', 'ACTIVITIES_B_TABLE', 'DETAILS_B_TABLE']
missing_db_env = [env for env in REQUIRED_DB_ENV if not os.getenv(env)]
if missing_db_env:
  raise RuntimeError(f"Missing env variables: {', '.join(missing_db_env)}.")

### Connecting with PostgreSQL

In [4]:
engine = create_engine(
  DB_URI, 
  pool_pre_ping=True, 
  pool_size=5, 
  max_overflow=10
)
with engine.connect() as conn:
  conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {TARGET_B_SCHEMA};"))
logging.info(f"Connection established and {TARGET_B_SCHEMA} schema exists.")

2025-09-23 12:28:44,657 | INFO | Connection established and bronze schema exists.


### Token authorization

In [5]:
def get_access_token(auth_url: str, client_id: str, client_secret: str, refresh_token: str, timeout: int = 30) -> str:
  """
  Request a new access token from an OAuth2 authentication endpoint.

  The function uses a refresh token to obtain a short-lived access token. 
  If the response does not contain an ``access_token``, a RuntimeError is raised.

  Parameters
  ----------
  auth_url : str
      URL of the OAuth2 token endpoint.
  client_id : str
      OAuth2 client identifier.
  client_secret : str
      OAuth2 client secret.
  refresh_token : str
      Refresh token used to request a new access token.
  timeout : int, default=30
      Timeout in seconds for the HTTP request.

  Returns
  -------
  str
      The access token string retrieved from the authentication server.

  Raises
  ------
  RuntimeError
      If the response does not include an ``access_token``.
  requests.exceptions.RequestException
      If the HTTP request fails (e.g., network error, timeout).

  Notes
  -----
  Logs an informational message when the token is successfully retrieved.
  """

  payload = {
    'client_id': client_id,
    'client_secret': client_secret,
    'refresh_token': refresh_token,
    'grant_type': 'refresh_token',
  }
  res = requests.post(auth_url, data=payload, timeout=timeout)
  data = res.json()
  token = data.get('access_token')
  if not token:
    raise RuntimeError(f"No access token in response: {data}")
  logging.info('Access token retrived.')
  return token

access_token = get_access_token(AUTH_URL, CLIENT_ID, CLIENT_SECRET, REFRESH_TOKEN)

2025-09-23 12:28:45,065 | INFO | Access token retrived.


### HTTP session

In [6]:
session = requests.Session()
session.headers.update({"Authorization": f"Bearer {access_token}"})

def get_json_with_retry(url: str, params=None, max_retries: int = MAX_RETRIES, timeout: int = REQUEST_TIMEOUT, base_sleep: float = BASE_SLEEP):
  """
  Send a GET request with automatic retries and return the parsed JSON response.

  The function retries on:
    * HTTP 429 (rate limiting) ‚Äî respects the ``Retry-After`` header if present,
      otherwise waits an increasing backoff time.
    * HTTP 5xx errors ‚Äî retries with exponential backoff and jitter.
    * Network/connection errors ‚Äî retries with exponential backoff and jitter.

  For each attempt, the backoff time increases by ``base_sleep * attempt`` seconds
  plus a small random jitter. On the final attempt, any error is raised.

  Parameters
  ----------
  url : str
      Endpoint URL to send the GET request to.
  params : dict, optional
      Query string parameters to include in the request.
  max_retries : int, default=MAX_RETRIES
      Maximum number of retry attempts before failing.
  timeout : int, default=REQUEST_TIMEOUT
      Timeout in seconds for each HTTP request.
  base_sleep : float, default=BASE_SLEEP
      Base number of seconds used for exponential backoff between retries.

  Returns
  -------
  dict
      Parsed JSON response from the server.

  Raises
  ------
  requests.exceptions.RequestException
      If the request fails after all retry attempts.
  requests.exceptions.HTTPError
      If the server returns a 4xx/5xx response on the last attempt.
  json.JSONDecodeError
      If the response cannot be parsed as JSON.

  Notes
  -----
  * HTTP 429 triggers a wait using ``Retry-After`` if available, otherwise a fallback.
  * Logs warnings before each retry and errors if JSON parsing fails.
  * Uses a global ``requests.Session`` (`session`) for connection pooling.
  """

  for attempt in range(1, max_retries + 1):
    try:
      resp = session.get(url, params=params, timeout=timeout)
      
      if resp.status_code == 429:
        retry_after = resp.headers.get('Retry-After')
        if retry_after and retry_after.isdigit():
          sleep_for = int(retry_after)
        else:
          sleep_for = max(base_sleep * attempt, 15)
        logging.warning(f"HTTP 429 - wait {sleep_for}s (attempt {attempt}/{max_retries})")
        time.sleep(sleep_for)
        continue

      if 500 <= resp.status_code < 600:
        if attempt == max_retries:
          resp.raise_for_status()
        sleep_for = base_sleep * attempt + random.uniform(0, 1.0)
        logging.warning(f"HTTP {resp.status_code} ‚Äî retry in {sleep_for:.1f}s (attempt {attempt}/{max_retries})")
        time.sleep(sleep_for)
        continue

      resp.raise_for_status()

      try:
        return resp.json()
      except json.JSONDecodeError:
        logging.error('JSON parsing error')
        raise
    
    except requests.exceptions.RequestException as e:
      if attempt == max_retries:
        logging.exception('Request error (last attempt)')
        raise
      sleep_for = base_sleep * attempt + random.uniform(0, 1.0)
      logging.warning(f"{e} ‚Äî retry in {sleep_for:.1f}s (attempt {attempt}/{max_retries})")
      time.sleep(sleep_for)

### Getting activities list

In [7]:
def fetch_all_activities(activites_url: str, per_page: int = PER_PAGE, max_pages: int = MAX_PAGES):
  """
  Fetch all activity records from a paginated API endpoint.

  The function iterates through API pages until either:
    * the maximum number of pages is reached (``max_pages``), or
    * the API returns an empty list (end of results).

  Each page is retrieved using ``get_json_with_retry`` to ensure resilience
  against transient errors (rate limits, timeouts, 5xx responses).

  Parameters
  ----------
  activites_url : str
      The base URL of the activities endpoint (must support ``per_page`` and ``page`` query params).
  per_page : int, default=PER_PAGE
      Number of activity records to request per page.
  max_pages : int, default=MAX_PAGES
      Maximum number of pages to fetch before stopping.

  Returns
  -------
  list of dict
      Combined list of activity objects returned by the API.

  Raises
  ------
  RuntimeError
      If the API response is not a list (unexpected schema).
  requests.exceptions.RequestException
      If the underlying HTTP requests fail after retries.

  Notes
  -----
  * Logs the number of records downloaded per page and the running total.
  * Stops early if the API returns an empty list before reaching ``max_pages``.
  * The total number of records is ``per_page * n_pages`` at most.
  """

  all_items = []
  page = 1
  while page <= max_pages:
    params = {'per_page': per_page, 'page': page}
    data = get_json_with_retry(activites_url, params=params)

    if not isinstance(data, list):
      raise RuntimeError(f"Unexpected response type for page {page}: {type(data)} ‚Äî expected list")
    
    if not data:
      break
    
    all_items.extend(data)
    logging.info(f"Page {page}: downloaded {len(data)} records (total: {len(all_items)})")
    page += 1
  return all_items

activities_raw = fetch_all_activities(ACTIVITIES_URL, per_page=PER_PAGE, max_pages=MAX_PAGES)
logging.info(f"Total activities downloaded: {len(activities_raw)}")
activities_df = pd.json_normalize(activities_raw, sep='_')
if activities_df.empty:
    raise RuntimeError('No activities to save.')
activities_df.head()

2025-09-23 12:28:50,086 | INFO | Page 1: downloaded 200 records (total: 200)
2025-09-23 12:28:56,495 | INFO | Page 2: downloaded 200 records (total: 400)
2025-09-23 12:29:03,494 | INFO | Page 3: downloaded 200 records (total: 600)
2025-09-23 12:29:10,084 | INFO | Page 4: downloaded 200 records (total: 800)
2025-09-23 12:29:14,891 | INFO | Page 5: downloaded 200 records (total: 1000)
2025-09-23 12:29:15,998 | INFO | Page 6: downloaded 99 records (total: 1099)
2025-09-23 12:29:16,228 | INFO | Total activities downloaded: 1099


Unnamed: 0,resource_state,name,distance,moving_time,elapsed_time,total_elevation_gain,type,sport_type,workout_type,id,...,has_kudoed,suffer_score,athlete_id,athlete_resource_state,map_id,map_summary_polyline,map_resource_state,average_cadence,max_watts,weighted_average_watts
0,2,Afternoon Ride,23312.2,5551,8140,61.0,Ride,Ride,,15891685827,...,False,11.0,81055898,1,a15891685827,ki}vHuomgBcDnFgCtEsAdBi@z@aInNWh@KvAKZQP_@B{@f...,2,,,
1,2,23km Long Runü•µ,20049.3,7499,8179,51.0,Run,Run,2.0,15885021588,...,False,98.0,81055898,1,a15885021588,wf}vHismgBkArBaAvAWh@g@x@q@bA{AzCi@|@gApAgEpHe...,2,83.7,499.0,323.0
2,2,Evening Ride,19347.2,4273,4375,75.0,Ride,Ride,10.0,15879687027,...,False,8.0,81055898,1,a15879687027,oh|vHsuogBVn@XNDNAFiBjDy@hAi@f@qAxBcAvAgAdBSN[...,2,,,
3,2,Afternoon Weight Training,0.0,3673,3673,0.0,Workout,WeightTraining,,15865360447,...,False,9.0,81055898,1,a15865360447,,2,,,
4,2,K200süèéÔ∏è,9915.6,3272,3314,10.0,Run,Run,3.0,15855640218,...,False,60.0,81055898,1,a15855640218,or{vHq{ngBHZ^l@Vv@|@rB|@nCd@rCf@rDZtApAjEdBjF^...,2,82.2,546.0,382.0


### Set `bronze.activities` types map

In [8]:
activities_dtype_map = {
    "resource_state": Integer,
    "name": Text,
    "distance": Float,
    "moving_time": Integer,
    "elapsed_time": Integer,
    "total_elevation_gain": Float,
    "type": Text,
    "sport_type": Text,
    "workout_type": Float,
    "id": BigInteger,
    "start_date": Text,
    "start_date_local": Text,
    "timezone": Text,
    "utc_offset": Float,
    "location_city": Text,
    "location_state": Text,
    "location_country": Text,
    "achievement_count": Integer,
    "kudos_count": Integer,
    "comment_count": Integer,
    "athlete_count": Integer,
    "photo_count": Integer,
    "trainer": Boolean,
    "commute": Boolean,
    "manual": Boolean,
    "private": Boolean,
    "visibility": Text,
    "flagged": Boolean,
    "gear_id": Text,
    "start_latlng": JSONB,
    "end_latlng": JSONB,
    "average_speed": Float,
    "max_speed": Float,
    "average_cadence": Float,
    "average_watts": Float,
    "max_watts": Float,
    "weighted_average_watts": Float,
    "device_watts": Boolean,
    "kilojoules": Float,
    "has_heartrate": Boolean,
    "average_heartrate": Float,
    "max_heartrate": Float,
    "heartrate_opt_out": Boolean,
    "display_hide_heartrate_option": Boolean,
    "elev_high": Float,
    "elev_low": Float,
    "upload_id": BigInteger,
    "upload_id_str": Text,
    "external_id": Text,
    "from_accepted_tag": Boolean,
    "pr_count": Integer,
    "total_photo_count": Integer,
    "has_kudoed": Boolean,
    "suffer_score": Float,
    "athlete_id": BigInteger,
    "athlete_resource_state": Integer,
    "map_id": Text,
    "map_summary_polyline": Text,
    "map_resource_state": Integer,
    "average_temp": Float,
}

### Save `bronze.activities` to database

In [9]:
table_full_name = f"{TARGET_B_SCHEMA}.{ACTIVITIES_B_TABLE}"
logging.warning(f"Whole table {table_full_name} will be overwritten.")
activities_df.to_sql(
  ACTIVITIES_B_TABLE, 
  engine, 
  schema=TARGET_B_SCHEMA, 
  if_exists="replace", 
  index=False, 
  method="multi", 
  chunksize=1000, 
  dtype=activities_dtype_map
)



-2

### Identify missing details ‚Äî only `id` that are not present in `bronze.activities_details`

In [10]:
with engine.begin() as conn:
  conn.execute(text(f"""
    CREATE TABLE IF NOT EXISTS {TARGET_B_SCHEMA}.{ACTIVITIES_B_TABLE} (id BIGINT PRIMARY KEY)
"""))
  existing_ids = pd.read_sql(text(f"SELECT id FROM {TARGET_B_SCHEMA}.{DETAILS_B_TABLE}"), conn)
  existing_ids_set = set(existing_ids['id'].astype('Int64').dropna().to_list())

all_ids_set = set(activities_df['id'].astype('Int64').dropna().to_list())
missing_ids = sorted(all_ids_set - existing_ids_set)
logging.info(f"Missing details of {len(missing_ids)} IDs")
pd.DataFrame({'id': missing_ids}).head()

2025-09-23 12:29:17,285 | INFO | Missing details of 4 IDs


Unnamed: 0,id
0,15865360447
1,15879687027
2,15885021588
3,15891685827


### Identify recent activities ‚Äî update and download kudos of `id` from the last 30 days

In [11]:
with engine.begin() as conn:
  conn.execute(text(f"""
    CREATE TABLE IF NOT EXISTS {TARGET_B_SCHEMA}.{ACTIVITIES_B_TABLE} (id BIGINT PRIMARY KEY)
"""))
  bronze_activities_df = pd.read_sql(text(f"SELECT * FROM {TARGET_B_SCHEMA}.{DETAILS_B_TABLE}"), conn)

In [12]:
bronze_activities_df['date'] =  pd.to_datetime(bronze_activities_df['start_date_local']).dt.tz_localize(None)
today = pd.Timestamp('today').normalize()
bronze_activities_df['activity_age_days'] = (today - bronze_activities_df['date'].dt.normalize()).dt.days

In [14]:
recent_activities_df = bronze_activities_df[bronze_activities_df['activity_age_days'] <= REFRESH_THRESHOLD_DAYS]
recent_activities_ids = set(recent_activities_df['id'].astype('Int64').dropna().to_list())
recent_activities_ids = sorted(recent_activities_ids)
logging.info(f"Refreshing details of {len(recent_activities_ids)} IDs")
pd.DataFrame({'id': recent_activities_ids}).head()

2025-09-23 12:30:08,202 | INFO | Refreshing details of 29 IDs


Unnamed: 0,id
0,15581915542
1,15592776280
2,15606539244
3,15626291263
4,15626580466


### Download activities details

In [16]:
def fetch_activity_details(activity_id: int):
  """
  Fetch detailed information for a single activity.

  Builds the activity detail endpoint URL using the given activity ID
  and retrieves the JSON payload with retry logic.

  Parameters
  ----------
  activity_id : int
      Unique identifier of the activity.

  Returns
  -------
  dict
      JSON object containing the activity details as returned by the API.

  Raises
  ------
  requests.exceptions.RequestException
      If the request fails after retries (e.g., network error, rate limit, 5xx).
  requests.exceptions.HTTPError
      If the server returns an error response on the last attempt.
  json.JSONDecodeError
      If the response cannot be parsed as JSON.

  Notes
  -----
  * Uses the global template ``ACTIVITY_DETAIL_URL_TMPL`` to construct the URL.
  * Under the hood calls ``get_json_with_retry`` for resiliency.
  """
  url = ACTIVITY_DETAIL_URL_TMPL.format(id=activity_id)
  return get_json_with_retry(url, params=None)

ids_to_be_downloaded = sorted(set(missing_ids) | set(recent_activities_ids))
details_records = []
for i, act_id in tqdm(enumerate(ids_to_be_downloaded, start=1), total=len(ids_to_be_downloaded)):
  try:
    resp = fetch_activity_details(act_id)
    
    if not isinstance(resp, dict):
      logging.warning(f"id={act_id}: unexpected response type ({type(resp)}), skip")
      continue
    details_records.append(resp)
    
  except Exception as e:
    logging.error(f"Error downloading details for id={act_id}: {e}")
  
  time.sleep(random.randint(7, 9))

logging.info(f"Details downloaded: {len(details_records)} / {len(ids_to_be_downloaded)}")
details_df_new = pd.json_normalize(details_records, sep='_')

100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 33/33 [04:47<00:00,  8.71s/it]
2025-09-23 12:35:48,037 | INFO | Details downloaded: 33 / 33


In [17]:
if not details_df_new.empty:
  details_df_new = details_df_new.drop(['message', 'errors'], axis=1, errors='ignore')
  details_df_new = details_df_new.dropna(how='all')
  
details_df_new.head()

Unnamed: 0,resource_state,name,distance,moving_time,elapsed_time,total_elevation_gain,type,sport_type,workout_type,id,...,similar_activities_trend_direction,similar_activities_resource_state,average_temp,photos_primary_unique_id,photos_primary_urls_600,photos_primary_urls_100,photos_primary_source,photos_primary_media_type,photos_use_primary_photo,private_note
0,3,Progressive Runü™¶,9762.5,3112,3309,17.0,Run,Run,3.0,15581915542,...,1.0,2.0,,,,,,,,
1,3,Afternoon Weight Training,0.0,3524,3524,0.0,WeightTraining,WeightTraining,30.0,15592776280,...,,,,,,,,,,
2,3,Rolling 300sü™¶,9061.8,3078,3116,9.0,Run,Run,3.0,15606539244,...,0.0,2.0,,,,,,,,
3,3,Afternoon Weight Training,0.0,3582,3582,0.0,WeightTraining,WeightTraining,30.0,15626291263,...,,,,,,,,,,
4,3,Afternoon Ride,12140.8,1749,6223,38.0,Ride,Ride,10.0,15626580466,...,,,25.0,,,,,,,


### Set `bronze.activities_details` types map

In [18]:
activities_details_dtype_map = {
    "resource_state": Integer,
    "name": Text,
    "distance": Float,
    "moving_time": Integer,
    "elapsed_time": Integer,
    "total_elevation_gain": Float,
    "type": Text,
    "sport_type": Text,
    "workout_type": Float,
    "id": BigInteger,
    "start_date": Text,
    "start_date_local": Text,
    "timezone": Text,
    "utc_offset": Float,
    "location_city": Text,
    "location_state": Text,
    "location_country": Text,
    "achievement_count": Integer,
    "kudos_count": Integer,
    "comment_count": Integer,
    "athlete_count": Integer,
    "photo_count": Integer,
    "trainer": Boolean,
    "commute": Boolean,
    "manual": Boolean,
    "private": Boolean,
    "visibility": Text,
    "flagged": Boolean,
    "gear_id": Text,
    "start_latlng": JSONB,
    "end_latlng": JSONB,
    "average_speed": Float,
    "max_speed": Float,
    "average_cadence": Float,
    "average_watts": Float,
    "max_watts": Float,
    "weighted_average_watts": Float,
    "device_watts": Boolean,
    "kilojoules": Float,
    "has_heartrate": Boolean,
    "average_heartrate": Float,
    "max_heartrate": Float,
    "heartrate_opt_out": Boolean,
    "display_hide_heartrate_option": Boolean,
    "elev_high": Float,
    "elev_low": Float,
    "upload_id": BigInteger,
    "upload_id_str": Text,
    "external_id": Text,
    "from_accepted_tag": Boolean,
    "pr_count": Integer,
    "total_photo_count": Integer,
    "has_kudoed": Boolean,
    "suffer_score": Float,
    "description": Text,
    "calories": Float,
    "perceived_exertion": Text,
    "prefer_perceived_exertion": Text,
    "segment_efforts": JSONB,
    "splits_metric": JSONB,
    "splits_standard": JSONB,
    "laps": JSONB,
    "best_efforts": JSONB,
    "stats_visibility": JSONB,
    "hide_from_home": Boolean,
    "device_name": Text,
    "embed_token": Text,
    "available_zones": JSONB,
    "athlete_id": BigInteger,
    "athlete_resource_state": Integer,
    "map_id": Text,
    "map_polyline": Text,
    "map_resource_state": Integer,
    "map_summary_polyline": Text,
    "gear_primary": Boolean,
    "gear_name": Text,
    "gear_nickname": Text,
    "gear_resource_state": Float,
    "gear_retired": Boolean,
    "gear_distance": Float,
    "gear_converted_distance": Float,
    "photos_primary": JSONB,
    "photos_count": Integer,
    "similar_activities_effort_count": Float,
    "similar_activities_average_speed": Float,
    "similar_activities_min_average_speed": Float,
    "similar_activities_mid_average_speed": Float,
    "similar_activities_max_average_speed": Float,
    "similar_activities_pr_rank": Float,
    "similar_activities_frequency_milestone": Float,
    "similar_activities_trend_speeds": JSONB,
    "similar_activities_trend_current_activity_index": Float,
    "similar_activities_trend_min_speed": Float,
    "similar_activities_trend_mid_speed": Float,
    "similar_activities_trend_max_speed": Float,
    "similar_activities_trend_direction": Float,
    "similar_activities_resource_state": Float,
    "average_temp": Float,
    "photos_primary_unique_id": Text,
    "photos_primary_urls_600": Text,
    "photos_primary_urls_100": Text,
    "photos_primary_source": Integer,
    "photos_primary_media_type": Integer,
    "photos_use_primary_photo": Boolean,
    "private_note": Text
}

### Save `bronze.activities_details` to database

In [19]:
if details_df_new.empty:
  logging.info('No new details to be saved.')
else:
  insp = inspect(engine)

  # 1) Check if table exists in PostgreSQL
  if not insp.has_table(table_name=DETAILS_B_TABLE, schema=TARGET_B_SCHEMA):
    details_df_new.head(0).to_sql(
      DETAILS_B_TABLE, 
      engine, 
      schema=TARGET_B_SCHEMA,
      if_exists="append", 
      index=False, 
      dtype=activities_details_dtype_map
    )

    # 2) Check if primary key exists
  insp = inspect(engine)
  pk = insp.get_pk_constraint(table_name=DETAILS_B_TABLE, schema=TARGET_B_SCHEMA)
  if not pk.get("constrained_columns"):
    with engine.begin() as conn:
      conn.execute(text(f'''
        ALTER TABLE {TARGET_B_SCHEMA}.{DETAILS_B_TABLE}
        ALTER COLUMN "id" SET NOT NULL;
        ALTER TABLE {TARGET_B_SCHEMA}.{DETAILS_B_TABLE}
        ADD PRIMARY KEY ("id");
      '''
      ))
    
    # 3) Staging
  with engine.begin() as conn:

    #Temp table
    conn.execute(text(f'''
        CREATE TEMP TABLE details_stg
        AS TABLE {TARGET_B_SCHEMA}.{DETAILS_B_TABLE} WITH NO DATA
      '''
      ))
    
    # Load data to temp table
    details_df_new.to_sql(
      "details_stg", 
      conn, 
      if_exists="append",
      index=False, 
      method="multi", 
      chunksize=5000, 
      dtype=activities_details_dtype_map
      )
    
    # Prepare upsert
    stg_cols = details_df_new.columns.to_list()
    
    if not 'id' in stg_cols:
      raise RuntimeError('Column "id" is required in details_df_new to perform UPSERT.')
    
    cols_csv = ', '.join(f'"{c}"' for c in stg_cols)
    set_csv  = ', '.join(f'"{c}" = EXCLUDED."{c}"' for c in stg_cols if c != 'id')

    upsert_sql = f'''
        INSERT INTO {TARGET_B_SCHEMA}.{DETAILS_B_TABLE} ({cols_csv})
        SELECT {cols_csv} FROM details_stg
        ON CONFLICT ("id") DO UPDATE
        SET {set_csv};
      '''
    conn.execute(text(upsert_sql))

    # Clean up
    conn.execute(text('DROP TABLE IF EXISTS details_stg;'))
  
    logging.info('Activities details saved to PostgreSQL.')

2025-09-23 12:49:06,726 | INFO | Activities details saved to PostgreSQL.


### Sanity check

In [20]:
with engine.begin() as conn:
    total_acts = pd.read_sql(text(f"SELECT COUNT(*) AS n FROM {TARGET_B_SCHEMA}.{ACTIVITIES_B_TABLE}"), conn)
    total_det = pd.read_sql(text(f"SELECT COUNT(*) AS n FROM {TARGET_B_SCHEMA}.{DETAILS_B_TABLE}"), conn)

logging.info(f"Total number of records in {TARGET_B_SCHEMA}.{ACTIVITIES_B_TABLE}: {int(total_acts['n'][0])}")
logging.info(f"Total number of records in {TARGET_B_SCHEMA}.{DETAILS_B_TABLE}: {int(total_det['n'][0])}")

2025-09-23 12:49:10,298 | INFO | Total number of records in bronze.activities: 1099
2025-09-23 12:49:10,299 | INFO | Total number of records in bronze.activities_details: 1099


### Identify missing kudos ‚Äî only `activity_id` that are not present in `bronze.kudos`

In [21]:
with engine.begin() as conn:
  conn.execute(text(f"""
    CREATE TABLE IF NOT EXISTS {TARGET_B_SCHEMA}.{KUDOS_B_TABLE} (id TEXT PRIMARY KEY, activity_id BIGINT)
"""))
  existing_ids_kudos = pd.read_sql(text(f"SELECT activity_id FROM {TARGET_B_SCHEMA}.{KUDOS_B_TABLE}"), conn)
  existing_ids_kudos_set = set(existing_ids_kudos['activity_id'].astype('Int64').dropna().to_list())

all_ids_kudos_set = set(activities_df['id'].astype('Int64').dropna().to_list())
missing_ids_kudos = sorted(all_ids_kudos_set - existing_ids_kudos_set, reverse=True)
logging.info(f"Missing kudos of {len(missing_ids_kudos)} activities")
pd.DataFrame({'activity_id': missing_ids_kudos}).head()

2025-09-23 12:49:12,958 | INFO | Missing kudos of 154 activities


Unnamed: 0,activity_id
0,15891685827
1,15885021588
2,15879687027
3,15865360447
4,14489256849


### Download activities kudos

In [22]:
def fetch_activity_kudos(activity_id: int):
  """
  """
  url = ACTIVITY_KUDOS_URL_TMPL.format(id=activity_id)
  return get_json_with_retry(url, params=None)

kudos_to_be_downloaded = sorted(set(missing_ids_kudos) | set(recent_activities_ids))

kudos_records = []
for i, act_id in tqdm(enumerate(kudos_to_be_downloaded, start=1), total=len(kudos_to_be_downloaded)):
  try:
    resp = fetch_activity_kudos(act_id)
    
    if not isinstance(resp, list):
      logging.warning(f"id={act_id}: unexpected response type ({type(resp)}), skip")
      continue

    for i, kudos in enumerate(resp):
      kudos["activity_id"] = act_id
      kudos["kudos_id"] = i
    kudos_records.extend(resp)
    
  except Exception as e:
    logging.error(f"Error downloading kudos for id={act_id}: {e}")
  
  time.sleep(random.randint(7, 9))

logging.info(f"Kudos downloaded: {len(kudos_records)} / {len(kudos_to_be_downloaded)}")


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 183/183 [26:59<00:00,  8.85s/it]
2025-09-23 13:16:25,915 | INFO | Kudos downloaded: 332 / 183


In [18]:

kudos_df_new = pd.json_normalize(kudos_records, sep='_')
if not kudos_df_new.empty:
  kudos_df_new = kudos_df_new.drop(['message', 'errors'], axis=1, errors='ignore')
  kudos_df_new = kudos_df_new.dropna(how='all')
  
kudos_df_new['id'] = kudos_df_new[['activity_id', 'kudos_id']].astype("string").agg("-".join, axis=1)

In [None]:
kudos_df_new.head()

### Set `bronze.kudos` types map

In [20]:
activities_kudos_dtype_map = {
    "resource_state": Integer,
    "first_name": Text,
    "last_name": Float,
    "activity_id": BigInteger,
    "id": Text
}

### Save `bronze.kudos` to database

In [None]:
if kudos_df_new.empty:
  logging.info('No new kudos to be saved.')
else:
  insp = inspect(engine)

  # 1) Check if table exists in PostgreSQL
  if not insp.has_table(table_name=KUDOS_B_TABLE, schema=TARGET_B_SCHEMA):
    kudos_df_new.head(0).to_sql(
      KUDOS_B_TABLE, 
      engine, 
      schema=TARGET_B_SCHEMA,
      if_exists="append", 
      index=False, 
      dtype=activities_kudos_dtype_map
    )

    # 2) Check if primary key exists
  insp = inspect(engine)
  pk = insp.get_pk_constraint(table_name=KUDOS_B_TABLE, schema=TARGET_B_SCHEMA)
  if not pk.get("constrained_columns"):
    with engine.begin() as conn:
      conn.execute(text(f'''
        ALTER TABLE {TARGET_B_SCHEMA}.{KUDOS_B_TABLE}
        ALTER COLUMN "id" SET NOT NULL;
        ALTER TABLE {TARGET_B_SCHEMA}.{KUDOS_B_TABLE}
        ADD PRIMARY KEY ("id");
      '''
      ))
    
    # 3) Staging
  with engine.begin() as conn:

    #Temp table
    conn.execute(text(f'''
        CREATE TEMP TABLE kudos_stg
        AS TABLE {TARGET_B_SCHEMA}.{KUDOS_B_TABLE} WITH NO DATA
      '''
      ))
    
    # Load data to temp table
    kudos_df_new.to_sql(
      "kudos_stg", 
      conn, 
      if_exists="append",
      index=False, 
      method="multi", 
      chunksize=5000, 
      dtype=activities_kudos_dtype_map
      )
    
    # Prepare upsert
    stg_cols = kudos_df_new.columns.to_list()
    
    if not 'id' in stg_cols:
      raise RuntimeError('Column "id" is required in kudos_df_new to perform UPSERT.')
    
    cols_csv = ', '.join(f'"{c}"' for c in stg_cols)
    set_csv  = ', '.join(f'"{c}" = EXCLUDED."{c}"' for c in stg_cols if c != 'id')

    upsert_sql = f'''
        INSERT INTO {TARGET_B_SCHEMA}.{KUDOS_B_TABLE} ({cols_csv})
        SELECT {cols_csv} FROM kudos_stg
        ON CONFLICT ("id") DO UPDATE
        SET {set_csv};
      '''
    conn.execute(text(upsert_sql))

    # Clean up
    conn.execute(text('DROP TABLE IF EXISTS kudos_stg;'))
  
    logging.info('Activities kudos saved to PostgreSQL.')