In [None]:
!python -m pip install ipython-sql psycopg2

In [2]:
%load_ext sql

In [3]:
%sql postgresql://postgres:navya@localhost:5432/mydb

### Insertion into hstore

In [4]:
%sql DROP TABLE IF EXISTS data_hstore CASCADE;

 * postgresql://postgres:***@localhost:5432/mydb
Done.


[]

In [5]:
import psycopg2
import time
import csv
from psycopg2.extras import execute_values, register_hstore

conn = psycopg2.connect(
    dbname="mydb",
    user="postgres",
    password="navya",
    host="localhost",
    port="5432"
)
cur = conn.cursor()
cur.execute("CREATE EXTENSION IF NOT EXISTS hstore;")
register_hstore(conn)

cur.execute("DROP TABLE IF EXISTS data_hstore;")
cur.execute("CREATE TABLE data_hstore (id SERIAL PRIMARY KEY, attributes hstore);")
conn.commit()

def load_hstore_from_csv(csv_file_path, max_rows=None):
    data = []
    with open(csv_file_path, newline='', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        for row in reader:
            data.append((row,))
            if max_rows and len(data) >= max_rows:
                break
    return data

def test_latency(row_counts, csv_path):
    results = []
    for n in row_counts:
        batch = load_hstore_from_csv(csv_path, max_rows=n)
        start = time.time()
        execute_values(cur, "INSERT INTO data_hstore (attributes) VALUES %s", batch)
        conn.commit()
        latency = (time.time() - start) * 1000
        print(f"Inserted {n} rows in {latency:.2f} ms")
        results.append((n, latency))
    return results

#csv_path = "D:/semesters/SEM7/COL868/benchmark/archive/Food_Supply_kcal_Data.csv"
csv_path = "datasets\yelp_business.csv"
row_counts = [10000, 50000, 100000,150346]

results = test_latency(row_counts, csv_path)
print("\nLatency Results (HSTORE):")
for n, t in results:
    print(f"{n:6d} rows → {t:8.2f} ms")

cur.close()
conn.close()


Inserted 10000 rows in 4840.22 ms
Inserted 50000 rows in 25669.60 ms
Inserted 100000 rows in 47852.47 ms
Inserted 150346 rows in 76198.58 ms

Latency Results (HSTORE):
 10000 rows →  4840.22 ms
 50000 rows → 25669.60 ms
100000 rows → 47852.47 ms
150346 rows → 76198.58 ms


### Insertion into JSONB

In [6]:
%sql DROP TABLE IF EXISTS data_jsonb CASCADE;

 * postgresql://postgres:***@localhost:5432/mydb
Done.


[]

In [None]:
import psycopg2
import time
import csv
import json
from psycopg2.extras import execute_values

conn = psycopg2.connect(
    dbname="mydb",
    user="postgres",
    password="navya",
    host="localhost",
    port="5432"
)
cur = conn.cursor()

cur.execute("DROP TABLE IF EXISTS data_jsonb;")
cur.execute("CREATE TABLE data_jsonb (id SERIAL PRIMARY KEY, attributes JSONB);")
conn.commit()

def load_jsonb_from_csv(csv_file_path, max_rows=None):
    data = []
    with open(csv_file_path, newline='', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        for row in reader:
            data.append((json.dumps(row),))
            if max_rows and len(data) >= max_rows:
                break
    return data

def test_latency(row_counts, csv_path):
    results = []
    for n in row_counts:
        batch = load_jsonb_from_csv(csv_path, max_rows=n)
        start = time.time()
        execute_values(cur, "INSERT INTO data_jsonb (attributes) VALUES %s", batch)
        conn.commit()
        latency = (time.time() - start) * 1000
        print(f"Inserted {n} rows in {latency:.2f} ms")
        results.append((n, latency))
    return results

#csv_path = "D:/semesters/SEM7/COL868/benchmark/archive/Food_Supply_kcal_Data.csv"
csv_path = "datasets\yelp_business.csv"
row_counts = [10000, 50000, 100000,150346]

results = test_latency(row_counts, csv_path)
print("\nLatency Results (JSONB):")
for n, t in results:
    print(f"{n:6d} rows → {t:8.2f} ms")

cur.close()
conn.close()


Inserted 10000 rows in 733.94 ms
Inserted 50000 rows in 4147.46 ms
Inserted 100000 rows in 9341.43 ms
Inserted 150346 rows in 14529.61 ms

Latency Results (JSONB):
 10000 rows →   733.94 ms
 50000 rows →  4147.46 ms
100000 rows →  9341.43 ms
150346 rows → 14529.61 ms


### Insertion into postgresql-vanilla

In [7]:
%sql DROP TABLE IF EXISTS data_vanilla CASCADE;

 * postgresql://postgres:***@localhost:5432/mydb
Done.


[]

In [None]:
import psycopg2
import time
import csv
from psycopg2.extras import execute_values

# Connect to PostgreSQL
conn = psycopg2.connect(
    dbname="mydb",
    user="postgres",
    password="navya",
    host="localhost",
    port="5432"
)
cur = conn.cursor()

cur.execute("DROP TABLE IF EXISTS data_vanilla;")
cur.execute("CREATE TABLE data_vanilla (id SERIAL PRIMARY KEY);")
conn.commit()

# Define proper column types for your dataset
COLUMN_TYPES = {
    'business_id': 'TEXT',
    'name': 'TEXT',
    'address': 'TEXT',
    'city': 'TEXT',
    'state': 'TEXT',
    'postal_code': 'TEXT',
    'latitude': 'NUMERIC',
    'longitude': 'NUMERIC',
    'stars': 'NUMERIC',
    'review_count': 'INTEGER',
    'is_open': 'INTEGER',
    'categories': 'TEXT',
    'hours': 'TEXT',
    # Attributes - keep as TEXT to avoid conversion issues
    'attributes.ByAppointmentOnly': 'TEXT',
    'attributes.BusinessAcceptsCreditCards': 'TEXT',
    'RestaurantsTakeOut': 'TEXT',
    'RestaurantsPriceRange2': 'TEXT',
}

def infer_type(value):
    """Infer PostgreSQL type from sample value"""
    if value is None or value == '' or value.lower() == 'none':
        return 'TEXT'
    
    # Try integer
    try:
        int(value)
        return 'INTEGER'
    except (ValueError, TypeError):
        pass
    
    # Try numeric
    try:
        float(value)
        return 'NUMERIC'
    except (ValueError, TypeError):
        pass
    
    return 'TEXT'

def create_columns_from_csv(csv_file_path, sample_size=1000):
    """Create columns with inferred types"""
    with open(csv_file_path, newline='', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        cols = reader.fieldnames
        
        # Sample rows to infer types
        samples = {col: [] for col in cols}
        for i, row in enumerate(reader):
            if i >= sample_size:
                break
            for col in cols:
                val = row[col]
                if val and val.lower() != 'none':  # Only valid values
                    samples[col].append(val)
        
        # Create columns with inferred types
        for col in cols:
            # Use predefined type if available
            if col in COLUMN_TYPES:
                col_type = COLUMN_TYPES[col]
            # Otherwise infer from samples
            elif samples[col]:
                col_type = infer_type(samples[col][0])
            else:
                col_type = 'TEXT'
            
            cur.execute(f'ALTER TABLE data_vanilla ADD COLUMN "{col}" {col_type};')
        
        conn.commit()
        return cols

def convert_value(value, col_name):
    """Convert value to appropriate type, return None for NULL values"""
    # Handle NULL cases
    if value is None or value == '' or value.lower() in ('none', 'null'):
        return None
    
    col_type = COLUMN_TYPES.get(col_name, 'TEXT')
    
    try:
        if col_type == 'INTEGER':
            return int(float(value))  # Handle "1.0" -> 1
        elif col_type == 'NUMERIC':
            return float(value)
        else:
            return value
    except (ValueError, TypeError):
        return None  # Return NULL for conversion failures

def load_rows(csv_file_path, max_rows=None):
    data = []
    with open(csv_file_path, newline='', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        cols = reader.fieldnames
        
        for i, row in enumerate(reader):
            # Convert values to proper types
            converted_row = tuple(convert_value(row[col], col) for col in cols)
            data.append(converted_row)
            
            if max_rows and len(data) >= max_rows:
                break
    return data

def test_latency(row_counts, csv_path):
    cols = create_columns_from_csv(csv_path)
    results = []
    
    for n in row_counts:
        # Clear table before each test
        cur.execute("TRUNCATE TABLE data_vanilla RESTART IDENTITY;")
        conn.commit()
        
        rows = load_rows(csv_path, n)
        start = time.time()
        cols_quoted = ','.join(f'"{c}"' for c in cols)
        
        # Use execute_values with proper NULL handling
        execute_values(
            cur,
            f'INSERT INTO data_vanilla ({cols_quoted}) VALUES %s',
            rows,
            template=None,
            page_size=1000  # Batch inserts for better performance
        )
        conn.commit()
        latency = (time.time() - start) * 1000
        print(f"Inserted {n} rows in {latency:.2f} ms")
        results.append((n, latency))
    
    return results

csv_path = r"yelp_business.csv"
row_counts = [10000, 50000, 100000, 150346]

try:
    results = test_latency(row_counts, csv_path)
    print("\nLatency Results (Vanilla):")
    for n, t in results:
        print(f"{n:6d} rows → {t:8.2f} ms")

    # Verify column types
    cur.execute("""
        SELECT column_name, data_type 
        FROM information_schema.columns 
        WHERE table_name = 'data_vanilla' 
        AND column_name NOT IN ('id')
        ORDER BY ordinal_position
        LIMIT 20;
    """)
    print("\n\nSample Column Types Created:")
    for col, dtype in cur.fetchall():
        print(f"  {col}: {dtype}")
        
except Exception as e:
    print(f"Error: {e}")
    conn.rollback()
finally:
    cur.close()
    conn.close()

Inserted 10000 rows in 633.82 ms
Inserted 50000 rows in 3902.62 ms
Inserted 100000 rows in 7704.27 ms
Inserted 150346 rows in 11972.24 ms

Latency Results (Vanilla):
 10000 rows →   633.82 ms
 50000 rows →  3902.62 ms
100000 rows →  7704.27 ms
150346 rows → 11972.24 ms


Sample Column Types Created:
  business_id: text
  name: text
  address: text
  city: text
  state: text
  postal_code: text
  latitude: numeric
  longitude: numeric
  stars: numeric
  review_count: integer
  is_open: integer
  categories: text
  hours: text
  attributes.ByAppointmentOnly: text
  attributes.BusinessAcceptsCreditCards: text
  hours.Monday: text
  hours.Tuesday: text
  hours.Wednesday: text
  hours.Thursday: text
  hours.Friday: text


### Aggregate Queries

In [13]:
import time
import psycopg2
import sys

# Database connection
conn = psycopg2.connect(
    dbname="mydb",
    user="postgres",
    password="navya",
    host="localhost",
    port="5432"
)
conn.autocommit = False

def get_cursor():
    """Get a fresh cursor and rollback any failed transactions"""
    try:
        conn.rollback()
    except:
        pass
    return conn.cursor()

cur = get_cursor()

def time_query(label, query, iterations=5, show_result=True):
    """Run query multiple times and return average time"""
    times = []
    result_data = None
    
    for i in range(iterations):
        cur = get_cursor()
        start = time.time()
        try:
            cur.execute(query)
            if show_result and i == 0:
                result_data = cur.fetchall()
            else:
                cur.fetchall()
            conn.commit()
        except Exception as e:
            print(f"  ERROR: {e}")
            conn.rollback()
            cur.close()
            return None
        end = time.time()
        elapsed = (end - start) * 1000
        times.append(elapsed)
        cur.close()
    
    avg_time = sum(times) / len(times)
    min_time = min(times)
    max_time = max(times)
    
    if show_result and result_data is not None:
        if len(result_data) == 1:
            print(f"  Result: {result_data[0]}")
        elif len(result_data) <= 5:
            print(f"  Results ({len(result_data)} rows):")
            for row in result_data:
                print(f"    {row}")
        else:
            print(f"  Results: {len(result_data)} rows returned")
    
    print(f"{label:<20} → Avg: {avg_time:>8.2f} ms | Min: {min_time:>8.2f} ms | Max: {max_time:>8.2f} ms")
    return avg_time

print("\n" + "="*80)
print(" POSTGRESQL HSTORE vs JSONB vs VANILLA - AGGREGATION BENCHMARKS")
print("="*80)
print(f" Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f" Dataset: Yelp Business")
print(f" Iterations per query: 5")
print("="*80)

# Store results for summary
results = {
    'vanilla': [],
    'hstore': [],
    'jsonb': []
}

# ============================================================================
# TC25: COUNT WITH FILTERS
# ============================================================================
print("\n" + "█"*80)
print("█ TC25: COUNT WITH FILTERS")
print("█"*80)

print("\n--- TEST 1: Simple boolean filter (is_open = 1) ---")
results['vanilla'].append(time_query("VANILLA", """
SELECT COUNT(*) FROM data_vanilla WHERE is_open = 1;
"""))
results['hstore'].append(time_query("HSTORE", """
SELECT COUNT(*) FROM data_hstore 
WHERE attributes->'is_open' = '1';
"""))
results['jsonb'].append(time_query("JSONB", """
SELECT COUNT(*) FROM data_jsonb 
WHERE attributes->>'is_open' = '1';
"""))

print("\n--- TEST 2: Sparse attribute filter (RestaurantsTakeOut) ---")
results['vanilla'].append(time_query("VANILLA", """
SELECT COUNT(*) FROM data_vanilla WHERE "attributes.RestaurantsTakeOut" = 'True';
"""))
results['hstore'].append(time_query("HSTORE", """
SELECT COUNT(*) FROM data_hstore 
WHERE attributes->'attributes.RestaurantsTakeOut' = 'True';
"""))
results['jsonb'].append(time_query("JSONB", """
SELECT COUNT(*) FROM data_jsonb 
WHERE attributes->>'attributes.RestaurantsTakeOut' = 'True';
"""))

print("\n--- TEST 3: Multiple conditions ---")
results['vanilla'].append(time_query("VANILLA", """
SELECT COUNT(*) FROM data_vanilla WHERE is_open = 1 AND state = 'CA';
"""))
results['hstore'].append(time_query("HSTORE", """
SELECT COUNT(*) FROM data_hstore 
WHERE attributes->'is_open' = '1' AND attributes->'state' = 'CA';
"""))
results['jsonb'].append(time_query("JSONB", """
SELECT COUNT(*) FROM data_jsonb 
WHERE attributes->>'is_open' = '1' AND attributes->>'state' = 'CA';
"""))

# ============================================================================
# TC26: GROUP BY
# ============================================================================
print("\n" + "█"*80)
print("█ TC26: GROUP BY EXTRACTED VALUES")
print("█"*80)

print("\n--- TEST 1: High cardinality (state) ---")
results['vanilla'].append(time_query("VANILLA", """
SELECT state, COUNT(*) FROM data_vanilla 
WHERE state IS NOT NULL
GROUP BY state ORDER BY COUNT(*) DESC LIMIT 10;
""", show_result=False))
results['hstore'].append(time_query("HSTORE", """
SELECT attributes->'state', COUNT(*) FROM data_hstore 
WHERE attributes->'state' IS NOT NULL
GROUP BY attributes->'state' ORDER BY COUNT(*) DESC LIMIT 10;
""", show_result=False))
results['jsonb'].append(time_query("JSONB", """
SELECT attributes->>'state', COUNT(*) FROM data_jsonb 
WHERE attributes->>'state' IS NOT NULL
GROUP BY attributes->>'state' ORDER BY COUNT(*) DESC LIMIT 10;
""", show_result=False))

print("\n--- TEST 2: Medium cardinality (price range) ---")
results['vanilla'].append(time_query("VANILLA", """
SELECT "attributes.RestaurantsPriceRange2", COUNT(*) FROM data_vanilla 
WHERE "attributes.RestaurantsPriceRange2" IS NOT NULL
GROUP BY "attributes.RestaurantsPriceRange2" ORDER BY COUNT(*) DESC;
""", show_result=False))
results['hstore'].append(time_query("HSTORE", """
SELECT attributes->'attributes.RestaurantsPriceRange2', COUNT(*) FROM data_hstore 
WHERE attributes->'attributes.RestaurantsPriceRange2' IS NOT NULL
GROUP BY attributes->'attributes.RestaurantsPriceRange2' ORDER BY COUNT(*) DESC;
""", show_result=False))
results['jsonb'].append(time_query("JSONB", """
SELECT attributes->>'attributes.RestaurantsPriceRange2', COUNT(*) 
FROM data_jsonb 
WHERE attributes->>'attributes.RestaurantsPriceRange2' IS NOT NULL
GROUP BY attributes->>'attributes.RestaurantsPriceRange2' 
ORDER BY COUNT(*) DESC;
""", show_result=False))

print("\n--- TEST 3: Low cardinality (is_open) ---")
results['vanilla'].append(time_query("VANILLA", """
SELECT is_open, COUNT(*) FROM data_vanilla GROUP BY is_open;
"""))
results['hstore'].append(time_query("HSTORE", """
SELECT attributes->'is_open', COUNT(*) FROM data_hstore 
GROUP BY attributes->'is_open';
"""))
results['jsonb'].append(time_query("JSONB", """
SELECT attributes->>'is_open', COUNT(*) FROM data_jsonb 
GROUP BY attributes->>'is_open';
"""))

# ============================================================================
# TC27: NUMERIC AGGREGATIONS
# ============================================================================
print("\n" + "█"*80)
print("█ TC27: AVG/SUM NUMERIC AGGREGATIONS")
print("█"*80)

print("\n--- TEST 1: Simple aggregations ---")
results['vanilla'].append(time_query("VANILLA", """
SELECT AVG(stars), AVG(review_count), SUM(review_count) 
FROM data_vanilla WHERE is_open = 1;
"""))
results['hstore'].append(time_query("HSTORE", """
SELECT AVG((attributes->'stars')::numeric), 
       AVG((attributes->'review_count')::numeric),
       SUM((attributes->'review_count')::numeric)
FROM data_hstore WHERE attributes->'is_open' = '1';
"""))
results['jsonb'].append(time_query("JSONB", """
SELECT AVG((attributes->>'stars')::numeric),
       AVG((attributes->>'review_count')::numeric),
       SUM((attributes->>'review_count')::numeric)
FROM data_jsonb WHERE attributes->>'is_open' = '1';
"""))

print("\n--- TEST 2: Grouped aggregation (avg by state) ---")
results['vanilla'].append(time_query("VANILLA", """
SELECT state, AVG(stars), COUNT(*) FROM data_vanilla 
WHERE state IS NOT NULL
GROUP BY state HAVING COUNT(*) > 10 
ORDER BY AVG(stars) DESC LIMIT 10;
""", show_result=False))
results['hstore'].append(time_query("HSTORE", """
SELECT attributes->'state', AVG((attributes->'stars')::numeric), COUNT(*) 
FROM data_hstore 
WHERE attributes->'state' IS NOT NULL AND attributes->'stars' IS NOT NULL
GROUP BY attributes->'state' HAVING COUNT(*) > 10 
ORDER BY AVG((attributes->'stars')::numeric) DESC LIMIT 10;
""", show_result=False))
results['jsonb'].append(time_query("JSONB", """
SELECT attributes->>'state', AVG((attributes->>'stars')::numeric), COUNT(*) 
FROM data_jsonb 
WHERE attributes->>'state' IS NOT NULL AND attributes->>'stars' IS NOT NULL
GROUP BY attributes->>'state' HAVING COUNT(*) > 10 
ORDER BY AVG((attributes->>'stars')::numeric) DESC LIMIT 10;
""", show_result=False))

print("\n--- TEST 3: Multiple aggregations ---")
results['vanilla'].append(time_query("VANILLA", """
SELECT COUNT(*), AVG(stars), MIN(stars), MAX(stars), 
       SUM(review_count), AVG(review_count)
FROM data_vanilla WHERE is_open = 1;
"""))
results['hstore'].append(time_query("HSTORE", """
SELECT COUNT(*), 
       AVG((attributes->'stars')::numeric),
       MIN((attributes->'stars')::numeric),
       MAX((attributes->'stars')::numeric),
       SUM((attributes->'review_count')::numeric),
       AVG((attributes->'review_count')::numeric)
FROM data_hstore WHERE attributes->'is_open' = '1';
"""))
results['jsonb'].append(time_query("JSONB", """
SELECT COUNT(*),
       AVG((attributes->>'stars')::numeric),
       MIN((attributes->>'stars')::numeric),
       MAX((attributes->>'stars')::numeric),
       SUM((attributes->>'review_count')::numeric),
       AVG((attributes->>'review_count')::numeric)
FROM data_jsonb WHERE attributes->>'is_open' = '1';
"""))

# ============================================================================
# SUMMARY
# ============================================================================
print("\n" + "="*80)
print(" PERFORMANCE SUMMARY")
print("="*80)

vanilla_valid = [t for t in results['vanilla'] if t is not None]
hstore_valid = [t for t in results['hstore'] if t is not None]
jsonb_valid = [t for t in results['jsonb'] if t is not None]

if vanilla_valid and hstore_valid and jsonb_valid:
    vanilla_avg = sum(vanilla_valid) / len(vanilla_valid)
    hstore_avg = sum(hstore_valid) / len(hstore_valid)
    jsonb_avg = sum(jsonb_valid) / len(jsonb_valid)

    print(f"\nAverage across all tests:")
    print(f"  VANILLA: {vanilla_avg:>8.2f} ms  (1.00x) ← BASELINE")
    print(f"  HSTORE:  {hstore_avg:>8.2f} ms  ({hstore_avg/vanilla_avg:.2f}x)")
    print(f"  JSONB:   {jsonb_avg:>8.2f} ms  ({jsonb_avg/vanilla_avg:.2f}x)")
    
    print(f"\nTest Success Rate:")
    print(f"  VANILLA: {len(vanilla_valid)}/{len(results['vanilla'])} tests passed")
    print(f"  HSTORE:  {len(hstore_valid)}/{len(results['hstore'])} tests passed")
    print(f"  JSONB:   {len(jsonb_valid)}/{len(results['jsonb'])} tests passed")
else:
    print("\nWARNING: Some queries failed. Check errors above.")


conn.close()


 POSTGRESQL HSTORE vs JSONB vs VANILLA - AGGREGATION BENCHMARKS
 Timestamp: 2025-11-07 13:23:46
 Dataset: Yelp Business
 Iterations per query: 5

████████████████████████████████████████████████████████████████████████████████
█ TC25: COUNT WITH FILTERS
████████████████████████████████████████████████████████████████████████████████

--- TEST 1: Simple boolean filter (is_open = 1) ---
  Result: (119698,)
VANILLA              → Avg:    69.13 ms | Min:    47.99 ms | Max:   133.54 ms
  Result: (247203,)
HSTORE               → Avg:   544.79 ms | Min:   454.38 ms | Max:   756.95 ms
  Result: (247203,)
JSONB                → Avg:   494.76 ms | Min:   406.34 ms | Max:   757.03 ms

--- TEST 2: Sparse attribute filter (RestaurantsTakeOut) ---
  Result: (52943,)
VANILLA              → Avg:    66.62 ms | Min:    57.79 ms | Max:    86.33 ms
  Result: (109310,)
HSTORE               → Avg:   503.32 ms | Min:   433.33 ms | Max:   670.11 ms
  Result: (109310,)
JSONB                → Avg:   477.83 ms 