Import the sqlite3 module and create a database by giving it a name and connect to it.

In [None]:
import sqlite3
conn = sqlite3.connect('signals.db')

Now we can create the schema of the database by using SQL commands, finished by a `commit()` function to effect the change.

The idea of using two separate tables is that the first table `Signals` keep a unique list of uuids for each signal created, so that even when a signal's name change over time, there is still a way to track its true identity in this table. The second table `SignalVersions` keeps track of transactions of filling/modifying various fields of the signals that come in different software versions. Each such transaction will be tagged with a `software_version` so we can track how the contents of the signal change over different software versions.

In [None]:
# Use SQLite syntax to create a database for the uuid, latest signal name, creation info, and creation date
c = conn.cursor()

# Signals Table
c.execute('''
    CREATE TABLE Signals(
        uuid TEXT PRIMARY KEY,
        latest_signal_name TEXT,
        creation_info TEXT,
        creation_date TEXT
    )
''')

# Use SQLite syntax to create a database for software versions
# SignalVersions Table
c.execute('''
    CREATE TABLE SignalVersions(
        id INTEGER PRIMARY KEY,
        signal_uuid TEXT,
        signal_name TEXT NOT NULL,
        unit TEXT,
        default_value REAL,
        size TEXT,
        software_version TEXT,
        FOREIGN KEY(signal_uuid) REFERENCES Signals(uuid)
    )
''')

conn.commit()

The main use-case of this tool is to import signals from external sources (e.g., csv files) and keep track of their versions. To import a csv file into a brand new database, we create the following function to:
* Create an auto-generated uuid for each signal
* Mark the creation date and creation info to future tracking purposes
* Import the specified signal fields into the database
* Tag all signals with a software version

In [None]:
# Define a function to import data into the database using a CSV file
# The CSV file should have the following columns:
#   - signal name
#   - unit
#   - default value
#   - size
# The CSV file should have a header row
# The software version will be specified as a separate argument
# The uuid will be generated automatically

import csv
import uuid
import datetime

def import_data(csv_file, software_version):
    c = conn.cursor()

    # Open the CSV file
    with open(csv_file, newline='') as f:
        reader = csv.DictReader(f)

        # Iterate over each row in the CSV file
        for row in reader:
            # Generate a new UUID for the signal
            signal_uuid = str(uuid.uuid4())
            # Add the signal version to the SignalVersions table
            c.execute('''
                INSERT INTO SignalVersions (signal_uuid, signal_name, unit, default_value, size, software_version)
                VALUES (?, ?, ?, ?, ?, ?)
                ''', (signal_uuid, row['signal name'], row['unit'], row['default value'], row['size'], software_version))
            # Add the signal to the Signals table
            c.execute('''
                INSERT INTO Signals (uuid, latest_signal_name, creation_info, creation_date)
                VALUES (?, ?, ?, ?)
                ''', (signal_uuid, row['signal name'], \
                    'Imported from CSV file ' + csv_file + ' with version as ' + software_version, \
                    datetime.datetime.now()))

    conn.commit()
    print("Data imported.")

# Import data from a CSV file
import_data('signals_from_csv.csv', '1.0.0')

In addition to importing from external files, which adds signals in batch, one can also manually add signals by first giving it an initial name and assign an auto-generated uuid to it. Signals created this way will have the `creation_info` as 'Manually added'.

In [None]:
# Define a function to add an auto-generated uuid to the Signals table
import uuid

def add_uuid(signal_name):
    """add a uuid to the Signals table
    
    This function adds a uuid to the Signals table, and also adds the signal name. The uuid is added using the uuid4 function from the uuid module.

    Args:
        signal_name (str): name of the signal
    
    Returns:
        None
    """
    c.execute('''
        INSERT INTO Signals(uuid, latest_signal_name, creation_info, creation_date)
        VALUES (?, ?, ?, ?)
    ''', (str(uuid.uuid4()), signal_name, 'Manually added', datetime.datetime.now()))
    conn.commit()

# Add five uuids to the Signals table
for i in range(5):
    add_uuid(f'signal_{i}')

Once the barebone uuids are manually added, we need to infuse content for these signals. Each addition of signal content requires also the information of `software_version` for version control purposes. If you are merely testing, you can set `software_version` simply to a string such as 'testing'.

By design, the database prevents adding duplicate transactions into the `SignalVersions` table where both the `signal_uuid` and the `software_version` fields are the same. This is enforced to ensure that for each unique signal, only one entry represents its content for a specific `software_version`. This makes sense because each unique signal should only have one representation for a specific `software_version`.

In [None]:
# Define a function to add a signal version entry to the SignalVersion table
# This function will use

def add_signal_version(signal_uuid, signal_name, unit, default_value, size, software_version):
    c = conn.cursor()

    # Check if the same uuid and software_version combination already exists in SignalVersions
    c.execute('''
        SELECT signal_uuid, software_version
        FROM SignalVersions
        WHERE signal_uuid = ? AND software_version = ?
        ''', (signal_uuid, software_version))
    existing_entry = c.fetchone()

    if existing_entry:
        print("Warning: Entry with the same UUID and software version already exists.")
        return existing_entry[0]  # Return the UUID of the existing signal

    # Add the signal version to the SignalVersions table
    c.execute('''
        INSERT INTO SignalVersions (signal_uuid, signal_name, unit, default_value, size, software_version)
        VALUES (?, ?, ?, ?, ?, ?)
        ''', (signal_uuid, signal_name, unit, default_value, size, software_version))

    conn.commit()
    return signal_uuid  # Return the UUID of the new signal


In most cases, manually adding a signal version entry is done by a human, and it is often the case that the human knows the signal of interest by its name, instead of its uuid. To facilitate finding the uuid of a signal given the signal name, the following help function is deviced:

In [None]:
# Look up the uuid of a signal by its latest_signal_name
def get_uuid(signal_name):
    c = conn.cursor()

    c.execute('''
        SELECT uuid
        FROM Signals
        WHERE latest_signal_name = ?
        ''', (signal_name,))
    uuid = c.fetchone()

    if uuid:
        return uuid[0]
    else:
        return None

With the above helper function, now we can manually add a signal version entry easily by looking up its uuid using name, then add the contents of the signal.

In [None]:
# Manually insert a few signal versions entry into the database
uuid_0 = get_uuid('signal_0')
uuid_1 = get_uuid('signal_1')

# software version 1.0.0
add_signal_version(uuid_0, 'signal_0_name_v1p0', 'kph', 1, '[1]', '1.0.0')
add_signal_version(uuid_1, 'signal_1_name_v1p0', 'ampere', 0, '[1]', '1.0.0')

# software version 1.1.0
add_signal_version(uuid_0, 'signal_0_name_v1p1', 'kph', 100, '[1]', '1.1.0')
add_signal_version(uuid_1, 'signal_1_name_v1p1', 'ampere', 10, '[1]', '1.1.0')

Now we show a few displaying scripts to see what is inside the database.

In [None]:
# retrerive all signals from the Signals table
c.execute('SELECT * FROM Signals')
c.fetchall()


In [None]:
# Retrieve all signals of all versions from the SignalVersions table
c.execute('SELECT * FROM SignalVersions')
c.fetchall()

In [None]:
# Retrieve all signal versions from the database for a specific signal given its uuid
uuid_0 = get_uuid('signal_0')
c.execute('SELECT * FROM SignalVersions WHERE signal_uuid = ?', (uuid_0,))
c.fetchall()


In [None]:
# Retrieve the latest version of a signal from the database
uuid_0 = get_uuid('signal_0')
c.execute('''
    SELECT * FROM SignalVersions
    WHERE signal_uuid = ?
    ORDER BY software_version DESC
    LIMIT 1
    ''', (uuid_0,))
c.fetchall()


In [None]:
# Define a function to retrieve the latest version of a signal from the database
def get_latest_signal(uuid):
    c = conn.cursor()

    c.execute('''
        SELECT * 
        FROM SignalVersions 
        WHERE signal_uuid = ? 
        ORDER BY software_version DESC 
        LIMIT 1
        ''', (uuid,))

    return c.fetchone()

get_latest_signal(uuid_0)

In [None]:
# Define a function to retrieve the latest version of all signals from the database
def get_all_latest_signals():
    c = conn.cursor()

    c.execute('''
        SELECT * 
        FROM SignalVersions 
        WHERE id IN (
            SELECT MAX(id)
            FROM SignalVersions
            GROUP BY signal_uuid
        )
        ''')

    return c.fetchall()

get_all_latest_signals()

In [None]:
# Define a function to retrieve a specific version of a signal from the database
def get_signal_version(uuid, software_version):
    c = conn.cursor()

    c.execute('''
        SELECT * 
        FROM SignalVersions 
        WHERE signal_uuid = ? AND software_version = ?
        ''', (uuid, software_version))

    return c.fetchone()

get_signal_version(uuid_0, '1.0.0')

In [None]:
# Define a function to retrieve all versions of a signal from the database
def get_all_signal_versions(uuid):
    c = conn.cursor()

    c.execute('''
        SELECT * 
        FROM SignalVersions 
        WHERE signal_uuid = ?
        ''', (uuid,))

    return c.fetchall()

get_all_signal_versions(uuid_0)

In [None]:
# Define a function to retrieve all signals of a specific version from the database
def get_all_signals_of_version(software_version):
    c = conn.cursor()

    c.execute('''
        SELECT * 
        FROM SignalVersions 
        WHERE software_version = ?
        ''', (software_version,))

    return c.fetchall()

get_all_signals_of_version('1.1.0')

In [None]:
# Define a function to clear the SignalVersions table
def clear_signal_versions_table():
    c = conn.cursor()

    # Clear the SignalVersions table
    c.execute('DELETE FROM SignalVersions')

    conn.commit()
    print("SignalVersions table cleared.")

# Example usage
clear_signal_versions_table()


In [None]:
# Export the schema of the database to a SQL file

def export_schema(db_file, output_file):
    conn = sqlite3.connect(db_file)
    c = conn.cursor()

    # Get the schema using a SQL query
    c.execute("SELECT sql FROM sqlite_master WHERE type='table';")
    schema = c.fetchall()

    # Write the schema to the output file
    with open(output_file, 'w') as file:
        for table_schema in schema:
            file.write(table_schema[0] + "\n")

    conn.close()
    print(f"Schema exported to {output_file}")

export_schema('signals.db', 'schema.sql')

In [None]:
import yaml

# Export the signals of a specific software version to a YAML file
def export_signals(database, software_version):
    conn = sqlite3.connect(database)
    c = conn.cursor()

    # Retrieve all signals of a specified software version
    c.execute('''
        SELECT s.latest_signal_name, sv.default_value, sv.unit, sv.size, s.uuid  
        FROM Signals s
        JOIN SignalVersions sv ON s.uuid = sv.signal_uuid
        WHERE sv.software_version = ?
        ''', (software_version,))

    signals = c.fetchall()

    # Convert the signals into a list of lists for the YAML export
    signals_list = [
        [{"signal_name": signal_name}, {"default_value": default_value}, {"unit": unit}, {"size": size}, {"uuid": uuid}] 
        for signal_name, default_value, unit,  size, uuid  in signals
    ]

    # Export the signals to a YAML file
    with open(f'signals_{software_version}.yaml', 'w') as f:
        yaml.dump(signals_list, f)


export_signals('signals.db','1.0.0')

In [None]:
import csv

# Export the signals of a specific software version to a CSV file
def export_signals_to_csv(database, software_version):
    conn = sqlite3.connect(database)
    c = conn.cursor()

    # Retrieve all signals of a specified software version
    c.execute('''
        SELECT s.latest_signal_name, sv.default_value, sv.unit, sv.size, s.uuid  
        FROM Signals s
        JOIN SignalVersions sv ON s.uuid = sv.signal_uuid
        WHERE sv.software_version = ?
        ''', (software_version,))

    signals = c.fetchall()

    # Export the signals to a CSV file
    with open(f'signals_{software_version}.csv', 'w') as f:
        writer = csv.writer(f)
        writer.writerow(['signal_name', 'default_value', 'unit', 'size', 'uuid'])
        writer.writerows(signals)

export_signals_to_csv('signals.db','1.0.0')

In [None]:
# Create a database from a schema file
def create_tables_from_schema(db_file, schema_file):
    conn = sqlite3.connect(db_file)
    c = conn.cursor()

    # Read the schema file and execute the SQL commands
    with open(schema_file, 'r') as file:
        schema_sql = file.read()

        # Check for missing semicolons between statements
        schema_sql = schema_sql.replace('\nCREATE', ';\nCREATE')

        c.executescript(schema_sql)

    conn.commit()
    conn.close()
    print("Tables created successfully.")

create_tables_from_schema('signals_3.db', 'schema.sql')

In [None]:
# A function to add a column to an existing table

def add_column(db_file, table_name, column_name, column_type):
    conn = sqlite3.connect(db_file)
    c = conn.cursor()

    # Add a new column to the table
    c.execute(f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_type};")

    conn.commit()
    conn.close()
    print(f"Column '{column_name}' added to table '{table_name}' successfully.")

# Example usage
db_file = 'signals_3.db'
table_name = 'SignalVersions'
column_name = 'is_constant'
column_type = 'INTEGER'
add_column(db_file, table_name, column_name, column_type)


In [None]:
# A function to rename a column in an existing table

def rename_column(db_file, table_name, old_column_name, new_column_name):
    conn = sqlite3.connect(db_file)
    c = conn.cursor()

    # Rename the column
    c.execute(f"ALTER TABLE {table_name} RENAME COLUMN {old_column_name} TO {new_column_name};")

    conn.commit()
    conn.close()
    print(f"Column '{old_column_name}' renamed to '{new_column_name}' in table '{table_name}' successfully.")

# Example usage
db_file = 'signals_3.db'
table_name = 'SignalVersions'
old_column_name = 'is_constant'
new_column_name = 'is_constant_signal'
rename_column(db_file, table_name, old_column_name, new_column_name)

In [None]:
# A function to delete a column from an existing table

def delete_column(db_file, table_name, column_name):
    conn = sqlite3.connect(db_file)
    c = conn.cursor()

    # Check if the column exists in the table
    c.execute(f"PRAGMA table_info({table_name})")
    table_info = c.fetchall()
    column_exists = any(column[1] == column_name for column in table_info)

    if column_exists:
        # Create a temporary table without the specified column
        c.execute(f"CREATE TABLE temp_table AS SELECT * FROM {table_name}")
        c.execute(f"DROP TABLE {table_name}")

        # Rename the temporary table to the original table name
        c.execute(f"ALTER TABLE temp_table RENAME TO {table_name}")

        conn.commit()
        conn.close()
        print(f"Column '{column_name}' deleted from table '{table_name}' successfully.")
    else:
        print(f"Column '{column_name}' does not exist in table '{table_name}'.")

# Example usage
db_file = 'signals_3.db'
table_name = 'SignalVersions'
column_name = 'is_constant_signal'
delete_column(db_file, table_name, column_name)


In [None]:
# Close the connection to the database
conn.close()