# Cinema Data Generation and Fixes

This notebook generates synthetic data for a cinema management system, backfills missing transactions, and fixes missing ticket associations. The data is stored in a PostgreSQL database.

---

## Table of Contents
1. [Database Configuration](#database-configuration)
2. [Data Generation](#data-generation)
   - [Populate Static Dimensions](#populate-static-dimensions)
   - [Generate Customers](#generate-customers)
   - [Generate Time Dimension](#generate-time-dimension)
   - [Generate Movies and Relations](#generate-movies-and-relations)
   - [Generate Showings and Tickets](#generate-showings-and-tickets)
   - [Generate Transactions](#generate-transactions)
3. [Backfill Missing Transactions](#backfill-missing-transactions)
4. [Fix Missing Tickets](#fix-missing-tickets)


---
## Database Configuration

The database connection details are stored in environment variables. Replace the placeholders with your actual database credentials.

In [2]:
pip install -r requirements.txt

Collecting psycopg2-binary==2.9.9 (from -r requirements.txt (line 1))
  Downloading psycopg2_binary-2.9.9-cp310-cp310-macosx_11_0_arm64.whl.metadata (4.4 kB)
Collecting Faker==19.13.0 (from -r requirements.txt (line 2))
  Downloading Faker-19.13.0-py3-none-any.whl.metadata (15 kB)
Collecting python-dotenv==1.0.0 (from -r requirements.txt (line 3))
  Downloading python_dotenv-1.0.0-py3-none-any.whl.metadata (21 kB)
Downloading psycopg2_binary-2.9.9-cp310-cp310-macosx_11_0_arm64.whl (2.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.6/2.6 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0mm
[?25hDownloading Faker-19.13.0-py3-none-any.whl (1.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.7/1.7 MB[0m [31m27.6 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hDownloading python_dotenv-1.0.0-py3-none-any.whl (19 kB)
Installing collected packages: python-dotenv, psycopg2-binary, Faker
Successfully installed Faker-19.13.0 psycopg2

In [3]:
from psycopg2.extras import execute_values
import random
from faker import Faker
import psycopg2
from datetime import datetime, timedelta
import sys
import os
from dotenv import load_dotenv
from multiprocessing import Pool, cpu_count
from io import StringIO
import time

In [6]:


# Load environment variables from .env file
load_dotenv()

# Database configuration
DB_CONFIG = {
    "dbname": os.getenv("DB_NAME"),
    "user": os.getenv("DB_USER"),
    "password": os.getenv("DB_PASSWORD"),
    "host": os.getenv("DB_HOST"),
    "port": os.getenv("DB_PORT"),
    "options": f"-c search_path={os.getenv('DB_SCHEMA')},public"
}

def get_db_connection():
    """Returns a connection to the PostgreSQL database."""
    return psycopg2.connect(**DB_CONFIG)


NUM_CUSTOMERS = 10_000
NUM_TRANSACTIONS = 1_000_000
BATCH_SIZE = 10_000
fake=Faker()

## Data Generation

This section generates synthetic data for all tables in the `al_sinama` schema.

In [8]:
def create_tables():
    """Create tables if they don't exist (omitted for brevity)"""
    pass



def populate_static_dimensions():
    """Populate static dimensions (Genres, Promotions)"""
    with get_db_connection() as conn:
        with conn.cursor() as cursor:
            # Genres
            cursor.execute("SELECT COUNT(*) FROM dim_genre")
            if cursor.fetchone()[0] == 0:
                genres = ['Action', 'Drama', 'Comedy', 'Horror', 'Sci-Fi']
                execute_values(
                    cursor,
                    "INSERT INTO dim_genre (name) VALUES %s",
                    [(g,) for g in genres]
                )
            
            # Promotions
            cursor.execute("SELECT COUNT(*) FROM dim_promotion")
            if cursor.fetchone()[0] == 0:
                promotions = [
                    (f"Promo {i}", random.randint(5, 30), 
                     fake.date_between('-2y'), fake.date_between('-1y'))
                    for i in range(10)
                ]
                execute_values(
                    cursor,
                    """INSERT INTO dim_promotion 
                       (description, discount, start_date, end_date)
                       VALUES %s""",
                    promotions
                )
            conn.commit()

def generate_customers():
    with get_db_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute("SELECT COUNT(*) FROM dim_customer")
            if cursor.fetchone()[0] == 0:
                print("Generating customers...")
                customers = [
                    (fake.name(), 
                     fake.date_of_birth(minimum_age=12, maximum_age=90),
                     random.choice(['M', 'F', 'Other']),
                     fake.address())
                    for _ in range(NUM_CUSTOMERS)
                ]
                
                # Use COPY for bulk insert
                buffer = StringIO()
                for c in customers:
                    buffer.write("\t".join([
                        str(c[0]), str(c[1]), c[2], c[3], "\n"
                    ]))
                buffer.seek(0)
                
                cursor.copy_from(
                    buffer,
                    "dim_customer",
                    columns=("name", "dob", "gender", "address"),
                    null=""
                )
                conn.commit()

def generate_time_dimension():
    with get_db_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute("SELECT COUNT(*) FROM dim_time")
            if cursor.fetchone()[0] == 0:
                print("Generating time dimension...")
                start_date = datetime(2014, 1, 1)
                end_date = datetime(2024, 12, 31)
                delta = timedelta(days=1)
                
                buffer = StringIO()
                current_date = start_date
                while current_date <= end_date:
                    buffer.write("\t".join([
                        str(current_date.date()),
                        str(current_date.year),
                        str(current_date.month),
                        str(current_date.day),
                        str((current_date.month - 1) // 3 + 1),
                        str(current_date.isocalendar()[1]),
                        "\n"
                    ]))
                    current_date += delta
                
                buffer.seek(0)
                cursor.copy_from(
                    buffer,
                    "dim_time",
                    columns=("date", "year", "month", "day", "quarter", "week_number")
                )
                conn.commit()

def generate_movies_and_relations():
    with get_db_connection() as conn:
        with conn.cursor() as cursor:
            # Generate movies
            cursor.execute("SELECT COUNT(*) FROM dim_movie")
            if cursor.fetchone()[0] == 0:
                print("Generating movies...")
                cursor.execute("SELECT genre_id FROM dim_genre")
                genre_ids = [row[0] for row in cursor.fetchall()]
                
                movies = [
                    (fake.catch_phrase(),
                     fake.date_between('-10y'),
                     random.choice(['English', 'Arabic', 'French']),
                     round(random.uniform(1e6, 100e6), 2),
                     fake.country(),
                     random.choice(genre_ids))
                    for _ in range(500)
                ]
                execute_values(
                    cursor,
                    """INSERT INTO dim_movie 
                       (title, release_date, language, cost, country, genre_id)
                       VALUES %s""",
                    movies
                )
                conn.commit()

            # Generate movie-star relations
            cursor.execute("SELECT COUNT(*) FROM dim_movie_star")
            if cursor.fetchone()[0] == 0:
                print("Generating movie-star relations...")
                cursor.execute("SELECT movie_id FROM dim_movie")
                movie_ids = [row[0] for row in cursor.fetchall()]
                cursor.execute("SELECT star_id FROM dim_star")
                star_ids = [row[0] for row in cursor.fetchall()]
                
                relations = set()
                while len(relations) < 2000:
                    relations.add((
                        random.choice(movie_ids),
                        random.choice(star_ids)
                    ))
                
                execute_values(
                    cursor,
                    "INSERT INTO dim_movie_star (movie_id, star_id) VALUES %s",
                    list(relations)
                )
                conn.commit()

def generate_showings_tickets(args):
    """Parallel ticket generation worker"""
    show_id, hall_size = args
    return [
        (show_id, 1, seat_num, round(random.uniform(5, 25), 2))
        for seat_num in range(1, hall_size + 1)
    ]

def generate_showings_and_tickets():
    with get_db_connection() as conn:
        with conn.cursor() as cursor:
            # Generate showings
            cursor.execute("SELECT COUNT(*) FROM dim_showing")
            if cursor.fetchone()[0] == 0:
                print("Generating showings...")
                cursor.execute("SELECT movie_id FROM dim_movie")
                movie_ids = [row[0] for row in cursor.fetchall()]
                cursor.execute("SELECT hall_id, size FROM dim_hall")
                hall_data = cursor.fetchall()
                
                showings = []
                for _ in range(20000):
                    show_date = fake.date_between_dates(
                        datetime(2014, 1, 1), datetime(2024, 12, 31))
                    show_time = fake.time_object()
                    hall_id, hall_size = random.choice(hall_data)
                    showings.append((
                        random.choice(movie_ids),
                        hall_id,
                        show_date,
                        show_time.strftime("%H:%M:%S"),
                        show_date.weekday() >= 5,
                        'Morning' if show_time.hour < 12 else 
                        'Afternoon' if show_time.hour < 17 else 'Evening'
                    ))
                
                execute_values(
                    cursor,
                    """INSERT INTO dim_showing 
                       (movie_id, hall_id, date, time, is_weekend, time_of_day)
                       VALUES %s RETURNING show_id, hall_id""",
                    showings,
                    page_size=1000
                )
                show_hall = cursor.fetchall()
                conn.commit()

                # Generate tickets in parallel
                print("Generating tickets...")
                with Pool(cpu_count()) as pool:
                    tickets = pool.map(
                        generate_showings_tickets,
                        [(s[0], next(h[1] for h in hall_data if h[0] == s[1])) 
                         for s in show_hall]
                    )
                
                # Flatten results
                tickets = [t for sublist in tickets for t in sublist]
                
                # Use COPY for tickets
                buffer = StringIO()
                for t in tickets:
                    buffer.write("\t".join(map(str, t)) )
                    buffer.write("\n")
                buffer.seek(0)
                
                cursor.copy_from(
                    buffer,
                    "dim_ticket",
                    columns=("show_id", "row_num", "seat_num", "price")
                )
                conn.commit()

def generate_transactions():
    """Generate and insert transaction data."""
    with get_db_connection() as conn:
        with conn.cursor() as cursor:
            # Check if transactions already exist
            cursor.execute("SELECT COUNT(*) FROM fact_transaction")
            if cursor.fetchone()[0] == 0:
                print("Generating transactions...")
                
                # Pre-load lookup data
                print("Loading lookup data...")
                # Time dimension
                cursor.execute("SELECT date, time_id FROM dim_time")
                date_to_time = {row[0]: row[1] for row in cursor.fetchall()}
                
                # Customers
                cursor.execute("SELECT customer_id, dob FROM dim_customer")
                customers = cursor.fetchall()
                
                # Tickets
                cursor.execute("SELECT ticket_id, price FROM dim_ticket")
                tickets = cursor.fetchall()
                ticket_map = {t[0]: t[1] for t in tickets}
                ticket_ids = list(ticket_map.keys())
                
                # Promotions
                cursor.execute("SELECT promotion_id FROM dim_promotion")
                promotions = [row[0] for row in cursor.fetchall()]
                
                # Cinemas
                cursor.execute("SELECT cinema_id FROM dim_cinema")
                cinema_ids = [row[0] for row in cursor.fetchall()]

                # Generate in batches
                for batch_start in range(0, NUM_TRANSACTIONS, BATCH_SIZE):
                    start_time = time.time()
                    batch_size = min(BATCH_SIZE, NUM_TRANSACTIONS - batch_start)
                    
                    # Generate transaction data
                    transactions = []
                    trans_tickets = []
                    online_trans = []
                    offline_trans = []
                    
                    for _ in range(batch_size):
                        # Random customer
                        customer_id, dob = random.choice(customers)
                        
                        # Random date
                        trans_date = fake.date_time_between_dates(
                            datetime(2014, 1, 1), datetime(2024, 12, 31))
                        time_id = date_to_time.get(trans_date.date())
                        
                        # Random tickets
                        num_tickets = random.randint(1, 5)
                        selected_tickets = random.sample(ticket_ids, num_tickets)
                        total_price = sum(ticket_map[t] for t in selected_tickets)
                        
                        # Build transaction
                        transactions.append((
                            customer_id,
                            random.choice([None] + promotions),
                            total_price,
                            random.choice(['Credit', 'Debit', 'Cash', 'Online']),
                            trans_date,
                            time_id,
                            trans_date.year - dob.year
                        ))
                        
                        # Store tickets
                        trans_tickets.extend(selected_tickets)
                    
                    # Insert transactions
                    execute_values(
                        cursor,
                        """INSERT INTO fact_transaction 
                        (customer_id, promotion_id, total_price, pay_method,
                            transaction_date, time_id, age_at_transaction)
                        VALUES %s RETURNING transaction_id""",
                        transactions,
                        page_size=1000,
                        fetch=True
                    )
                    trans_ids = [row[0] for row in cursor.fetchall()]
                    
                    # Insert tickets
                    ticket_data = [
                        (trans_id, ticket_id)
                        for trans_id, ticket_id in zip(trans_ids, trans_tickets)
                    ]
                    execute_values(
                        cursor,
                        """INSERT INTO fact_transaction_ticket
                        (transaction_id, ticket_id) VALUES %s""",
                        ticket_data,
                        page_size=1000
                    )
                    
                    # Split online/offline
                    for trans_id in trans_ids:
                        if random.random() < 0.3:
                            online_trans.append((
                                trans_id,
                                random.choice(['Windows', 'MacOS', 'Linux']),
                                random.choice(['Chrome', 'Firefox', 'Safari'])
                            ))
                        else:
                            offline_trans.append((
                                trans_id,
                                random.choice(cinema_ids)
                            ))
                    
                    # Insert online/offline
                    if online_trans:
                        execute_values(
                            cursor,
                            """INSERT INTO fact_online_transaction
                            (transaction_id, system_used, browser) VALUES %s""",
                            online_trans,
                            page_size=1000
                        )
                    if offline_trans:
                        execute_values(
                            cursor,
                            """INSERT INTO fact_offline_transaction
                            (transaction_id, cinema_id) VALUES %s""",
                            offline_trans,
                            page_size=1000
                        )
                    
                    conn.commit()
                    elapsed = time.time() - start_time
                    print(f"Processed batch {batch_start//BATCH_SIZE + 1} "
                          f"({batch_size} records) in {elapsed:.2f}s")
              

if __name__ == "__main__":
    print("Starting data generation...")
    
    # Create tables if needed
    create_tables()
    
    # Populate static dimensions first
    populate_static_dimensions()
    generate_time_dimension()
    generate_customers()
    generate_movies_and_relations()
    generate_showings_and_tickets()
    
    # Generate transactions
    generate_transactions()
    
    print("Data generation complete!")

Starting data generation...
Data generation complete!


In [9]:
def backfill_missing_transactions():
    # Get all cinema IDs for offline transactions
    with get_db_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute("SELECT cinema_id FROM dim_cinema")
            cinema_ids = [row[0] for row in cursor.fetchall()]
    
    # Batch settings
    BATCH_SIZE = 10000
    
    with get_db_connection() as conn:
        with conn.cursor() as cursor:
            # Process in batches
            offset = 0
            while True:
                # Fetch a batch of missing transactions
                cursor.execute("""
                    SELECT ft.transaction_id
                    FROM fact_transaction ft
                    LEFT JOIN (
                        SELECT transaction_id FROM fact_online_transaction
                        UNION ALL
                        SELECT transaction_id FROM fact_offline_transaction
                    ) AS existing ON ft.transaction_id = existing.transaction_id
                    WHERE existing.transaction_id IS NULL
                    LIMIT %s OFFSET %s
                """, (BATCH_SIZE, offset))
                batch = [row[0] for row in cursor.fetchall()]
                
                if not batch:
                    break  # No more missing transactions
                
                # Split into online/offline (30%/70%)
                online = []
                offline = []
                
                for trans_id in batch:
                    if random.random() < 0.3:  # 30% online
                        online.append((
                            trans_id,
                            random.choice(['Windows', 'MacOS', 'Linux']),
                            random.choice(['Chrome', 'Firefox', 'Safari'])
                        ))
                    else:  # 70% offline
                        offline.append((
                            trans_id,
                            random.choice(cinema_ids)
                        ))
                
                # Insert online transactions
                if online:
                    execute_values(
                        cursor,
                        """INSERT INTO fact_online_transaction 
                           (transaction_id, system_used, browser) VALUES %s""",
                        online,
                        page_size=1000
                    )
                
                # Insert offline transactions
                if offline:
                    execute_values(
                        cursor,
                        """INSERT INTO fact_offline_transaction 
                           (transaction_id, cinema_id) VALUES %s""",
                        offline,
                        page_size=1000
                    )
                
                conn.commit()
                print(f"Processed batch {offset//BATCH_SIZE + 1}")
                offset += BATCH_SIZE

if __name__ == "__main__":
    backfill_missing_transactions()
    print("Backfill complete!")

Backfill complete!


In [10]:

def fix_missing_tickets_batched(batch_size=10000):
    with psycopg2.connect(**DB_CONFIG) as conn:
        with conn.cursor() as cursor:
            # Step 1: Get all ticket IDs once
            cursor.execute("SELECT ticket_id FROM dim_ticket")
            ticket_ids = [row[0] for row in cursor.fetchall()]
            print(f"Loaded {len(ticket_ids)} ticket IDs")

            # Step 2: Process in batches
            offset = 0
            total_fixed = 0
            
            while True:
                # Fetch a batch of missing transactions
                cursor.execute("""
                    SELECT ft.transaction_id
                    FROM fact_transaction ft
                    LEFT JOIN fact_transaction_ticket ftt 
                        ON ft.transaction_id = ftt.transaction_id
                    WHERE ftt.transaction_id IS NULL
                    ORDER BY ft.transaction_id
                    LIMIT %s OFFSET %s
                """, (batch_size, offset))
                
                batch = [row[0] for row in cursor.fetchall()]
                if not batch:
                    break  # Exit loop when no more results
                
                # Step 3: Generate ticket associations for this batch
                ticket_data = []
                for trans_id in batch:
                    num_tickets = random.randint(1, 5)
                    selected_tickets = random.sample(ticket_ids, num_tickets)
                    ticket_data.extend([(trans_id, tid) for tid in selected_tickets])
                
                # Step 4: Bulk insert for this batch
                execute_values(
                    cursor,
                    """INSERT INTO fact_transaction_ticket 
                       (transaction_id, ticket_id) VALUES %s
                       ON CONFLICT DO NOTHING""",
                    ticket_data,
                    page_size=1000
                )
                conn.commit()
                
                # Step 5: Update progress
                total_fixed += len(batch)
                print(f"Processed batch {offset//batch_size + 1}: "
                      f"Fixed {len(batch)} transactions "
                      f"(Total: {total_fixed}/~700,000)")
                
                offset += batch_size

if __name__ == "__main__":
    start_time = time.time()
    fix_missing_tickets_batched(batch_size=10000) 
    print(f"Total time: {time.time()-start_time:.2f} seconds")

Loaded 3578599 ticket IDs
Total time: 3.72 seconds
