In [50]:
import psycopg2
import psycopg2.errors
from pymongo import MongoClient
from cassandra.cluster import Cluster
import mysql.connector
import yaml
import pandas as pd
import os
import time
import sys
from pathlib import Path
with open('docker-compose.yaml', 'r') as file:
    docker_config = yaml.safe_load(file)

postgres_config = docker_config['services']['postgres']
postgres_client = psycopg2.connect(
    host='localhost',
    database=postgres_config['environment']['POSTGRES_DB'],
    user=postgres_config['environment']['POSTGRES_USER'],
    password=postgres_config['environment']['POSTGRES_PASSWORD'],
    port=postgres_config['ports'][0].split(':')[0]
)

mariadb_config = docker_config['services']['mariadb']
mariadb_client = mysql.connector.connect(
    host='localhost',
    database=mariadb_config['environment']['MYSQL_DATABASE'],
    user=mariadb_config['environment']['MYSQL_USER'],
    password=mariadb_config['environment']['MYSQL_PASSWORD'],
    port=mariadb_config['ports'][0].split(':')[0],
    allow_local_infile=True
)

mysql_config = docker_config['services']['mysql']
mysql_client = mysql.connector.connect(
    host='localhost',
    database=mysql_config['environment']['MYSQL_DATABASE'],
    user=mysql_config['environment']['MYSQL_USER'],
    password=mysql_config['environment']['MYSQL_PASSWORD'],
    port=mysql_config['ports'][0].split(':')[0],
    allow_local_infile=True
)

mongo_8_config = docker_config['services']['mongo-8']
mongo_8_config = MongoClient(
    host='localhost',
    port=int(mongo_8_config['ports'][0].split(':')[0])
)

mongo_7_config = docker_config['services']['mongo-7']
mongo_7_config = MongoClient(
    host='localhost',
    port=int(mongo_7_config['ports'][0].split(':')[0])
)

cassandra_config = docker_config['services']['cassandra']
cassandra_client = Cluster(['localhost'], port=int(cassandra_config['ports'][0].split(':')[0]))
cassandra_session = cassandra_client.connect()

try:
    postgres_client.cursor().execute("SELECT 0")
    mariadb_client.cursor(buffered=True).execute("SELECT 1")
    mysql_client.cursor(buffered=True).execute("SELECT 1")
    cassandra_session.execute("SELECT release_version FROM system.local")
    mongo_8_config.admin.command('ping')
    mongo_7_config.admin.command('ping')
    print("connection has been established")
except Exception as e:
    print("connection test failed")

connection has been established


In [51]:
def initialize_postgres_schema(conn, schema):
    if not schema:
        return
    try:
        with conn.cursor() as cur:
            cur.execute(schema)
        conn.commit()
        print("PostgreSQL schema initialization complete.")
    except Exception as e:
        conn.rollback()
        print("Error initializing PostgreSQL schema")


def verify_postgres_tables(conn, expected_tables):
    try:
        with conn.cursor() as cur:
            cur.execute("""
                SELECT table_name
                FROM information_schema.tables
                WHERE table_schema = 'public' AND table_name = ANY(%s);
            """, (expected_tables,))
            existing_tables = {row[0] for row in cur.fetchall()}

        missing_tables = set(expected_tables) - existing_tables
        if not missing_tables:
            print(f"INFO: All PostgreSQL tables exist: {', '.join(expected_tables)}")
            return True
        else:
            print(f"WARNING: Missing PostgreSQL tables: {', '.join(missing_tables)}")
            return False
    except Exception as e:
        print(f"ERROR: Error verifying PostgreSQL tables: {e}")
        return False



In [52]:
with open('db/postgres/schema.sql', 'r') as f:
    sql_schema = f.read()

postgres_tables = ['titles', 'aka_titles', 'ratings', 'people', 'principals', 'episodes', 'title_genres']

initialize_postgres_schema(postgres_client, sql_schema)
verify_postgres_tables(postgres_client, postgres_tables)

PostgreSQL schema initialization complete.
INFO: All PostgreSQL tables exist: titles, aka_titles, ratings, people, principals, episodes, title_genres


True

In [53]:
from pathlib import Path
import csv
import re
def preprocess_imdb(input_dir: str = "./data", output_dir: str = "./data/processed"):
    """
    Przygotowuje pliki TSV zgodne z naszym ERD:
    - titles.tsv
    - people.tsv
    - ratings.tsv
    - principals.tsv
    - aka_titles.tsv
    - episodes.tsv
    - title_genres.tsv
    """
    import sys
    max_field_size = sys.maxsize
    while True:
        try:
            csv.field_size_limit(max_field_size)
            break
        except OverflowError:
            max_field_size = max_field_size // 10
    in_path = Path(input_dir)
    out_path = Path(output_dir)
    out_path.mkdir(parents=True, exist_ok=True)
    
    # regex: tab + " + tekst bez " i tabów, aż do taba albo końca linii
    quote_fix = re.compile(r'\t"([^"\t]*)(?=\t|$)')
    # tekst do pierwszego taba + tab + \N - dla people
    people_fix = re.compile(r'^([^\t]*\t)\\N')

    def cleaned_lines(f, clean_people=False):
        for line in f:
            # zamiana: \t"tekst   ->  \ttekst
            line = quote_fix.sub(r'\t\1', line)
            
            if clean_people:
                line = people_fix.sub(r'\1JakisTyp', line)
                
            yield line

    valid_tconsts = set()
    # ---------- T I T L E S  +  T I T L E _ G E N R E S ----------
    basics_file = in_path / "title.basics.tsv"
    if basics_file.exists():
        titles_out = out_path / "titles.tsv"
        genres_out = out_path / "title_genres.tsv"

        with basics_file.open("r", encoding="utf-8") as fin, \
             titles_out.open("w", encoding="utf-8", newline="") as ftitles, \
             genres_out.open("w", encoding="utf-8", newline="") as fgenres:
                 

            reader = csv.DictReader(cleaned_lines(fin), delimiter="\t")
            wtitles = csv.writer(ftitles, delimiter="\t", lineterminator="\n")
            wgenres = csv.writer(fgenres, delimiter="\t", lineterminator="\n")
                 

            # nagłówki zgodne ze schemą
            wtitles.writerow([
                "tconst",
                "title_type",
                "primary_title",
                "original_title",
                "is_adult",
                "start_year",
                "end_year",
                "runtime_minutes",
            ])
            wgenres.writerow(["title_id", "genre"])

            for row in reader:
                tconst = row.get("tconst", "\\N")
                
                if tconst and tconst != "\\N":
                    valid_tconsts.add(tconst) 

                wtitles.writerow([
                    tconst,
                    row.get("titleType", "\\N"),
                    row.get("primaryTitle", "\\N"),
                    row.get("originalTitle", "\\N"),
                    row.get("isAdult", "\\N"),
                    row.get("startYear", "\\N"),
                    row.get("endYear", "\\N"),
                    row.get("runtimeMinutes", "\\N"),
                ])

                genres = row.get("genres", "\\N")
                if genres and genres != "\\N":
                    for g in genres.split(","):
                        g = g.strip()
                        if g:
                            wgenres.writerow([tconst, g])
        print("[preprocess] titles + title_genres OK")
    else:
        print("[preprocess] WARNING: title.basics.tsv not found")

    # ---------- P E O P L E ----------
    name_file = in_path / "name.basics.tsv"
    valid_nconsts = set()
    if name_file.exists():
        people_out = out_path / "people.tsv"
        with name_file.open("r", encoding="utf-8") as fin, \
             people_out.open("w", encoding="utf-8", newline="") as fout:

            reader = csv.DictReader(cleaned_lines(fin, clean_people=True), delimiter="\t")
            w = csv.writer(fout, delimiter="\t", lineterminator="\n")

            # nconst, primary_name, birth_year, death_year, primary_profession
            w.writerow(["nconst", "primary_name", "birth_year", "death_year", "primary_profession"])

            for row in reader:
                nconst = row.get("nconst", "\\N")
                if nconst and nconst != "\\N":
                    valid_nconsts.add(nconst)
                w.writerow([
                    nconst,
                    row.get("primaryName", "\\N"),
                    row.get("birthYear", "\\N"),
                    row.get("deathYear", "\\N"),
                    row.get("primaryProfession", "\\N"),
                ])
        print("[preprocess] people OK")
    else:
        print("[preprocess] WARNING: name.basics.tsv not found")

    # ---------- R A T I N G S ----------
    ratings_file = in_path / "title.ratings.tsv"
    if ratings_file.exists():
        ratings_out = out_path / "ratings.tsv"
        with ratings_file.open("r", encoding="utf-8") as fin, \
             ratings_out.open("w", encoding="utf-8", newline="") as fout:

            reader = csv.DictReader(fin, delimiter="\t")
            w = csv.writer(fout, delimiter="\t", lineterminator="\n")

            # tconst -> title_id
            w.writerow(["title_id", "average_rating", "num_votes"])

            for row in reader:
                title_id = row.get("tconst", "\\N")
                if title_id in valid_tconsts:
                    w.writerow([
                        title_id,
                        row.get("averageRating", "\\N"),
                        row.get("numVotes", "\\N"),
                    ])
        print("[preprocess] ratings OK")
    else:
        print("[preprocess] WARNING: title.ratings.tsv not found")

    # ---------- P R I N C I P A L S ----------
    MAX_PRINCIPALS = 15000000  # Limit 15mln
    count = 0
    principals_file = in_path / "title.principals.tsv"
    if principals_file.exists():
        principals_out = out_path / "principals.tsv"
        with principals_file.open("r", encoding="utf-8") as fin, \
             principals_out.open("w", encoding="utf-8", newline="") as fout:

            reader = csv.DictReader(fin, delimiter="\t")
            w = csv.writer(fout, delimiter="\t", lineterminator="\n")

            # title_id, ordering, person_id, category, job, characters
            w.writerow(["title_id", "ordering", "person_id", "category", "job", "characters"])

            for row in reader:
                title_id = row.get("tconst", "\\N")
                person_id = row.get("nconst", "\\N")
                if person_id in valid_nconsts and title_id in valid_tconsts:
                    if count >= MAX_PRINCIPALS:
                        break
                    w.writerow([
                        title_id,
                        row.get("ordering", "\\N"),
                        person_id,
                        row.get("category", "\\N"),
                        row.get("job", "\\N"),
                        row.get("characters", "\\N"),
                    ])
                    count += 1
        print("[preprocess] principals OK")
    else:
        print("[preprocess] WARNING: title.principals.tsv not found")

    # ---------- A K A _ T I T L E S ----------
    akas_file = in_path / "title.akas.tsv"
    if akas_file.exists():
        akas_out = out_path / "aka_titles.tsv"
        with akas_file.open("r", encoding="utf-8") as fin, \
             akas_out.open("w", encoding="utf-8", newline="") as fout:

            reader = csv.DictReader(fin, delimiter="\t")
            w = csv.writer(fout, delimiter="\t", lineterminator="\n")

            # title_id, ordering, aka_title, region, language, types, attributes, is_original_title
            w.writerow([
                "title_id", "ordering", "aka_title",
                "region", "language", "types", "attributes", "is_original_title"
            ])

            for row in reader:
                title_id = row.get("titleId", "\\N")
                if title_id in valid_tconsts:
                    w.writerow([
                        title_id,
                        row.get("ordering", "\\N"),
                        row.get("title", "\\N"),
                        row.get("region", "\\N"),
                        row.get("language", "\\N"),
                        row.get("types", "\\N"),
                        row.get("attributes", "\\N"),
                        row.get("isOriginalTitle", "\\N"),
                    ])
        print("[preprocess] aka_titles OK")
    else:
        print("[preprocess] WARNING: title.akas.tsv not found")

    # ---------- E P I S O D E S ----------
    episodes_file = in_path / "title.episode.tsv"
    if episodes_file.exists():
        episodes_out = out_path / "episodes.tsv"
        with episodes_file.open("r", encoding="utf-8") as fin, \
             episodes_out.open("w", encoding="utf-8", newline="") as fout:

            reader = csv.DictReader(fin, delimiter="\t")
            w = csv.writer(fout, delimiter="\t", lineterminator="\n")

            # episode_id, parent_id, season_number, episode_number
            w.writerow(["episode_id", "parent_id", "season_number", "episode_number"])

            for row in reader:
                episode_id = row.get("tconst", "\\N") 
                parent_id = row.get("parentTconst", "\\N") 
                if episode_id in valid_tconsts and parent_id in valid_tconsts:
                    w.writerow([
                        episode_id,
                        parent_id,
                        row.get("seasonNumber", "\\N"),
                        row.get("episodeNumber", "\\N"),
                    ])
        print("[preprocess] episodes OK")
    else:
        print("[preprocess] WARNING: title.episode.tsv not found")

    print("[preprocess] DONE. Pliki w:", out_path)


In [20]:
preprocess_imdb("./data", "./data/processed")

[preprocess] titles + title_genres OK
[preprocess] people OK
[preprocess] ratings OK
[preprocess] principals OK
[preprocess] aka_titles OK
[preprocess] episodes OK
[preprocess] DONE. Pliki w: data\processed


In [54]:
def postgres_operation(conn, query, fetch=False):
    """
    Execute a PostgreSQL query and optionally fetch results.
    
    Parameters:
    -----------
    conn : psycopg2 connection
        Database connection
    query : str
        SQL query to execute
    fetch : bool
        Whether to fetch and return results
        
    Returns:
    --------
    Result set if fetch=True, None otherwise
    """
    try:
        with conn.cursor() as cur:
            cur.execute(query)
            if fetch:
                return cur.fetchall()
        if not fetch:
            conn.commit()
    except Exception as e:
        conn.rollback()
        print(f"Błąd operacji PostgreSQL: {e}")


def load_insert_data_from_tsv(data_dir, max_rows=100):
    """
    Load data from TSV files for INSERT benchmark operations.
    Loads a subset of rows from each TSV file to use for INSERT benchmarks.
    
    Parameters:
    -----------
    data_dir : str or Path
        Directory containing TSV files
    max_rows : int
        Maximum number of rows to load from each file for INSERT operations
        
    Returns:
    --------
    dict: Dictionary containing test data for each table
    """
    from pathlib import Path
    import csv
    
    data_path = Path(data_dir)
    result = {
        'titles': {},
        'people': {},
        'ratings': {},
        'principals': {},
        'aka_titles': {},
        'episodes': {},
        'title_genres': {}
    }
    
    # Helper to escape SQL strings
    def escape_sql(s):
        if s is None or s == '\\N':
            return None
        return str(s).replace("'", "''")
    
    # Load titles
    titles_file = data_path / 'titles.tsv'
    if titles_file.exists():
        with open(titles_file, 'r', encoding='utf-8') as f:
            reader = csv.DictReader(f, delimiter='\t')
            for i, row in enumerate(reader):
                if i >= max_rows:
                    break
                tconst = row.get('tconst', '')
                if tconst:
                    new_tconst = f"NEW{tconst}"
                    result['titles'][new_tconst] = {
                        'title_type': escape_sql(row.get('title_type', '')),
                        'primary_title': escape_sql(row.get('primary_title', '')),
                        'original_title': escape_sql(row.get('original_title', '')),
                        'is_adult': row.get('is_adult', '0') == '1',
                        'start_year': row.get('start_year', '') if row.get('start_year') != '\\N' else None,
                        'end_year': row.get('end_year', '') if row.get('end_year') != '\\N' else None,
                        'runtime_minutes': row.get('runtime_minutes', '') if row.get('runtime_minutes') != '\\N' else None
                    }
    
    # Load people
    people_file = data_path / 'people.tsv'
    if people_file.exists():
        with open(people_file, 'r', encoding='utf-8') as f:
            reader = csv.DictReader(f, delimiter='\t')
            for i, row in enumerate(reader):
                if i >= max_rows:
                    break
                nconst = row.get('nconst', '')
                if nconst:
                    new_nconst = f"NEW{nconst}"
                    result['people'][new_nconst] = {
                        'primary_name': escape_sql(row.get('primary_name', '')),
                        'birth_year': row.get('birth_year', '') if row.get('birth_year') != '\\N' else None,
                        'death_year': row.get('death_year', '') if row.get('death_year') != '\\N' else None,
                        'primary_profession': escape_sql(row.get('primary_profession', ''))
                    }
    
    # Load ratings
    ratings_file = data_path / 'ratings.tsv'
    if ratings_file.exists():
        with open(ratings_file, 'r', encoding='utf-8') as f:
            reader = csv.DictReader(f, delimiter='\t')
            for i, row in enumerate(reader):
                if i >= max_rows:
                    break
                tconst = row.get('title_id', '')
                if tconst:
                    new_tconst = f"NEW{tconst}"
                    if new_tconst in result['titles']:
                        result['ratings'][new_tconst] = {
                            'average_rating': row.get('average_rating', '0'),
                            'num_votes': row.get('num_votes', '0')
                        }
    
    # Load principals
    principals_file = data_path / 'principals.tsv'
    if principals_file.exists():
        with open(principals_file, 'r', encoding='utf-8') as f:
            reader = csv.DictReader(f, delimiter='\t')
            for i, row in enumerate(reader):
                if i >= max_rows:
                    break
                original_title_id = row.get('title_id', '')
                original_person_id = row.get('person_id', '')
                new_title_id = f"NEW{original_title_id}"
                new_person_id = f"NEW{original_person_id}"
                if new_title_id in result['titles'] and new_person_id in result['people']:
                    new_key = f"{new_title_id}_{row.get('ordering', '')}"
                    result['principals'][new_key] = {
                        'title_id': new_title_id,
                        'ordering': row.get('ordering', '0'),
                        'person_id': new_person_id,
                        'category': escape_sql(row.get('category', '')),
                        'job': escape_sql(row.get('job', '')) if row.get('job') != '\\N' else None,
                        'characters': escape_sql(row.get('characters', '')) if row.get('characters') != '\\N' else None
                    }
    
    # Load aka_titles
    aka_file = data_path / 'aka_titles.tsv'
    if aka_file.exists():
        with open(aka_file, 'r', encoding='utf-8') as f:
            reader = csv.DictReader(f, delimiter='\t')
            for i, row in enumerate(reader):
                if i >= max_rows:
                    break
                original_title_id = row.get('title_id', '')
                new_title_id = f"NEW{original_title_id}"
                if new_title_id in result['titles']:
                    new_key = f"{new_title_id}_{row.get('ordering', '')}"
                    result['aka_titles'][new_key] = {
                        'title_id': new_title_id,
                        'ordering': row.get('ordering', '0'),
                        'aka_title': escape_sql(row.get('aka_title', '')),
                        'region': escape_sql(row.get('region', '')) if row.get('region') != '\\N' else None,
                        'language': escape_sql(row.get('language', '')) if row.get('language') != '\\N' else None,
                        'types': escape_sql(row.get('types', '')) if row.get('types') != '\\N' else None,
                        'attributes': escape_sql(row.get('attributes', '')) if row.get('attributes') != '\\N' else None,
                        'is_original_title': row.get('is_original_title', '0') == '1'
                    }
    
    # Load episodes
    episodes_file = data_path / 'episodes.tsv'
    if episodes_file.exists():
        with open(episodes_file, 'r', encoding='utf-8') as f:
            reader = csv.DictReader(f, delimiter='\t')
            for i, row in enumerate(reader):
                if i >= max_rows:
                    break

                episode_id = row.get('episode_id', '')
                parent_id = row.get('parent_id', '')
                
                new_episode_id = f"NEW{episode_id}"
                new_parent_id = f"NEW{parent_id}"
                if new_episode_id in result['titles'] and new_parent_id in result['titles']:
                    result['episodes'][new_episode_id] = {
                        'parent_id': new_parent_id,
                        'season_number': row.get('season_number', '') if row.get('season_number') != '\\N' else None,
                        'episode_number': row.get('episode_number', '') if row.get('episode_number') != '\\N' else None
                    }
    
    # Load title_genres
    genres_file = data_path / 'title_genres.tsv'
    if genres_file.exists():
        with open(genres_file, 'r', encoding='utf-8') as f:
            reader = csv.DictReader(f, delimiter='\t')
            for i, row in enumerate(reader):
                if i >= max_rows:
                    break
            
                title_id = row.get('title_id', '')
                genre_value = row.get('genre', '')
                
                new_title_id = f"NEW{title_id}"
                
                if title_id and genre_value and new_title_id in result['titles']:
                        new_key = f"{new_title_id}_{genre_value}"
                        result['title_genres'][new_key] = {
                            'title_id': new_title_id,
                            'genre': genre_value
                        }    
    return result


In [55]:
data_dir = "data/processed"  # względnie do katalogu projektu
test_data = load_insert_data_from_tsv(data_dir, max_rows=100000)

for table, rows in test_data.items():
    print(table, "->", len(rows))

titles -> 100000
people -> 100000
ratings -> 77464
principals -> 14288
aka_titles -> 100000
episodes -> 443
title_genres -> 100000


In [56]:
import time
import psutil
import statistics
from typing import List, Tuple, Callable

class BenchmarkTime:    
    def __init__(self, db_type: str, data_dir: str):
        self.data_dir = data_dir
        self.db_type = db_type
        self.process = psutil.Process()
        self.results = []
    
    def get_results_df(self):
        import pandas as pd
        return pd.DataFrame(self.results)

    def run_benchmark(self, scenarios: List[Tuple[str, List[Tuple[str, Callable]]]], 
                    setup_method: Callable = None, 
                    cleanup_method: Callable = None):
        io_counters_start = psutil.disk_io_counters()
        
        if setup_method:
            setup_method()
            
        for scenario_name, operations in scenarios:
            start_time = time.time()
            cpu_samples = []
            memory_samples = []
            durations = []
            
            for op_name, func in operations:
                cpu_samples.append(self.process.cpu_percent())
                memory_samples.append(self.process.memory_info().rss)
                
                op_start = time.time()
                func()
                op_duration = time.time() - op_start
                durations.append(op_duration)
            
            end_time = time.time()
            total_time = end_time - start_time

            avg_cpu = statistics.mean(cpu_samples) if cpu_samples else 0
            avg_memory = statistics.mean(memory_samples) / (1024 * 1024) if memory_samples else 0
            
            io_counters_end = psutil.disk_io_counters()
            read_mb = (io_counters_end.read_bytes - io_counters_start.read_bytes) / (1024 * 1024)
            write_mb = (io_counters_end.write_bytes - io_counters_start.write_bytes) / (1024 * 1024)
            
            avg_op_time = statistics.mean(durations) if durations else 0
            throughput = len(operations) / total_time if total_time > 0 else 0
            
            scenario_result = {
                'database': self.db_type,
                'data_dir': self.data_dir,
                'scenario': scenario_name,
                'total_time': total_time,
                'operations': len(operations),
                'avg_operation_time': avg_op_time,
                'throughput': throughput,
                'cpu_avg': avg_cpu,
                'memory_avg': avg_memory,
                'disk_read_mb': read_mb,
                'disk_write_mb': write_mb
            }
            self.results.append(scenario_result)
            
            print(f"--- {scenario_name} ({self.db_type}) ---")
            print(f"Total time: {total_time:.4f} seconds")
            print(f"Operations: {len(operations)}")
            print(f"Avg operation time: {avg_op_time:.4f} seconds")
            print(f"Throughput: {throughput:.2f} ops/sec")
            print(f"CPU avg: {avg_cpu:.2f}%")
            print(f"Memory avg: {avg_memory:.2f} MB")
            print(f"Disk read: {read_mb:.2f} MB")
            print(f"Disk write: {write_mb:.2f} MB")
            print()
            
            io_counters_start = io_counters_end
        
        if cleanup_method:
            cleanup_method()

In [57]:
INSERT_TITLE="INSERT Title ?"
INSERT_PERSON="INSERT Person ?"
INSERT_RATING="INSERT Rating ?"
INSERT_PRINCIPAL="INSERT Principal ?"
INSERT_AKA_TITLE="INSERT AKA Title ?"
INSERT_EPISODE="INSERT Episode ?"
INSERT_TITLE_GENRE="INSERT Title Genre ?"

SELECT_TITLE = "SELECT title ?"
SELECT_PERSON = "SELECT person ?"
SELECT_ALL_PEOPLE_IN_TITLE = "SELECT all people that are in the title ?"
SELECT_ALL_EPISODES_FOR_SERIES = "SELECT all episodes for the series ?"
SELECT_ALL_RATINGS_WITH_TITLE_INFO = "SELECT all ratings with title info for all titles in the genre ?"

UPDATE_TITLE_PRIMARY_TITLE = "UPDATE Title ? Primary Title"
UPDATE_ALL_RATINGS_FOR_TITLE = "UPDATE all Ratings for Title ?"
UPDATE_PERSON_PRIMARY_NAME = "UPDATE Person ? Primary Name"
UPDATE_TITLE_START_YEAR = "UPDATE Title ? Start Year"
UPDATE_PERSON_BIRTH_YEAR_FOR_ALL_PEOPLE_IN_TITLE = "UPDATE Person Birth Year for all people that are in the title ?"

DELETE_TITLE = "DELETE Title ?"
DELETE_PERSON = "DELETE Person ?"
DELETE_GENRES_THAT_ARE_IN_THE_TITLE = "DELETE Genres that are in the title ?"
DELETE_PEOPLE_WHO_ARE_IN_TITLE = "DELETE People who are in title ?"

In [58]:
def load_postgres_data(conn, data_dir: str):
    data_path = Path(data_dir)

    def copy_from_tsv(conn, table_name, file_path, columns=None):
        op_time = time.time()
        f_op_time = time.time()

        try:
            with conn.cursor() as cur:
                with open(file_path, "r", encoding="utf-8") as f:
                    next(f)
                    f_op_time = time.time()

                    if columns:
                        cur.copy_from(f,
                            table_name,
                            columns=columns,
                            null="\\N",
                            sep="\t",
                        )
                    else:
                        cur.copy_from(
                            f,
                            table_name,
                            null="\\N",
                            sep="\t",
                        )
            conn.commit()
        except Exception as e:
            conn.rollback()
            print(f"ERROR loading {table_name} from {file_path}: {e}")
            raise

        end_time = time.time()
        return op_time, f_op_time, end_time

    # ---------- T I T L E S ----------
    titles_file = data_path / "titles.tsv"
    if titles_file.exists():
        op_time, f_op_time, end_time = copy_from_tsv(
            conn,
            "titles",
            titles_file,
            columns=(
                "tconst",
                "title_type",
                "primary_title",
                "original_title",
                "is_adult",
                "start_year",
                "end_year",
                "runtime_minutes",
            ),
        )
        print(
            f"INFO: Inserted titles in {end_time - op_time:.2f} s "
            f"(COPY phase: {end_time - f_op_time:.2f} s)"
        )
    else:
        print(f"WARNING: {titles_file} not found, skipping titles")

    # ---------- P E O P L E ----------
    people_file = data_path / "people.tsv"
    if people_file.exists():
        op_time, f_op_time, end_time = copy_from_tsv(
            conn,
            "people",
            people_file,
            columns=(
                "nconst",
                "primary_name",
                "birth_year",
                "death_year",
                "primary_profession",
            ),
        )
        print(
            f"INFO: Inserted people in {end_time - op_time:.2f} s "
            f"(COPY phase: {end_time - f_op_time:.2f} s)"
        )
    else:
        print(f"WARNING: {people_file} not found, skipping people")

    # ---------- R A T I N G S ----------
    ratings_file = data_path / "ratings.tsv"
    if ratings_file.exists():
        op_time, f_op_time, end_time = copy_from_tsv(
            conn,
            "ratings",
            ratings_file,
            columns=("title_id", "average_rating", "num_votes"),
        )
        print(
            f"INFO: Inserted ratings in {end_time - op_time:.2f} s "
            f"(COPY phase: {end_time - f_op_time:.2f} s)"
        )
    else:
        print(f"WARNING: {ratings_file} not found, skipping ratings")

    # ---------- P R I N C I P A L S ----------
    principals_file = data_path / "principals.tsv"
    if principals_file.exists():
        op_time, f_op_time, end_time = copy_from_tsv(
            conn,
            "principals",
            principals_file,
            columns=("title_id", "ordering", "person_id", "category", "job", "characters"),
        )
        print(
            f"INFO: Inserted principals in {end_time - op_time:.2f} s "
            f"(COPY phase: {end_time - f_op_time:.2f} s)"
        )
    else:
        print(f"WARNING: {principals_file} not found, skipping principals")

    # ---------- A K A _ T I T L E S ----------
    aka_titles_file = data_path / "aka_titles.tsv"
    if aka_titles_file.exists():
        op_time, f_op_time, end_time = copy_from_tsv(
            conn,
            "aka_titles",
            aka_titles_file,
            columns=(
                "title_id",
                "ordering",
                "aka_title",
                "region",
                "language",
                "types",
                "attributes",
                "is_original_title",
            ),
        )
        print(
            f"INFO: Inserted aka_titles in {end_time - op_time:.2f} s "
            f"(COPY phase: {end_time - f_op_time:.2f} s)"
        )
    else:
        print(f"WARNING: {aka_titles_file} not found, skipping aka_titles")

    # ---------- E P I S O D E S ----------
    episodes_file = data_path / "episodes.tsv"
    if episodes_file.exists():
        op_time, f_op_time, end_time = copy_from_tsv(
            conn,
            "episodes",
            episodes_file,
            columns=("episode_id", "parent_id", "season_number", "episode_number"),
        )
        print(
            f"INFO: Inserted episodes in {end_time - op_time:.2f} s "
            f"(COPY phase: {end_time - f_op_time:.2f} s)"
        )
    else:
        print(f"WARNING: {episodes_file} not found, skipping episodes")

    # ---------- T I T L E _ G E N R E S ----------
    title_genres_file = data_path / "title_genres.tsv"
    if title_genres_file.exists():
        op_time, f_op_time, end_time = copy_from_tsv(
            conn,
            "title_genres",
            title_genres_file,
            columns=("title_id", "genre"),
        )
        print(
            f"INFO: Inserted title_genres in {end_time - op_time:.2f} s "
            f"(COPY phase: {end_time - f_op_time:.2f} s)"
        )
    else:
        print(f"WARNING: {title_genres_file} not found, skipping title_genres")

In [59]:
def postgres_benchmark(data_dir, random_numbers_list) -> pd.DataFrame:
    benchmark = BenchmarkTime("postgres", data_dir)
    def setup_for_insert():
        with open('db/postgres/schema.sql', 'r') as f:
            sql_schema = f.read()

        initialize_postgres_schema(postgres_client, sql_schema)
        load_postgres_data(postgres_client, data_dir)

    test_data = load_insert_data_from_tsv(data_dir, max_rows=100000)
    test_titles = test_data['titles']
    test_people = test_data['people']
    test_ratings = test_data['ratings']
    test_principals = test_data['principals']
    test_aka_titles = test_data['aka_titles']
    test_episodes = test_data['episodes']
    test_title_genres = test_data['title_genres']
    
    insert_scenarios = [
        (
            INSERT_TITLE,
            [
                (INSERT_TITLE + str(k), lambda k=k, v=v: postgres_operation(postgres_client,
                    f"INSERT INTO titles (tconst, title_type, primary_title, original_title, is_adult, start_year, end_year, runtime_minutes) VALUES ('{k}', '{v['title_type']}', '{v['primary_title']}', '{v['original_title']}', {v['is_adult']}, " +
                    (f"{v['start_year']}" if v.get('start_year') is not None else 'NULL') + ", " +
                    (f"{v['end_year']}" if v.get('end_year') is not None else 'NULL') + ", " +
                    (f"{v['runtime_minutes']}" if v.get('runtime_minutes') is not None else 'NULL') +
                    ")"))
                for k, v in test_titles.items()
            ]
        ),
        (
            INSERT_PERSON,
            [
                (INSERT_PERSON + str(k), lambda k=k, v=v: postgres_operation(postgres_client,
                    f"INSERT INTO people (nconst, primary_name, birth_year, death_year, primary_profession) VALUES ('{k}', '{v['primary_name']}', " +
                    (f"{v['birth_year']}" if v.get('birth_year') is not None else 'NULL') + ", " +
                    (f"{v['death_year']}" if v.get('death_year') is not None else 'NULL') + ", " +
                    f"'{v['primary_profession']}')"))
                for k, v in test_people.items()
            ]
        ),
        (
            INSERT_RATING,
            [
                (INSERT_RATING + str(k), lambda k=k, v=v: postgres_operation(postgres_client,
                    f"INSERT INTO ratings (title_id, average_rating, num_votes) VALUES ('{k}', {v['average_rating']}, {v['num_votes']})"))
                for k, v in test_ratings.items()
            ]
        ), 
        (
            INSERT_PRINCIPAL,
            [
                (INSERT_PRINCIPAL + str(k), lambda k=k, v=v: postgres_operation(postgres_client,
                    f"INSERT INTO principals (title_id, ordering, person_id, category, job, characters) VALUES ('{v['title_id']}', {v['ordering']}, '{v['person_id']}', '{v['category']}', " + (f"'{v['job']}'" if v.get('job') else 'NULL') + ", " + (f"'{v['characters']}'" if v.get('characters') else 'NULL') + ")"))
                for k, v in test_principals.items()
            ]
        ),
        (
            INSERT_AKA_TITLE,
            [
                (INSERT_AKA_TITLE + str(k), lambda k=k, v=v: postgres_operation(postgres_client,
                    f"INSERT INTO aka_titles (title_id, ordering, aka_title, region, language, types, attributes, is_original_title) VALUES ('{v['title_id']}', {v['ordering']}, '{v['aka_title']}', " + (f"'{v['region']}'" if v.get('region') else 'NULL') + ", " + (f"'{v['language']}'" if v.get('language') else 'NULL') + ", " + (f"'{v['types']}'" if v.get('types') else 'NULL') + ", " + (f"'{v['attributes']}'" if v.get('attributes') else 'NULL') + f", {v['is_original_title']})"))
                for k, v in test_aka_titles.items()
            ]
        ),
        (
            INSERT_EPISODE,
            [
                (INSERT_EPISODE + str(k), lambda k=k, v=v: postgres_operation(postgres_client,
                    f"INSERT INTO episodes (episode_id, parent_id, season_number, episode_number) VALUES ('{k}', '{v['parent_id']}', " +
                    (f"{v['season_number']}" if v.get('season_number') is not None else 'NULL') + ", " +
                    (f"{v['episode_number']}" if v.get('episode_number') is not None else 'NULL') +
                    ")"))
                for k, v in test_episodes.items()
            ]
        ),
        (
            INSERT_TITLE_GENRE,
            [
                (INSERT_TITLE_GENRE + str(k), lambda k=k, v=v: postgres_operation(postgres_client,
                    f"INSERT INTO title_genres (title_id, genre) VALUES ('{v['title_id']}', '{v['genre']}')"))
                for k, v in test_title_genres.items()
            ]
        )
    ]

    select_scenarios = [
        (
            SELECT_TITLE, [
                (SELECT_TITLE + str(k), lambda k=k: postgres_operation(postgres_client,
                    f"SELECT * FROM titles WHERE tconst = '{k}'", fetch=True))
                for k in random_numbers_list
            ]
        ),
        (
            SELECT_PERSON, [
                (SELECT_PERSON + str(k), lambda k=k: postgres_operation(postgres_client,
                    f"SELECT * FROM people WHERE nconst = '{k}'", fetch=True))
                for k in random_numbers_list
            ]
        ),
        (
            SELECT_ALL_PEOPLE_IN_TITLE, [
                (SELECT_ALL_PEOPLE_IN_TITLE + str(k), lambda k=k: postgres_operation(postgres_client,
                    f"SELECT * FROM people p JOIN principals pr ON p.nconst = pr.person_id WHERE pr.title_id = '{k}'", fetch=True))
                for k in random_numbers_list
            ]
        ),
        (
            SELECT_ALL_EPISODES_FOR_SERIES, [
                (SELECT_ALL_EPISODES_FOR_SERIES + str(k), lambda k=k: postgres_operation(postgres_client,
                    f"SELECT * FROM episodes e JOIN titles t ON e.episode_id = t.tconst WHERE e.parent_id = '{k}'", fetch=True))
                for k in random_numbers_list
            ] 
        ),
        (
            SELECT_ALL_RATINGS_WITH_TITLE_INFO, [
                (SELECT_ALL_RATINGS_WITH_TITLE_INFO + str(k), lambda k=k: postgres_operation(postgres_client,
                    f"SELECT * FROM ratings r JOIN titles t ON r.title_id = t.tconst JOIN title_genres tg ON t.tconst = tg.title_id WHERE tg.genre = '{k}'", fetch=True))
                for k in random_numbers_list
            ] 
        ),
    ]

    update_scenarios = [
        (
            UPDATE_TITLE_PRIMARY_TITLE, [
                (UPDATE_TITLE_PRIMARY_TITLE + str(k), lambda k=k: postgres_operation(postgres_client,
                    f"UPDATE titles SET primary_title = 'UPDATED' WHERE tconst = '{k}'"))
                for k in random_numbers_list
            ],
        ),
        (
            UPDATE_ALL_RATINGS_FOR_TITLE, [
                (UPDATE_ALL_RATINGS_FOR_TITLE + str(k), lambda k=k: postgres_operation(postgres_client,
                    f"UPDATE ratings SET average_rating = 10.0 WHERE title_id = '{k}'"))
                for k in random_numbers_list
            ],
        ),
        (
            UPDATE_PERSON_PRIMARY_NAME, [
                (UPDATE_PERSON_PRIMARY_NAME + str(k), lambda k=k: postgres_operation(postgres_client,
                    f"UPDATE people SET primary_name = 'UPDATED' WHERE nconst = '{k}'"))
                for k in random_numbers_list
            ],
        ),
        (
            UPDATE_TITLE_START_YEAR, [
                (UPDATE_TITLE_START_YEAR + str(k), lambda k=k: postgres_operation(postgres_client,
                    f"UPDATE titles SET start_year = 2024 WHERE tconst = '{k}'"))
                for k in random_numbers_list
            ],
        ),
        (
            UPDATE_PERSON_BIRTH_YEAR_FOR_ALL_PEOPLE_IN_TITLE, [
                (UPDATE_PERSON_BIRTH_YEAR_FOR_ALL_PEOPLE_IN_TITLE + str(k), lambda k=k: postgres_operation(postgres_client,
                    f"UPDATE people SET birth_year = 2000 WHERE nconst IN (SELECT person_id FROM principals WHERE title_id = '{k}')"))
                for k in random_numbers_list
            ],
        )
    ]

    delete_scenarios = [
        (
            DELETE_TITLE, [
                (DELETE_TITLE + str(k), lambda k=k: postgres_operation(postgres_client,
                    f"DELETE FROM titles WHERE tconst = '{k}'"))
                for k in random_numbers_list
            ]
        ),
        (
            DELETE_PERSON, [
                (DELETE_PERSON + str(k), lambda k=k: postgres_operation(postgres_client,
                    f"DELETE FROM people WHERE nconst = '{k}'"))
                for k in random_numbers_list
            ]
        ),
        (
            DELETE_GENRES_THAT_ARE_IN_THE_TITLE, [
                (DELETE_GENRES_THAT_ARE_IN_THE_TITLE + str(k), lambda k=k: postgres_operation(postgres_client,
                    f"DELETE FROM title_genres WHERE title_id = '{k}'"))
                for k in random_numbers_list
            ]
        ),
        (
            DELETE_PEOPLE_WHO_ARE_IN_TITLE, [
                (DELETE_PEOPLE_WHO_ARE_IN_TITLE + str(k), lambda k=k: postgres_operation(postgres_client,
                    f"DELETE FROM principals WHERE title_id = '{k}'"))
                for k in random_numbers_list
            ]
        )
    ]

    benchmark.run_benchmark(insert_scenarios, setup_method=setup_for_insert)
    benchmark.run_benchmark(select_scenarios)
    benchmark.run_benchmark(update_scenarios)
    benchmark.run_benchmark(delete_scenarios)
    return benchmark.get_results_df()


In [60]:
DEBUG=False
from random import sample
def run_all_benchmarks(scale, ignore_cassandra=False):
    data_path = './data/scale_' + str(scale)
    data_path = './data/processed'

    # generate_files(output_dir=data_path, scale=adjusted_scale)
    rand_list = sample(range(1, scale+1), 100000)

    results = []
    postgres_results_df = postgres_benchmark(data_path, rand_list)
    results.append(postgres_results_df)

    merged_df = pd.concat(results, ignore_index=True)
    merged_df.to_csv(f"{data_path}/merged_results.csv", index=False)



In [None]:
run_all_benchmarks(100000)

PostgreSQL schema initialization complete.
INFO: Inserted titles in 27.17 s (COPY phase: 27.17 s)
