### Create DuckDB

In [8]:
import duckdb

# Create a new DuckDB database (or connect to an existing one)
conn = duckdb.connect('redshift_queries.duckdb')

# Define the schema and create the table
create_table_query = """
CREATE TABLE live_queries (
    instance_id VARCHAR,
    cluster_size INTEGER,
    user_id VARCHAR,
    database_id VARCHAR,
    query_id VARCHAR,
    arrival_timestamp TIMESTAMP,
    compile_duration_ms INTEGER,
    queue_duration_ms INTEGER,
    execution_duration_ms INTEGER,
    feature_fingerprint VARCHAR,
    was_aborted BOOLEAN,
    was_cached BOOLEAN,
    cache_source_query_id VARCHAR,
    query_type VARCHAR,
    num_permanent_tables_accessed INTEGER,
    num_external_tables_accessed INTEGER,
    num_system_tables_accessed INTEGER,
    read_table_ids VARCHAR,
    write_table_ids VARCHAR,
    mbytes_scanned INTEGER,
    mbytes_spilled INTEGER,
    num_joins INTEGER,
    num_scans INTEGER,
    num_aggregations INTEGER,
    dataset_type VARCHAR,
);
"""

# Execute the query to create the table
conn.execute("DROP TABLE IF EXISTS live_queries")
conn.execute(create_table_query)

conn.close()

print("DuckDB database and table created successfully.")


DuckDB database and table created successfully.


In [9]:
conn = duckdb.connect('redshift_queries.duckdb')
#add parquet file to duckdb combined_sorted_redset_datasets.parquet that contains the data
insert_query = """
INSERT INTO live_queries
SELECT * FROM parquet_scan('combined_sorted_redset_datasets.parquet')
"""

# Execute the query to insert the data  
conn.execute(insert_query)

# Close the connection
conn.close()

In [None]:
import pandas as pd
import duckdb

def load_and_clean_data(csv_file):
    df = pd.read_csv(csv_file)

    # Define conversion functions
    def convert_to_string(s):
        try:
            if pd.isna(s) or s == '':
                return None
            return str(s).strip()
        except Exception:
            return None

    def convert_to_integer(s):
        try:
            if pd.isna(s) or s == '':
                return None
            return int(float(s))
        except Exception:
            return None

    def convert_to_float(s):
        try:
            if pd.isna(s) or s == '':
                return None
            return float(s)
        except Exception:
            return None

    def convert_to_datetime(s):
        datetime_formats = [
            '%Y-%m-%d %H:%M:%S.%f',  # Format in your data
            '%Y-%m-%d %H:%M:%S',
            '%Y-%m-%d',
            '%d-%m-%Y',
            '%m/%d/%Y',
            '%Y/%m/%d',
            '%d-%b-%Y'
        ]
        for fmt in datetime_formats:
            try:
                return pd.to_datetime(s, format=fmt)
            except (ValueError, TypeError):
                continue
        return pd.to_datetime(s, errors='coerce')

    def convert_to_boolean(s):
        if pd.isna(s) or s == '':
            return None
        if str(s).lower() in ['true', '1', 'yes']:
            return True
        elif str(s).lower() in ['false', '0', 'no']:
            return False
        else:
            return None

    # Define column conversions
    column_conversions = {
        'instance_id': convert_to_string,
        'cluster_size': convert_to_integer,
        'user_id': convert_to_string,
        'database_id': convert_to_string,
        'query_id': convert_to_string,
        'arrival_timestamp': convert_to_datetime,
        'compile_duration_ms': convert_to_integer,
        'queue_duration_ms': convert_to_integer,
        'execution_duration_ms': convert_to_integer,
        'feature_fingerprint': convert_to_string,
        'was_aborted': convert_to_boolean,
        'was_cached': convert_to_boolean,
        'cache_source_query_id': convert_to_string,
        'query_type': convert_to_string,
        'num_permanent_tables_accessed': convert_to_integer,
        'num_external_tables_accessed': convert_to_integer,
        'num_system_tables_accessed': convert_to_integer,
        'read_table_ids': convert_to_string,
        'write_table_ids': convert_to_string,
        'mbytes_scanned': convert_to_integer,
        'mbytes_spilled': convert_to_integer,
        'num_joins': convert_to_integer,
        'num_scans': convert_to_integer,
        'num_aggregations': convert_to_integer,
        'dataset_type': convert_to_string,
    }

    # Apply conversions
    for col, func in column_conversions.items():
        if col in df.columns:
            df[col] = df[col].apply(func)
        else:
            print(f"Warning: Column '{col}' not found in the DataFrame.")
    
    # Optionally, drop rows with missing essential values (e.g., 'instance_id', 'query_id')
    # df.dropna(subset=['instance_id', 'query_id'], inplace=True)

    return df

def insert_into_duckdb(df, duckdb_file='cleaned_redshift_data_001.duckdb'):
    # Connect to DuckDB
    conn = duckdb.connect(duckdb_file)

    # Insert data into the existing 'queries' table
    # If the table doesn't exist, you can create it here or ensure it's created beforehand
    try:
        conn.execute("INSERT INTO queries SELECT * FROM df")
    except duckdb.Error as e:
        print("An error occurred while inserting data into DuckDB:", e)
        print("Attempting to create the table and retry insertion.")
        # Define the schema and create the table if it doesn't exist
        create_table_query = """
        CREATE TABLE queries (
            instance_id VARCHAR,
            cluster_size INTEGER,
            user_id VARCHAR,
            database_id VARCHAR,
            query_id VARCHAR,
            arrival_timestamp TIMESTAMP,
            compile_duration_ms INTEGER,
            queue_duration_ms INTEGER,
            execution_duration_ms INTEGER,
            feature_fingerprint VARCHAR,
            was_aborted BOOLEAN,
            was_cached BOOLEAN,
            cache_source_query_id VARCHAR,
            query_type VARCHAR,
            num_permanent_tables_accessed INTEGER,
            num_external_tables_accessed INTEGER,
            num_system_tables_accessed INTEGER,
            read_table_ids VARCHAR,
            write_table_ids VARCHAR,
            mbytes_scanned INTEGER,
            mbytes_spilled INTEGER,
            num_joins INTEGER,
            num_scans INTEGER,
            num_aggregations INTEGER
        );
        """
        conn.execute(create_table_query)
        conn.execute("INSERT INTO queries SELECT * FROM df")

    # Close the connection
    conn.close()

def main():
    csv_file = 'sample_0.01.csv'
    df = load_and_clean_data(csv_file)
    insert_into_duckdb(df)
    print("Data successfully loaded, cleaned, and inserted into DuckDB.")

if __name__ == "__main__":
    main()


In [None]:
from confluent_kafka import Consumer
import duckdb
import pandas as pd
import json
import random

def run_consumer():
    # Kafka Configuration
    kafka_config = {
        'bootstrap.servers': 'dep-eng-data-s-heimgarten.hosts.utn.de:9092',
        'group.id': f'hello_group_{random.randint(1, 1000000)}',
        'auto.offset.reset': 'latest'
    }
    
    # Create Consumer
    consumer = Consumer(kafka_config)
    
    # Subscribe to Topic
    topic = 'chache-me-if-you-can'
    consumer.subscribe([topic])
    
    # Connect to DuckDB
    conn = duckdb.connect('my_database.duckdb')

    # Create a list to store valid rows
    valid_rows = []

    try:
        while True:
            # Wait for messages (timeout 1 second)
            msg = consumer.poll(1.0)
            
            if msg is None:
                continue
                
            if msg.error():
                print(f'Consumer error: {msg.error()}')
                continue
                
            # Decode the message value
            message = msg.value().decode("utf-8")
            print(f"Received message: {message}")
            row = json.loads(message)  # Convert JSON string to dictionary
            
            # Check for missing values
            if all(value is not None for value in row.values()):
                valid_rows.append(row)  # Append valid rows to the list
            
            # If we have a batch of valid rows, save them to DuckDB
            if len(valid_rows) >= 10:  # Adjust batch size as needed
                df = pd.DataFrame(valid_rows)
                df.to_sql('queries', conn, if_exists='append', index=False)
                valid_rows = []  # Reset the list after saving
                print("Batch of rows saved to DuckDB.")

    except KeyboardInterrupt:
        print("Consumer is shutting down...")
    finally:
        # Save any remaining valid rows
        if valid_rows:
            df = pd.DataFrame(valid_rows)
            df.to_sql('queries', conn, if_exists='append', index=False)
        consumer.close()
        conn.close()
        print("Consumer and DuckDB connection closed.")

if __name__ == '__main__':
    run_consumer()
