# Cấu hình chung

In [None]:
'''
_DATABASE = flags.DEFINE_string("database", None, "Database name.")
flags.mark_flag_as_required("database")
_USER = flags.DEFINE_string("user", None, "Database username.")
_PASSWORD = flags.DEFINE_string("password", None, "Database password.")
_HOST = flags.DEFINE_string("host", "localhost", "Database host.")
_SEED = flags.DEFINE_float("seed", 0, "Database random number seed.")

_TEMPLATE_FILE = flags.DEFINE_string("template_file", None,
                                     "Parameterized query template file.")
flags.mark_flag_as_required("template_file")
'''

In [1]:
_DATABASE = 'kepler_stack'
_USER = 'kepler_user'
_PASSWORD = '12345'
_HOST = 'localhost'
_SEED = 0
_TEMPLATE_FILE = 'inputs/stack_query_templates.json'

_ROOT_OUTPUT_DIR = 'outputs'

_QUERY = 'q1_0'

_MAX_WORKERS = 1 # Giảm số worker để tránh quá tải CPU

In [5]:
import os

os.makedirs(_ROOT_OUTPUT_DIR, exist_ok= True)

# I. Gen các giá trị parameter cho query templates

In [None]:
'''
_PARAMETER_COUNT = flags.DEFINE_integer(
    "count", 1000000, "The max number of parameters to generate per query.")

_COUNTS_OUTPUT_FILE = flags.DEFINE_string(
    "counts_output_file", None,
    "Output file to store the parameter counts per query.")
flags.mark_flag_as_required("counts_output_file")
_PARAMETERS_OUTPUT_DIR = flags.DEFINE_string(
    "parameters_output_dir", None,
    "Directory to store parameter values per query.")
flags.mark_flag_as_required("parameters_output_dir")

_DRY_RUN = flags.DEFINE_bool(
    "dry_run", False,
    "If true, verify that the parameter generation process works correctly "
    "using a single, non-random parameter value. This involves a) verifying "
    "that the parameter generation query can be composed from the template "
    "query and b) ensuring that the template query executes successfully with "
    "the generated parameter value.")
'''

In [2]:
_PARAMETER_COUNT = 1

_COUNTS_OUTPUT_DIR = f'{_ROOT_OUTPUT_DIR}/parameter_counts'
_COUNTS_OUTPUT_FILE = f'{_COUNTS_OUTPUT_DIR}/parameter_counts.json'

_PARAMETERS_OUTPUT_DIR = f'{_ROOT_OUTPUT_DIR}/parameters/'

_DRY_RUN = False

In [14]:
import os

os.makedirs(_COUNTS_OUTPUT_DIR, exist_ok= True)
os.makedirs(_PARAMETERS_OUTPUT_DIR, exist_ok= True)

In [15]:
import collections
from concurrent import futures
import functools
import json
import logging
import os
import time

import parameter_generator
import query_utils


with open(_TEMPLATE_FILE) as f:
  templates = json.load(f)

work_list = []
# Query templates that failed hint verification using the parameters from the
# original Stack benchmark. That is, at least one provided hint was ignored by
# the PG optimizer for at least one parameter binding.
skip_list = ["q3_0", "q3_1", "q3_2"]
for query_id, template in templates.items():
  if query_id not in skip_list and query_id == _QUERY:
    work_list.append(
        parameter_generator.TemplateItem(
            query_id=query_id, template=template))

database_configuration = query_utils.DatabaseConfiguration(
    dbname=_DATABASE,
    user=_USER,
    password=_PASSWORD,
    host=_HOST,
    seed=_SEED)
generator = parameter_generator.ParameterGenerator(database_configuration)
parameter_generation_function = functools.partial(
    generator.generate_parameters, _PARAMETER_COUNT, dry_run=_DRY_RUN)

output_counts = collections.defaultdict(lambda: {})
# The high-latency work occurs remotely via the database executing queries to
# generate parameters. The number of max workers is limited empirically to
# avoid memory issues on the database side.
with futures.ThreadPoolExecutor(max_workers=_MAX_WORKERS) as executor:
  for result in executor.map(parameter_generation_function, work_list):
    query_id = next(iter(result))
    logging.info("Finished generating for %s", query_id)
    with open(
        os.path.join(_PARAMETERS_OUTPUT_DIR,
                      f"{query_id}-{len(result[query_id]['params'])}.json"),
        "w") as f:
      json.dump(result, f)

    output_counts[query_id] = len(result[query_id]["params"])

    if _DRY_RUN:
      # Ensure that the template query executes successfully with the
      # generated parameter value.
      query_manager = query_utils.QueryManager(database_configuration)
      start_ms = int(time.time() * 1e3)
      query = result[query_id]["query"]
      params = result[query_id]["params"][0]
      results = query_manager.execute(query, params)
      end_ms = int(time.time() * 1e3)

      print(f"Query {query_id} approximate latency: {end_ms-start_ms} ms")
      print(f"Template: {query}")
      print(f"Params: {params}")

      # The query should return at least one result.
      assert results

with open(_COUNTS_OUTPUT_FILE, "w") as f:
  json.dump(output_counts, f)

SELECT site.site_name,tag.name
from
 tag, site, question, tag_question
where
tag.site_id = site.site_id and
question.site_id = site.site_id and
tag_question.site_id = site.site_id and
tag_question.question_id = question.id and
tag_question.tag_id = tag.id
AND site.site_name IS NOT NULL
AND tag.name IS NOT NULL
GROUP BY site.site_name,tag.name 
ORDER BY random()
LIMIT 1;


# II. Gen Plan Candidates 

In [3]:
import enum
import json
import os

import main_utils
import pg_generate_plan_candidates
import pg_plan_hint_extractor
import query_utils

In [4]:
class GenerationFunction(enum.Enum):
  PG_CONFIGS = "pg_configs"
  ROW_NUM_EVOLUTION = "row_num_evolution"
  EXHAUSTIVE_CARDINALITY_PERTURBATIONS = "exhaustive_cardinality_perturbations"


_GENERATION_FUNCTION_MAP = {
    GenerationFunction.PG_CONFIGS:
        pg_generate_plan_candidates.get_query_plans,
    GenerationFunction.ROW_NUM_EVOLUTION:
        pg_generate_plan_candidates.generate_by_row_num_evolution,
    GenerationFunction.EXHAUSTIVE_CARDINALITY_PERTURBATIONS:
        pg_generate_plan_candidates
        .generate_by_exhaustive_cardinality_perturbations
}

In [5]:
def _supports_distributed_execution(
    generation_function: GenerationFunction) -> bool:
  return generation_function != GenerationFunction.EXHAUSTIVE_CARDINALITY_PERTURBATIONS

In [None]:
'''
_QUERY_PARAMS_FILE = flags.DEFINE_string(
    "query_params_file", None,
    "File containing parameterized queries with list of parameter values.")
flags.mark_flag_as_required("query_params_file")
_PARAMS_LIMIT = flags.DEFINE_integer(
    "params_limit", None,
    "The number of parameter values to use when generating plans.")
_OUTPUT_DIR = flags.DEFINE_string(
    "output_dir", None,
    "Directory in which to store query plan hints and configs.")
flags.mark_flag_as_required("output_dir")

_PLANS_OUTPUT_FILE = flags.DEFINE_string(
    "plans_output_file", None,
    "File to store distinct plans per query. The file name is expected to end in .json"
)
flags.mark_flag_as_required("plans_output_file")
_PLAN_INDEX_SUFFIX = flags.DEFINE_string(
    "plan_index_suffix", "_plan_index.json",
    "Suffix of files to store plan indices in.")
_VERIFICATION_FAILURES_FILE = flags.DEFINE_string(
    "verification_failures_file", "verification_failures.json",
    "Filename of file to save verification failures.")
_CHUNKSIZE = flags.DEFINE_integer(
    "chunksize", 100, "How many params to include in each subprocess chunk.")
_KEYS_TO_REMOVE = flags.DEFINE_list(
    "keys_to_remove", [],
    ("List of keys to filter from EXPLAIN plan JSON. Good candidates include "
     "\"Parallel Aware\", \"Relation Name\", \"Parent Relationship\""))

_GENERATION_FUNCTION = flags.DEFINE_enum_class(
    "generation_function", GenerationFunction.PG_CONFIGS.value,
    GenerationFunction, "Which plan generation function to use.")
_SOFT_TOTAL_PLANS_LIMIT = flags.DEFINE_integer(
    "soft_total_plans_limit", None,
    "Soft limit on total number of plans to produce."
)
# Pg configs flags.
_CONFIG_STR = flags.DEFINE_string(
    "configs", "",
    "Comma-separated string of Postgres optimizer configuration parameters to toggle off."
)
# Row number evolution flags.
_MAX_PLANS_PER_PARAM = flags.DEFINE_integer(
    "max_plans_per_param", None,
    "Stop evolution after this number of plans is exceeded.")
_NUM_GENERATIONS = flags.DEFINE_integer(
    "num_generations", 3, "Number of generations of row number evolution.")
_NUM_MUTATIONS_PER_PLAN = flags.DEFINE_integer(
    "num_mutations_per_plan", 25, "Number of random mutations for each plan.")
_EXPONENT_BASE = flags.DEFINE_integer(
    "exponent_base", 10, "Base of exponential row number perturbations.")
_EXPONENT_RANGE = flags.DEFINE_integer(
    "exponent_range", 3, "One-sided range of exponent of perturbations.")
_MAX_PLANS_PER_GENERATION = flags.DEFINE_integer(
    "max_plans_per_generation", 20,
    "Max number of plans to mutate per generation.")
_PERTURB_UNIT_ONLY = flags.DEFINE_bool(
    "perturb_unit_only", True,
    "Whether to perturb only row counts exactly equal to one."
)
_MAX_PERTURBS_PER_JOIN = flags.DEFINE_integer(
    "max_perturbs_per_join", 1,
    "Limit on how many times a specific join can be perturbed."
)
# Exhaustive cardinality perturbation flags.
_CARDINALITY_MULTIPLIERS = flags.DEFINE_list(
    "cardinality_multipliers", None,
    "List of cardinality multipliers to apply when generating plans.")
'''

In [9]:
_QUERY_PARAMS_FILE = f'{_PARAMETERS_OUTPUT_DIR}/q1_0-1.json' # Thêm đường dẫn tới file json kết quả của phần I.
_PARAMS_LIMIT = None
_PLANS_OUTPUT_DIR = f'{_ROOT_OUTPUT_DIR}/plans'

_PLANS_OUTPUT_FILE =  'pg_hints.json'

_PLAN_INDEX_SUFFIX = '_plan_index.json'

_VERIFICATION_FAILURES_FILE = 'verification_failures.json'

_CHUNKSIZE = 100
_KEYS_TO_REMOVE = []

_GENERATION_FUNCTION = GenerationFunction.ROW_NUM_EVOLUTION

_SOFT_TOTAL_PLANS_LIMIT = None
# Pg configs flags.
_CONFIG_STR = None

# Row number evolution flags.
_MAX_PLANS_PER_PARAM = 10

_NUM_GENERATIONS = 3

_NUM_MUTATIONS_PER_PLAN = 25

_EXPONENT_BASE = 10
_EXPONENT_RANGE = 3

_MAX_PLANS_PER_GENERATION = 20

_PERTURB_UNIT_ONLY = True

_MAX_PERTURBS_PER_JOIN = 1

# Exhaustive cardinality perturbation flags.
_CARDINALITY_MULTIPLIERS = None

In [10]:
configs = _CONFIG_STR.split(",") if _CONFIG_STR else []

with open(_QUERY_PARAMS_FILE) as json_file:
    info = json.load(json_file)

hints_output_dir = os.path.join(_PLANS_OUTPUT_DIR, _DATABASE)
os.makedirs(hints_output_dir, exist_ok=True)

database_configuration = query_utils.DatabaseConfiguration(
    dbname=_DATABASE,
    user=_USER,
    password=_PASSWORD,
    host=_HOST)
query_manager = query_utils.QueryManager(database_configuration)
query_utils.save_postgres_config_info(query_manager, _PLANS_OUTPUT_DIR)

hint_accumulator = main_utils.HintAccumulator()
for query_id, query_metadata in info.items():
    print("Start: {query_id}")

    output = {}
    output["output"] = {}

    function_kwargs = {
        "database_configuration": database_configuration,
        "query": query_metadata["query"],
        "keys_to_remove": _KEYS_TO_REMOVE
    }

    # Augment kwargs depending on generation function.
    if _GENERATION_FUNCTION == GenerationFunction.PG_CONFIGS:
        function_kwargs["configs"] = configs
    elif _GENERATION_FUNCTION == GenerationFunction.ROW_NUM_EVOLUTION:
        function_kwargs.update({
            "max_plans": _MAX_PLANS_PER_PARAM,
            "num_generations": _NUM_GENERATIONS,
            "num_mutations_per_plan": _NUM_MUTATIONS_PER_PLAN,
            "exponent_base": _EXPONENT_BASE,
            "exponent_range": _EXPONENT_RANGE,
            "max_plans_per_generation": _MAX_PLANS_PER_GENERATION,
            "perturb_unit_only": _PERTURB_UNIT_ONLY,
            "max_perturbs_per_join": _MAX_PERTURBS_PER_JOIN
        })
    elif _GENERATION_FUNCTION == GenerationFunction.EXHAUSTIVE_CARDINALITY_PERTURBATIONS:
        cardinality_multipliers = [
            float(multiplier) for multiplier in _CARDINALITY_MULTIPLIERS
        ]

        function_kwargs.update(
            {"cardinality_multipliers": cardinality_multipliers})

    if _PARAMS_LIMIT:
        query_metadata["params"] = query_metadata["params"][:_PARAMS_LIMIT]

    plan_hint_extractor = pg_plan_hint_extractor.PlanHintExtractor()
    pg_generate_plan_candidates.execute_plan_generation(
        _GENERATION_FUNCTION_MAP[_GENERATION_FUNCTION],
        function_kwargs,
        query_metadata["params"],
        plan_hint_extractor=plan_hint_extractor,
        chunksize=_CHUNKSIZE,
        distributed=_supports_distributed_execution(_GENERATION_FUNCTION),
        soft_total_plans_limit=_SOFT_TOTAL_PLANS_LIMIT)
    counts, plan_hints, params_plan_indices, debug_infos = (
        plan_hint_extractor.get_consolidated_plan_hints())

    hint_accumulator.query_id_to_counts[query_id] = counts
    hint_accumulator.query_id_to_plan_hints[query_id] = plan_hints
    hint_accumulator.query_id_to_params_plan_indices[
        query_id] = params_plan_indices
    hint_accumulator.query_id_to_debug_infos[query_id] = debug_infos

    failure_counts = pg_plan_hint_extractor.verify_hints(
        query_id=query_id,
        query=query_metadata["query"],
        plan_hints=plan_hints,
        params_plan_indices=params_plan_indices,
        database_configuration=database_configuration)
    hint_accumulator.combined_failure_counts.update(failure_counts)

main_utils.print_failure_counts(hint_accumulator.combined_failure_counts)
main_utils.print_hint_counts_by_source(hint_accumulator.query_id_to_counts)

hint_accumulator.save(
    output_dir=hints_output_dir,
    plans_output_file=_PLANS_OUTPUT_FILE,
    verification_failures_file=_VERIFICATION_FAILURES_FILE,
    plan_index_suffix=_PLAN_INDEX_SUFFIX)

SHOW seq_page_cost;
SHOW random_page_cost;
SHOW cpu_tuple_cost;
SHOW cpu_index_tuple_cost;
SHOW cpu_operator_cost;
SHOW parallel_setup_cost;
SHOW parallel_tuple_cost;
SHOW min_parallel_table_scan_size;
SHOW min_parallel_index_scan_size;
SHOW effective_cache_size;
SHOW jit_above_cost;
SHOW jit_inline_above_cost;
SHOW jit_optimize_above_cost;
SHOW shared_buffers;
SHOW huge_pages;
SHOW temp_buffers;
SHOW max_prepared_transactions;
SHOW work_mem;
SHOW hash_mem_multiplier;
SHOW maintenance_work_mem;
SHOW autovacuum_work_mem;
SHOW max_stack_depth;
SHOW shared_memory_type;
SHOW dynamic_shared_memory_type;
SHOW temp_file_limit;
SHOW max_files_per_process;
Start: {query_id}
EXPLAIN (FORMAT JSON) select count(*) 
from
 tag, site, question, tag_question
where
site.site_name='stackoverflow' and
tag.name='connect-compose' and
tag.site_id = site.site_id and
question.site_id = site.site_id and
tag_question.site_id = site.site_id and
tag_question.question_id = question.id and
tag_question.tag_id = tag