
## Part 2 Data Discovery

You will be given a set of datasets and you are tasked to perform data discovery on the data sets.

<b>The datasets are provided in the group lockers on brightspace. Let me know if you are having trouble accessing the datasets</b>

The process is to have the goal of finding datasets that are related to each other, finding relationships between the datasets.

The relationships that we are primarily working with are Join and Union relationships.

So please implement two methods for allowing us to find those pesky Join and Union relationships.

Try to do this with the datasets as is and no processing.



In [1]:
import csv
from pathlib import Path
import os
import statistics

import pandas as pd
import numpy as np
from random import shuffle
from time import time
import pickle

import itertools
from collections import Counter
from difflib import SequenceMatcher
import unicodedata

In [2]:

INFO = True






### CSV Reading



In [3]:

########
# This function returns a dict of all likely delimiters it is able to find in a line-string
# A likely delimiter is any character that is not a-z, A-Z, 0-9 or ., ', "
########
def find_candidate_delimiters( line ):
    candidates = {}
    line = line.rstrip("\r\n")
    in_substring = False
    for character in line:
        if character == '"' or character == "'":
            in_substring ^= True
        if in_substring:
            continue
        if character.isalnum() or character == '.':
            continue
        if character not in candidates:
            candidates[character] = np.array([])

    return candidates


In [4]:


########
# Given a list of strings named "file" and a list of candidate delimiters this function counts
# how many times each delimiter appears in each line (ie. list element)
########
def delimiter_consistency( file, candidates ):
    for line in file:
        line = line.rstrip("\r\n")

        in_substring = False
        appearances = { char: 0 for char in candidates }
        for character in line:
            if character == '"' or character == "'":
                in_substring ^= True
            if in_substring:
                continue
            if character in candidates:
                appearances[character] += 1

        for char in appearances:
            if appearances[char] > 0:
                candidates[char] = np.append ( candidates[char], appearances[char] )

    return candidates


In [5]:

########
#
# This function attempts to figure out which delimiter is the most likely for a given csv
# To do this it calculates the SNR of each likely delimiter per line.
# As specific files are highly incorrect a high epsilon value of 0.5 was chosen to combat
# high variances of delimiters with very low means
#
# Example Issue:
# Table 7 has two likely delimiters, '/', ','.
# Without making any assumptions the best SNR is given for '/' due to 0 variance.
# This character however has a mean of 1 which is a lot less than ','.
# ',' has significant variance but a vastly more sensical mean.
# We increase epsilon to ensure that this delimiter is correctly found.
#
########
def find_delimiter( file_path, max_rows=1000 ):
    delimiter = ''

    with open( file_path, mode='r', encoding='utf-8' ) as file:
        # Only use a single line for candidates
        # a random line near the middle is unlikely to have specific formatting
        file = file.readlines()
        candidates_line = file[ len(file)//2 ]
        candidates = find_candidate_delimiters( candidates_line )

        if len( candidates ) == 0:
            return delimiter

        step = len(file)//max_rows
        delimiters = delimiter_consistency( file=file[::step], candidates=candidates )

    # A delimiter must appear on all lines of the csv
    # Drop those that do not
    keys = list(delimiters.keys())
    for key in keys:
        if delimiters[key].shape[0] < 0.99*max_rows:
            delimiters.pop( key, None )
            continue

    keys = list(delimiters.keys())
    epsilon = 0.5

    first_key = keys[0]
    delimiter = first_key
    max_snr = delimiters[first_key].mean()/(delimiters[first_key].std()+epsilon)

    for key in keys[1:]:
        snr = delimiters[key].mean()/(delimiters[key].std()+epsilon)
        if snr > max_snr:
            delimiter = key
            max_snr = snr

    return delimiter


In [6]:

########
# Reads a csv with as few assumptions about it as possible.
# This produces a very unclean dataframe to be used in part 1
# Part two will perform extra cleaning on this same csv
########

def read_csv_dirty( filepath ):
    delimiter = find_delimiter( filepath )

    result = []
    with open( filepath, 'r' ) as file:
        reader = csv.reader( file, delimiter=delimiter )
        for line in reader:
            result.append( line )

    return result



### Set Similarity Measures


In [7]:

########
# The well-known levenshtein similarity measure.
# Can be used in place of jaccard if needed.
########
def levenshtein(x, y):
    if len(x) == 0 or len(y) == 0:
        return len(x) + len(y)

    if x[0] == y[0]:
        return levenshtein( x[1:], y[1:] )
    return 1+levenshtein( x[1:], y[1:] )


In [8]:

########
# The well-known jaccard set similarity measure.
# We return zero when both sets are empty because even though it's normally undefined
# we want to avoid over-filling our pairs of similar sets with useless pairs
# as such if we set their similarity to zero they will never be treated as similar
########
def jaccard(x,y):
    if len(x) == 0 and len(y)==0:
        return 0
    return len( x & y ) / len( x | y )



### Union Finding

Possible **JOIN** relationship candidates are calculated later within our *Lazo* and *SilkMoth* implementations. This section implements methods that can use the aforementioned **JOIN** candidates to see if they are numerous enough to accomodate a **UNION** between our tables. *exists_mapping* will tell us whether we should treat a pair of tables as **UNION** candidates.

All relationships that are not find to be **UNIONS** are classified as **JOIN**.

In [9]:

########
# Given a dict that represents a graph adjacency matrix this method returns:
# True - if there's a subgraph of our graph that is bipartite
# False - if there's no bipartite subgraph
# This will be helpful to discover unions
########
def bipartite_match( node, seen, match, adjacency ):
    for node2 in adjacency[node]:
        if node2 in seen:
            continue
        seen.add( node2 )
        if node2 not in match or bipartite_match( match[node2], seen, match, adjacency ):
            match[node2] = node
            return True
    return False


In [10]:

########
# This function uses the bipartite_match function to see if the graph
# created by joining table columns depending on their similarity can
# match all columns in both table one to one.
# A small threshold is allowed in case a pair of columns was mistakenly
# not found to be similar earlier ( a non-zero probability phenomenon )
# If a graph matching exists we have a union candidate pair
########

def exists_mapping( pairs, threshold=0.9 ):
    adjacency = {}
    cols1 = set()
    cols2 = set()
    for p in pairs:
        c1, c2 = p['col1'], p['col2']
        if not c1 in adjacency:
            adjacency[c1] = []
        adjacency[c1].append(c2)
        cols1.add(c1)
        cols2.add(c2)

    matches = 0
    for node in adjacency:
        if bipartite_match( node, set(), {}, adjacency ):
            matches += 1

    return matches >= threshold*len( cols1 )



### Lazo


In [11]:

########
# Shingles a string to a list of k-shingles
# Needed to later generate Lazo signatures
########

def k_shingle( doc, k=2 ):
    if len( doc ) < k:
        return set( [ doc ] )
    shingles = set()
    for i in range( len(doc) + 1 - k ):
        shingles.add( doc[i:i+k] )
    return shingles


In [12]:

########
# Produces shingles for an entier dataframe, column by column
########

def dframe_shingles( dframe, k=2 ):
    shingles = {}
    for column in dframe.columns:
        shingles[column] = set([])
        for datum in dframe[column]:
            shingles[column].update( k_shingle( str(datum), k=k ) )
    return shingles


In [13]:

########
# Produces shingles for all dataframes
# Utilizes caching to avoid re-shingling on later executions
# since it is very time consuming
########
def shingle_dframes( dframes, shingle_size=2 ):
    if not os.path.exists( Path("./cache") ):
        os.mkdir( Path("./cache") )

    vocab_file = f"./cache/vocab.{shingle_size}.pickle"
    vocab_loaded = False
    if os.path.exists( vocab_file ):
        with open( vocab_file, 'rb' ) as file:
            vocab = pickle.load( file )
        vocab_loaded = True
    else:
        vocab = set([])

    shingles = {}
    for title, dframe in dframes.items():
        cache_file = f"./cache/{title}.{shingle_size}.pickle"
        if os.path.exists( cache_file ):
            with open( cache_file, 'rb' ) as file:
                shingles[title] = pickle.load( file )
        else:
            shingles[title] = dframe_shingles( dframe, k=shingle_size )
            with open( cache_file, 'wb' ) as file:
                pickle.dump( shingles[title], file, pickle.HIGHEST_PROTOCOL )

        if not vocab_loaded:
            for _, col_shingles in shingles[title].items():
                vocab.update( col_shingles )

    if not vocab_loaded:
        with open( vocab_file, 'wb' ) as file:
            pickle.dump( vocab, file, pickle.HIGHEST_PROTOCOL )

    return shingles, vocab


In [14]:

########
# Produces a one-hot list of all shingles given a vocabulary
# To late be used for creating hashes and buckets
########
def onehot_shingles( shingles, vocab ):

    onehot = { title: { col: [] for col in shingles[title] } for title in shingles }

    for shingle in vocab:
        for title in shingles:
            for col in shingles[title]:
                if shingle in shingles[title][col]:
                    onehot[title][col].append( 1 )
                else:
                    onehot[title][col].append( 0 )

    return onehot


In [15]:

########
# One-permutation hashing, the defining feature of Lazo
# and a major speed increase over Min-Hash
# Produces a signature for a given one-hot encoding of a document
# in this case, a column
# Also bands the signature to later be used for bucketing
########
def one_perm_hashing( onehot, vocab_length, num_hashes=100, n_bands=10 ):
    assert num_hashes % n_bands == 0
    assert num_hashes > n_bands
    rows_per_band = num_hashes // n_bands
    bin_size = vocab_length // num_hashes

    idx = list( range( vocab_length ) )
    shuffle( idx )

    banded_sig = [ [ -1 for _ in range( rows_per_band ) ] for _ in range( n_bands ) ]
    for b in range( num_hashes ):
        start = b * bin_size
        end = (b + 1) * bin_size
        if b == num_hashes - 1:
            end = vocab_length
        bin_indices = idx[start:end]

        band_id = b // rows_per_band
        for i in bin_indices:
            if onehot[i] == 1:
                j = b % rows_per_band
                banded_sig[band_id][j] = i
                break

    return banded_sig


In [16]:

########
# Creates a set of buckets using the signatures of table columns
# Columns with similar signatures are placed in the same bucket
########
def hash_dframes( onehot, vocab_length, signature_length=100, num_bands=10 ):
    buckets = [ {} for _ in range(num_bands) ]
    for title in onehot:
        for column in onehot[title]:
            bands = one_perm_hashing( onehot[title][column], vocab_length, num_hashes=signature_length, n_bands=num_bands )

            for band_idx, band in enumerate( bands ):
                band_hash = hash( tuple(band) )
                if band_hash not in buckets[band_idx]:
                    buckets[band_idx][band_hash] = [ { 'title': title, 'column': column } ]
                else:
                    buckets[band_idx][band_hash].append( { 'title': title, 'column': column } )

    return buckets


In [17]:

########
# Full implementation of Lazo
# After buckets are created the full similarities of
# columns in the same bucket are calculated for greater
# certainty. Minimum similarity thresholds apply.
########
def Lazo( documents, shingle_size=2, signature_length=100, num_bands=10, similarity_threshold = 0.7, union_threshold=0.9 ):
    t_start_shingling = time()

    shingles, vocab = shingle_dframes( documents, shingle_size=shingle_size  )
    vocab_length = len(vocab)

    t_end_shingles = time()
    if INFO:
        print( f"Finished shingling ({1000*(t_end_shingles-t_start_shingling):.4f}ms)" )

    onehot = onehot_shingles( shingles, vocab )

    t_end_onehot = time()
    if INFO:
        print( f"Finished onehot ({1000*(t_end_onehot-t_end_shingles):.4f}ms)" )

    buckets = hash_dframes( onehot, vocab_length )

    t_end_hashing = time()
    if INFO:
        print( f"Finished hashing ({1000*(t_end_hashing-t_end_onehot):.4f}ms)" )


    similarities = {}
    for band_idx, band_buckets in enumerate(buckets):
        for key in band_buckets:
            bucket = band_buckets[key]
            if len( bucket ) < 2:
                continue

            for i in range( len(bucket) ):

                doc1 = bucket[i]
                title1 = doc1['title']
                col1 = doc1['column']
                for j in range( i ):
                    doc2 = bucket[j]
                    title2 = doc2['title']
                    col2 = doc2['column']
                    similarity = jaccard( shingles[title1][col1], shingles[title2][col2] )

                    if similarity < similarity_threshold:
                        continue

                    if title1 not in similarities and title2 not in similarities:
                        similarities[title1] = {}
                        similarities[title1][title2] = [ { 'col1': col1, 'col2': col2, 'similarity': similarity } ]
                    elif title1 in similarities and title2 in similarities[title1]:
                        similarities[title1][title2].append( { 'col1': col1, 'col2': col2, 'similarity': similarity } )
                    elif title2 in similarities and title1 in similarities[title2]:
                        similarities[title2][title1].append( { 'col1': col2, 'col2': col1, 'similarity': similarity } )
                    elif title1 in similarities:
                        similarities[title1][title2] = [ { 'col1': col1, 'col2': col2, 'similarity': similarity } ]
                    else:
                        similarities[title2][title1] = [ { 'col1': col2, 'col2': col1, 'similarity': similarity } ]

    candidates = { 'union': [], 'join': [] }
    for table1 in similarities:
        for table2 in similarities[table1]:
            if table2==table1:
                continue

            if documents[table1].shape[1] != documents[table2].shape[1]:
                candidates['join'].append( [table1, table2] )
                continue

            if exists_mapping( similarities[table1][table2] ):
                candidates['union'].append( [table1, table2] )
            else:
                candidates['join'].append( [table1, table2] )

    t_end_bucketing = time()
    if INFO:
        print( f"Finished bucketing ({1000*(t_end_bucketing-t_end_hashing):.4f}ms)" )

    return candidates



### SilkMoth


In [18]:

def normalize_string(s: str) -> str:
    return unicodedata.normalize("NFKC", str(s)).lower().strip()


In [19]:

########
# This function returns the most likely datatype for a column
# Subsamples rows to avoid long computation times
########
def detect_column_dtype( column, samples=1000 ):
    vals = set( column )
    step = len( vals ) // samples

    if len( vals ) == 2:
        return 'bool'

    likely_dtypes = {
        'int': 0,
        'float': 0,
        'datetime': 0,
        'string': 1
    }

    count = 0
    for val in vals:
        if count ==samples:
            break
        count += 1

        try:
            tmp = pd.to_datetime( val )
            likely_dtypes['datetime'] += 1
            continue
        except (ValueError, TypeError):
            pass

        try:
            tmp = int( val )
            likely_dtypes['int'] += 1
            continue
        except (ValueError, TypeError):
            pass

        try:
            tmp = float( val )
            likely_dtypes['float'] += 1
            continue
        except (ValueError, TypeError):
            pass

        likely_dtypes['string'] += 1

    if likely_dtypes['float'] > 0.1*likely_dtypes['int']:
        likely_dtypes['float'] += likely_dtypes['int']

    dtype = 'string'
    most_likely = likely_dtypes['string']
    if likely_dtypes['float'] >= most_likely:
        most_likely = likely_dtypes['float']
        dtype = 'float'
    if likely_dtypes['int'] >= most_likely:
        most_likely = likely_dtypes['int']
        dtype = 'int'
    if likely_dtypes['datetime'] >= most_likely:
        dtype = 'datetime'

    return dtype


In [20]:

########
# Creates a signature for the table given the detected data types
# of its columns
########
def table_signature(df: pd.DataFrame):
    sig = []
    for col in df.columns:
        sig.append(detect_column_dtype(df[col]))
    return sig


In [21]:

def column_similarity(col1, col2, sample_size=10):
    vals1 = col1.dropna().astype(str).map(normalize_string)
    vals2 = col2.dropna().astype(str).map(normalize_string)

    if sample_size and len(vals1) > sample_size:
        vals1 = vals1.sample(sample_size, random_state=42)
    if sample_size and len(vals2) > sample_size:
        vals2 = vals2.sample(sample_size, random_state=42)

    set1, set2 = set(vals1), set(vals2)
    jaccard_sim = jaccard( set1, set2 )

    name_sim = SequenceMatcher(None, normalize_string(col1.name), normalize_string(col2.name)).ratio()
    return 0.5 * jaccard_sim + 0.5 * name_sim


In [22]:

# Union check
def unionable_by_type(sig1, sig2, threshold=0.8):
    if not sig1 or not sig2:
        return False
    counter1, counter2 = Counter(sig1), Counter(sig2)
    common_count = sum(min(counter1[t], counter2[t]) for t in counter1)
    overlap_ratio = common_count / max(len(sig1), len(sig2))
    return overlap_ratio >= threshold


In [23]:

# Relationship finder
def find_relationships(datasets, join_threshold=0.6, union_threshold=0.8):
    relationships = {"join": [], "union": []}

    signatures = { name: table_signature(df) for name, df in datasets.items() }
    for (i, (name1, df1)), (j, (name2, df2)) in itertools.combinations(enumerate(datasets.items()), 2):
        sig1 = signatures[name1]
        sig2 = signatures[name2]

        if unionable_by_type(sig1, sig2, threshold=union_threshold):
            #print(f"[UNION] {name1} <--> {name2} (overlap ≥ {union_threshold})")
            relationships["union"].append({
                "table1": name1,
                "table2": name2,
                "signature1": sig1,
                "signature2": sig2
            })

        joinable_cols = []
        found = False
        # takes a lot of time, so I only picked a few columns for speed, accuracy should
        # theoretically increase for a higher threshold and more columns
        for i, col1 in enumerate( df1.columns[:10] ):
            if found:
                break
            for j, col2 in enumerate( df2.columns[:10] ):
                if found:
                    break

                dtype1 = sig1[i]
                dtype2 = sig2[j]

                if dtype1 != dtype2:
                    continue

                sim = column_similarity(df1[col1], df2[col2])
                if sim >= join_threshold:
                    #print(f"[JOIN] {name1}.{col1} <--> {name2}.{col2} (dtype={dtype1}, sim={sim:.2f})")
                    joinable_cols.append({
                        "col1": col1,
                        "col2": col2,
                        "similarity": sim,
                        "dtype": dtype1
                    })
                    found = True
                    break

        if joinable_cols:
            relationships["join"].append({
                "table1": name1,
                "table2": name2,
                "joinable_columns": joinable_cols
            })

    return relationships





### Testing on Unclean Data


In [24]:
########
# Dirty data is loaded and tested against our algorithms
########
file_names = [ f[:-4] for f in os.listdir( Path('./lake49') ) if os.path.isfile( os.path.join( './lake49', f ) ) ]

swamps = {}
for swamp_name in file_names:
    file_path = f"./lake49/{swamp_name}.csv"
    data = read_csv_dirty( file_path )
    swamps[swamp_name] = pd.DataFrame( data )

candidates = Lazo( swamps, shingle_size=5, signature_length=100, num_bands=25, similarity_threshold=0.6, union_threshold=0.8 )

print( "\n\n====== Lazo ======" )
print( f"Join Candidates: {len(candidates['join'])}" )
for candidate in candidates['join']:
    print( f"{candidate[0]}, {candidate[1]}, JOIN" )

print( f"\nUnion Candidates: {len(candidates['union'])}" )
for candidate in candidates['union']:
    print( f"{candidate[0]}, {candidate[1]}, UNION" )

print( "\n====== SilkMoth ======" )
candidates = find_relationships( swamps, join_threshold=0.7, union_threshold=0.8 )
print( f"Join Candidates: {len(candidates['join'])}" )
for candidate in candidates['join']:
    print( f"{candidate['table1']}, {candidate['table2']}, JOIN" )

print( f"\nUnion Candidates: {len(candidates['union'])}" )
for candidate in candidates['union']:
    print( f"{candidate['table1']}, {candidate['table2']}, UNION" )


Finished shingling (143.8913ms)
Finished onehot (30315.9292ms)
Finished hashing (67785.0680ms)
Finished bucketing (32.2101ms)


Join Candidates: 10
table_8, table_9, JOIN
table_8, table_1, JOIN
table_8, table_14, JOIN
table_8, table_0, JOIN
table_9, table_1, JOIN
table_9, table_14, JOIN
table_9, table_0, JOIN
table_1, table_14, JOIN
table_0, table_14, JOIN
table_6, table_7, JOIN

Union Candidates: 1
table_1, table_0, UNION

Join Candidates: 32
table_17, table_0, JOIN
table_17, table_8, JOIN
table_17, table_7, JOIN
table_17, table_1, JOIN
table_17, table_16, JOIN
table_17, table_4, JOIN
table_17, table_5, JOIN
table_17, table_6, JOIN
table_0, table_9, JOIN
table_0, table_1, JOIN
table_0, table_16, JOIN
table_0, table_5, JOIN
table_8, table_7, JOIN
table_8, table_16, JOIN
table_8, table_6, JOIN
table_13, table_12, JOIN
table_7, table_1, JOIN
table_7, table_16, JOIN
table_7, table_6, JOIN
table_10, table_9, JOIN
table_10, table_15, JOIN
table_9, table_1, JOIN
table_9, table_16, JOIN
table

### Sanity Check

In [32]:

## Sanity check using adult dataset
data = pd.read_csv( Path( "./adult.csv" ), header=0, na_values='?' )

print( data.shape )
print( data.columns )

# Check for overly empty feature columns
for column in data.columns:
    nans = data[column].isna()
    if nans.sum() >= 0.6*data.shape[0]:
        print( f"Dropping {column}" )
        data.drop( column )

# Drop any rows with NaN
# Try imputation
data.dropna( inplace=True )


# Capital loss and capital gain represent essentially the exact same information
# Capital delta can represent both at the same time without increasing dimensionality
data['capital-delta'] = data['capital-gain'] - data['capital-loss']


# Education is the same as education-num if it were ordinally encoded.
# Capital loss/gain already encoded with capital delta
data.drop( columns=[ 'education', 'capital-gain', 'capital-loss' ], inplace=True )

slicepoint = data.shape[0]//2
train, test = data[:slicepoint], data[slicepoint:]

documents = { 'train': train, 'test': test }

candidates = Lazo( documents, shingle_size=5, signature_length=100, num_bands=25, similarity_threshold=0.5, union_threshold=0.8 )

print( "\n\n====== Lazo ======" )
print( f"Join Candidates: {len(candidates['join'])}" )
for candidate in candidates['join']:
    print( f"{candidate[0]}, {candidate[1]}, JOIN" )

print( f"\nUnion Candidates: {len(candidates['union'])}" )
for candidate in candidates['union']:
    print( f"{candidate[0]}, {candidate[1]}, UNION" )


print( "\n====== SilkMoth ======" )
candidates = find_relationships( documents, join_threshold=0.6, union_threshold=0.8 )
print( f"Join Candidates: {len(candidates['join'])}" )
for candidate in candidates['join']:
    print( f"{candidate['table1']}, {candidate['table2']}, JOIN" )

print( f"\nUnion Candidates: {len(candidates['union'])}" )
for candidate in candidates['union']:
    print( f"{candidate['table1']}, {candidate['table2']}, UNION" )

(48842, 15)
Index(['age', 'workclass', 'fnlwgt', 'education', 'education-num',
       'marital-status', 'occupation', 'relationship', 'race', 'sex',
       'capital-gain', 'capital-loss', 'hours-per-week', 'native-country',
       'salary'],
      dtype='object')
Finished shingling (31.3239ms)
Finished onehot (759.9332ms)
Finished hashing (2975.6005ms)
Finished bucketing (0.4699ms)


Join Candidates: 0

Union Candidates: 1
test, train, UNION

Join Candidates: 1
train, test, JOIN

Union Candidates: 1
train, test, UNION


You would have noticed that the data has some issues in them.
So perhaps those issues have been troublesome to deal with.

Please try to do some cleaning on the data.

After performing cleaning see if the results of the data discovery has changed?

Please try to explain this in your report, and try to match up the error with the observation.

### Data Cleaning

In [26]:

########
# Attempts to find a header, if any, in our table.
# Multiline assumed possible.
# Returns the amount of lines in the header.
########
def find_header( column, dtype ):
    counter = 0
    for val in column:
        if dtype == 'int':
            try:
                tmp = int( val )
                break
            except (ValueError, TypeError):
                counter += 1

        if dtype == 'float':
            try:
                tmp = float( val )
                break
            except (ValueError, TypeError):
                counter += 1

        if dtype == 'datetime':
            try:
                tmp = pd.to_datetime( val )
                break
            except (ValueError, TypeError):
                counter += 1

    return counter


In [27]:

########
# Coerces dataframe column to a given data type
########
def change_column_type( column, dtype ):
    if dtype in ('int', 'float'):
        column = pd.to_numeric( column, errors='coerce' )
        return column
    if dtype in ('datetime'):
        column = pd.to_datetime( column, errors='coerce' )
        return column
    if dtype in ('bool'):
        column = column.map( lambda x:
                             True if str(x).lower() in ('true','1') else
                             False if str(x).lower() in ('false','0') else
                             pd.NA
                           )
        return column
    return column.astype(str)


In [28]:

########
# Creates a clean dataframe with a reduced amount of null and duplicate entries
########
def make_dframe( file_path, na_row_thresh=0.4, na_col_thresh=0.4 ):
    data = read_csv_dirty( file_path )
    lake = pd.DataFrame( data )
    counter = 0

    header_lengths = set()
    for c_name, column in lake.items():
        dtype = detect_column_dtype( column )
        if dtype not in ( 'string', 'bool' ):
            header_lengths.add( find_header( column, dtype ) )
    if len( header_lengths ) == 0:
        header_len = 0
    else:
        header_len = min( header_lengths )

    if header_len == 0:
        return lake

    header = pd.MultiIndex.from_arrays( lake.iloc[:header_len].values )
    header = [ ' '.join( map(str, col)).strip() for col in header.values ]

    lake = lake.iloc[header_len:]
    lake.columns = header

    for c_name, column in lake.items():
        dtype = detect_column_dtype( column )
        lake[c_name] = change_column_type( column, dtype )

    na_thresh = int( ( 1 - na_row_thresh ) * lake.shape[1] )
    lake.dropna( thresh=na_thresh, inplace=True )
    lake.drop_duplicates( inplace = True )

    lake_T = lake.T

    na_thresh = int( ( 1 - na_col_thresh ) * lake_T.shape[1] )
    lake_T.dropna( thresh=na_thresh, inplace=True )
    lake_T.drop_duplicates( inplace = True )

    lake = lake_T.T

    return lake


In [29]:

########
# Creates a set of clean dataframes
# caches them to avoid recomputations on later executions
########
def cleaningData( datasets ):
    if not os.path.exists( Path('./clean') ):
        os.mkdir( Path('./clean') )

    lakes = {}
    for lake_name in file_names:
        file_path = f"./lake49/{lake_name}.csv"
        lake_name = f"{lake_name}_clean"
        cache_path = f"./clean/{lake_name}.csv"
        if os.path.exists( cache_path ):
            with open( cache_path, 'r' ) as file:
                lakes[lake_name] = pd.read_csv( cache_path, header=0 )
        else:
            lakes[lake_name] = make_dframe( file_path )
            with open( cache_path, 'w' ) as file:
                lakes[lake_name].to_csv( cache_path, header=True, index=False )

    return lakes


### Testing on Clean Data

In [34]:
########
# Testing our newly cleaned data against our relationship detecting
# algorithms
########
file_names = [ f[:-4] for f in os.listdir('./lake49') if os.path.isfile( os.path.join('./lake49', f ) ) ]

lakes = cleaningData( file_names )

candidates = Lazo( lakes, shingle_size=5, signature_length=100, num_bands=25, similarity_threshold=0.5, union_threshold=0.8 )

print( "\n\n====== Lazo ======" )
print( f"Join Candidates: {len(candidates['join'])}" )
for candidate in candidates['join']:
    print( f"{candidate[0]}, {candidate[1]}, JOIN" )

print( f"\nUnion Candidates: {len(candidates['union'])}" )
for candidate in candidates['union']:
    print( f"{candidate[0]}, {candidate[1]}, UNION" )

print( "\n====== SilkMoth ======" )
candidates = find_relationships( lakes, join_threshold=0.6, union_threshold=0.8 )
print( f"Join Candidates: {len(candidates['join'])}" )
for candidate in candidates['join']:
    print( f"{candidate['table1']}, {candidate['table2']}, JOIN" )

print( f"\nUnion Candidates: {len(candidates['union'])}" )
for candidate in candidates['union']:
    print( f"{candidate['table1']}, {candidate['table2']}, UNION" )


  lakes[lake_name] = pd.read_csv( cache_path, header=0 )


Finished shingling (104.5892ms)
Finished onehot (29866.2550ms)
Finished hashing (91783.6456ms)
Finished bucketing (125.9313ms)


Join Candidates: 77
table_17_clean, table_0_clean, JOIN
table_17_clean, table_8_clean, JOIN
table_17_clean, table_7_clean, JOIN
table_17_clean, table_1_clean, JOIN
table_17_clean, table_16_clean, JOIN
table_17_clean, table_4_clean, JOIN
table_17_clean, table_5_clean, JOIN
table_17_clean, table_6_clean, JOIN
table_0_clean, table_8_clean, JOIN
table_0_clean, table_7_clean, JOIN
table_0_clean, table_16_clean, JOIN
table_0_clean, table_4_clean, JOIN
table_0_clean, table_5_clean, JOIN
table_0_clean, table_6_clean, JOIN
table_8_clean, table_7_clean, JOIN
table_8_clean, table_10_clean, JOIN
table_8_clean, table_9_clean, JOIN
table_8_clean, table_1_clean, JOIN
table_8_clean, table_16_clean, JOIN
table_8_clean, table_4_clean, JOIN
table_8_clean, table_6_clean, JOIN
table_7_clean, table_1_clean, JOIN
table_7_clean, table_16_clean, JOIN
table_7_clean, table_4_clean, JOI

  data = data.astype(DT64NS_DTYPE).view("i8")


Join Candidates: 22
table_17_clean, table_7_clean, JOIN
table_17_clean, table_16_clean, JOIN
table_17_clean, table_4_clean, JOIN
table_17_clean, table_5_clean, JOIN
table_17_clean, table_6_clean, JOIN
table_0_clean, table_1_clean, JOIN
table_0_clean, table_16_clean, JOIN
table_0_clean, table_5_clean, JOIN
table_0_clean, table_2_clean, JOIN
table_13_clean, table_12_clean, JOIN
table_7_clean, table_6_clean, JOIN
table_10_clean, table_11_clean, JOIN
table_10_clean, table_2_clean, JOIN
table_1_clean, table_16_clean, JOIN
table_1_clean, table_4_clean, JOIN
table_1_clean, table_5_clean, JOIN
table_1_clean, table_2_clean, JOIN
table_16_clean, table_4_clean, JOIN
table_16_clean, table_5_clean, JOIN
table_4_clean, table_5_clean, JOIN
table_3_clean, table_2_clean, JOIN
table_11_clean, table_2_clean, JOIN

Union Candidates: 12
table_17_clean, table_3_clean, UNION
table_0_clean, table_1_clean, UNION
table_8_clean, table_7_clean, UNION
table_8_clean, table_5_clean, UNION
table_8_clean, table_6_clea

### Pandas Sanity Check

In [35]:

########
# Sanity check where tables are simply read with pandas
# and cleaning is left to default pandas behavior
# (column dtype detection, single-line header detection, etc.)
########
file_names = [ f[:-4] for f in os.listdir('./lake49') if os.path.isfile( os.path.join('./lake49', f ) ) ]

lakes = {}
for lake_name in file_names:
    file_path = f"./lake49/{lake_name}.csv"
    delimiter = find_delimiter( file_path )
    lake_name = lake_name + '_pdclean'
    lakes[lake_name] = pd.read_csv( file_path, delimiter=delimiter )

candidates = Lazo( lakes, shingle_size=5, signature_length=100, num_bands=25, similarity_threshold=0.5, union_threshold=0.8 )

print( "\n\n====== Lazo ======" )
print( f"Join Candidates: {len(candidates['join'])}" )
for candidate in candidates['join']:
    print( f"{candidate[0]}, {candidate[1]}, JOIN" )

print( f"\nUnion Candidates: {len(candidates['union'])}" )
for candidate in candidates['union']:
    print( f"{candidate[0]}, {candidate[1]}, UNION" )


print( "\n====== SilkMoth ======" )
candidates = find_relationships( lakes, join_threshold=0.6, union_threshold=0.8 )
print( f"Join Candidates: {len(candidates['join'])}" )
for candidate in candidates['join']:
    print( f"{candidate['table1']}, {candidate['table2']}, JOIN" )

print( f"\nUnion Candidates: {len(candidates['union'])}" )
for candidate in candidates['union']:
    print( f"{candidate['table1']}, {candidate['table2']}, UNION" )


  lakes[lake_name] = pd.read_csv( file_path, delimiter=delimiter )


Finished shingling (142.6606ms)
Finished onehot (40837.1022ms)
Finished hashing (114798.7959ms)
Finished bucketing (146.9545ms)


Join Candidates: 94
table_17_pdclean, table_0_pdclean, JOIN
table_17_pdclean, table_8_pdclean, JOIN
table_17_pdclean, table_7_pdclean, JOIN
table_17_pdclean, table_10_pdclean, JOIN
table_17_pdclean, table_9_pdclean, JOIN
table_17_pdclean, table_1_pdclean, JOIN
table_17_pdclean, table_16_pdclean, JOIN
table_17_pdclean, table_4_pdclean, JOIN
table_17_pdclean, table_5_pdclean, JOIN
table_17_pdclean, table_6_pdclean, JOIN
table_0_pdclean, table_8_pdclean, JOIN
table_0_pdclean, table_7_pdclean, JOIN
table_0_pdclean, table_10_pdclean, JOIN
table_0_pdclean, table_9_pdclean, JOIN
table_0_pdclean, table_16_pdclean, JOIN
table_0_pdclean, table_4_pdclean, JOIN
table_0_pdclean, table_5_pdclean, JOIN
table_0_pdclean, table_6_pdclean, JOIN
table_8_pdclean, table_7_pdclean, JOIN
table_8_pdclean, table_10_pdclean, JOIN
table_8_pdclean, table_9_pdclean, JOIN
table_8_pdclean,

## Discussions

1)  Different aspects of the data can effect the data discovery process. Write a short report on your findings. Such as which data quality issues had the largest effect on data discovery. Which data quality problem was repairable and how you choose to do the repair.

<!-- For the set of considerations that you have outlined for the choice of data discovery methods, choose one and identify under this new constraint, how would you identify and resolve this problem? -->

Max 400 words

The aspects of the data that most influenced our data discovery process are defined below:

1. different deliminators
The CSV files in our data lake used different delimiters. We fixed this by writing a delimiter discovery function. This function analyzes both the number of non-alphanumeric characters in each row of the table and the variance of these counts across rows. The character with the highest ratio of frequency to variance (+ a small constant 𝜖) emerges as the most likely delimiter. This approach successfully fixed this issue.

2. Data tables with different length rows
When some rows in a dataset have different lengths than others, comparison of datasets becomes difficlt. To address this, NaN values are added to the shorter rows to fill in the missing data and match the length of the longest row.

3. columns of all zeroes
When rows contain only zeros, any comparison function treats them as identical, even though they likely represent different data. Therefore, these rows are ignored.
nulls nans

4. Large size of data
Another problem we encountered was the large size of out datasets, especially table 14 was large. To still be able to do data discovery we used the computationally efficient Lazo as well as Silkmoth.
