In [1]:
import pandas as pd
from time import time
from pymongo import MongoClient
import re
from datetime import datetime, timedelta

In [2]:
%%time
with open('data/tuef_master.txt') as fil:
    dat = fil.read()
dat = [i.split('\\t') for i in dat.split('\n')]    

CPU times: user 6.57 s, sys: 4.61 s, total: 11.2 s
Wall time: 11.2 s


In [3]:
%%time
df = pd.DataFrame(dat[1:-1], columns = dat[0])

CPU times: user 1.03 s, sys: 0 ns, total: 1.03 s
Wall time: 1.03 s


In [4]:
def clean(txt):
    bgn = txt.find("TUEF")
    end = txt.find('0102**')+5+1
    txt = txt[bgn:end]
    length = int(txt[-17:].replace("ES07", "")[:7])
    return {
        'string': txt,
        'valid': len(txt[bgn: end]) == length
    }

In [5]:
%%time
a = pd.DataFrame([clean(i) for i in df.response_xml])

CPU times: user 3.96 s, sys: 124 ms, total: 4.09 s
Wall time: 4.08 s


In [6]:
df = pd.concat([df,a], axis=1)

In [7]:
df.shape

(635247, 9)

In [8]:
df.reference_number.nunique()

635247

In [9]:
df['parsed'] = False

In [10]:
from pymongo import MongoClient
db = MongoClient('mongodb://10.10.225.142:27017')['cibil']
db.authenticate('nilabja21607', 'pass@123')

True

In [11]:
%%time
df.drop("response_xml", inplace=True, axis=1)

CPU times: user 903 ms, sys: 85.2 ms, total: 988 ms
Wall time: 980 ms


In [12]:
%%time
from time import time
start = time()
print('Upload of raw string commenced:\n')
step = 5000
tuef = df.to_dict(orient='record')
for idx in range(0, len(tuef), step):
    db['raw_upload'].insert_many(tuef[idx:idx+step])
    perc = round(min((idx+step), len(tuef)) / len(tuef) * 100)
    elapsed = time() - start
    speed = "{:.2f} iter/sec".format(idx / elapsed) if idx > elapsed else \
        "{:.2f} sec/iter".format(elapsed/(idx+step))
    print("{}{} Speed:{}".format(u"\u2588"*perc, u"\u2501"*(100-perc), speed),
          end='\r')

Upload of raw string commenced:

CPU times: user 26.9 s, sys: 3.2 s, total: 30.1 s███████████████████████████████████████████████████ Speed:7981.31 iter/sec
Wall time: 1min 19s


In [13]:
def multi_chunk_breaker(string: str, splitter: str, shift: int = 2) -> list:
    """ Compute ... and return ...
    :return:
    :param string:
    :param splitter:
    :param shift:
    :return:
    """
    chunk = string.split(splitter)
    chunk.pop(0)
    return [str_breaker(indx[shift:]) for indx in chunk]


def str_breaker(string):
    """
    :param string:
    :return:
    """
    ret = {}
    while string:
        key = string[:2]
        string = string[2:]
        length = int(string[:2])
        string = string[2:]
        val = string[:length]
        string = string[length:]
        ret[key] = val
    return ret


# the tuef contains several segments which have definite start values


keymap = db['keymap'].find_one()

pattern = re.compile(
    "(PN03N01|ID03I01|PT03T01|EC03C01|EM03E01|SC10CIBILTUSCR|PA03A01|"
    "TL04T001|IQ04I001|DR03D01)"
)  # Create the pattern to break the tuef string into segments


In [14]:
def parse_tuef(string):
    tuef = string[:-17]
    chunkdict = {}
    breaks = [m.start() for m in pattern.finditer(tuef)]
    breaks.insert(0, 0)
    breaks.append(len(tuef))

    for i in range(1, len(breaks)):
        temp = tuef[breaks[i - 1]:breaks[i]]
        chunkdict[temp[:2]] = temp

    header = {}
    header["seg_tag"], tuef = tuef[:4], tuef[4:]
    header["version"], tuef = tuef[:2], tuef[2:]
    header["mem_ref_no"], tuef = tuef[:25].strip(), tuef[25:]
    header["blank"], tuef = tuef[:6], tuef[6:]
    header["enq_mem_user_id"], tuef = tuef[:30].strip(), tuef[30:]
    header["sub_ret_cd"], tuef = tuef[:1], tuef[1:]
    header["enq_cntrl_no"], tuef = tuef[:12], tuef[12:]
    header["datetime_processed"] = datetime.strptime(
        tuef[:14], '%d%m%Y%H%M%S')
    ######################################################################
    #                                 PN decoding                        #
    ######################################################################

    name_chunk = chunkdict.get('PN')
    if name_chunk:
        name = str_breaker(name_chunk[7:])
        name = {keymap['PN'][key]: val for key, val in name.items()}
        if 'gender' in name:
            name['gender'] = keymap['gender'][name['gender']]
        if 'dob' in name:
            name['dob'] = datetime.strptime(name['dob'], '%d%m%Y')
        name = {key: val for key, val in name.items() if val is not None}
        header.update(name)
    ######################################################################
    #                                 ID decoding                        #
    ######################################################################

    id_chunk = chunkdict.get('ID')
    if id_chunk:
        ids = pd.DataFrame(multi_chunk_breaker(id_chunk, "ID03I"))
        ids.columns = [keymap['ID'][key] for key in ids.columns]
        if "id_type" in ids:
            ids["id_type"] = [keymap["id_type"][i] for i in
                              ids["id_type"]]
        if "issue_date" in ids:
            ids["issue_date"] = pd.to_datetime(ids["issue_date"],
                                               format='%d%m%Y')
        if "expiration_date" in ids:
            ids["expiration_date"] = pd.to_datetime(
                ids["expiration_date"], format='%d%m%Y')
        header['ids'] = ids.T.apply(lambda x: x.dropna().to_dict()).\
            tolist()
    ######################################################################
    #                            PT decoding                             #
    ######################################################################

    phone_chunk = chunkdict.get('PT')
    if phone_chunk:
        tele = pd.DataFrame(multi_chunk_breaker(phone_chunk, "PT03T"))
        tele.columns = [keymap['PT'][key] for key in tele.columns]
        if "telephone_type" in tele:
            tele["telephone_type"] = [keymap["telephone_type"][i] for i in
                                      tele["telephone_type"]]
        header['phones'] = tele.T.apply(
            lambda x: x.dropna().to_dict()).tolist()
    ######################################################################
    #                                 EC decoding                        #
    ######################################################################

    email_chunk = chunkdict.get('EC')
    if email_chunk:
        email = pd.DataFrame(multi_chunk_breaker(email_chunk, "EC03C"))
        email.columns = ['email']
        header['emails'] = \
            email.email.str.lower().dropna().unique().tolist()

    ######################################################################
    #                            EM decoding                             #
    ######################################################################

    employ_chunk = chunkdict.get('EM')
    if employ_chunk:
        employ = str_breaker(employ_chunk[7:])
        employ = {keymap['EM'][key]: val for key, val in employ.items()}
        employ['account_type'] = keymap['account_type'][
            employ['account_type']]
        employ['date_reported_and_certified'] = datetime.strptime(
            employ['date_reported_and_certified'], '%d%m%Y')
        employ['occupation_code'] = keymap['occupation_code'].get(
            employ.get('occupation_code'))
        employ['net_gross_income_indicator'] = \
            keymap['net_gross_income_indicator'].get(employ.get(
                'net_gross_income_indicator'))
        employ['monthly_annual_income_indicator'] = \
            keymap['monthly_annual_income_indicator'].get(
                employ.get('monthly_annual_income_indicator'))
        employ = {key: val for key, val in employ.items()
                  if val is not None}
        header.update(employ)
    ######################################################################
    #                       SC decoding                                  #
    ######################################################################

    score_chunk = chunkdict.get("SC")
    if score_chunk:
        score = str_breaker(score_chunk[14:])
        score = {keymap['SC'][key]: val for key, val in score.items()}
        score['score_date'] = datetime.strptime(score['score_date'],
                                                '%d%m%Y')
        score = {key: val for key, val in score.items()
                 if val is not None}
        header.update(score)
    ######################################################################
    #                                 PA decoding                        #
    ######################################################################

    addr_chunk = chunkdict.get("PA")
    if addr_chunk:
        addr = pd.DataFrame(multi_chunk_breaker(addr_chunk, "PA03A"))
        addr.columns = [keymap['PA'][key] for key in addr.columns]
        if 'date_reported' in addr:
            addr['date_reported'] = pd.to_datetime(addr['date_reported'],
                                                   format='%d%m%Y')
        if 'state_code' in addr:
            addr['state_code'] = [keymap['state_code'].get(i) for i in
                                  addr['state_code']]
        if 'address_category' in addr:
            addr['address_category'] = [keymap['address_category'].get(i)
                                    for i in addr['address_category']]
        header['addresses'] = addr.T.apply(
            lambda x: x.dropna().to_dict()).tolist()

    ######################################################################
    #                       TL decoding                                  #
    ######################################################################

    acc_chunk = chunkdict.get('TL')
    if acc_chunk:
        acc = multi_chunk_breaker(acc_chunk, "TL04T", 3)
        summary = pd.DataFrame([i for i in acc if
                                i['02'] == "ACCTREVIEW_SUMM"])
        acc = pd.DataFrame([i for i in acc if i['02']
                            != "ACCTREVIEW_SUMM"])
        acc.columns = [keymap['TL'][key] for key in acc.columns]
        summary.columns = [keymap['summary'][key] for
                           key in summary.columns]
        acc['account_name'] = [keymap['account_type'][i]['Name'] for i in
                               acc['account_type']]
        acc['account_type'] = [keymap['account_type'][i]['loan_type'] for i in
                               acc['account_type']]
        acc['ownership_indicator'] = [keymap['ownership_indicator'][i] for
                                      i in acc['ownership_indicator']]
        datecols = list({'date_opened_disbursed', 'date_of_last_payment',
                         'date_closed', 'date_reported_certified',
                         'payment_history_start_date',
                         'payment_history_end_date'}.intersection(
            acc.columns))
        acc[datecols] = acc[datecols].apply(pd.to_datetime,
                                            format="%d%m%Y")
        if 'suit_filed_wilful_default' in acc:
            acc['suit_filed_wilful_default'] = \
                [keymap['suit_filed_wilful_default'].get(i) for i in
                 acc['suit_filed_wilful_default']]
        header['accounts'] = acc.T.apply(
            lambda x: x.dropna().to_dict()).tolist()
    ######################################################################
    #                                 IQ decoding                        #
    ######################################################################
    enquiry_chunk = chunkdict.get('IQ')
    if enquiry_chunk:
        enq = pd.DataFrame(multi_chunk_breaker(enquiry_chunk, "IQ04I", 3))
        enq.columns = [keymap['IQ'][key] for key in enq.columns]
        enq['date_of_enquiry'] = pd.to_datetime(enq['date_of_enquiry'],
                                                format='%d%m%Y')
        enq['enquiry_purpose_acct_nm'] = [keymap['account_type'][i]['Name'] for i in
                               enq['enquiry_purpose']]
        enq['enquiry_purpose_acct_typ'] = [keymap['account_type'][i]['loan_type'] for i in
                               enq['enquiry_purpose']]
        header['enquiries'] = enq.T.apply(
            lambda x: x.dropna().to_dict()).tolist()
    return header

In [15]:
to_parse = db['raw_upload'].find({
    'parsed': False,
    'valid': True
}, {'string':1})
try:
    total = to_parse.count()
except:
    total = db['raw_upload'].count_documents({
    'parsed': False,
    'valid': True
})

  


In [16]:
start = time()
idx = 0
output = []
ids = []
chunk = 100
for i in to_parse:
    try:
        output.append(parse_tuef(i['string']))
    except:
        db['failed'].insert_one(i)
    else:
        ids.append(i['_id'])
    idx += 1
    if not idx % chunk:
        db['parsed_tuef'].insert_many(output)
        db['raw_upload'].update_many({"_id":{"$in": ids}}, {"$set":{"parsed":True}})
        output = []
        ids = []
        perc = round(100 * idx / total)
        elapsed = time() - start
        speed = "{:.2f} iter/sec".format(idx / elapsed) if idx > elapsed else \
            "{:.2f} sec/iter".format(elapsed / (idx + args.chunk))
        eta = str(timedelta(seconds = round(elapsed * (total-idx)/idx)))
        print("{}{} Speed:{} | ETA: {}".format(
            u"\u2588" * perc, u"\u2501" * (100 - perc), speed, eta), end='\r')

████████████████████████████████████████████████████████████████████████████████████████████████████ Speed:32.90 iter/sec | ETA: 0:00:03