Skip to content

2.27.0.0-b423

@IshanChhangani IshanChhangani tagged this 07 Aug 11:59
Summary:
**Background**:

YugabyteDB has added the pss_mem_bytes field (Proportional set size memory in bytes) to pg_stat_activity.

The yb_active_session_history (ASH) view is populated by sampling pg_stat_activity, but currently does not include any process memory usage information.

**Problem**:

Memory-related issues are notoriously difficult to diagnose, especially in complex systems where memory pressure may build gradually or spike suddenly.

Without capturing per-process memory usage at the session level, it is challenging to correlate high memory consumption with specific queries, connections, or application patterns.

Today, if a system runs out of memory or exhibits degraded performance, there is no lightweight way to trace back which sessions were using the most memory at the time.

**Proposal**:

Extend yb_active_session_history to include the pss_mem_bytes value from pg_stat_activity.

Since pss_mem_bytes is already collected in pg_stat_activity, it should be straightforward to sample and insert it into the ASH stream alongside existing fields.
Jira: DB-17018

Test Plan:
Manual testing was conducted to measure the performance impact of these changes on the ash collector under a load of 100 PostgreSQL connections.

**Baseline (Before Changes)**: Average execution time was ~7 µs.

**With Changes (After Diff)**: Average execution time was ~12 µs.

Here is the changes made to record timings :

```
diff --git a/src/postgres/src/backend/utils/misc/yb_ash.c b/src/postgres/src/backend/utils/misc/yb_ash.c
index 86ab81cd38..b743c15003 100644
--- a/src/postgres/src/backend/utils/misc/yb_ash.c
+++ b/src/postgres/src/backend/utils/misc/yb_ash.c
@@ -779,6 +779,10 @@ YbAshMain(Datum main_arg)

 		HandleMainLoopInterrupts();

+		instr_time start_time, end_time, counter;
+
+		INSTR_TIME_SET_CURRENT(start_time);
+
 		if (yb_enable_ash && yb_ash_sample_size > 0)
 		{
 			sample_time = GetCurrentTimestamp();
@@ -795,6 +799,18 @@ YbAshMain(Datum main_arg)
 			YbStorePgAshSamples(sample_time);
 			YbAshReleaseBufferLock();
 		}
+
+		INSTR_TIME_SET_CURRENT(end_time);
+		INSTR_TIME_ACCUM_DIFF(counter, end_time, start_time);
+
+		FILE *fptr = fopen("/Users/ishanchhangani/test.txt", "a");
+		fprintf(fptr, "yb_ash collector took %f ms\n",
+				INSTR_TIME_GET_DOUBLE(counter));
+		fclose(fptr);
+
+		INSTR_TIME_SET_ZERO(start_time);
+		INSTR_TIME_SET_ZERO(end_time);
+		INSTR_TIME_SET_ZERO(counter);
 	}
 	proc_exit(0);
 }
```

And the python script used to generate traffic was:
```
import os
import time
import psycopg2
from psycopg2 import sql
from multiprocessing import Process

# ==============================================================================
# Python PostgreSQL Parallel Workload Generator
#
# This script uses Python's multiprocessing library to launch concurrent
# processes that generate a workload against a PostgreSQL database.
#
# Each process will:
# 1. Establish its own database connection.
# 2. Create a unique table.
# 3. Insert 100 rows of data using a parameterized query.
# 4. Perform a SELECT query to verify the data.
# 5. Pause execution using pg_sleep().
# 6. Clean up by dropping its table, even if errors occur.
#
# Requirements:
# pip install psycopg2-binary
# ==============================================================================

# --- Configuration ---
# Desc: Set your PostgreSQL connection details here.
# Note: It's recommended to use environment variables or a secrets management
#       system for production credentials instead of hardcoding them.
DB_CONFIG = {
    "host": "localhost",
    "port": "5433",
    "dbname": "yugabyte",
    "user": "yugabyte",
    "password": "yugabyte" # Replace with your password
}

# Number of parallel processes to launch.
NUM_PROCESSES = 100

# Duration for pg_sleep in seconds.
SLEEP_DURATION = 30

def run_workload(process_id):
    """
    The main function executed by each child process.
    Connects to the database, performs SQL operations, and cleans up.
    """
    conn = None
    # Generate a unique table name for this process
    # Using process_id and the OS-level process ID (PID) ensures uniqueness
    table_name = f"workload_table_{process_id}_{os.getpid()}"
    print(f"[Process {process_id}] Starting workload (PID: {os.getpid()})...")

    try:
        # Each process must create its own connection.
        # Do not share connection objects across processes.
        conn = psycopg2.connect(**DB_CONFIG)
        conn.autocommit = True  # Autocommit to avoid managing transactions for simple DDL/DML
        cur = conn.cursor()

        # 1. Create a unique table for this process.
        # We use the 'sql' module for safe dynamic composition of identifiers like table names.
        create_table_query = sql.SQL("""
            CREATE TABLE {table} (
                id SERIAL PRIMARY KEY,
                entry_text VARCHAR(255),
                created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
            );
        """).format(table=sql.Identifier(table_name))
        cur.execute(create_table_query)
        print(f"[Process {process_id}] Created table: {table_name}")

        # 2. Insert 100 rows.
        # Use parameterized queries to prevent SQL injection.
        insert_query = sql.SQL("INSERT INTO {table} (entry_text) VALUES (%s);").format(
            table=sql.Identifier(table_name)
        )
        for i in range(1, 101):
            cur.execute(insert_query, (f"Process {process_id} - Row {i}",))
        print(f"[Process {process_id}] Inserted 100 rows.")

        # 3. Select data to verify insertion.
        select_query = sql.SQL("SELECT COUNT(*) AS total_rows FROM {table};").format(
            table=sql.Identifier(table_name)
        )
        cur.execute(select_query)
        count = cur.fetchone()[0]
        print(f"[Process {process_id}] Verification select: Found {count} rows.")

        # 4. Simulate work with pg_sleep().
        print(f"[Process {process_id}] Sleeping for {SLEEP_DURATION} seconds...")
        cur.execute("SELECT pg_sleep(%s);", (SLEEP_DURATION,))

        cur.close()

    except psycopg2.Error as e:
        print(f"[Process {process_id}] A database error occurred: {e}")
    except Exception as e:
        print(f"[Process {process_id}] An unexpected error occurred: {e}")
    finally:
        # 5. Clean up the created table.
        # This block runs whether an error occurred or not, ensuring cleanup.
        if conn:
            try:
                # We need a new cursor if the previous one was closed or failed.
                cur = conn.cursor()
                drop_table_query = sql.SQL("DROP TABLE IF EXISTS {table};").format(
                    table=sql.Identifier(table_name)
                )
                cur.execute(drop_table_query)
                print(f"[Process {process_id}] Dropped table: {table_name}. Workload finished.")
                cur.close()
            except psycopg2.Error as e:
                print(f"[Process {process_id}] Cleanup failed: {e}")
            finally:
                conn.close()

if __name__ == "__main__":
    # The __name__ == "__main__" block is crucial for multiprocessing.

    print(f"Starting {NUM_PROCESSES} parallel PostgreSQL processes...")
    print("--------------------------------------------------------")

    processes = []
    # Create and start all the processes
    for i in range(1, NUM_PROCESSES + 1):
        process = Process(target=run_workload, args=(i,))
        processes.append(process)
        process.start()

    # Wait for all processes to complete their execution
    print("All processes launched. Waiting for completion...")
    for process in processes:
        process.join()

    print("--------------------------------------------------------")
    print("All processes have completed their workloads.")
```

Reviewers: asaha, telgersma

Reviewed By: asaha, telgersma

Subscribers: svc_phabricator, smishra, jason, yql

Differential Revision: https://phorge.dev.yugabyte.com/D44443
Assets 2
Loading