In [1]:
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
import psycopg2
import matplotlib.pyplot as plt
import itertools
from collections import defaultdict
import time
import re
import math
import copy

In [2]:
engine = create_engine('postgresql://postgres:1234@localhost:5432/census_income')

In [3]:
try:
    result = engine.execute("SELECT 1")
    print("Connection successful!")
except Exception as e:
    print("Connection failed:", e)

  result = engine.execute("SELECT 1")


Connection successful!


In [4]:
def kl_divergence(p, q):
    epsilon = 1e-10  # 避免数学错误的小常数
    p = np.where(p == 0, epsilon, p)
    q = np.where(q == 0, epsilon, q)
    return np.sum(p * np.log(p / q))

In [5]:
# Sharing opt
# The following functions can generated the combined queries

import re
from collections import defaultdict

function_lst = ['SUM', 'COUNT', 'AVG', 'MAX', 'MIN']

select_pattern = r"SELECT\s+(.*?)\s+FROM"
from_pattern = r"FROM\s+(.*?)(?:\s+WHERE|\s+GROUP\s+BY|$)"
where_pattern = r"WHERE\s+(.*?)(?:\s+GROUP\s+BY|$)"
group_by_pattern = r"GROUP\s+BY\s+(.*)"


def parseQuery(query):
    select_match = re.search(select_pattern, query, re.IGNORECASE)
    from_match = re.search(from_pattern, query, re.IGNORECASE)
    where_match = re.search(where_pattern, query, re.IGNORECASE)
    group_by_match = re.search(group_by_pattern, query, re.IGNORECASE)

    select_items = select_match.group(1) if select_match else ""
    from_items = from_match.group(1) if from_match else ""
    where_items = where_match.group(1) if where_match else ""
    group_by_items = group_by_match.group(1) if group_by_match else ""

    return select_items, from_items, where_items, group_by_items


def parseSelectItems(select_items):
    components = select_items.split(',')
    attribute = None
    measures, functions = [], []

    for component in components:
        component = component.strip()
        component_upper = component.upper()
        for func in function_lst:
            if func.upper() in component_upper:
                measure_match = re.search(rf"{func}\((.*?)\)", component, re.IGNORECASE)
                if measure_match:
                    measure = measure_match.group(1)
                    measures.append(measure)
                    functions.append(func)
                break
        else:
            attribute = component

    return attribute, measures, functions

def combineAggregates(queries, partition=None):
    # Group queries by FROM and WHERE clauses for potential combination
    query_groups = defaultdict(list)
    for query in queries:
        select_items, from_clause, where_clause, group_by_items = parseQuery(query.rstrip(';'))
        attribute, measures, functions = parseSelectItems(select_items)
        key = (from_clause, where_clause)
        query_groups[key].append((group_by_items, measures, functions, attribute))

    combined_queries = []
    for (from_clause, where_clause), group in query_groups.items():
        combined_by_group = defaultdict(list)
        for group_by_items, measures, functions, attribute in group:
            combined_by_group[group_by_items].append((measures, functions, attribute))

        for group_by_items, details in combined_by_group.items():
            select_parts = []
            for measures, functions, attribute in details:
                for measure, function in zip(measures, functions):
                    select_parts.append(f"{function}({measure}) AS {function}_{measure}")
            select_clause = ', '.join(select_parts)
            if partition is not None:
                partition_clause = f"partition_id = {partition}"
                where_clause = f"{where_clause} AND {partition_clause}" if where_clause else partition_clause
            where_clause = f"WHERE {where_clause}" if where_clause else ""
            group_by_clause = f"GROUP BY {group_by_items}" if group_by_items else ""
            query = f"SELECT {group_by_items}, {select_clause} FROM {from_clause} {where_clause} {group_by_clause};"
            combined_queries.append(query)

    return combined_queries


def decomposeAggTable(dimensions_name, combined_dataframe, individual_list):
    measurements = set(col.split('_')[1] for col in combined_dataframe.columns if '_' in col)

    # Create a dictionary to store each separate DataFrame
    separated_tables = {}

    for column in combined_dataframe.columns:
        if column != dimensions_name:
            # Properly split the column name
            parts = column.split('_')
            func = parts[0]  # The function is always the first part
            measure = '_'.join(parts[1:])  # The rest is the measurement name

            table_name = f"{func}_{measure}"  # Create a unique table name

            # Create a new DataFrame for this specific measurement and function
            separated_tables[table_name] = combined_dataframe[[dimensions_name, column]].copy()
            separated_tables[table_name].rename(columns={column: table_name}, inplace=True)

    # Printing or exporting the separated tables
    for table_name, df in separated_tables.items():
        individual_list.append(df)


In [6]:
# 定义维度、度量和聚合函数
import itertools

dimensions = ['workclass', 'education', 'occupation', 'relationship', 'race', 'sex', 'native_country', 'income']
measurements = ['age', 'fnlwgt', 'education_num', 'capital_gain', 'capital_loss', 'hours_per_week']
aggregate_functions = ['count', 'max', 'min', 'sum', 'avg']

# 初始化视图
views = {}

for dimension, measurement, function in itertools.product(dimensions, measurements, aggregate_functions):
    vid = f"{dimension}_{measurement}_{function}"
    views[vid] = (dimension, measurement, function)

## Generate the query

In [7]:
def generate_queries(candidate_views):
    """
    This function is used to generate queries based off the current candidate views.

    Parameters
    ----------
    candidate_views
        The modified dictionary of views after pruning views in the end of every phase.
    """
    married_query_list = []
    unmarried_query_list = []
    for key, (attribute, measurement, function) in candidate_views.items():
        married_query_list.append(f"SELECT {attribute}, {function}({measurement}) AS {function}_{measurement} FROM married_data WHERE {attribute} IS NOT NULL GROUP BY {attribute};")
        unmarried_query_list.append(f"SELECT {attribute}, {function}({measurement}) AS {function}_{measurement} FROM unmarried_data WHERE {attribute} IS NOT NULL GROUP BY {attribute};")
    return married_query_list, unmarried_query_list

## Generate and Decompose the aggregation table

In [8]:
# Hyperparameters
delta = 0.0001


def epsilon_m(m: int, N: int):
    """
    Calculate the epsilon value for the confidence interval pruning method.

    Parameters
    ----------
    m : int
        The number of items in the sample.
    N : int
        The number of items in the population.
    """
    upper_left = 1 - (m - 1) / N
    # Using log log m is problematic when m=1. So we change it to log m for now.
    upper_right = 2 * np.log(m) + np.log((np.pi ** 2) / (3 * delta))
    bottom = 2 * m
    return np.sqrt(upper_left * upper_right / bottom)

In [9]:
num_phases = 5
k = 5


def pruning_based_optimization():
    # Make a copy of the views dictionary so that it won't change the original data.
    candidate_views = copy.deepcopy(views)

    # Storing scores across phases, where key = a_m_f and value = KL.
    view_scores = defaultdict(list)
    
    for phase in range(num_phases):
        # Get raw queries (separate statements).
        raw_queries_married, raw_queries_unmarried = generate_queries(candidate_views)

        # Combine statements together (Section 4.1).
        combined_queries_married = combineAggregates(raw_queries_married, partition=phase)
        combined_queries_unmarried = combineAggregates(raw_queries_unmarried, partition=phase)

        # Run combined queries for both married and unmarried data.
        agg_opt_res = []
        for i in range(len(combined_queries_married)):
            df_married_agg_opt = pd.read_sql_query(combined_queries_married[i], engine)
            df_unmarried_agg_opt = pd.read_sql_query(combined_queries_unmarried[i], engine)
            agg_opt_res.append((df_married_agg_opt, df_unmarried_agg_opt))

        # Decompose combined queries back into original views.
        de_married_list = []
        de_unmarried_list = []
        for i in range(len(agg_opt_res)):
            dimensions_name = agg_opt_res[i][0].columns[0]
            married_combined_df = agg_opt_res[i][0]
            unmarried_combined_df = agg_opt_res[i][1]
            decomposeAggTable(dimensions_name, married_combined_df, de_married_list)
            decomposeAggTable(dimensions_name, unmarried_combined_df, de_unmarried_list)

        print('len:', len(de_married_list), len(candidate_views))

        upper_bound_map = dict()
        lower_bound_map = dict()
    
        # Normalize aggregate data and compute utility score.
        for query_idx, (attribute, measurement, function) in enumerate(candidate_views.values()):
            vid = f"{attribute}_{measurement}_{function}"
            
            df_married = de_married_list[query_idx]
            df_unmarried = de_unmarried_list[query_idx]
        
            all_attribute_values = set(df_married[attribute].unique()).union(set(df_unmarried[attribute].unique()))
    
            grouped_married = df_married.groupby(attribute).agg({f'{function}_{measurement}': 'sum'}).reindex(all_attribute_values, fill_value=0)
            grouped_unmarried = df_unmarried.groupby(attribute).agg({f'{function}_{measurement}': 'sum'}).reindex(all_attribute_values, fill_value=0)
    
            total_married = grouped_married[f'{function}_{measurement}'].sum()
            total_unmarried = grouped_unmarried[f'{function}_{measurement}'].sum()
    
            # Normalization
            p = (grouped_married / total_married).fillna(0).values.flatten()
            q = (grouped_unmarried / total_unmarried).fillna(0).values.flatten()
    
            # Compute the KL score for current partition.
            curr_score = kl_divergence(p, q)
            view_scores[vid] = view_scores[vid] + [curr_score]

            # Computing epsilon_m.
            N = num_phases
            m = len(view_scores[vid])
            estimated_mean = np.mean(view_scores[vid])
            error_range = epsilon_m(m, N)

            upper_bound_map[vid] = estimated_mean + error_range
            lower_bound_map[vid] = estimated_mean - error_range

        kth_lower_bound = list(sorted(lower_bound_map.items(), key=lambda x: x[1], reverse=True))[k]
        # Prune views that has an upper bound lower than the k-th highest lower bound.
        for vid, upper_bound in upper_bound_map.items():
            if upper_bound < kth_lower_bound[1]:
                del candidate_views[vid]
                del view_scores[vid]
                # print('Dropped:', vid)

        print('End of phase', phase, 'having candidate views', len(candidate_views))

    final_view_scores = dict()
    for vid in candidate_views.keys():
        final_view_scores[vid] = np.mean(view_scores[vid])

    print(view_scores['relationship_age_sum'])
    top_scores = sorted(final_view_scores.items(), key=lambda x: x[1], reverse=True)[:5]
    for key, score in top_scores:
        print(f"View: {key}, Score: {score}")

# Baseline non-opt

In [10]:
def baseline():
    # 这一部分可能结果有问题，逻辑没问题(就是把结果里面每一个view查到的结果，求归一化后求K-L偏差，最后排序找到前5个，输出分数)
    results = []
    raw_queries_married, raw_queries_unmarried = generate_queries(views)
    for married_query, unmarried_query in zip(raw_queries_married, raw_queries_unmarried):
        # 执行已婚数据查询并直接读取到 DataFrame
        df_married = pd.read_sql_query(married_query, engine)

        # 执行未婚数据查询并直接读取到 DataFrame
        df_unmarried = pd.read_sql_query(unmarried_query, engine)

        # 将结果存储为元组
        results.append((df_married, df_unmarried))


    view_scores = {}  # 初始化空字典，将用来存储三元组和对应的K-L散度

    # 修正并重新计算分数
    for query_idx, (attribute, measurement, function) in enumerate(views.values()):
        df_married = results[query_idx][0]
        df_unmarried = results[query_idx][1]

        all_attribute_values = set(df_married[attribute].unique()).union(set(df_unmarried[attribute].unique()))

        grouped_married = df_married.groupby(attribute).agg({f'{function}_{measurement}': 'sum'}).reindex(all_attribute_values, fill_value=0)
        grouped_unmarried = df_unmarried.groupby(attribute).agg({f'{function}_{measurement}': 'sum'}).reindex(all_attribute_values, fill_value=0)

        total_married = grouped_married[f'{function}_{measurement}'].sum()
        total_unmarried = grouped_unmarried[f'{function}_{measurement}'].sum()

        # 归一化以获得概率分布
        p = (grouped_married / total_married).fillna(0).values.flatten()
        q = (grouped_unmarried / total_unmarried).fillna(0).values.flatten()

        # 计算 K-L 散度并保存
        score = kl_divergence(p, q)
        view_scores[(attribute, measurement, function)] = score

    print(view_scores['relationship', 'age', 'sum'])
    # 排序并打印top 5
    top_scores = sorted(view_scores.items(), key=lambda x: x[1], reverse=True)[:5]
    for key, score in top_scores:
        print(f"View: {key}, Score: {score}")



In [11]:
import time


# Measure execution time of approach_one
# start_time = time.time()
# result_one = baseline()
# end_time = time.time()
# duration_one = end_time - start_time
# print(f"Baseline Time: {duration_one} seconds")

# Measure execution time of approach_two
start_time = time.time()
result_two = pruning_based_optimization()
end_time = time.time()
duration_two = end_time - start_time
print(f"Approach Two Execution Time: {duration_two} seconds")


len: 240 240
End of phase 0 having candidate views 17
len: 17 17
End of phase 1 having candidate views 13
len: 13 13
End of phase 2 having candidate views 13
len: 13 13
End of phase 3 having candidate views 13
len: 13 13
End of phase 4 having candidate views 13
[18.62180469148511, 18.56626478937287, 18.522641288417436, 18.47121699308135, 18.89536108456243]
View: relationship_capital_gain_sum, Score: 21.23768404937885
View: relationship_capital_loss_sum, Score: 20.439943294515913
View: relationship_hours_per_week_sum, Score: 19.312089411774245
View: relationship_education_num_sum, Score: 19.163364340487533
View: relationship_age_count, Score: 18.81313103895464
Approach Two Execution Time: 5.547677755355835 seconds
