In [1]:
import sys

In [2]:
sys.path.append('../..')

In [3]:
from datetime import datetime
from pathlib import Path

In [4]:
from app import db

In [5]:
import pandas as pd

In [6]:
chunksize = 5000000
now = datetime.utcnow()
now_str = f"{now:%Y%m%dT%H%M%S}"
datadir_base = Path(f'../data/works_ids_export_{now_str}')
if not datadir_base.exists():
    datadir_base.mkdir()

In [7]:
%%time
outdir_works = datadir_base.joinpath('work_not_merged')
if not outdir_works.exists():
    outdir_works.mkdir()
sq = 'select paper_id from mid.work where merge_into_id is null'
rowcount = 0
for i, _df in enumerate(pd.read_sql_query(sq, db.engine, chunksize=chunksize)):
    _df.to_csv(outdir_works.joinpath(f'work_paper_ids_{i:03}.csv'))
    rowcount += len(_df)
print(f"rowcount: {rowcount}")

rowcount: 249714645
CPU times: user 6min 12s, sys: 27.8 s, total: 6min 40s
Wall time: 29min 6s


In [8]:
%%time
outdir_rt = datadir_base.joinpath('recordthresher_record')
if not outdir_rt.exists():
    outdir_rt.mkdir()
sq = 'select work_id, count(*) as num_rows_in_rt from ins.recordthresher_record where work_id is not null and work_id > 0 group by work_id'
rowcount = 0
for i, _df in enumerate(pd.read_sql_query(sq, db.engine, chunksize=chunksize)):
    _df.to_csv(outdir_rt.joinpath(f'recordthresher_record_work_ids_counts_{i:03}.csv'))
    rowcount += len(_df)
print(f"rowcount: {rowcount}")

In [None]:
_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2928250 entries, 0 to 2928249
Data columns (total 2 columns):
 #   Column          Dtype
---  ------          -----
 0   work_id         int64
 1   num_rows_in_rt  int64
dtypes: int64(2)
memory usage: 44.7 MB


In [None]:
%%time
df_works = pd.concat([pd.read_csv(fp, index_col=0) for fp in outdir_works.glob('work_paper_ids_*.csv')])
df_works.info()

<class 'pandas.core.frame.DataFrame'>
Index: 249531500 entries, 0 to 4999999
Data columns (total 1 columns):
 #   Column    Dtype
---  ------    -----
 0   paper_id  int64
dtypes: int64(1)
memory usage: 3.7 GB
CPU times: user 20.1 s, sys: 677 ms, total: 20.8 s
Wall time: 20.8 s


In [None]:
%%time
df_rt = pd.concat([pd.read_csv(fp, index_col=0) for fp in outdir_rt.glob('recordthresher_record_work_ids_counts_*.csv')])
df_rt.info()

<class 'pandas.core.frame.DataFrame'>
Index: 247928250 entries, 0 to 4999999
Data columns (total 2 columns):
 #   Column          Dtype
---  ------          -----
 0   work_id         int64
 1   num_rows_in_rt  int64
dtypes: int64(2)
memory usage: 5.5 GB
CPU times: user 23.2 s, sys: 976 ms, total: 24.2 s
Wall time: 24.2 s


In [12]:
df_rt['num_rows_in_rt'].value_counts()

num_rows_in_rt
1      190778192
2       45114558
3        6873185
4        1924158
5         888081
         ...    
123            1
169            1
146            1
182            1
278            1
Name: count, Length: 237, dtype: int64

In [13]:
df_works['paper_id'].duplicated().value_counts()

paper_id
False    249531500
Name: count, dtype: int64

In [14]:
df_rt['work_id'].duplicated().value_counts()

work_id
False    247928250
Name: count, dtype: int64

In [15]:
workjoin = df_works.rename(columns={'paper_id': 'work_id'}).merge(df_rt, how='left', on='work_id')
workjoin['num_rows_in_rt'] = workjoin['num_rows_in_rt'].fillna(value=0)

In [16]:
workjoin['num_rows_in_rt'].value_counts()

num_rows_in_rt
1.0       190552349
2.0        44958232
3.0         6791151
0.0         2198872
4.0         1875158
            ...    
930.0             1
646.0             1
1398.0            1
270.0             1
188.0             1
Name: count, Length: 238, dtype: int64

In [17]:
%%time
tbldata = workjoin[workjoin['num_rows_in_rt']==0][['work_id']]
tbldata.to_sql(
    name="tmp_work_ids_not_in_recordthresher",
    con=db.engine,
    schema="public",
    if_exists="replace",
    index=False,
    method="multi",
)

CPU times: user 23.9 s, sys: 526 ms, total: 24.5 s
Wall time: 43.8 s


2198872