In [1]:
import numpy as np
import pandas as pd

import pyspark.sql.types as T
import pyspark.sql.functions as F
from collections import Counter

sqlContext.setConf("spark.sql.shuffle.partitions", 30)

## Prepare pmc_pair data


```python
pmc_pair_schema = T.StructType([
    T.StructField("source_id", T.IntegerType(), False),
    T.StructField("sink_id", T.IntegerType(), False),
])
df_pmc_pair = sqlContext.read.csv(
    "data/pmc_pair.txt",
    sep="\t",
    header=True,
    schema=pmc_pair_schema)
df_pmc_pair.write.parquet("out/pmc_pair.parquet")
```

In [2]:
df = sqlContext.read.parquet("out/Training_2002_2005.parquet")

In [3]:
df_pmc_pair = sqlContext.read.parquet("out/pmc_pair.parquet")
df_pmc_pair.head()

Row(source_id=3114744, sink_id=3100643)

In [4]:
df_joined = df.join(df_pmc_pair, ["source_id", "sink_id"], how="inner")

In [5]:
df_joined.head(3)

[Row(source_id=11675395, sink_id=8663607, source_year=2002, source_j=u'J Biol Chem', source_n_mesh=9, source_n_mesh_ex=21, source_is_eng=1, source_country=u'USA', source_is_journal=1, source_is_review=0, source_is_case_rep=0, source_is_let_ed_com=0, source_T_novelty=24, source_V_novelty=6916, source_PT_novelty=2, source_PV_novelty=2, source_ncites=60, source_n_authors=4, sink_year=1996, sink_j=u'J Biol Chem', sink_n_mesh=12, sink_n_mesh_ex=47, sink_is_eng=1, sink_is_journal=1, sink_is_review=0, sink_is_case_rep=0, sink_is_let_ed_com=0, sink_T_novelty=28, sink_V_novelty=407, sink_PT_novelty=0, sink_PV_novelty=1, sink_n_authors=4, year_span=6, journal_same=1, mesh_sim=0.23529411852359772, title_sim=0.3442651927471161, lang_sim=1.0, affiliation_sim=1.0, pubtype_sim=0.6000000238418579, cite_sim=0.0714285746216774, author_sim=0.1428571492433548, gender_sim=0.9036961197853088, eth_sim=0.861640453338623, n_common_authors=1, auid=u'10662773_3', gender=u'-', eth1=u'CHINESE', eth2=u'UNKNOWN', po

In [6]:
df_joined.count()

26880634

In [7]:
df.count()

216570270

In [8]:
df_joined.write.csv("out/Training_data_2002_2005_pmc_pair_txt", sep="\t", header='false')

In [9]:
df_joined.columns

['source_id',
 'sink_id',
 'source_year',
 'source_j',
 'source_n_mesh',
 'source_n_mesh_ex',
 'source_is_eng',
 'source_country',
 'source_is_journal',
 'source_is_review',
 'source_is_case_rep',
 'source_is_let_ed_com',
 'source_T_novelty',
 'source_V_novelty',
 'source_PT_novelty',
 'source_PV_novelty',
 'source_ncites',
 'source_n_authors',
 'sink_year',
 'sink_j',
 'sink_n_mesh',
 'sink_n_mesh_ex',
 'sink_is_eng',
 'sink_is_journal',
 'sink_is_review',
 'sink_is_case_rep',
 'sink_is_let_ed_com',
 'sink_T_novelty',
 'sink_V_novelty',
 'sink_PT_novelty',
 'sink_PV_novelty',
 'sink_n_authors',
 'year_span',
 'journal_same',
 'mesh_sim',
 'title_sim',
 'lang_sim',
 'affiliation_sim',
 'pubtype_sim',
 'cite_sim',
 'author_sim',
 'gender_sim',
 'eth_sim',
 'n_common_authors',
 'auid',
 'gender',
 'eth1',
 'eth2',
 'pos',
 'pos_nice',
 'sink_last_ncites',
 'sink_prev_ncites',
 'auth_last_npapers',
 'auth_prev_papers',
 'jj_sim',
 'is_self_cite']

In [11]:
with open("out/Training_data_2002_2005_pmc_pair_txt.header.txt", "w+") as fp:
    print >> fp, "\t".join(df_joined.columns)
    
! head out/Training_data_2002_2005_pmc_pair_txt.header.txt

source_id	sink_id	source_year	source_j	source_n_mesh	source_n_mesh_ex	source_is_eng	source_country	source_is_journal	source_is_review	source_is_case_rep	source_is_let_ed_com	source_T_novelty	source_V_novelty	source_PT_novelty	source_PV_novelty	source_ncites	source_n_authors	sink_year	sink_j	sink_n_mesh	sink_n_mesh_ex	sink_is_eng	sink_is_journal	sink_is_review	sink_is_case_rep	sink_is_let_ed_com	sink_T_novelty	sink_V_novelty	sink_PT_novelty	sink_PV_novelty	sink_n_authors	year_span	journal_same	mesh_sim	title_sim	lang_sim	affiliation_sim	pubtype_sim	cite_sim	author_sim	gender_sim	eth_sim	n_common_authors	auid	gender	eth1	eth2	pos	pos_nice	sink_last_ncites	sink_prev_ncites	auth_last_npapers	auth_prev_papers	jj_sim	is_self_cite
