In [22]:
import numpy as np
import pandas as pd
import time

HEADER_NAMES = np.array(['CMTE_ID', 'AMNDT_IND', 'RPT_TP', 'TRANSACTION_PGI', 'IMAGE_NUM',
                        'TRANSACTION_TP', 'ENTITY_TP', 'NAME', 'CITY', 'STATE',
                        'ZIP_CODE', 'EMPLOYER', 'OCCUPATION', 'TRANSACTION_DT', 'TRANSACTION_AMT',
                        'OTHER_ID', 'TRAN_ID', 'FILE_NUM', 'MEMO_CD', 'MEMO_TEXT','SUB_ID'])

BY_ZIP = pd.DataFrame(columns=['CMTE_ID', 'ZIP_CODE', 'RUN_MED', 'NUM_ZIP_TRANS', 'TOTAL_ZIP_AMT'])
BY_ZIP['RUN_MED'] = BY_ZIP['RUN_MED'].astype(int)
BY_ZIP['NUM_ZIP_TRANS'] = BY_ZIP['NUM_ZIP_TRANS'].astype(int)
BY_ZIP['TOTAL_ZIP_AMT'] = BY_ZIP['TOTAL_ZIP_AMT'].astype(int)
# SEEN_ZIPS = set() #set
SEEN_ZIPS = {} # zip: [count, sum]

BY_DATE = pd.DataFrame(columns=['CMTE_ID', 'TRANSACTION_DT', 'MEDIAN', 'NUM_TRANS', 'TOTAL_AMT'])
BY_DATE['MEDIAN'] = BY_DATE['MEDIAN'].astype(int)
BY_DATE['NUM_TRANS'] = BY_DATE['NUM_TRANS'].astype(int)
BY_DATE['TOTAL_AMT'] = BY_DATE['TOTAL_AMT'].astype(int)
reader = pd.read_csv('../../input/itcont_short.txt', sep='|', header=None, names=HEADER_NAMES,
                         usecols=['CMTE_ID', 'ZIP_CODE', 'TRANSACTION_DT', 'TRANSACTION_AMT', 'OTHER_ID'],
                         index_col=False, na_filter = False, chunksize=1000000, dtype={'ZIP_CODE': object, 'TRANSACTION_DT': object})

In [23]:
def preprocessing(pre_df):
    temp_df = pd.DataFrame(pre_df.loc[(pre_df['OTHER_ID']=='') & (pre_df['CMTE_ID'] != '') & (np.invert(pre_df['TRANSACTION_AMT'].isnull())), :])
    temp_df.loc[:,'ZIP_CODE'] = temp_df.loc[:, 'ZIP_CODE'].astype(str).str[:5]
    temp_df.loc[:, 'TRANSACTION_DT'] = pd.to_datetime(temp_df['TRANSACTION_DT'], format="%m%d%Y")
    return temp_df
        
def process_by_zip(input_df):
    global BY_ZIP
    valid_zip_df = pd.DataFrame(input_df.loc[input_df['ZIP_CODE'].str.len() == 5, :])
    list_dicts = []
    for row in valid_zip_df.itertuples():
        if row.ZIP_CODE not in SEEN_ZIPS:
            SEEN_ZIPS[row.ZIP_CODE] = [1, row.TRANSACTION_AMT]
        else:
            SEEN_ZIPS[row.ZIP_CODE] = [SEEN_ZIPS[row.ZIP_CODE][0]+1, SEEN_ZIPS[row.ZIP_CODE][1]+row.TRANSACTION_AMT]
        list_dicts.append({'CMTE_ID': row.CMTE_ID, 
                             'ZIP_CODE': row.ZIP_CODE,
                             'RUN_MED': row.TRANSACTION_AMT,
                             'NUM_ZIP_TRANS': SEEN_ZIPS[row.ZIP_CODE][0],
                             'TOTAL_ZIP_AMT': SEEN_ZIPS[row.ZIP_CODE][1]})
    BY_ZIP = pd.concat([BY_ZIP, pd.DataFrame(list_dicts)])
    
def process_by_date(input_df):
    global BY_DATE
    valid_date_df = pd.DataFrame(input_df.loc[np.invert(input_df['TRANSACTION_AMT'].isnull()), :])
    list_dicts = []
    for row in valid_date_df.itertuples():
        list_dicts.append({'CMTE_ID': row.CMTE_ID,
                           'TRANSACTION_DT': row.TRANSACTION_DT,
                           'MEDIAN': row.TRANSACTION_AMT,
                           'NUM_TRANS': 1,
                           'TOTAL_AMT': row.TRANSACTION_AMT
                           })
    BY_DATE = pd.concat([BY_DATE, pd.DataFrame(list_dicts)])
    
def running_med():
    global BY_ZIP
    global SEEN_ZIPS
    BY_ZIP.reset_index(inplace=True, drop=True)
    BY_ZIP['Index'] = BY_ZIP.index
    BY_ZIP.set_index('ZIP_CODE', inplace=True)
    list_zips = []
    for zips in SEEN_ZIPS:
        run_med = pd.Series(BY_ZIP.loc[zips, 'RUN_MED']).expanding().median().round().astype(int)
        indices = pd.Series(BY_ZIP.loc[zips, 'Index'])
        for index, med in zip(indices,run_med):
            list_zips.append({'RUN_MED': med, 
                              'Indices': index})
    temp_df = pd.DataFrame(list_zips)
    temp_df.set_index('Indices', inplace=True)
    BY_ZIP.reset_index(inplace=True)
    BY_ZIP = BY_ZIP.loc[:, ['ZIP_CODE', 'CMTE_ID', 'NUM_ZIP_TRANS', 'TOTAL_ZIP_AMT']].merge(
        temp_df, how='outer', left_index=True, right_index=True, sort=True)
    
def date_recip_order():
    global BY_DATE
    BY_DATE = BY_DATE.groupby(['CMTE_ID', 'TRANSACTION_DT']).agg(
        {'MEDIAN': np.median, 'NUM_TRANS': 'count', 'TOTAL_AMT': 'sum'})
    BY_DATE['MEDIAN'] = BY_DATE['MEDIAN'].round().astype(int)
    BY_DATE.reset_index(inplace=True)
    
def post_process_zip():
    global BY_ZIP
    BY_ZIP = BY_ZIP[['CMTE_ID', 'ZIP_CODE', 'RUN_MED', 'NUM_ZIP_TRANS', 'TOTAL_ZIP_AMT']]
    
def post_process_date():
    global BY_DATE
    BY_DATE = BY_DATE[['CMTE_ID', 'TRANSACTION_DT', 'MEDIAN', 'NUM_TRANS', 'TOTAL_AMT']]
    BY_DATE.loc[:,'TRANSACTION_DT'] = BY_DATE.loc[:,'TRANSACTION_DT'].dt.strftime('%m%d%Y')
    BY_DATE.sort_values(['CMTE_ID', 'TRANSACTION_DT'], inplace=True)    

In [34]:
# for chunk in reader:
#     test_df = preprocessing(chunk)
#     process_by_zip(test_df)
#     process_by_date(test_df)
post_process_zip()
post_process_date()

In [16]:
start_time = time.time()
chunk = reader.get_chunk(20000)

# for chunk in reader:
test_df = preprocessing(chunk)
process_by_zip2(test_df)
process_by_date(test_df)

running_med2()
date_recip_order()

post_process_zip()
post_process_date()

print("--- %s seconds ---" % (time.time() - start_time))

--- 14.864471435546875 seconds ---


In [35]:
BY_ZIP

Unnamed: 0,CMTE_ID,ZIP_CODE,RUN_MED,NUM_ZIP_TRANS,TOTAL_ZIP_AMT
0,C00177436,30004,384,1,384
1,C00384818,2895,250,1,250
2,C00177436,30750,230,1,230
3,C00177436,4105,384,1,384
4,C00384818,2895,292,2,583
5,C00177436,4105,384,2,768


In [37]:
list(BY_DATE)

['CMTE_ID', 'TRANSACTION_DT', 'MEDIAN', 'NUM_TRANS', 'TOTAL_AMT']

In [38]:
list(BY_ZIP)

['CMTE_ID', 'ZIP_CODE', 'RUN_MED', 'NUM_ZIP_TRANS', 'TOTAL_ZIP_AMT']

In [45]:
BY_DATE

Unnamed: 0,CMTE_ID,TRANSACTION_DT,MEDIAN,NUM_TRANS,TOTAL_AMT
0,C00177436,1312017,384,4,1382
1,C00384818,1122017,292,2,583


In [46]:
BY_DATE.dtypes

CMTE_ID           object
TRANSACTION_DT    object
MEDIAN             int32
NUM_TRANS          int64
TOTAL_AMT          int64
dtype: object

In [48]:
BY_DATE.loc[:, 'TRANSACTION_DT']

0    01312017
1    01122017
Name: TRANSACTION_DT, dtype: object