### Athena
---
query from db

#### Map mct.transaction_sequence = dtj.tseq

In [12]:
profile_db = 'datalake_production_profile_reporting'
profile_db_query = f"""
SELECT tjd,
         dtj.time,
         dtj.cid AS business_account_number,
         cdt AS created_dt,
         endbal,
         dtj.etc AS transaction_code,
         dtj.tcmt AS transaction_comment,
         tamt AS transaction_amount,
         trn.dc AS is_credit,
         cif.zsofiid AS user_id,
         split_part(split_part(tso, 'RCID#', 2), '~', 1) AS external_account_number, 
         split_part(split_part(dtj.tso, 'RINS#', 2), '~', 1) AS external_institution_id, 
         split_part(split_part(dtj.tso, 'ACHCOID#', 2), '~', 1) AS originating_company_id, 
         split_part(split_part(dtj.tso, 'ACHID#', 2), '~', 1) AS external_institution_trans_id, 
         split_part(split_part(dtj.tso, 'ACHODFI#', 2), '~', 1) AS originator_dfi_id, 
         auth.merchnm AS merchant_name
FROM {profile_db}.dtj
LEFT JOIN {profile_db}.authdtl auth
    ON auth.authid = split_part(split_part(dtj.tso, 'AUTHID#', 2), '~', 1)
        AND dtj.tso LIKE '%AUTHID#%'
JOIN {profile_db}.dep
    ON dep.cid = dtj.cid
JOIN {profile_db}.cif
    ON cif.acn = dep.acn
LEFT JOIN {profile_db}.trn
    ON dtj.etc = trn.etc
WHERE endbal is NOT Null
        AND tamt is NOT Null
        AND tamt NOT LIKE '%#%'
"""

In [2]:
!pip install -q pyathena[pandas]

In [78]:
from pyathena import connect
import pandas as pd

In [80]:
conn = connect(s3_staging_dir='s3://sofi-data-science/jxu/data_dump/ach_risk_transactions_profile_db')
df = pd.read_sql(profile_db_query, conn)

In [81]:
mm_db = 'datalake_production_money_monitoring'
mm_query = f'''SELECT mct.transaction_date AS transaction_datetime,
         mct.money_account_id AS business_account_number,
         mct.created_dt,
         mct.transaction_current_balance as endbal,
         mct.transaction_code,
         mct.transaction_amount,
         not CAST(mct.debit AS boolean) AS is_credit,
         cif.zsofiid AS user_id
FROM datalake_production_money_monitoring.money_cache_transaction as mct
JOIN datalake_production_profile_reporting.dep
   ON CAST(dep.cid AS varchar) = mct.money_account_id
JOIN datalake_production_profile_reporting.cif
    ON cif.acn = dep.acn
LEFT JOIN datalake_production_profile_reporting.trn
    ON mct.transaction_code = trn.etc
WHERE mct.transaction_current_balance is NOT Null
        AND mct.transaction_amount is NOT Null'''

conn = connect(s3_staging_dir='s3://sofi-data-science/jxu/data_dump/ach_risk_transactions_mm')
df_mm = pd.read_sql(mm_query, conn)

In [21]:
df.to_parquet('../data/transaction_data/dl_profile_reporting_db.parque')

In [82]:
df_mm.to_parquet('../data/transaction_data/dl_transaction_data_from_mm.parquet')

### Check differences across three datasets
---


In [2]:
import numpy as np
import pandas as pd
import seaborn as sns
import warnings

warnings.filterwarnings(action='ignore')

In [3]:
df_mmc = pd.read_parquet('../data/transaction_data/dl_money_monitoring_cache.parquet')
df_athena_pr = pd.read_parquet('../data/transaction_data/dl_profile_reporting_db.parquet')
df_psql_pr = pd.read_parquet('../data/transaction_data/psql_profile_reporting_db.parquet')
dfs = [df_mmc, df_athena_pr, df_psql_pr]

#### set datetime

In [4]:
df_psql_pr['transaction_datetime'] = pd.to_datetime((df_psql_pr['tjd'].astype(str) + ' ' + \
                                                          df_psql_pr['time']), 
                                                          format='%Y-%m-%d %H:%M:%S', 
                                                          errors='coerce').dt.tz_localize(None)
# THIS MAY BE WRONG!
df_athena_pr['time'] = df_athena_pr.time.dt.strftime('%H:%M:%S') 
df_athena_pr['transaction_datetime'] = pd.to_datetime((df_athena_pr['tjd'].astype(str) + ' ' + \
                                                          df_athena_pr['time']), 
                                                          format='%Y-%m-%d %H:%M:%S', 
                                                          errors='coerce').dt.tz_localize(None)
df_mmc['transaction_datetime'] = df_mmc['transaction_date']

#### Clip dates

In [5]:
df_psql_pr = df_psql_pr[df_psql_pr.transaction_datetime.between(pd.to_datetime('2019-01-01'),
                                                                pd.to_datetime('2020-05-31'))]
df_athena_pr = df_athena_pr[df_athena_pr.transaction_datetime.between(pd.to_datetime('2019-01-01'),
                                                                pd.to_datetime('2020-05-31'))]
df_mmc = df_mmc[df_mmc.transaction_datetime.between(pd.to_datetime('2019-01-01'),
                                                                pd.to_datetime('2020-05-31'))]

#### checkout columns we have

In [6]:
df_mmc.columns

Index(['dms_export_timestamp', 'money_cache_transaction_id', 'money_cache_id',
       'money_account_id', 'money_message_hash', 'transaction_amount',
       'transaction_date', 'transaction_type', 'transaction_code',
       'transaction_current_balance', 'transaction_category', 'debit',
       'created_by', 'created_dt', 'updated_by', 'updated_dt',
       'transaction_sequence', 'transaction_datetime'],
      dtype='object')

In [7]:
# df_***_pr should have the same columns and data

df_athena_pr.columns

Index(['tjd', 'time', 'business_account_number', 'created_dt', 'endbal',
       'transaction_code', 'transaction_comment', 'transaction_amount',
       'is_credit', 'user_id', 'external_account_number',
       'external_institution_id', 'originating_company_id',
       'external_institution_trans_id', 'originator_dfi_id', 'merchant_name',
       'transaction_datetime'],
      dtype='object')

In [8]:
df_psql_pr.columns

Index(['tjd', 'time', 'business_account_number', 'created_dt', 'endbal',
       'transaction_code', 'transaction_comment', 'transaction_amount',
       'is_credit', 'user_id', 'external_account_number',
       'external_institution_id', 'originating_company_id',
       'external_institution_trans_id', 'originator_dfi_id', 'merchant_name',
       'transaction_datetime'],
      dtype='object')

#### Compare df_mmc and df_psql_pr

In [9]:
mmc_ids = sorted(df_mmc.money_account_id.unique())
psql_pr_ids = sorted(df_psql_pr.business_account_number.unique())

In [10]:
df_mmc['money_account_id'] = df_mmc.money_account_id.astype(int)

In [11]:
df_mmc.columns

Index(['dms_export_timestamp', 'money_cache_transaction_id', 'money_cache_id',
       'money_account_id', 'money_message_hash', 'transaction_amount',
       'transaction_date', 'transaction_type', 'transaction_code',
       'transaction_current_balance', 'transaction_category', 'debit',
       'created_by', 'created_dt', 'updated_by', 'updated_dt',
       'transaction_sequence', 'transaction_datetime'],
      dtype='object')

In [12]:
df_psql_pr.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 10650251 entries, 0 to 13162923
Data columns (total 17 columns):
 #   Column                         Dtype         
---  ------                         -----         
 0   tjd                            object        
 1   time                           object        
 2   business_account_number        int64         
 3   created_dt                     object        
 4   endbal                         float64       
 5   transaction_code               object        
 6   transaction_comment            object        
 7   transaction_amount             object        
 8   is_credit                      int64         
 9   user_id                        int64         
 10  external_account_number        object        
 11  external_institution_id        object        
 12  originating_company_id         object        
 13  external_institution_trans_id  object        
 14  originator_dfi_id              object        
 15  merchant_name

In [17]:
df_mmc.columns

Index(['dms_export_timestamp', 'money_cache_transaction_id', 'money_cache_id',
       'money_account_id', 'money_message_hash', 'transaction_amount',
       'transaction_date', 'transaction_type', 'transaction_code',
       'transaction_current_balance', 'transaction_category', 'debit',
       'created_by', 'created_dt', 'updated_by', 'updated_dt',
       'transaction_sequence', 'transaction_datetime'],
      dtype='object')

In [20]:
df_mmc[['transaction_current_balance', 'transaction_amount', 'transaction_code']].head()

Unnamed: 0,transaction_current_balance,transaction_amount,transaction_code
0,6.1,4.0,DDATMREFUND
1,1260.58,18.91,POSDW
2,69.98,64.98,DWCRDBILLPAY
3,3473.4,6.37,DWCRDBILLPAY
4,69.44,11.67,POSDW


In [70]:
id_ = 410001985020
mmc_cols = ['transaction_datetime', 'money_account_id', 'transaction_type', 'transaction_current_balance', 'transaction_amount', 'transaction_code', 'debit']
mmc_sample = df_mmc[df_mmc.money_account_id == id_]
mmc_sample = mmc_sample[mmc_cols]
mmc_sample = mmc_sample[mmc_sample.transaction_code != 'IIPD']
mmc_sample = mmc_sample[~mmc_sample.transaction_type.isna()]

pr_cols = ['transaction_datetime', 'business_account_number', 'endbal', 'transaction_amount', 'transaction_code', 'transaction_comment', 'is_credit', 'external_account_number']
pr_sample = df_psql_pr[df_psql_pr.business_account_number == id_]
pr_sample = pr_sample[pr_cols]

In [76]:
pr_sample[pr_sample.external_account_number != '']

Unnamed: 0,transaction_datetime,business_account_number,endbal,transaction_amount,transaction_code,transaction_comment,is_credit,external_account_number
5791761,2020-05-11 18:01:08,410001985020,27541.01,2200,ACHDW,PO,0,0049611886
6038548,2020-05-26 18:01:11,410001985020,4098.73,800,ACHDW,PO,0,0049611886
6038550,2020-05-26 18:01:11,410001985020,3498.73,600,ACHDW,PO,0,0049611886
6038552,2020-05-26 18:01:11,410001985020,3298.73,200,ACHDW,PO,0,0049611886
6759183,2020-05-18 18:01:09,410001985020,5150.58,100,ACHDW,PO,0,104792076911
...,...,...,...,...,...,...,...,...
12907809,2020-03-30 18:01:08,410001985020,20519.60,4750,ACHDD,CO,1,0049611886
12941168,2020-04-01 18:01:07,410001985020,19490.79,1000,ACHDD,CO,1,0049611886
13150826,2020-05-04 18:01:12,410001985020,27724.46,23200,ACHDD,CO,1,0049611886
13150828,2020-05-04 18:01:12,410001985020,26974.46,750,ACHDW,PO,0,0049611886


In [59]:
def export_samples(sample1, sample2):
    sample1.sort_values(by=['transaction_datetime', 'transaction_amount'], inplace=True)
    sample2.sort_values(by=['transaction_datetime', 'transaction_amount'], inplace=True)
    sample1.to_csv('../artifacts/sample1.csv')
    sample2.to_csv('../artifacts/sample2.csv')
    

export_samples(mmc_sample, pr_sample)

In [57]:
df_mmc.transaction_category.head()

0                    None
1    purchasefromchecking
2      internalwithdrawal
3      internalwithdrawal
4    purchasefromchecking
Name: transaction_category, dtype: object

In [53]:
df_psql_pr.transaction_comment.head()

0    KILROY'S PACKAGE STORE COLLEGE PARK, GA, US
1                      GREENDOT PASADENA, CA, US
2        DOORDASH*TERIYAKI MADN 6506819470, C, A
3                 Microsoft*Xbox msbill.info, WA
4           CASH APP*KJ PRINE*ADD 8774174551, CA
Name: transaction_comment, dtype: object

In [54]:
df_mmc.money_cache_transaction_id.head()

0    dedb6973-dfb8-4dfb-b72b-a0665c4e532f
1    c12c3398-5f0b-4c1b-8acd-750bc4a2eae3
2    559bab8d-f63a-4442-9944-ac8b22bdd9c0
3    f14b1ccc-da8c-41a5-ac3d-e9a3ef03dc81
4    86496056-5f48-4d7a-9dad-b7264b72efad
Name: money_cache_transaction_id, dtype: object

In [None]:
df_psql_pr

In [50]:
df_mmc.columns

Index(['dms_export_timestamp', 'money_cache_transaction_id', 'money_cache_id',
       'money_account_id', 'money_message_hash', 'transaction_amount',
       'transaction_date', 'transaction_type', 'transaction_code',
       'transaction_current_balance', 'transaction_category', 'debit',
       'created_by', 'created_dt', 'updated_by', 'updated_dt',
       'transaction_sequence', 'transaction_datetime'],
      dtype='object')

In [22]:
mmc_sample.head()

Unnamed: 0,dms_export_timestamp,money_cache_transaction_id,money_cache_id,money_account_id,money_message_hash,transaction_amount,transaction_date,transaction_type,transaction_code,transaction_current_balance,transaction_category,debit,created_by,created_dt,updated_by,updated_dt,transaction_sequence,transaction_datetime
5805,2020-08-24 04:20:34.854956,c1a4a734-9bce-4148-821a-d22fb1f3c927,11b62f4b-ad5d-4d72-8562-77d527902dda,410001526181,11:462498,4.37,2020-01-23 22:20:40,Debit Card,POSDW,7.6,purchasefromchecking,True,automation@sofi.org,2020-01-23 22:20:42.237,,NaT,1332.0,2020-01-23 22:20:40
13244,2020-08-24 04:20:05.989364,f72e4bdd-01b9-4abd-b018-bc2f66ebea04,11b62f4b-ad5d-4d72-8562-77d527902dda,410001526181,14:411354,19.75,2019-12-30 17:41:14,Debit Card,DDCRDBILLREF,24.65,internaldeposit,False,automation@sofi.org,2019-12-30 17:41:17.498,,NaT,1192.0,2019-12-30 17:41:14
13389,2020-08-24 04:20:05.994662,fbdaedda-11a3-47a3-834f-23ea8a3bfc0a,11b62f4b-ad5d-4d72-8562-77d527902dda,410001526181,14:411359,23.0,2019-12-30 17:50:27,ATM,DWATM,1.65,atmus,True,automation@sofi.org,2019-12-30 17:50:37.968,,NaT,1193.0,2019-12-30 17:50:27
13436,2020-08-24 04:20:05.996492,736d91f9-d171-49bc-bea9-0bbb58996ba6,11b62f4b-ad5d-4d72-8562-77d527902dda,410001526181,9:411396,3.0,2019-12-30 17:50:27,ATM,DDATMREFUND,4.65,,False,automation@sofi.org,2019-12-30 17:50:37.791,,NaT,1194.0,2019-12-30 17:50:27
33551,2020-08-24 04:16:44.128731,23477908-d74d-4ebe-817e-deff25a90bf5,11b62f4b-ad5d-4d72-8562-77d527902dda,410001526181,4:158083,403.0,2019-07-23 12:37:33,ATM,DWATM,196.24,atmus,True,automation@sofi.org,2019-08-04 01:34:39.311,,NaT,,2019-07-23 12:37:33


In [23]:
pr_sample.head()

Unnamed: 0,tjd,time,business_account_number,created_dt,endbal,transaction_code,transaction_comment,transaction_amount,is_credit,user_id,external_account_number,external_institution_id,originating_company_id,external_institution_trans_id,originator_dfi_id,merchant_name,transaction_datetime
8118,2020-05-24,08:10:26,410001526181,2020-05-24,42.17,POSDW,"DOMINO'S 5172 MARGATE, FL",8.55,0,13587166,,,,,,DOMINO'S 5172,2020-05-24 08:10:26
73002,2019-06-25,16:14:40,410001526181,2019-06-25,7.87,POSDW,"WENDYS 8179 7115 WEST MCNAB ROAD N.LAUDERDALE,...",4.28,0,13587166,,,,,,WENDYS 8179,2019-06-25 16:14:40
74764,2019-05-16,03:32:20,410001526181,2019-05-16,0.66,POSDW,"CHEVRON 0303196 MARGATE, FL, US",0.92,0,13587166,,,,,,CHEVRON 0303196,2019-05-16 03:32:20
87430,2019-07-05,08:41:35,410001526181,2019-07-05,865.8,DWATM,"PAI ISO 590 S ST RD 7 MARGATE, FL, US",42.5,0,13587166,,,,,,PAI ISO,2019-07-05 08:41:35
87432,2019-07-05,08:41:35,410001526181,2019-07-05,868.3,DDATMREFUND,"PAI ISO 590 S ST RD 7 MARGATE, FL, US",2.5,1,13587166,,,,,,PAI ISO,2019-07-05 08:41:35


In [25]:
df_tmp = df_mmc.money_account_id.value_counts()
df_tmp.head()

410000375873    3531
410000307504    3061
410000259612    2960
410001659933    2878
410000158103    2812
Name: money_account_id, dtype: int64

In [26]:
df_tmp[df_tmp == 20]

450002628848    20
450003966260    20
410003018093    20
410001998845    20
410001837098    20
                ..
410001886136    20
410003326291    20
410001965392    20
410004106499    20
410002716080    20
Name: money_account_id, Length: 2223, dtype: int64

In [65]:
profile_db = 'datalake_production_profile_reporting'
mm_db = 'datalake_production_money_monitoring'
mm_transaction_query = f"""
SELECT mct.transaction_date AS transaction_datetime,
         mct.money_account_id AS business_account_number,
         mct.created_dt,
         mct.account_current_balance as endbal,
         mct.transaction_code,
         mct.transaction_amount,
         mct.debit AS is_debit,
         cif.zsofiid AS user_id,
FROM {mm_db}.money_cache_transaction as mct
JOIN {profile_db}.dep
    ON dep.cid = dtj.cid
JOIN {profile_db}.cif
    ON cif.acn = dep.acn
LEFT JOIN {profile_db}.trn
    ON mct.transaction_code = trn.etc
WHERE endbal is NOT Null
        AND tamt is NOT Null
        AND tamt NOT LIKE '%#%'
LIMIT 10
"""

In [66]:
print(mm_transaction_query)


SELECT mct.transaction_date AS transaction_datetime,
         mct.money_account_id AS business_account_number,
         mct.created_dt,
         mct.account_current_balance as endbal,
         mct.transaction_code,
         mct.transaction_amount,
         mct.debit AS is_debit,
         cif.zsofiid AS user_id,
FROM datalake_production_money_monitoring.money_cache_transaction as mct
JOIN datalake_production_profile_reporting.dep
    ON dep.cid = dtj.cid
JOIN datalake_production_profile_reporting.cif
    ON cif.acn = dep.acn
LEFT JOIN datalake_production_profile_reporting.trn
    ON mct.transaction_code = trn.etc
WHERE endbal is NOT Null
        AND tamt is NOT Null
        AND tamt NOT LIKE '%#%'
LIMIT 10



In [None]:
SELECT mct.transaction_date AS transaction_datetime,
         mct.money_account_id AS business_account_number,
         mct.created_dt,
         mct.transaction_current_balance as endbal,
         mct.transaction_code,
         mct.transaction_amount,
         mct.debit AS is_debit,
         cif.zsofiid AS user_id
FROM datalake_production_money_monitoring.money_cache_transaction as mct
JOIN datalake_production_profile_reporting.dep
    ON dep.cid = dtj.cid
JOIN datalake_production_profile_reporting.cif
    ON cif.acn = dep.acn
LEFT JOIN datalake_production_profile_reporting.trn
    ON mct.transaction_code = trn.etc
WHERE endbal is NOT Null
        AND tamt is NOT Null
        AND tamt NOT LIKE '%#%'
LIMIT 10

In [3]:
# working version
query = '''
SELECT mct.transaction_date AS transaction_datetime,
         mct.money_account_id AS business_account_number,
         mct.created_dt,
         mct.transaction_current_balance as endbal,
         mct.transaction_code,
         mct.transaction_amount,
         not CAST(mct.debit AS boolean) AS is_credit,
         cif.zsofiid AS user_id
FROM datalake_production_money_monitoring.money_cache_transaction as mct
JOIN datalake_production_profile_reporting.dep
   ON CAST(dep.cid AS varchar) = mct.money_account_id
JOIN datalake_production_profile_reporting.cif
    ON cif.acn = dep.acn
LEFT JOIN datalake_production_profile_reporting.trn
    ON mct.transaction_code = trn.etc
WHERE mct.transaction_current_balance is NOT Null
        AND mct.transaction_amount is NOT Null
'''

In [4]:
import pandas as pd
import mdsutils

In [None]:
%%time

athena = mdsutils.AthenaClient(database='datalake_production_money_monitoring')
df = athena.query_to_df(query)

CPU times: user 1min 53s, sys: 18.8 s, total: 2min 12s
Wall time: 8min 31s


In [None]:
df.to_parquet("../../artifacts/dev-prod-data-alignment/tdf_mm.parquet")

In [10]:
df[df.user_id == 12447].sort_values("transaction_datetime")

Unnamed: 0,transaction_datetime,business_account_number,created_dt,endbal,transaction_code,transaction_amount,is_credit,user_id
40592890,2018-09-01 04:03:42.000,410000157616,2019-08-02 20:12:26.380,10.00,IIPD,0.00,True,12447
46650936,2018-09-14 09:47:31.000,410000157616,2019-08-02 20:14:23.166,510.00,ACHDDIN,500.00,True,12447
46650943,2018-09-16 18:53:06.000,410000157616,2019-08-02 20:14:31.010,310.00,DDATMREFUND,5.00,True,12447
46650844,2018-09-16 18:53:06.000,410000157616,2019-08-02 20:10:12.168,305.00,DWATM,205.00,False,12447
46650945,2018-09-17 02:43:27.000,410000157616,2019-08-02 20:14:31.643,288.50,POSDW,21.50,False,12447
...,...,...,...,...,...,...,...,...
33615285,2020-10-16 22:01:06.000,410000157616,2020-10-16 22:01:12.969,600.15,ACHDD,100.00,True,12447
9121575,2020-11-01 04:03:26.000,410000157616,2020-11-01 04:05:41.582,600.26,IIPD,0.11,True,12447
43759885,2020-11-16 23:01:12.000,410000157616,2020-11-16 23:02:33.085,0.00,ACHDW,600.26,False,12447
53079334,2020-12-01 05:03:26.000,410000157616,2020-12-01 05:05:19.136,0.06,IIPD,0.06,True,12447
