# db2

In [1]:
from ipynb.fs.full.koselleck import *
import shelve
PERIOD_LENS={5,20,70}
MAX_RUNS=10

## Model definitions

In [44]:
# Use sqlite dictionary
def get_db_sqlitedict(prefix,folders=[],autocommit=False,
                      mode='c'):
    o=[PATH_DB] + folders + [f'db.kos2.{prefix}.sqlite']
    ofnfn=os.path.join(*o)
    if not os.path.exists(ofnfn): mode='c'
    return SqliteDict(
        ofnfn,
        tablename='data',
        autocommit=autocommit,
        flag='r' if mode=='r' else 'c',
        timeout=30
    )

In [10]:
# SqliteDict??

In [11]:
# !pip install -U git+https://github.com/RaRe-Technologies/sqlitedict

In [12]:
def get_db_shelf(name=None,mode='r'):
    fn=FN_DB if not name else FN_DB.replace('.shelf',f'.{name}.shelf')
    return shelve.open(fn,flag=mode)

In [45]:
def get_db(name=None,mode='r'):
    return get_db_sqlitedict(
        name,
        autocommit=mode=='w',
        mode=mode
    )

In [14]:
with get_db('testing','w') as db:
    db['test']=[1,2,3]

In [15]:
with get_db('testing') as db:
    print(db['test'])

[Koselleck] (09:15:51) [1, 2, 3] (+28.0s)


In [16]:
def db_split_key(key):
    if key.startswith('/'): key=key[1:]
    prefix,key=key.split('/',1)
    return prefix,key
def db_get_keys(prefix):
    with get_db(prefix,mode='r') as db: return set(db.keys())
def db_has_key(key,prefix=None):
    if prefix is None: prefix,key=db_split_key(key)
    with get_db(prefix,mode='r') as db: return key in db
def dbget(key,default=pd.DataFrame(),prefix=None):
    if prefix is None: prefix,key=db_split_key(key)
    with get_db(prefix,mode='r') as db:
        return db.get(key,default)
def dbput(key,val,prefix=None):
    if prefix is None:prefix,key=db_split_key(key)
    with get_db(prefix,mode='c') as db:
        db[key]=val

In [17]:
# dbput('/testing/hello',[2523325235])

In [18]:
# db_get_keys('testing')

In [19]:
# dbget('/testing/hello')

In [20]:
# dbget('/test/hello')
# dbput('/test/hello','goodbye')

In [21]:
# Import into its own shelf
# with shelve.open('/home/ryan/db/db.koselleck7.shelve') as db1:
#     keys=list(db1.keys())
#     dkeys=[x for x in keys if x.startswith('/dists/')]
#     for k in tqdm(dkeys):
#         kdf=db1[k]
#         dbput(k, kdf)

In [22]:
# db_get_keys('vecs')

In [23]:
# len(db_get_keys('vecs'))

In [24]:
# ls -ltrh ~/db/

In [25]:
# dbput('test',pd.Series([1,2]))

## Vectors

In [26]:
dfmodels_all = get_pathdf_models(period_len=None)
dfmodels_all['run_int'] = dfmodels_all['run'].apply(lambda x: int(x.split('_')[-1]))
dfmodels=dfmodels_all[dfmodels_all.period_len.isin(PERIOD_LENS)].query(f'run_int<={MAX_RUNS}')
# dfmodels

Scanning directory for models: 3901it [00:02, 1655.58it/s]


In [27]:
def do_ingest_vecs(row,only_valid_words=True):
    mpath=row.path
    m=load_model(mpath)
    data=m.wv.vectors
    keys=[m.wv.index_to_key[i] for i in range(len(data))]
    res=pd.DataFrame(data, index=keys)
    if only_valid_words:
        vwords=set(get_valid_words())
        res=res.loc[set(res.index) & vwords]
    return res

def ingest_vecs(dfmodels, only_valid_words=True,num_proc=1,force=False):
    with get_db('vecs',mode='a') as db:
        df=dfmodels.assign(qstr=[
            f'{row.corpus}/{row.period}/{row.run_int}'
            for i,row in dfmodels.iterrows()
        ])
        done=set(db.keys())
        if not force: df=df[~df.qstr.isin(done)]
        objs=[row for i,row in df.iterrows()]
        iterr=pmap_iter(do_ingest_vecs, objs, num_proc=num_proc)
        for i,(row,qdf) in enumerate(zip(objs,iterr)):
            db[row.qstr]=qdf
            if i and not i%10: db.commit()
        db.commit()

In [31]:
# !rm /home/ryan/db/db.kos2.vecs.sqlite
# ingest_vecs(dfmodels,num_proc=4)

In [32]:
def vecs(period,run=1,corpus=DEFAULT_CORPUS):
    qstr=f'/vecs/{corpus}/{period}/{run}'
    return dbget(qstr,pd.DataFrame())

In [33]:
# vecs('1820-1840',1)

## Distances

In [34]:
def vecs2dist(dfvecs,only_valid_words=True,lim=None):
    if only_valid_words:
        dfvecs=dfvecs.loc[
            set(dfvecs.index)&set(get_valid_words())
        ]
    if lim: dfvecs=dfvecs[:lim]
    dfsim=pd.DataFrame(
        fastdist.cosine_pairwise_distance(
            dfvecs.values.astype(float),
            return_matrix=True
        ),
        index=dfvecs.index,
        columns=dfvecs.index
    )
    dfdist=1-dfsim
    return dfdist

In [35]:
# X=vecs(random.choice(get_default_periods()))
# X

In [36]:
# o=vecs2dist(X)
# o

In [37]:
def do_ingest_dists(dfperiod,progress=False,max_runs=10):
    df=None
    dfperiod=dfperiod[dfperiod.run_int<=max_runs]
    iterr=zip(dfperiod.corpus, dfperiod.period, dfperiod.run_int)
    if progress:
        iterr=tqdm(iterr,total=len(dfperiod),position=0)
    for corpus,period,run in iterr:
        dfvecs=vecs(corpus=corpus, period=period, run=run)
        dfdist=vecs2dist(dfvecs)
        if df is None:
            df=dfdist
        else:
            df3=pd.concat([df,dfdist])
            df=df3.groupby(df3.index).mean()
    return df

In [38]:
# for i,dfprd in dfmodels.groupby(['corpus','period']): pass
# dfdist=do_ingest_dists(dfprd,progress=True)
# dfdist

In [39]:
# db_has_key('/vecs/bpo/1720-1740/1')

In [40]:

def ingest_dists(dfmodels,num_proc=1,force=False):
    key_grp = [
        (f'/dists/{corpus}/{period}', gdf)
        for (corpus,period),gdf in dfmodels.groupby(
            ['corpus','period']
        )
    ]
    with get_db('wdists',mode='a') as db:
        if not force:
            key_grp=[
                (k,v) for k,v in key_grp
                if k not in db
            ]
        iterr=tqdm(key_grp,position=0)
        for key,grp in iterr:
            prefix,key2 = db_split_key(key)
            iterr.set_description(key)
            odf=do_ingest_dists(grp,progress=True)
            for wordcol in odf:
                qstr=f'{wordcol}/{key2}'
                db[qstr]=odf[wordcol]
            db.commit()
    
        
#     keys,grps=zip(*key_grp)
#     iterr=pmap_iter(
#         do_ingest_dists,
#         grps,
#         num_proc=num_proc
#     )
#     for key,res in zip(keys,iterr): dbput(key,res)

In [41]:
# !rm ../db/db.kos2.wdists.sqlite

In [42]:
# ingest_dists(dfmodels,force=True)

In [43]:
# import
force=False
with get_db_shelf('wdists',mode='r') as idb, get_db('wdists',mode='a') as odb:
#     for i,(k,v) in enumerate(tqdm(idb.items())): 
    for i,k in enumerate(tqdm(idb.keys())): 
        if not force and k in odb: continue
        odb[k]=idb[k]
        if i and not i%1000: odb.commit()
    odb.commit()
stop

 10%|█         | 68710/657649 [00:09<01:23, 7038.41it/s]


KeyboardInterrupt: 

In [None]:
# db_get_keys('wdists')

In [None]:
# dbget('/dists/bpo/1720-1740')

In [None]:
# def ingest_dists_model(
#         period,run=1,corpus=DEFAULT_CORPUS,
#         only_valid_words=True,lim=25000,force=False,**attrs):
#     dfvecs=vecs(period,run)
#     dfdist=vecs2dist(dfvecs,only_valid_words=only_valid_words,lim=lim)
#     return dfdist

# def ingest_dists_model_(objd): return ingest_dists_model(**objd)

In [None]:
# def ingest_dists(dfmodels,only_valid_words=True,lim=None,num_proc=1,force=False):
#     dfmodels['run_int']=dfmodels.run.apply(lambda x: int(x.split('_')[-1]))
#     objs = [
#         dict(
#             period=period,
#             run=run,
#             corpus=corpus,
#             only_valid_words=only_valid_words,
#             lim=lim,
#             qstr=f'/dists/{corpus}/{period}/{run}'
#         )
#         for period,run,corpus in zip(dfmodels.period, dfmodels.run_int, dfmodels.corpus)
#     ]
#     if not force: objs = [d for d in objs if not db_has_key(d['qstr'])]
            
#     iterr=pmap_iter(ingest_dists_model_, objs[:100], num_proc=num_proc)
#     #with get_db(mode='a') as db:
#     db=get_db()
#     for i,(idx,odf) in enumerate(zip(objs, iterr)):
#         if odf is None: continue
#         db[idx['qstr']] = odf
#         #if i and not i%10: db.sync()
#         db.sync()

In [None]:
# ingest_dists(dfmodels,num_proc=1)

In [None]:
# !ls -lSh ~/db/ | head