### Function definitions (Algorithms)

In [17]:
from itertools import combinations, chain
import random
import string

def compute_closure(attributes, fds) -> set:
    """
    Compute the closure of a set of attributes under a set of functional dependencies
    ---------------------------------------------------------------------------------
    attributes: a set of attributes
    fds: a list of functional dependencies (contains tuples of two sets. First set implies the second set)
    """
    closure = set(attributes)
    changed = True
    while changed:
        changed = False
        for fd in fds:
            if fd[0].issubset(closure) and not fd[1].issubset(closure):
                closure.update(fd[1])
                changed = True
    return closure

def compute_all_closures(attributes, fds) -> dict:
    """
    Compute the closure of all possible subsets of a set of attributes
    ------------------------------------------------------------------
    attributes: a set of attributes
    fds: a list of functional dependencies (contains tuples of two sets. First set implies the second set)
    """
    all_closures = {}
    for r in range(1, len(attributes) + 1):
        for subset in combinations(attributes, r):
            subset_closure = compute_closure(set(subset), fds)
            all_closures[tuple(subset)] = subset_closure
    return all_closures

def compute_candidate_keys(closure_set, attributes) -> list:
    """
    Compute the candidate keys of a set of attributes
    -------------------------------------------------
    closure_set: a dictionary of all closures
    attributes: a set of attributes
    """
    super_keys = []
    for i in closure_set:
        if set(closure_set[i]) == set(attributes):
            super_keys.append(i)
    candidate_keys = []
    for j in super_keys:
        flag = False
        for i in super_keys:
            if set(i) != set(j):
                if set(i).issubset(set(j)):
                    flag = True
        if flag == False:
            candidate_keys.append(j)
    return candidate_keys

def find_prime_attributes(candidate_keys) -> set:
    """
    Find the prime attributes of a set of candidate keys
    ----------------------------------------------------
    candidate_keys: a list of candidate keys
    """
    prime_attributes = set()
    for key in candidate_keys:
        prime_attributes.update(key)
    return prime_attributes

def compute_single_covers(attributes, fds) -> dict:
    """
    Compute the closure of each attribute in a set of attributes
    ------------------------------------------------------------
    attributes: a set of attributes
    fds: a list of functional dependencies (contains tuples of two sets. First set implies the second set)
    """
    all_closures = {}
    for a in attributes:
        subset_closure = compute_closure(a, fds)
        all_closures[a] = subset_closure
    return all_closures

def project_dependency(fds, R_hat) -> list:
    """
    Project a set of functional dependencies on a set of attributes
    ---------------------------------------------------------------
    fds: a list of functional dependencies (contains tuples of two sets. First set implies the second set)
    R_hat: a set of attributes
    """
    fds_hat = []
    for fd in fds:
        if fd[0].issubset(R_hat):
            y = fd[1].intersection(R_hat)
            if len(y)>0:
                fds_hat.append((fd[0],y))
    for fd in fds_hat:
        if fd[0] == fd[1]:
            fds_hat.remove(fd)
    return fds_hat

## Minimal cover computation

def decompose_fds(fds) -> list:
    """Decompose each FD so that the RHS contains only one attribute.
    For example, the FD {A} -> {B, C} will be decomposed into {A} -> {B} and {A} -> {C}.
    ------------------------------------------------------------------------------------
    fds: a list of functional dependencies (contains tuples of two sets. First set implies the second set)
    """
    decomposed_fds = []
    for lhs, rhs in fds:
        for attr in rhs:
            decomposed_fds.append((lhs, {attr}))
    return decomposed_fds

def remove_trivial_dependencies(fds) -> list:
    """Remove trivial FDs of the form A -> A.
    -----------------------------------------
    fds: a list of functional dependencies (contains tuples of two sets. First set implies the second set)
    """
    return [(lhs, rhs) for lhs, rhs in fds if lhs != rhs]

def remove_redundant_dependencies(fds) -> list:
    """Remove redundant FDs by checking if we can infer a FD from others.
    ---------------------------------------------------------------------
    fds: a list of functional dependencies (contains tuples of two sets. First set implies the second set)
    """
    fds_ = fds.copy()
    len_fds_1 = len(fds_)
    len_fds_2 = 0
    while len_fds_1>len_fds_2:
        len_fds_1 = len(fds_)
        for i, (lhs, rhs) in enumerate(fds_):
            remaining_fds = fds_[:i] + fds_[i+1:]
            closure_lhs = compute_closure(lhs, remaining_fds)
            if rhs.issubset(closure_lhs):
                fds_.remove((lhs, rhs))
        len_fds_2 = len(fds_)
    return fds_

def merge_fds(fds) -> list:
    """Merge FDs with the same LHS back together.
    --------------------------------------------
    fds: a list of functional dependencies (contains tuples of two sets. First set implies the second set)
    """
    merged_fds = {}
    for lhs, rhs in fds:
        lhs = tuple(lhs)
        if lhs in merged_fds:
            merged_fds[lhs].update(rhs)
        else:
            merged_fds[lhs] = set(rhs)
    
    return [(set(lhs), rhs) for lhs, rhs in merged_fds.items()]

def powerset(iterable):
    """Generate all non-empty proper subsets of a set."""
    s = list(iterable)
    combs = [[i for i in combinations(s, r)] for r in range(1, len(s)+1)]
    return [x for xs in combs for x in xs]

def remove_superfluous_lhs(fds, p):
    """
    Simplify the LHS by checking if any proper subset of the LHS can imply the RHS.
    --------------------------------------------------------------------------------
    fds: a list of functional dependencies (contains tuples of two sets. First set implies the second set)
    p: probability of choosing a random minimal lhs
    """
    minimal_fds = []
    for lhs, rhs in fds:
        minimal_lhs = lhs
        min_sub = 10000
        minimals = []
        for subset in powerset(lhs):
            if len(subset) <= min_sub:
                if rhs.issubset(compute_closure(set(subset), fds)):
                    minimal_lhs = set(subset)
                    min_sub = len(subset)
                    minimals.append(minimal_lhs)
        if len(minimals)>1 and random.randint(0, 10) <= p*10:
            minimal_lhs = set(random.choice(minimals))
        else:
            minimal_lhs = minimals[0]
            
        minimal_fds.append((minimal_lhs, rhs))
    return minimal_fds

def minimal_cover(fds, p = 0.5) -> list:
    """Find the minimal cover of a set of FDs.
    -----------------------------------------
    attributes: a set of attributes
    fds: a list of functional dependencies (contains tuples of two sets. First set implies the second set)
    """
    # Step 1: Decompose the RHS
    decomposed_fds = decompose_fds(fds)

    # Step 2: Simplify LHS
    simplified_fds = remove_superfluous_lhs(decomposed_fds, p)

    # Step 3: Remove trivial dependencies (A -> A)
    simplified_fds = remove_trivial_dependencies(simplified_fds)

    # Step 4: Remove redundant FDs
    simplified_fds = remove_redundant_dependencies(simplified_fds)
    
    # Step 5: Recollect FDs with the same LHS
    minimal_fds = merge_fds(simplified_fds)
    
    return minimal_fds

### Example usage

In [18]:
attributes = {'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H'}
fds = [
    ({'A'}, {'C', 'E'}),
    ({'A', 'B'}, {'D'}),
    ({'F'}, {'H'}),
    ({'C', 'E'}, {'A'}),
    ({'A', 'B', 'F'}, {'D', 'G'}),
    ({'B', 'C', 'E', 'F'}, {'G'})
]

In [19]:
all_closures = compute_all_closures(attributes, fds)

In [20]:
for k in all_closures:
    print('{k}+ = {v}'.format(k=set(k), v=all_closures[k]))

{'D'}+ = {'D'}
{'H'}+ = {'H'}
{'G'}+ = {'G'}
{'F'}+ = {'F', 'H'}
{'C'}+ = {'C'}
{'B'}+ = {'B'}
{'A'}+ = {'A', 'E', 'C'}
{'E'}+ = {'E'}
{'D', 'H'}+ = {'D', 'H'}
{'G', 'D'}+ = {'G', 'D'}
{'D', 'F'}+ = {'D', 'F', 'H'}
{'D', 'C'}+ = {'D', 'C'}
{'D', 'B'}+ = {'D', 'B'}
{'D', 'A'}+ = {'D', 'E', 'C', 'A'}
{'D', 'E'}+ = {'D', 'E'}
{'G', 'H'}+ = {'G', 'H'}
{'F', 'H'}+ = {'F', 'H'}
{'C', 'H'}+ = {'C', 'H'}
{'B', 'H'}+ = {'B', 'H'}
{'A', 'H'}+ = {'A', 'H', 'C', 'E'}
{'H', 'E'}+ = {'H', 'E'}
{'G', 'F'}+ = {'G', 'F', 'H'}
{'G', 'C'}+ = {'G', 'C'}
{'G', 'B'}+ = {'G', 'B'}
{'G', 'A'}+ = {'G', 'A', 'E', 'C'}
{'G', 'E'}+ = {'G', 'E'}
{'C', 'F'}+ = {'C', 'F', 'H'}
{'B', 'F'}+ = {'B', 'F', 'H'}
{'A', 'F'}+ = {'C', 'E', 'H', 'A', 'F'}
{'F', 'E'}+ = {'F', 'H', 'E'}
{'C', 'B'}+ = {'C', 'B'}
{'C', 'A'}+ = {'C', 'E', 'A'}
{'C', 'E'}+ = {'C', 'E', 'A'}
{'A', 'B'}+ = {'C', 'D', 'B', 'A', 'E'}
{'B', 'E'}+ = {'B', 'E'}
{'A', 'E'}+ = {'A', 'E', 'C'}
{'G', 'D', 'H'}+ = {'G', 'D', 'H'}
{'D', 'F', 'H'}+ = {'D', 'F', 

In [22]:
compute_candidate_keys(all_closures, attributes)

[('F', 'B', 'A'), ('F', 'C', 'B', 'E')]

In [23]:
find_prime_attributes(compute_candidate_keys(all_closures, attributes))

{'A', 'B', 'C', 'E', 'F'}

In [24]:
minimal_fds = minimal_cover(fds, p = 0.5)
for lhs, rhs in minimal_fds:
    print(f"{lhs} -> {rhs}")

{'A'} -> {'C', 'E'}
{'F'} -> {'H'}
{'C', 'E'} -> {'A'}
{'A', 'B'} -> {'D'}
{'C', 'F', 'B', 'E'} -> {'G'}


### Dependency projection example

In [28]:
R_hat = {'A', 'B', 'C', 'D', 'E'}
project_dependency(fds, R_hat)

[({'A'}, {'C', 'E'}), ({'A', 'B'}, {'D'}), ({'C', 'E'}, {'A'})]

### Random FDs

In [32]:
def make_attribute_names(n: int) -> list[str]:
    """
    Build n attribute names with UPPER and lower case letters:
    """
    upper = list(string.ascii_uppercase)   # 26
    lower = list(string.ascii_lowercase)   # 26

    base = []   
    for u in upper:
        base.append(u)
    for l in lower:
        base.append(l)
    names = []
    k = 0
    while len(names) < n:
        for idx, sym in enumerate(base):
            if len(names) >= n:
                break
            suffix = "" if k == 0 else str(k)
            names.append(f"{sym}{suffix}")
        k += 1

    return names[:n]

def generate_random_fds(
    n_vars: int,
    num_fds: int,
    max_lhs_size: int = 3,
    max_rhs_size: int = 3,
    allow_trivial: bool = False,
    seed: int = None) -> tuple[set[str], list[tuple[set[str], set[str]]]]:
    """
    Generate random functional dependencies over n_vars attributes.

    Parameters
    ----------
    n_vars : number of attributes (>= 1)
    num_fds : number of FDs to generate; default is a random integer in [n_vars, 2*n_vars]
    max_lhs_size : maximum size of LHS for any FD (>=1)
    max_rhs_size : maximum size of RHS for any FD (>=1)
    allow_trivial : if False (default), ensure RHS has at least one attribute not in LHS
    seed : optional random seed for reproducibility

    Returns
    -------
    (attributes, fds)
      attributes : set of attribute names (strings)
      fds        : list of (set(lhs), set(rhs)) tuples
    """
    if n_vars < 1:
        raise ValueError("n_vars must be >= 1")
    if max_lhs_size < 1 or max_rhs_size < 1:
        raise ValueError("max_lhs_size and max_rhs_size must be >= 1")

    rng = random.Random(seed)

    attr_list = make_attribute_names(n_vars)
    attributes = set(attr_list)

    if num_fds is None:
        num_fds = rng.randint(n_vars, 2 * n_vars)

    fds = []
    seen = set()  # for deduplication
    attempts = 0
    max_attempts = num_fds * 20  # generous cap to avoid infinite loops

    while len(fds) < num_fds and attempts < max_attempts:
        attempts += 1

        lhs_size = rng.randint(1, min(max_lhs_size, n_vars))
        rhs_size = rng.randint(1, min(max_rhs_size, n_vars))

        lhs = set(rng.sample(attr_list, lhs_size))

        # Start with a random RHS
        rhs = set(rng.sample(attr_list, rhs_size))

        # Enforce non-triviality if requested: ensure RHS has something outside LHS
        if not allow_trivial:
            if rhs.issubset(lhs):
                outside = list(attributes - lhs)
                if not outside:
                    # Can't make a non-trivial FD if all attributes are in LHS
                    continue
                # choose at least 1 attribute from outside lhs
                k = rng.randint(1, min(rhs_size, len(outside)))
                rhs = set(rng.sample(outside, k))
            else:
                # keep overlap but ensure at least one outside-LHS attribute exists
                # (this keeps some partially overlapping FDs like {A} -> {A, B})
                pass

        # Disallow empty RHS (shouldn't happen with the above, but be safe)
        if not rhs:
            continue

        # Normalize for deduplication
        key = (tuple(sorted(lhs)), tuple(sorted(rhs)))
        if key in seen:
            continue

        seen.add(key)
        fds.append((set(key[0]), set(key[1])))

    return attributes, fds

In [37]:
attributes, fds = generate_random_fds(
    n_vars=8,
    num_fds=5,
    max_lhs_size=4,
    max_rhs_size=3,
    allow_trivial=False,
    # seed=32,  # remove or change for different draws
)

print("Attributes:", attributes)
print("FDs:")
for lhs, rhs in fds:
    print(f"{lhs} -> {rhs}")

Attributes: {'D', 'H', 'G', 'F', 'C', 'B', 'A', 'E'}
FDs:
{'E', 'H'} -> {'D'}
{'E'} -> {'G', 'F', 'H'}
{'A', 'B', 'C'} -> {'D', 'F'}
{'G', 'C'} -> {'E'}
{'A', 'F', 'E', 'C'} -> {'A', 'F', 'D'}
