In [3]:
from itertools import chain, combinations, permutations, product

## Q1a. Determine the closure of a given set of attribute S the schema R and functional dependency F
def closure(R, F, S):
    """
    Compute closure of an att_set based on the fd_set
    """
    closed = set(S)
    unused = F.copy()
    changed = True
    while changed:
        changed = False
        for fd in unused.copy():
            # lhs of current fd is a subset of att_set closed
            if set(fd[0]).issubset(closed):
                closed.update(set(fd[1]))
                # early termination
                if len(closed) == len(R):
                    break
                unused.remove(fd)
                changed = True
            
    return list(closed)

## Q1b. Determine all attribute closures excluding superkeys that are not candidate keys given the schema R and functional dependency F
def all_closures(R, F): 
    """
    Compute closures of all subsets of the fd_set excluding super keys that are not candidate keys
    """
    all_subsets = get_all_subsets(R)
    result = []

    all_subsets_copy = all_subsets.copy()

    while len(all_subsets_copy) > 0:
        attr_set = all_subsets_copy.pop(0)
        attr_set_closure = closure(R, F, attr_set)
        if len(attr_set_closure) == 0:
            continue
        if set(attr_set_closure) == set(R):
            all_subsets_copy = [attr_set_copy for attr_set_copy in all_subsets_copy if not set(attr_set_copy).issuperset(set(attr_set))]
    
        result.append([list(attr_set), attr_set_closure])

    return result
    
## Q2a. Return a minimal cover of the functional dependencies of a given schema R and functional dependencies F.
def min_cover(R, FD): 
    """
    Compute a minimal cover of the given fd_set according to the following steps:
    1. Minimize rhs of all FDs
    2. Minimize lhs of all FDs
    3. Remove redundant FDs
    """
    result = minimize_rhs_all_fds(FD)
    result = minimize_lhs_all_fds(R, result)
    minimal_cover = remove_redundant_fds(R, result)
    return minimal_cover

## Q2b. Return all minimal covers reachable from the functional dependencies of a given schema R and functional dependencies F.
def min_covers(R, FD):
    """
    Compute all minimal covers reachable from the given FD according to the following steps:
    1. Minimize rhs of all FDs
    2. Get all possible combinations of left-minimized FDs
    3. For each combination which is a left-minimized FD, get unique minimal covers from all of its permutations
    """
    # minimize rhs
    rminimized_fd_set = minimize_rhs_all_fds(FD)
    all_lminimized = {}
    # compute all possible minimized subsets of each lhs
    for lhs, rhs in rminimized_fd_set:
        all_lminimized[(tuple(lhs), tuple(rhs))] = set()

        min_len = len(lhs) 
        for lhs_subset in chain.from_iterable(combinations(lhs, r) for r in range(0, len(lhs) + 1)):
            if implies(R, list(lhs_subset), rhs, rminimized_fd_set) and len(lhs_subset) <= min_len:
                min_len = len(lhs_subset)
                all_lminimized[(tuple(lhs), tuple(rhs))].add(tuple(sorted(list(lhs_subset))))
    
    x = [list(lhs) for lhs in list(all_lminimized.values())]
    all_rhs = [rhs for _, rhs in rminimized_fd_set]
    # use cartesian product to get all possible left-minimized fd_set
    cartesian_product = [[list(x) for x in group] for group in list(product(*x))]
    all_lminimized_fd_set = [list(zip(product, all_rhs)) for product in cartesian_product]
    all_lminimized_fd_set = [[list(x) for x in group] for group in all_lminimized_fd_set]

    # for each left-minimized fd_set, get all unique minimal covers for different permutations
    all_minimal_covers = set()

    for lminimized_fd_set in all_lminimized_fd_set:
        for permutation in permutations(lminimized_fd_set, len(lminimized_fd_set)):
            minimum_cover = remove_redundant_fds(R, list(permutation))
            hashable_minimum_cover = get_hashable_fd_set(minimum_cover)
            all_minimal_covers.add(hashable_minimum_cover)

    return [[[list(lhs), list(rhs)] for lhs, rhs in min_cover] for min_cover in all_minimal_covers]

## Q2c. Return all minimal covers of a given schema R and functional dependencies F.
def all_min_covers(R, FD):
    """
    Get all minimal covers for a given FD according to the following steps:
    1. Compute FD+ from FD
    2. Remove trivial fd from FD+
    3. Apply min_covers on FD+ to obtain all minimal covers
    """
    all_minimal_covers = set()
    # compute fd_plus which is the set closure of fd_set
    fd_plus = set_closure(R, FD)
    fd_plus = [fd for fd in fd_plus if not set(fd[0]).issuperset(set(fd[1]))]
    # apply min_covers on fd_plus to get all minimal covers
    minimal_covers = min_covers(R, fd_plus)

    for minimum_cover in minimal_covers:
        all_minimal_covers.add(get_hashable_fd_set(minimum_cover))

    return [[[list(lhs), list(rhs)] for lhs, rhs in min_cover] for min_cover in all_minimal_covers]

## You can add additional functions below

def get_all_subsets(lst):
    return list(chain.from_iterable(combinations(lst, r) for r in range(0, len(lst) + 1)))

def implies(schema, lhs, rhs, fd_set):
    """
    Check whether the current fd_set implies the fd [lhs, rhs]
    """
    return set(closure(schema, fd_set, lhs)).issuperset(set(rhs))

def set_closure(schema, fd_set):
    """
    Get the set closure of a given fd_set
    """
    fd_plus = []
    L = get_all_subsets(schema)
    
    for attr_set in L:
        attr_set_closure = closure(schema, fd_set, attr_set)
        for attr in attr_set_closure:
            fd_plus.append([list(attr_set), list(attr)])
        
    return fd_plus

def get_hashable_fd_set(fd_set):
    """
    Get a hashable version of the fd_set to remove duplicates
    """
    return tuple(sorted(list(((tuple(lhs), tuple(rhs)) for lhs, rhs in fd_set))))

def minimize_lhs_current_fd(schema, lhs, rhs, fd_set):
    """
    Minimize the lhs of this fd [lhs, rhs] by removing each attribute of lhs
    """
    lhs_copy = lhs.copy()
    for attr in lhs:
        lhs_copy.remove(attr)
        if not implies(schema, lhs_copy, rhs, fd_set):
            lhs_copy.append(attr)
    return lhs_copy

def minimize_rhs_all_fds(fd_set):
    """
    Minimize the rhs to a single attribute for every fd in the fd_set
    """
    fd_set_rminimized = []

    for lhs, rhs in fd_set:
        for attr in rhs:
            if [lhs, attr] not in fd_set_rminimized:
                fd_set_rminimized.append([lhs, [attr]])
    
    return fd_set_rminimized

def minimize_lhs_all_fds(schema, fd_set):
    """
    Minimize the lhs for every fd in the fd_set
    """
    fd_set_lminimized = []
    for lhs, rhs in fd_set:
        lhs_minimized = minimize_lhs_current_fd(schema, lhs, rhs, fd_set)
        if lhs_minimized != lhs:
            fd_set_lminimized.append([lhs_minimized, rhs])
        else:
            fd_set_lminimized.append([lhs, rhs])
    return fd_set_lminimized

def remove_redundant_fds(schema, fd_set):
    """
    Check whether without each FD fd, the fd_set can still imply fd
    """
    fd_set_copy = fd_set.copy()
    for fd in fd_set:
        fd_set_copy.remove(fd)
        if not implies(schema, fd[0], fd[1], fd_set_copy):
            fd_set_copy.append(fd)
    
    return fd_set_copy

def find_all_candidate_keys(R, F):
    all_subsets = get_all_subsets(R)
    result = []

    all_subsets_copy = all_subsets.copy()

    while len(all_subsets_copy) > 0:
        attr_set = all_subsets_copy.pop(0)
        attr_set_closure = closure(R, F, list(attr_set))
        if len(attr_set_closure) == 0:
            continue
        if set(attr_set_closure) == set(R):
            all_subsets_copy = [attr_set_copy for attr_set_copy in all_subsets_copy if not set(attr_set_copy).issuperset(set(attr_set))]
            result.append(list(attr_set))

    return result

def get_prime_attributes(candidate_keys):
    return list(set([k for ckey in candidate_keys for k in ckey]))

def is_proper_subset(set1, set2):
    return set1.issubset(set2) and not set2.issubset(set1)

def is_3NF(R, F):
    minimal_cover = min_cover(R, F)
    print("Minimal cover: ", minimal_cover)
    candidate_keys = find_all_candidate_keys(R, F)
    print("Candidate keys: ", candidate_keys)
    prime_attributes = get_prime_attributes(candidate_keys)
    print("Prime attributes: ", prime_attributes, end="\n")
    violating_fds = []
    is_3nf = True
    for fd in minimal_cover:
        lhs = fd[0]
        rhs = fd[1][0]
        for ckey in candidate_keys:
            if not set(lhs).issuperset(set(ckey)):
                if rhs not in prime_attributes:
                    print("-"*30)
                    print("Current fd: ", fd)
                    print("Current candidate key: ", ckey)
                    print(f"{lhs} is not a super key")
                    print(f"{rhs} is not a prime attribute")
                    if fd not in violating_fds:
                        violating_fds.append(fd)
                    is_3nf = False
    
    return is_3nf, violating_fds

def compact_min_cover(minimal_cover):
    lhs_map = {}
    for fd in minimal_cover:
        lhs, rhs = fd[0], fd[1]
        hashable_lhs = tuple(sorted(lhs))
        if hashable_lhs not in lhs_map:
            lhs_map[hashable_lhs] = set(rhs)
        else:
            lhs_map[hashable_lhs].update(set(rhs))
    
    return [[list(lhs), list(rhs)] for lhs, rhs in lhs_map.items()]
    
def is_BCNF(R, F, verbose=True):
    minimal_cover = min_cover(R, F)
    minimal_cover = compact_min_cover(minimal_cover)
    candidate_keys = find_all_candidate_keys(R, F)
    if verbose:
        print(f"Checking Relation {R} with FD set {F}")
        print("Minimal cover: ", minimal_cover)
        print("Candidate keys: ", candidate_keys)
    is_bcnf = True
    violating_fds = []
    # Check if any fd's lhs is not a superkey
    for fd in minimal_cover:
        lhs = fd[0]
        # Check if current fd's lhs is a superkey by checking if it is a subset of any candidate_keys
        is_lhs_superkey = False
        for ckey in candidate_keys:
            if set(lhs).issuperset(set(ckey)):
                is_lhs_superkey = True
        # If not a superkey, then BCNF is violated. Add fd to violating_fds if not exists
        if not is_lhs_superkey:
            if fd not in violating_fds:
                violating_fds.append(fd)
            is_bcnf = False
    
    return is_bcnf, violating_fds

def is_2NF(R, F):
    minimal_cover = min_cover(R, F)
    print("Minimal cover: ", minimal_cover)
    candidate_keys = find_all_candidate_keys(R, F)
    print("Candidate keys: ", candidate_keys)
    prime_attributes = get_prime_attributes(candidate_keys)
    print("Prime attributes: ", prime_attributes, end="\n")
    violating_fds = []
    is_2nf = True
    for fd in minimal_cover:
        lhs = fd[0]
        rhs = fd[1][0]
        for ckey in candidate_keys:
            if is_proper_subset(set(lhs), set(ckey)):
                if rhs not in prime_attributes:
                    print("-"*30)
                    print("Current fd: ", fd)
                    print(f"{lhs} is a proper subset of candidate key {ckey}")
                    print(f"{rhs} is not a prime attribute")
                    if fd not in violating_fds:
                        violating_fds.append(fd)
                    is_2nf = False
    
    return is_2nf, violating_fds

def project(parent_R, decomposed_R, parent_F):
    decomposed_F = []
    for attr_set in chain.from_iterable(combinations(decomposed_R, r) for r in range(1, len(decomposed_R) + 1)):
        attr_set_closure = closure(decomposed_R, parent_F, list(attr_set))

        for attr in attr_set_closure:
            if attr in decomposed_R:
                decomposed_F.append([list(attr_set), list(attr)])
    
    minimal_cover = min_cover(decomposed_R, decomposed_F)
    return minimal_cover

def decomposision(R, F, accum):
    is_bcnf, violating_fds = is_BCNF(R, F, verbose=True)
    if not is_bcnf:
        violating_fd = violating_fds[0]
        lhs = violating_fd[0]
        R1 = closure(R, F, lhs)
        R2 = list(set(R).difference(set(R1)).union(set(lhs)))
        F1 = project(R, R1, F)
        F2 = project(R, R2, F)
        print(f"Violating Relation: {R}")
        print(f"Violating fds: {violating_fds}")
        print(f"R1: {R1} - F1: {F1}")
        print(f"R2: {R2} - F2: {F2}")
        print("-" * 30)
        decomposision(R1, F1, accum)
        decomposision(R2, F2, accum)
    else:
        accum.append((R, F))
    
    return accum

def remove_subsumed_fragments(lst):
    curr_res = []
    result = []
    # map each element to set and sort by decreasing size
    for fragment in sorted(map(set, lst), key = len, reverse = True):
        # if current fragment is not a subset of any fragment in curr_res, add it to curr_res. This helps to remove duplicate fragment of same size also as the next occurrence will be a subset hence not added.
        if not any(fragment.issubset(req) for req in curr_res):
            curr_res.append(fragment)
            result.append(list(fragment))
          
    return result

def make_fragments(compact_minimal_cover, candidate_keys):
    fragments = [lhs + rhs for lhs, rhs in compact_minimal_cover]
    print(f"Original Fragments {fragments}")
    # remove subsumed fragments (any fragment which is a proper subset of any other fragment)
    no_subsumed_fragments = remove_subsumed_fragments(fragments)
    print(f"After remove subsumed fragments {no_subsumed_fragments}")
    
    # check if any fragments contains a candidate key
    has_ckey = False
    for fragment in no_subsumed_fragments:
        for ckey in candidate_keys:
            if set(ckey).issubset(set(fragment)):
                has_ckey = True
    print(f"Current fragments have candidate key? {has_ckey}")
    if not has_ckey:
        no_subsumed_fragments.append(candidate_keys[0])
    print(f"Append {candidate_keys[0]} since no fragment contains a candidate key")

    return no_subsumed_fragments

def synthesis(R, F):
    print(f"Relation {R} with FD set {F}")
    candidate_keys = find_all_candidate_keys(R, F)
    print(f"Candidate keys {candidate_keys}")
    minimal_cover = min_cover(R, F)
    compact_minimal_cover = compact_min_cover(minimal_cover)
    print(f"Compact min cover: {compact_minimal_cover}")
    fragments = make_fragments(compact_minimal_cover, candidate_keys)
    print(f"Fragments {fragments}")

In [4]:
# R = ['A', 'B', 'C', 'D', 'E']
# F = [[['A', 'B'], ['C', 'D', 'E']], [['A', 'C'], ['B', 'D', 'E']], [['B'], ['C']], [['C'], ['B']], [['C'], ['D']], [['B'], ['E']], [['C'], ['E']]]

R = ['A', 'B', 'C', 'D', 'E']
F = [[['A'], ['A', 'B', 'C']], [['A', 'B'], ['A']], [['B', 'C'], ['A', 'D']], [['B'], ['A', 'B']], [['C'], ['D']]]

# min_cover(R, F)
# is_2NF(R, F)
# is_3NF(R, F)
is_BCNF(R, F, verbose=True)

Checking Relation ['A', 'B', 'C', 'D', 'E'] with FD set [[['A'], ['A', 'B', 'C']], [['A', 'B'], ['A']], [['B', 'C'], ['A', 'D']], [['B'], ['A', 'B']], [['C'], ['D']]]
Minimal cover:  [[['A'], ['C', 'B']], [['B'], ['A']], [['C'], ['D']]]
Candidate keys:  [['A', 'E'], ['B', 'E']]


(False, [[['A'], ['C', 'B']], [['B'], ['A']], [['C'], ['D']]])

In [5]:
decomposision(R, F, [])

Checking Relation ['A', 'B', 'C', 'D', 'E'] with FD set [[['A'], ['A', 'B', 'C']], [['A', 'B'], ['A']], [['B', 'C'], ['A', 'D']], [['B'], ['A', 'B']], [['C'], ['D']]]
Minimal cover:  [[['A'], ['C', 'B']], [['B'], ['A']], [['C'], ['D']]]
Candidate keys:  [['A', 'E'], ['B', 'E']]
Violating Relation: ['A', 'B', 'C', 'D', 'E']
Violating fds: [[['A'], ['C', 'B']], [['B'], ['A']], [['C'], ['D']]]
R1: ['D', 'C', 'A', 'B'] - F1: [[['C'], ['D']], [['A'], ['B']], [['B'], ['C']], [['B'], ['A']]]
R2: ['A', 'E'] - F2: []
------------------------------
Checking Relation ['D', 'C', 'A', 'B'] with FD set [[['C'], ['D']], [['A'], ['B']], [['B'], ['C']], [['B'], ['A']]]
Minimal cover:  [[['C'], ['D']], [['A'], ['B']], [['B'], ['C', 'A']]]
Candidate keys:  [['A'], ['B']]
Violating Relation: ['D', 'C', 'A', 'B']
Violating fds: [[['C'], ['D']]]
R1: ['D', 'C'] - F1: [[['C'], ['D']]]
R2: ['C', 'B', 'A'] - F2: [[['B'], ['A']], [['A'], ['C']], [['A'], ['B']]]
------------------------------
Checking Relation ['

[(['D', 'C'], [[['C'], ['D']]]),
 (['C', 'B', 'A'], [[['B'], ['A']], [['A'], ['C']], [['A'], ['B']]]),
 (['A', 'E'], [])]

In [6]:
synthesis(R, F)

Relation ['A', 'B', 'C', 'D', 'E'] with FD set [[['A'], ['A', 'B', 'C']], [['A', 'B'], ['A']], [['B', 'C'], ['A', 'D']], [['B'], ['A', 'B']], [['C'], ['D']]]
Candidate keys [['A', 'E'], ['B', 'E']]
Compact min cover: [[['A'], ['C', 'B']], [['B'], ['A']], [['C'], ['D']]]
Original Fragments [['A', 'C', 'B'], ['B', 'A'], ['C', 'D']]
After remove subsumed fragments [['C', 'B', 'A'], ['D', 'C']]
Current fragments have candidate key? False
Append ['A', 'E'] since no fragment contains a candidate key
Fragments [['C', 'B', 'A'], ['D', 'C'], ['A', 'E']]
