In [1]:
import pymongo
import pandas as pd
from tqdm import tqdm

In [2]:
MIN_UNIQUE_VALUE = 2
MIN_UNIQUENESS_SCORE = 0.6

In [3]:
mongo_client = pymongo.MongoClient('mongodb://127.0.0.1:27017/')
db = mongo_client['opencanada']
setoverlapresults_collection = db["setoverlapresults"]
inferredstats_collection = db["inferredstats"]
inferredcolumnstats_collection = db["inferredcolumnstats"]
metadata_collection = db["metadata"]

In [4]:
cols = list(inferredcolumnstats_collection.find({
    "is_all_null": False,
    "unique_values_count": {"$gte": MIN_UNIQUE_VALUE},
    "uniqueness_score": {"$gte": MIN_UNIQUENESS_SCORE},
}))

In [5]:
metadata_dict = {}
for c in tqdm(cols):
    if c['uuid'] not in metadata_dict:
        metadata_dict[c['uuid']] = metadata_collection.find_one({"resources.id": c['uuid']})

100%|██████████| 78870/78870 [00:15<00:00, 4985.52it/s]


In [7]:
candidate_pairs = []
for c in tqdm(cols):
    for r in metadata_dict[c['uuid']]['resources']:
        if r['id'] == c['uuid']:
            continue
        if r['format'].lower() != 'csv':
            continue
        candidate_cols = inferredcolumnstats_collection.find({
            "uuid": r["id"], 
            "is_all_null": False,  
            "unique_values_count": {"$gte": MIN_UNIQUE_VALUE}
        })
        for cc in candidate_cols:
            candidate_pairs.append({
                "source": {
                    "uuid": c["uuid"],
                    "index": c["index"]
                },
                "target": {
                    "uuid": cc["uuid"],
                    "index": cc["index"]
                }
            })

  6%|▌         | 4770/78870 [03:28<18:55, 65.28it/s]   

In [7]:
uuid_set = set()
for cc in candidate_pairs:
    uuid_set.add(cc["source"]["uuid"])
    uuid_set.add(cc["target"]["uuid"])

In [8]:
mongo_client.close()

In [9]:
import os
BASE_DIR = "/mnt/d/OpenCanadaDataCrawler/data/converted_files"

get_uuid_from_path = lambda p : os.path.splitext(os.path.basename(p))[0]
get_path_from_uuid = lambda u : os.path.join(BASE_DIR, "%s.csv" % u)

In [10]:
def read_file(f):
    full_path = f
    uuid = get_uuid_from_path(full_path)
    try:
        sets = []
        df = pd.read_csv(full_path, low_memory=False)
        df = df.applymap(lambda s: s.lower().strip() if type(s) == str else s)
        for c in df.columns:
            sets.append(set(df[c].unique()))
        return {"uuid": uuid, "sets": sets}
    except:
        return

In [11]:
paths = [get_path_from_uuid(u) for u in uuid_set]

In [12]:
from multiprocessing import Pool

NUM_PROCESSES = 44

column_sets = {}

with Pool(processes=NUM_PROCESSES) as pool:
    for data in tqdm(pool.imap_unordered(read_file, paths), total=len(paths)):    
        if data is not None:
            column_sets[data["uuid"]] = data["sets"]
        pass

100%|██████████| 8955/8955 [03:14<00:00, 46.03it/s] 


In [13]:
def get_stats(pair):
    try:
        source = pair['source']
        target = pair['target']
        source_set = column_sets[source['uuid']][source['index']]
        target_set = column_sets[target['uuid']][target['index']]
        intersection_size = len(source_set.intersection(target_set))
        if intersection_size == 0:
            return None
        return {
            "union_size": len(source_set.union(target_set)),
            "intersection_size": intersection_size,
            "pair": pair
        }
    except:
        return None

In [14]:
NUM_PROCESSES = 44

results = []

with Pool(processes=NUM_PROCESSES) as pool:
    for data in tqdm(pool.imap_unordered(get_stats, candidate_pairs), total=len(candidate_pairs)):    
        if data is not None:
            results.append(data)
        pass

100%|██████████| 42713149/42713149 [4:02:48<00:00, 2931.94it/s]   


In [15]:
import json
with open("key_overlaps.json", "w") as f:
    json.dump(results, f)

In [16]:
del candidate_pairs
del column_sets
import gc
gc.collect()

64

In [17]:
results_cols_set = set()
for r in tqdm(results):
    pair = r['pair']
    results_cols_set.add((pair['source']['uuid'], pair['source']['index']))
    results_cols_set.add((pair['target']['uuid'], pair['target']['index']))

100%|██████████| 6788570/6788570 [00:18<00:00, 374984.99it/s]


In [18]:
mongo_client = pymongo.MongoClient('mongodb://127.0.0.1:27017/')
db = mongo_client['opencanada']
inferredcolumnstats_collection = db["inferredcolumnstats"]

col_stats_dict = {}
for r in tqdm(results_cols_set):
    col = inferredcolumnstats_collection.find_one({"uuid": r[0], "index": r[1]})
    if r[0] not in col_stats_dict:
        col_stats_dict[r[0]] = {}
    col_stats_dict[r[0]][r[1]] = col

100%|██████████| 92379/92379 [00:58<00:00, 1585.65it/s]


In [19]:
mongo_client.close()

In [20]:
import math
def jaccard(intersection_size, union_size):
    # intersection size divided by union size
    return intersection_size / union_size

def cosine(intersection_size, query_size, target_size):
    # intersection size divided by square root of the product of sizes
    return intersection_size / math.sqrt(query_size * target_size)

def containment(intersection_size, query_size):
    # intersection size divided by the size of the query set
    return intersection_size / query_size

def containment_min(intersection_size, query_size, target_size):
    # intersection size divided by the minimal size of the query set and target set
    return intersection_size / min(query_size, target_size)

def containment_max(intersection_size, query_size, target_size):
    # intersection size divided by the maximal size of the query set and target set
    return intersection_size / max(query_size, target_size)

In [21]:
scores = []
for r in tqdm(results):
    query = r['pair']['source']
    target = r['pair']['target']
    query_unique_count = col_stats_dict[query['uuid']][query['index']]['unique_values_count']
    target_unique_count = col_stats_dict[target['uuid']][target['index']]['unique_values_count']
    query_count = col_stats_dict[query['uuid']][query['index']]['count']
    target_count = col_stats_dict[target['uuid']][target['index']]['count']
    intersection_size = r['intersection_size']
    union_size = r['union_size']
    
    jaccard_score = jaccard(intersection_size, union_size)
    cosine_score = cosine(intersection_size, query_unique_count, target_unique_count)
    containment_score = containment(intersection_size, query_unique_count)
    containment_min_score = containment_min(intersection_size, query_unique_count, target_unique_count)
    containment_max_score = containment_max(intersection_size, query_unique_count, target_unique_count)
    scores.append({
        "query_uuid": query['uuid'],
        "query_index": query['index'],
        "target_uuid": target['uuid'],
        "target_index": target['index'],
        "query_unique_count": query_unique_count,
        "target_unique_count": target_unique_count,
        "query_count": query_count,
        "target_count": target_count,
        "intersection_size": intersection_size,
        "union_size": union_size,
        "jaccard_score": jaccard_score,
        "cosine_score": cosine_score,
        "containment_score": containment_score,
        "containment_min_score": containment_min_score,
        "containment_max_score": containment_max_score
    })


100%|██████████| 6788570/6788570 [00:50<00:00, 134830.49it/s]


In [22]:
df = pd.DataFrame(scores)

In [23]:
df

Unnamed: 0,query_uuid,query_index,target_uuid,target_index,query_unique_count,target_unique_count,query_count,target_count,intersection_size,union_size,jaccard_score,cosine_score,containment_score,containment_min_score,containment_max_score
0,65c567f1-727c-45c5-81c2-11776037bc5c,2,47f03418-a0be-4353-aff7-bfcc756df4a1,13,12,13,12,2724,12,13,0.923077,0.960769,1.000000,1.000000,0.923077
1,65c567f1-727c-45c5-81c2-11776037bc5c,2,47f03418-a0be-4353-aff7-bfcc756df4a1,12,12,13,12,2724,1,24,0.041667,0.080064,0.083333,0.083333,0.076923
2,65c567f1-727c-45c5-81c2-11776037bc5c,3,47f03418-a0be-4353-aff7-bfcc756df4a1,13,12,13,12,2724,1,24,0.041667,0.080064,0.083333,0.083333,0.076923
3,65c567f1-727c-45c5-81c2-11776037bc5c,3,47f03418-a0be-4353-aff7-bfcc756df4a1,12,12,13,12,2724,11,14,0.785714,0.880705,0.916667,0.916667,0.846154
4,534e80eb-00e5-4178-abcb-898dbe6c6611,0,748f4ec1-0fb7-4fe2-acbe-6700f364612e,0,43,43,43,43,2,84,0.023810,0.046512,0.046512,0.046512,0.046512
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
6788565,5a3f68ac-7108-4356-87fa-51f3bf60a1fb,5,f02efc1b-ecff-4796-a33b-7f1c041050c6,2,22,15,22,15,1,36,0.027778,0.055048,0.045455,0.066667,0.045455
6788566,5a3f68ac-7108-4356-87fa-51f3bf60a1fb,5,5c8930a1-f081-42a5-98e6-11f480d03356,2,22,5,22,5,1,26,0.038462,0.095346,0.045455,0.200000,0.045455
6788567,5a3f68ac-7108-4356-87fa-51f3bf60a1fb,5,b06da925-ae21-4e7b-a9fe-890183f2af8a,5,22,15,22,15,1,36,0.027778,0.055048,0.045455,0.066667,0.045455
6788568,5a3f68ac-7108-4356-87fa-51f3bf60a1fb,5,ce54b66b-3db4-4aa8-83c7-828e6ae676f5,5,22,5,22,5,1,26,0.038462,0.095346,0.045455,0.200000,0.045455


In [24]:
df.to_csv("key_join_scores.csv", index=False)