# Working with large datasets

In [1]:
%%capture --no-display
import logging as log
log.getLogger().setLevel(log.DEBUG)

import takco
tables = takco.DaskHashBag.load(
    f'hdfs://bricks07:9000/user/kruit/tabel/1-*', 
    address = 'tcp://192.168.62.207:8786'
)
# tables = takco.HashBag.load('../../data/TabEL/10k-part/10k-part-aa')
# tables = takco.DaskHashBag.load('../../data/TabEL/75k-part/75k-part-aa')
tables.client if hasattr(tables, 'client') else None

0,1
Client  Scheduler: tcp://192.168.62.207:8786  Dashboard: http://192.168.62.207:8787/status,Cluster  Workers: 10  Cores: 120  Memory: 673.47 GB


In [2]:
%%time
tables = takco.TableSet.reshape(tables).tables
tables = takco.TableSet.coltypes(tables).tables
tables = tables.persist()
if hasattr(tables, 'bag'):
    print(tables.bag.count().compute())

INFO:root:Restructuring with rules: ()


6439
CPU times: user 55.8 ms, sys: 5.25 ms, total: 61.1 ms
Wall time: 11.1 s


In [None]:
%%time
from takco.cluster.matchers import LSHMatcher, EmbeddingMatcher, CellJaccMatcher, TypeCosMatcher
fdir = '.'
matchers = [
    CellJaccMatcher(fdir, name='headjacc', source='head'),
    LSHMatcher(fdir, name='lsh', num_perm=64),
    EmbeddingMatcher(fdir, name='emb', wordvec_fname='/export/scratch1/home/kruit/glove.6B.50d.pickle'),
]
filters = [CellJaccMatcher(fdir, name='sec', source='sectionTitle')]

clusters = takco.TableSet.cluster(
    tables,
    matcher_configs = matchers,
    filter_configs = filters,
    agg_func='max',
    agg_threshold=0.9,
    align_columns='max1',
    edge_exp=5,
    agg_threshold_col=0.5,
    keep_partition_meta=["tableHeaders", lambda x: {'tableData': x["tableData"][:10]}],
).tables.persist()

INFO:root:Numbering tables...
INFO:numexpr.utils:Note: NumExpr detected 32 cores but "NUMEXPR_MAX_THREADS" not set, so enforcing safe limit of 8.
INFO:numexpr.utils:NumExpr defaulting to 8 threads.
INFO:root:Dask offset tableIndex tableIndex 1
INFO:root:Dask offset numCols columnIndexOffset 0
INFO:root:Building matchers: headjacc, lsh, emb
INFO:root:Indexing headjacc
INFO:root:Indexing lsh
Indexing lsh: 100%|██████████| 13631/13631 [00:02<00:00, 6153.09it/s]
INFO:root:Indexing emb
DEBUG:root:faiss info: analyzing 12617 vectors of size 50
no NaN or Infs in data
11184 vectors are distinct (88.64%)
vector 209 has 86 copies
range of L2 norms=[1, 1] (0 null vectors)
vectors are normalized, inner product and L2  search are equivalent
matrix contains no 0s
no constant dimensions
no dimension has a too large mean
stddevs per dimension are in [0.0816254 0.274201]

INFO:root:Indexing sec
INFO:root:Blocking tables; computing and aggregating column sims...
INFO:root:Got 21937 table similarities
IN

In [4]:
nontrivial_clusters = [t for t in clusters if t.get("partColAligns")]

In [5]:
t = nontrivial_clusters[2]
takco.preview(t["partColAligns"])

?,0,1,2,3
Unnamed: 0_level_1,Country,Date,Format,Label
,United Kingdom,9 September 2002,"CD single , Compact Cassette",V2 Records

?,0,1,2,3,4,5,6,7,8
Unnamed: 0_level_1,Country,Date,Unnamed: 3_level_1,Unnamed: 4_level_1,Format,Label,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
,United Kingdom,9 September 2002,,,"CD single , Compact Cassette",V2 Records,,,
,,,Reign,The reign number for the specific set of wrestlers listed,,,,,
,,,Event,The event in which the title was won,,,,,
,,,—,Used for vacated reigns so as not to count it as an official reign,,,,,
,,,,The information is not available or is unknown,,,,,

?,0,1,2,3,4,5,6,7,8
Unnamed: 0_level_1,Country,Date,#,Order in reign history,Format,Label,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
,United Kingdom,9 September 2002,,,"CD single , Compact Cassette",V2 Records,,,
,,,Reign,The reign number for the specific set of wrestlers listed,,,,,
,,,Event,The event in which the title was won,,,,,
,,,—,Used for vacated reigns so as not to count it as an official reign,,,,,
,,,,The information is not available or is unknown,,,,,

?,0,1,2,3,4,5,6,7,8
Unnamed: 0_level_1,Location:	Country,Date:	Date,#,Notes:	Order in reign history,Format,Label,#:,Name(s):,Reigns:
,United Kingdom,9 September 2002,,,"CD single , Compact Cassette",V2 Records,,,
,,,Reign,The reign number for the specific set of wrestlers listed,,,,,
,,,Event,The event in which the title was won,,,,,
,,,—,Used for vacated reigns so as not to count it as an official reign,,,,,
,,,,The information is not available or is unknown,,,,,


## All steps separately

In [None]:
%%capture --no-display
import logging as log
log.getLogger().setLevel(log.INFO)

import takco
tables = takco.HashBag.load('../../data/TabEL/10k-part/10k-part-aa')
tables.client if hasattr(tables, 'client') else None

In [16]:
%%time
from takco.cluster.matchers import LSHMatcher, EmbeddingMatcher, CellJaccMatcher
from takco import cluster
fdir = '.'
matchers = [
    CellJaccMatcher(fdir, name='headjacc', source='head'),
    LSHMatcher(fdir, name='lsh', num_perm=64),
    EmbeddingMatcher(fdir, name='emb', wordvec_fname='/export/scratch1/home/kruit/glove.6B.50d.pickle'),
    CellJaccMatcher(fdir, name='sec', source='sectionTitle'),
]

tables = takco.TableSet.number_table_columns(tables).persist()
matchers = tables.pipe(cluster.matcher_add_tables, matchers)
matchers = list(matchers.fold(lambda x: x.name, lambda a, b: a.merge(b)))

for m in matchers:
    print(m.name)
    m.index()

INFO:root:Numbering tables...
DEBUG:root:Opening ../../data/TabEL/10k-part/10k-part-aa
DEBUG:root:Serial cumsum tableIndex tableIndex 1 -> 198
DEBUG:root:Serial cumsum numCols columnIndexOffset 0 -> 910
DEBUG:root:Piping matcher_add_tables ...
INFO:root:Loading word vectors /export/scratch1/home/kruit/glove.6B.50d.pickle
Loading tables into matchers: 198it [00:00, 242.43it/s]
Indexing lsh: 100%|██████████| 647/647 [00:00<00:00, 8045.81it/s]
DEBUG:root:faiss info: analyzing 605 vectors of size 50
no NaN or Infs in data
567 vectors are distinct (93.72%)
vector 2 has 12 copies
range of L2 norms=[1, 1] (0 null vectors)
vectors are normalized, inner product and L2  search are equivalent
matrix contains no 0s
no constant dimensions
no dimension has a too large mean
stddevs per dimension are in [0.0807233 0.25964]



headjacc
lsh
emb
sec
CPU times: user 2.53 s, sys: 321 ms, total: 2.85 s
Wall time: 2.85 s


In [None]:
# Look at a block
ti = 0
block = set()
matcher = matchers[1] # LSH matcher
print(matcher.name)
with matcher:
    block |= set(matcher.block(ti, tableid_colids[ti]))
print(f'Got block of size {len(block)}:', block)

# First table is query, rest is block
takco.preview([i_table[ti]] + [i_table[b] for b in block if b in i_table])

In [17]:
%%time
tableid_colids = dict(tables.pipe(cluster.get_table_ids))
print(len(tableid_colids))

DEBUG:root:Piping get_table_ids ...


198
CPU times: user 325 ms, sys: 702 µs, total: 325 ms
Wall time: 325 ms


In [18]:
%%time
import pandas as pd

tablesim = pd.concat(tables.pipe(
    cluster.get_tablesims,
    matchers=matchers,
    filter_matcher_names=['sec'],
    agg_func='max',
    agg_threshold=0.9,
    align_columns='max1',
    tableid_colids=tableid_colids,
))
tablesim

DEBUG:root:Piping get_tablesims ...
DEBUG:root:Loading <takco.cluster.matchers.lsh.LSHMatcher object at 0x7f2794631250> from disk...
INFO:root:Loading word vectors /export/scratch1/home/kruit/glove.6B.50d.pickle
DEBUG:root:Loading <takco.cluster.matchers.embedding.EmbeddingMatcher object at 0x7f2795fe5bd0> from disk...
DEBUG:root:Preparing block for matcher headjacc
DEBUG:root:Preparing block for matcher lsh
DEBUG:root:Preparing block for matcher emb
DEBUG:root:Querying emb faiss index with query matrix of shape (605, 50)
Blocking: 100%|██████████| 198/198 [00:00<00:00, 1822.86it/s]
DEBUG:root:Found 4847 pairs; 24 ± 23 per table
Looking up sec: 100%|██████████| 4847/4847 [00:00<00:00, 139013.77it/s]
DEBUG:root:Filtered down to 410 pairs
Looking up headjacc: 100%|██████████| 410/410 [00:00<00:00, 8841.24it/s]
Looking up lsh: 100%|██████████| 410/410 [00:00<00:00, 58388.72it/s]
DEBUG:root:Calculating 5262 lsh scores
Yielding lsh: 100%|██████████| 5262/5262 [00:00<00:00, 751308.13it/s]
Lo

CPU times: user 4.31 s, sys: 663 ms, total: 4.97 s
Wall time: 1.7 s


ti1  ti2
1    55     1.000000
     67     1.000000
     69     1.000000
     78     1.000000
     106    1.000000
              ...   
189  22     0.919705
     87     0.944925
     168    1.000000
195  86     0.989103
     133    0.941177
Length: 182, dtype: float64

In [19]:
%%time
itups = ((ti,ti) for ti in tableid_colids)
ii = pd.MultiIndex.from_tuples(itups, names=['ti1', 'ti2'])
tablesim = pd.concat([tablesim, pd.Series(1, index=ii)])

CPU times: user 5.48 ms, sys: 91 µs, total: 5.57 ms
Wall time: 4.91 ms


In [20]:
%%time
edge_exp = 5
louvain_partition = cluster.louvain(tablesim, edge_exp=edge_exp)
print(len(louvain_partition))

INFO:root:Created graph IGRAPH U-W- 198 380 --  + attr: weight (e)


163
CPU times: user 25.4 ms, sys: 4.88 ms, total: 30.3 ms
Wall time: 30 ms


In [10]:
nonsingle = [p for p in louvain_partition if len(p) > 1]
len(nonsingle)

15

In [11]:
import logging as log
log.getLogger().setLevel(log.WARN)

chunks = tables.new(enumerate(nonsingle)).pipe(
    cluster.cluster_partition_columns,
    tableid_colids = tableid_colids,
    matchers = matchers,
    agg_func = 'max',
    agg_threshold_col = 0.5,
)
from collections import ChainMap

ti_pi, pi_ncols, ci_pci, ti_colsim = (
    {k: v for d in ds for k, v in d.items()} for ds in zip(*chunks)
)
len(ti_pi), len(pi_ncols), len(ci_pci)

DEBUG:root:Piping cluster_partition_columns ...
DEBUG:root:Loading <takco.cluster.matchers.lsh.LSHMatcher object at 0x7f988fc97810> from disk...
INFO:root:Loading word vectors /export/scratch1/home/kruit/glove.6B.50d.pickle
DEBUG:root:Loading <takco.cluster.matchers.embedding.EmbeddingMatcher object at 0x7f988fc97650> from disk...
Matching with headjacc:   0%|          | 0/78 [00:00<?, ?it/s]
Looking up headjacc:   0%|          | 0/78 [00:00<?, ?it/s][A
Matching with headjacc: 100%|██████████| 78/78 [00:00<00:00, 7528.61it/s]
Looking up headjacc: 100%|██████████| 78/78 [00:00<00:00, 8679.02it/s]
Matching with lsh:   0%|          | 0/78 [00:00<?, ?it/s]
Looking up lsh:   0%|          | 0/78 [00:00<?, ?it/s][A
Matching with lsh: 100%|██████████| 78/78 [00:00<00:00, 12591.63it/s]
Looking up lsh: 100%|██████████| 78/78 [00:00<00:00, 16795.30it/s]
DEBUG:root:Calculating 489 lsh scores
Yielding lsh: 100%|██████████| 489/489 [00:00<00:00, 674454.01it/s]
Matching with emb:   0%|          | 0

Looking up lsh: 100%|██████████| 3/3 [00:00<00:00, 1057.30it/s]
DEBUG:root:Calculating 127 lsh scores
Yielding lsh: 100%|██████████| 127/127 [00:00<00:00, 354408.92it/s]
Matching with emb:   0%|          | 0/3 [00:00<?, ?it/s]
Looking up emb:   0%|          | 0/3 [00:00<?, ?it/s][A
Matching with emb: 100%|██████████| 3/3 [00:00<00:00, 778.12it/s]
Looking up emb: 100%|██████████| 3/3 [00:00<00:00, 1094.55it/s]
DEBUG:root:Calculating 93 emb scores
Yielding emb: 100%|██████████| 93/93 [00:00<00:00, 319729.73it/s]
DEBUG:root:Creating colsim dataframe
DEBUG:root:Clustering (14, 14) column similarities
DEBUG:root:Partition 14 has 2 tables and 8 column clusters


(50, 15, 244)

In [61]:
clusters = tables.pipe(
    cluster.set_partition_columns, ti_pi, pi_ncols, ci_pci
).fold(
    lambda t: t["_id"],
    lambda a, b: cluster.merge_partition_tables(
        a,
        b,
        keep_partition_meta=["tableHeaders", lambda x: {'tableData': x["tableData"][:10]}],
    ),
).persist()

DEBUG:root:Piping set_partition_columns ...


In [63]:
t = [t for t in clusters if t.get("partColAligns")][0]
takco.preview(t["partColAligns"])

?,0,1,2
Unnamed: 0_level_1,Unnamed: 1_level_1,Source,Rating
,Review scores,Allmusic,link
,Review scores,Entertainment Weekly,(B) link

?,0,1,2
Unnamed: 0_level_1,Unnamed: 1_level_1,Source,Rating
,Review scores,Allmusic,link
,Review scores,Entertainment Weekly,(B) link
,Review scores,Allmusic,link

?,0,1,2
Unnamed: 0_level_1,Unnamed: 1_level_1,Source,Rating
,Review scores,Allmusic,link
,Review scores,Entertainment Weekly,(B) link
,Review scores,Allmusic,link
,Review scores,Allmusic,

?,0,1,2
Unnamed: 0_level_1,Unnamed: 1_level_1,Source,Rating
,Review scores,Allmusic,link
,Review scores,Entertainment Weekly,(B) link
,Review scores,Allmusic,link
,Review scores,Allmusic,
,Review scores,Allmusic,

?,0,1,2
Unnamed: 0_level_1,Unnamed: 1_level_1,Source,Rating
,Review scores,Allmusic,link
,Review scores,Entertainment Weekly,(B) link
,Review scores,Allmusic,link
,Review scores,Allmusic,
,Review scores,Allmusic,

?,0,1,2
Unnamed: 0_level_1,Unnamed: 1_level_1,Source,Rating
,Review scores,Allmusic,link
,Review scores,Entertainment Weekly,(B) link
,Review scores,Allmusic,link
,Review scores,Allmusic,
,Review scores,Allmusic,

?,0,1,2
Unnamed: 0_level_1,Unnamed: 1_level_1,Source,Rating
,Review scores,Allmusic,link
,Review scores,Entertainment Weekly,(B) link
,Review scores,Allmusic,link
,Review scores,Allmusic,
,Review scores,Allmusic,

?,0,1,2
Unnamed: 0_level_1,Unnamed: 1_level_1,Source,Rating
,Review scores,Allmusic,link
,Review scores,Entertainment Weekly,(B) link
,Review scores,Allmusic,link
,Review scores,Allmusic,
,Review scores,Allmusic,

?,0,1,2
Unnamed: 0_level_1,Unnamed: 1_level_1,Source,Rating
,Review scores,Allmusic,link
,Review scores,Entertainment Weekly,(B) link
,Review scores,Allmusic,link
,Review scores,Allmusic,
,Review scores,Allmusic,

?,0,1,2
Unnamed: 0_level_1,Unnamed: 1_level_1,Source,Rating
,Review scores,Allmusic,link
,Review scores,Entertainment Weekly,(B) link
,Review scores,Allmusic,link
,Review scores,Allmusic,
,Review scores,Allmusic,
