In [3]:
from odo import odo
from odo import resource
from odo import discover
from glob import glob
from blaze import Data
import dask.bag as db
import os
from pprint import pprint
import pandas as pd
from dask.diagnostics import ProgressBar

pbar = ProgressBar()
pbar.register()

# Prepare Tables

In [9]:
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKey, Float, Index

NUM_TABLE = 1

def prepare_tables():
    engine = create_engine('postgresql://postgres:Rudfhr88!@localhost/facc1')
    metadata = MetaData()
    for i in range(0, NUM_TABLE):
        table_name = "clueweb12_%s" % str(i).zfill(2)
        table = Table(table_name, metadata,
                     Column('trec_id', String),
                     Column('encoding', String),
                     Column('entity', String),
                     Column('start', Integer),
                     Column('end', Integer),
                     Column('posterior', Float),
                     Column('posterior_context_only', Float),
                     Column('tag', String))
        Index('idx_trec_id_%s' % table_name, table.c.trec_id)

    metadata.create_all(engine)

# Insert Data

In [14]:
import numpy as np

DSHAPE = 'var * {trec_id: string, encoding: string, entity: ?string, start: int64, end: int64, posterior: float64, posterior_context_only: float64, tag: string}'
names = ['trec_id', 'encoding', 'entity', 'start', 'end', 'posterior', 'posterior_context_only', 'tag']
dtype = {'start': np.int64, 'end': np.int64, 'posterior': np.float64, 'posterior_context_only': np.float64}

def get_collection_name(fpath):
    return fpath.split('/')[-3].lower()

def each_partition(values):
    if not values:
        return
    
    successes = []
    errors = []
    for fpath in values:
#         collection = get_collection_name(fpath)
        collection = 'clueweb12_00'
        
        try:
            df = pd.read_csv(fpath, sep='\t', names=names, dtype=dtype, engine='c', encoding='utf-8', error_bad_lines=False)
            odo(df, 'postgresql://postgres:Rudfhr88!@localhost/facc1::%s' % collection, dshape=DSHAPE)
            successes.append(fpath)
        except Exception as e:
            errors.append(fpath)
            print("Error:", fpath, e)
            
    return (successes, errors)
        
def all_partition(reports):
    if not reports:
        return
    
    successes = []
    errors = []
    for report in reports:
        successes.extend(report[0])
        errors.extend(report[1])
    
    # report
    print('[Errors]')
    for f in errors:
        print(f)
#     print('[OK]')
#     for f in successes:
#         print(f)

In [15]:
prepare_tables()

files = sorted(glob('/data/Dataset/FACC1/output/ClueWeb12_00/**/*.tsv'))
b = db.from_sequence(files, npartitions=6)
b.reduction(each_partition, all_partition).compute()

[Errors]


# Deal with Errors

In [None]:
t = resource( 'postgresql://postgres:Rudfhr88!@localhost/facc1::%s' % 'ClueWeb12_00'.lower())
t.bind.dispose()

FIXED_DIR = '../ClueWeb12_00/errors/fixed'
if not os.path.exists(FIXED_DIR):
    os.mkdir(FIXED_DIR)

for fpath in glob('../ClueWeb12_00/errors/*.csv.gz'):
    df = pd.read_csv(fpath)
    odo(df, os.path.join(FIXED_DIR, os.path.basename(fpath)))
#     odo(df, t)
    print(fpath)