In [1]:
import json
import numpy as np
import collections
import copy
from os import listdir
from os.path import isfile, join

In [24]:
import pickle

In [2]:
import findspark
findspark.init()
from pyspark import SparkContext
import pyspark
conf = pyspark.SparkConf().setAll([('spark.executor.memory', '8g'), ('spark.executor.cores', '2'),('spark.executor.instances','7'), ('spark.driver.memory','32g'), ('spark.driver.maxResultSize','10g')])
sc = SparkContext(conf=conf)

In [3]:
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, FloatType, StringType
from pyspark.sql.types import Row
from pyspark.sql import SparkSession
spark = SparkSession(sc)

In [4]:
def convert_ndarray_back(x):
    x['entityCell'] = np.array(x['entityCell'])
    return x
data_dir = "../../data/"
train_tables = sc.textFile(data_dir+"train_tables.jsonl").map(lambda x:convert_ndarray_back(json.loads(x.strip())))

In [40]:
def get_table_entity(x):
    entities = []
    valid_rows = set()
    for i,j in zip(*x['entityCell'].nonzero()):
        entities.append(Row(
            t_id=x["_id"],
            entity=x['tableData'][i][j]['surfaceLinks'][0]['target']['id'],
            c_id=int(j),
            c_name=x["processed_tableHeaders"][j],
            r_id=int(i)
        ))
        valid_rows.add(i)
#     for i in valid_rows:
#         if x['pgId']!=-1:
#             entities.append(Row(
#                 t_id=x["_id"],
#                 entity=x['pgId'],
#                 c_id=-1,
#                 c_name='[TITLE]',
#                 r_id=int(i)
#             ))
    return entities

In [9]:
from operator import add

In [43]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover

In [41]:
entity_df = spark.createDataFrame(train_tables.flatMap(get_table_entity))

In [42]:
entity_df.show()

+----+--------------------+--------+----+----------+
|c_id|              c_name|  entity|r_id|      t_id|
+----+--------------------+--------+----+----------+
|   0|    military offices|27282555|   0|27281853-1|
|   1|    military offices|  980324|   0|27281853-1|
|   2|    military offices|27279970|   0|27281853-1|
|   0|              office|  450099|   0|   27282-1|
|   1|                name|  610889|   0|   27282-1|
|   2|               party|34904709|   0|   27282-1|
|   0|              office| 1702543|   1|   27282-1|
|   1|                name|40442126|   1|   27282-1|
|   2|               party|34904709|   1|   27282-1|
|   0|catholic church t...|23867939|   0|27282227-1|
|   1|catholic church t...|23866167|   0|27282227-1|
|   2|catholic church t...| 5496340|   0|27282227-1|
|   0|catholic church t...|  429187|   1|27282227-1|
|   0|    military offices|27283377|   0|27282555-1|
|   1|    military offices|  980324|   0|27282555-1|
|   2|    military offices|27281853|   0|27282

In [43]:
row_e2e = entity_df.selectExpr('r_id','t_id','entity as e1').filter(F.col('c_id')==0).join(\
                entity_df.withColumnRenamed('entity','e2').filter(F.col('c_id')!=0),\
                ['r_id','t_id'],'inner')\

In [44]:
row_e2e.show()

+----+----------+--------+----+--------------------+--------+
|r_id|      t_id|      e1|c_id|              c_name|      e2|
+----+----------+--------+----+--------------------+--------+
|   0|10025846-1| 2249031|   1|     destinations |-|10025461|
|   0|10046644-1|10046542|   1|frederick casimir...|21446013|
|   0|10046644-1|10046542|   2|frederick casimir...|10047700|
|   0|1008653-20|   25734|   1|  capital ( exonym )|   57648|
|   0|10164201-1|24687388|   1|church of england...|32970032|
|   0|10164201-1|24687388|   2|church of england...| 6946578|
|   0|10207079-1| 2116639|   1|united states sta...|   75799|
|   0|10207079-1| 2116639|   2|united states sta...| 1192200|
|   0| 1024241-4|  336256|   1|             country|   31717|
|   0| 1030221-3| 5081226|   3|               owner|  627707|
|   0| 1030221-3| 5081226|   4|              format|   30077|
|   0|10314003-1|11465230|   1|                 lga| 7870335|
|   0|10392544-2|11379937|   1|              league|    2357|
|   0|10

In [55]:
h2h = row_e2e.selectExpr('t_id as t1','e1','e2','c_name as h1').join(\
            row_e2e.selectExpr('t_id as t2','e1','e2','c_name as h2'),\
            ['e1','e2'],'inner').select('t1','t2','h1','h2').dropDuplicates().groupBy(['h1','h2']).agg(F.count('t1').alias('count'))\
            .filter(F.col('h1')!=F.col('h2'))

In [56]:
with open("../../data/n_h2h.pkl","wb") as f:
    h2h_local = {}
    for h1,h2,count in h2h.rdd.map(lambda x:(x['h1'],x['h2'],x['count'])).collect():
        if h1 not in h2h_local:
            h2h_local[h1] = {}
        h2h_local[h1][h2] = count
    pickle.dump(h2h_local, f)

In [38]:
row_e2e = row_e2e.rdd.map(lambda x:(x["e1"],[[x["t_id"],x["c_id"],x["c_name"],x["e2"]]])).reduceByKey(add)

In [39]:
with open("../../data/e2e_row.json","w") as f:
    json.dump(dict(row_e2e.collect()), f, indent=2)

In [33]:
row_e2e.take(2)

[(1192200,
  [['10207079-1',
    0,
    'united states statistical areas in the stateofmichigan',
    2116639],
   ['10207079-1',
    1,
    'united states statistical areas in the stateofmichigan',
    75799]]),
 (228600,
  [['14513849-1', 0, 'academic offices', 24912434],
   ['14513849-1', 2, 'academic offices', 2906014],
   ['24892630-1', 0, 'academic offices', 24939220],
   ['24892630-1', 2, 'academic offices', 24912434],
   ['3119216-1', 0, 'academic offices', 2906014],
   ['3119216-1', 2, 'academic offices', 11602165],
   ['13264837-2', 0, 'name', 516566],
   ['78455-1', 1, 'location and population', 138333],
   ['78455-1', 7, 'nickname', 3475181],
   ['17690519-1', 1, 'location', 138333],
   ['17690519-1', 5, 'nickname', 3475181],
   ['9839991-1', 3, 'player name', 7361593],
   ['9839991-1', 4, 'position', 492475],
   ['31641596-1', 0, 'year', 25085059],
   ['31641596-1', 1, 'player', 4806366],
   ['16419215-1', 1, 'location', 138333],
   ['16419215-1', 5, 'nickname', 3475181],


In [29]:
e2column = entity_df.rdd.map(lambda x:(x["entity"],[[x["t_id"],x["c_id"]]])).reduceByKey(add)
table_column2e = entity_df.rdd.map(lambda x:('%s-%d'%(x["t_id"],x["c_id"]),[x["entity"]])).reduceByKey(add)

In [30]:
with open("../../data/e2column.json","w") as f:
    json.dump(dict(e2column.collect()), f, indent=2)
with open("../../data/table_column2e","w") as f:
    json.dump(dict(table_column2e.collect()), f, indent=2)

In [32]:
table_column2e.take(10)

[('27282-1-2', [34904709, 34904709]),
 ('27283077-9-0', [4061083]),
 ('27283867-1-0',
  [612068, 24663262, 1690426, 2062331, 2026115, 1464381, 2312196, 28282542]),
 ('27285439-1-1', [23110304]),
 ('2728567-1-0', [25616606, 4736148, 2728010]),
 ('27286369-3-0', [16953666, 13213577, 12553843]),
 ('27290345-2-1',
  [5364159,
   31954112,
   9425158,
   9332010,
   9110148,
   3253971,
   23459693,
   26138114,
   28236220,
   22814941,
   18894094,
   51016932,
   28234487]),
 ('2729110-1-0', [371823, 10053201]),
 ('2729110-1-1', [164671, 164671]),
 ('27292530-1-2', [41512237, 9854423])]

In [34]:
e2column.take(10)

[(2612640, [['27283877-1', 0]]),
 (92360,
  [['27283877-1', 1],
   ['36903750-6', 2],
   ['36903750-6', 2],
   ['37928592-2', 3],
   ['37928592-2', 3],
   ['8113877-2', 1],
   ['26824095-2', 1],
   ['956879-5', 2],
   ['23780861-5', 2],
   ['38036637-1', 2],
   ['31311901-3', 2],
   ['35831830-2', 3],
   ['5112830-1', 3],
   ['36873469-2', 3],
   ['22126529-2', 2],
   ['36147867-2', 3],
   ['36349189-8', 2],
   ['9290308-3', 3],
   ['18833914-4', 2],
   ['18833914-4', 2],
   ['18833914-4', 2],
   ['18833914-4', 2],
   ['18833914-4', 2],
   ['36086750-6', 2],
   ['33801409-2', 3],
   ['33801409-2', 3],
   ['3522778-1', 1],
   ['28796362-2', 2],
   ['28796362-2', 2],
   ['28796362-2', 2],
   ['28796362-2', 2],
   ['28796362-2', 2],
   ['28796362-2', 2],
   ['28796362-2', 2],
   ['28796362-2', 2],
   ['28796362-2', 2],
   ['28796362-2', 2],
   ['28796362-2', 2],
   ['3869688-1', 3],
   ['36473008-2', 3],
   ['4534190-2', 2],
   ['18696349-1', 1],
   ['11593393-3', 1],
   ['35430791-4', 3]