In [87]:
%pip install psycopg2-binary

Note: you may need to restart the kernel to use updated packages.


In [88]:
import psycopg2
import logging
from collections import defaultdict

In [89]:
global conn
try:
  conn = psycopg2.connect(
      dbname="tpc",
      user="postgres",
      password="password",
      host="localhost",
      port="5432"
  )
  logging.info("Database connection established.")
except Exception as e:
  print(f"An error occurred while connecting to the database: {e}")

2024-04-19 14:17:33,211 - INFO - Database connection established.


In [90]:
def execute_query(conn, query):
  try:
    # Create a cursor object
    cur = conn.cursor()

    # Execute the query
    cur.execute(query)

    # Fetch all rows from the result set
    rows = cur.fetchall()
    return rows
  except psycopg2.Error as e:
    print("Error executing query:", e)

In [91]:
tableSet = {'lineitem', 'orders','customer','partsupp','supplier','part','nation','region'}
tableToProperties = defaultdict(lambda: {})

In [92]:
# """
#     Refresh the file metadata as it is updated lazily.
# """

# UPDATE_TABLE_STATS_QUERY = """
# DO $$
# DECLARE
#     table_name TEXT;
# BEGIN
#     FOR table_name IN
#         SELECT tablename
#         FROM pg_catalog.pg_tables
#         WHERE schemaname = 'public' -- Specify the schema if needed
#     LOOP
#         EXECUTE format('ANALYZE %I', table_name);
#     END LOOP;
# END $$;
# """
# execute_query(conn, UPDATE_TABLE_STATS_QUERY)

In [93]:
"""
  Get and store table metadata 
"""

RELATION_PROPERTIES_QUERY = """
SELECT relname, reltuples, relpages 
FROM pg_class 
WHERE relkind IN ('r');
"""
result = execute_query(conn, RELATION_PROPERTIES_QUERY)
for name, tuple_count, page_count in result:
  if name in tableSet:
    tableToProperties[name]['tuple_count'] = tuple_count
    tableToProperties[name]['page_count'] = page_count

print(tableToProperties)

defaultdict(<function <lambda> at 0x7fd7c022af70>, {'region': {'tuple_count': 5.0, 'page_count': 1}, 'nation': {'tuple_count': 25.0, 'page_count': 1}, 'supplier': {'tuple_count': 10000.0, 'page_count': 223}, 'part': {'tuple_count': 200000.0, 'page_count': 4128}, 'partsupp': {'tuple_count': 800000.0, 'page_count': 17552}, 'customer': {'tuple_count': 150000.0, 'page_count': 3600}, 'orders': {'tuple_count': 1500000.0, 'page_count': 26136}, 'lineitem': {'tuple_count': 6001340.0, 'page_count': 112600}})


In [94]:

import logging
import json
from collections import defaultdict

# Set up logging configuration
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

RELATION_PROPERTIES_QUERY = """
SELECT relname, reltuples, relpages 
FROM pg_class 
WHERE relkind IN ('r');
"""

class Explainer:
    tableSet = {'lineitem', 'orders','customer','partsupp','supplier','part','nation','region'}
    tableToProperties = defaultdict(lambda: {})

    def __init__(self, conn):
        self.conn = conn

        result = self.run(RELATION_PROPERTIES_QUERY)
        for name, tuple_count, page_count in result:
            if name in self.tableSet:
                self.tableToProperties[name]['tuple_count'] = tuple_count
                self.tableToProperties[name]['page_count'] = page_count

        self.cost_estimator = CostEstimator(self.tableToProperties)

    def run(self, query):
        cur = self.conn.cursor()
        cur.execute(query)
        rows = cur.fetchall()
        return rows

    def run_explain(self, query):
        """
        Executes the EXPLAIN command on a given SQL query using a PostgreSQL connection
        and returns the JSON-formatted plan.

        See 'EXPLAIN' documentation:
        https://www.postgresql.org/docs/current/sql-explain.html

        See psycopg2 documentation:
        https://www.psycopg.org/docs/cursor.html#cursor.execute

        Parameters:
        query (str): SQL query to be explained.
        conn (psycopg2.connection): Active database connection object.

        Returns:
        list: A list of dictionaries representing the JSON formatted execution plan returned by PostgreSQL.
        """
        with self.conn.cursor() as cur:
            # cur.execute(f"EXPLAIN (ANALYZE true, BUFFERS true, FORMAT json) {query}")
            cur.execute(f"EXPLAIN (ANALYZE true, FORMAT json) {query}")
            explain_output = cur.fetchone()[0]
            logging.info("EXPLAIN command executed successfully.")
            # psycopg2 implicitly converts the JSON output to a list of dictionaries (python)
            return explain_output

    def analyze_node(self, node):
        """
        Analyze a single node within the execution plan, extracting estimated cost metrics from the PostgreSQL planner
        ,  and recursively processing any sub-plans.

        Parameters:
        node (dict): A single node from the JSON execution plan.

        Returns:
        dict: Node and sub-nodes analysis including both estimated and computed costs.
        """
        # Extract estimated cost metrics provided by PostgreSQL
        plan_properties = self.get_plan_properties(node)
        estimated_cost, explanation = self.cost_estimator.estimate(node)

        # Create a dictionary for this node's analysis that includes both sets of cost metrics
        node_analysis = {
            'Node Type': node.get('Node Type'),
            'Relation Name': node.get('Relation Name', 'N/A'),
            'Cost Analysis': {
                'Actual Estimated Cost': node.get('Total Cost'),
                'Estimated Cost': estimated_cost,
                'Explanation': explanation
            }
        }

        # Recursively analyze any sub-plans and include their analysis
        if 'Plans' in node:
            sub_plans = [self.analyze_node(sub_node) for sub_node in node['Plans']]
            node_analysis['Sub-plans'] = sub_plans

        return node_analysis


    def analyze_execution_plan(self, explain_output):
        """
        Initiates the recursive analysis of the entire execution plan from the top-level node.

        Parameters:
        explain_output (list): The JSON execution plan as a list from PostgreSQL.

        Returns:
        dict: A dictionary representing the analyzed execution plan including nested sub-plans.
        """
        if explain_output:
            # The execution plan is enclosed in a list -> start with the first item
            return self.analyze_node(explain_output[0]['Plan'])
        else:
            logging.error("No execution plan found.")
            return {}


    def get_plan_properties(self, node):
        """
        Utility function to extract cost-related metrics from a node in the execution plan.
        These metrics are cost estimates calculated by the PostgreSQL planner.

        See 'JSON Format Explain Plan' section in:
        https://www.postgresonline.com/journal/archives/171-Explain-Plans-PostgreSQL-9.0-Text,-JSON,-XML,-YAML-Part-1-You-Choose.html

        Parameters:
        node (dict): Node of the execution plan.

        Returns:
        dict: Extracted cost metrics.
        """
        props = {
            'Node Type': node.get('Node Type'),
            'Startup Cost': node.get('Startup Cost', 0.0),
            'Total Cost': node.get('Total Cost', 0.0),
            'Plan Rows': node.get('Plan Rows', 0),
            'Plan Width': node.get('Plan Width', 0),
            'Actual Startup Time': node.get('Actual Startup Time', 0.0),  # might not be useful for us
            'Actual Total Time': node.get('Actual Total Time', 0.0),  # might not be useful for us
            'Actual Rows': node.get('Actual Rows', 0),
            'Actual Loops': node.get('Actual Loops', 1),
            'Shared Hit Blocks': node.get('Shared Hit Blocks', 0),  # needed for scan formula
            'Shared Read Blocks': node.get('Shared Read Blocks', 0),
            'Shared Dirtied Blocks': node.get('Shared Dirtied Blocks', 0),
            'Shared Written Blocks': node.get('Shared Written Blocks', 0),
        }

        return props


    # TODO: Function to explain the computation of various cost in the QEP, explaining differences if any

    def generate_report(self, analysis_results):
        """
        Generates a formatted JSON report from the analysis results.

        Parameters:
        analysis_results (dict): Analysis results of the execution plan.

        Returns:
        str: A string representation of the JSON-formatted analysis report.
        """
        report = json.dumps(analysis_results, indent=4)
        logging.info("Report generated.")
        return report
        
class CostEstimator:
    SEQ_PAGE_COST = 1
    CPU_TUPLE_COST = 0.01
    CPU_OPERATOR_COST = 0.0025

    def __init__(self, table_properties):
        self.table_properties = table_properties

    def scan_cost_function(self, node) -> float:
        rows, table_props = node['Plan Rows'], self.table_properties[node['Relation Name']]
        seq_pages_accessed = table_props['page_count']
        print('test', seq_pages_accessed, rows)
        explanation = f"Cost function: (seq_pages_accessed * {self.SEQ_PAGE_COST}) + (rows * {self.CPU_TUPLE_COST})\nwhere seq_pages_accessed= {seq_pages_accessed} and rows= {rows}"
        return [(seq_pages_accessed * self.SEQ_PAGE_COST) + (rows * self.CPU_TUPLE_COST), explanation]

    def estimate(self, node):
        operator = node['Node Type']
        if operator == 'Seq Scan':
            return self.scan_cost_function(node)
        else:
            raise Exception(f"Cost function is undefined for operator {operator}")


In [95]:
"""
  Test Selection
"""
explainer = Explainer(conn)

SELECT_PART_QUERY="""
SELECT p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment
FROM public.part;
"""
out = explainer.run_explain(SELECT_PART_QUERY)
result = explainer.analyze_execution_plan(out)
print(explainer.generate_report(result))




2024-04-19 14:17:33,369 - INFO - EXPLAIN command executed successfully.
2024-04-19 14:17:33,370 - INFO - Report generated.


test 4128 200000
{
    "Node Type": "Seq Scan",
    "Relation Name": "part",
    "Cost Analysis": {
        "Actual Estimated Cost": 6128.0,
        "Estimated Cost": 6128.0,
        "Explanation": "Cost function: (seq_pages_accessed * 1) + (rows * 0.01)\nwhere seq_pages_accessed= 4128 and rows= 200000"
    }
}


In [96]:
# class Explainer:
  # EXPLAIN_REGEX = r"(\w+\s+\w+)\s+on\s+(\w+)\s+\(cost=(\d+\.\d+)\.\.(\d+\.\d+) rows=(\d+) width=(\d+)\) \(actual time=(\d+\.\d+)\.\.(\d+\.\d+) rows=(\d+) loops=(\d+)\)"
  # def __init__(self, query:str):
  #   self.props = {}
  #   self.exec(query)

  # def toExplainResult(self, explanation: str):
  #   print(f"explanation {explanation}")
  #   match = re.match(self.EXPLAIN_REGEX, explanation)
  #   if match:
  #     self.props['operator'] = match.group(1)
  #     self.props['table_name'] = match.group(2)
  #     self.props['starting_cost'] = float(match.group(3))
  #     self.props['total_cost'] = float(match.group(4))
  #     self.props['rows'] = int(match.group(5))
  #     self.props['width'] = int(match.group(6))
  #     self.props['actual_time_start'] = float(match.group(7)) 
  #     self.props['actual_time_end'] = float(match.group(8)) 
  #     self.props['actual_rows'] = int(match.group(9)) 
  #     self.props['loops'] = int(match.group(10)) 
  #   else:
  #     print(f"No match found for {explanation}")
  
  # def exec(self, query: str):
  #   query_result = execute_query(conn, "EXPLAIN (ANALYZE true, BUFFERS true, FORMAT json)" + query)
  #   print(query_result)
  #   self.props = json.loads(query_result)
  #   # for index, res in enumerate(query_result):
  #   #   if index == 0:
  #   #     self.toExplainResult(res[0])
  #   #   else:
  #   #     key, value = res[0].split(': ')
  #   #     self.props[key.strip()] = value.strip()
  #   return self.props
  
  # def explain(self):
  #   if self.props['operator'] not in operator_to_cost_function:
  #     raise Exception(f"Cost function not defined for {self.props['operator']}")
    
  #   tableName = self.props['table_name']
  #   operator = self.props['operator']
  #   pagesAccessed = tableToProperties[tableName]['page_count'] # change to use block/buffer wtv Alex said
  #   expected_cost = operator_to_cost_function[operator](pagesAccessed, self.props['rows'])
  #   print(f"Actual total cost: {self.props['total_cost']}; Expected total cost: {expected_cost}")
  
#   def getProps(self):
#     return self.props

# select_part_explainer = Explainer(SELECT_PART_QUERY)
# select_part_explainer.explain()
# print(select_part_explainer.props)