In [1]:
import pandas as pd
import duckdb

In [2]:
data = pd.read_csv('../../assets/data/archive_2.csv', on_bad_lines='skip', sep=";")
data.rename(columns={0: 'id_transaction',
                        1: 'date',
                        2: 'card',
                        3: 'client',
                        4: 'date_of_birth',
                        5: 'passport',
                        6: 'passport_valid_to',
                        7: 'phone',
                        8: 'operation_type',
                        9: 'amount',
                        10: 'operation_result',
                        11: 'terminal_type',
                        12: 'city', 
                        13: 'address',
                        }, inplace=True)

In [82]:
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 18374 entries, 0 to 18373
Data columns (total 14 columns):
 #   Column             Non-Null Count  Dtype  
---  ------             --------------  -----  
 0   id_transaction     18374 non-null  int64  
 1   date               18374 non-null  object 
 2   card               18374 non-null  object 
 3   client             18374 non-null  object 
 4   date_of_birth      18374 non-null  object 
 5   passport           18374 non-null  object 
 6   passport_valid_to  18374 non-null  object 
 7   phone              18374 non-null  object 
 8   operation_type     18374 non-null  object 
 9   amount             18374 non-null  float64
 10  operation_result   18374 non-null  object 
 11  terminal_type      18374 non-null  object 
 12  city               18374 non-null  object 
 13  address            18374 non-null  object 
dtypes: float64(1), int64(1), object(12)
memory usage: 2.0+ MB


In [83]:
data.head()

Unnamed: 0,id_transaction,date,card,client,date_of_birth,passport,passport_valid_to,phone,operation_type,amount,operation_result,terminal_type,city,address
0,799191207,2024-05-25 12:00:00,18c215cc,1-21360,1995-10-09,fc73eda1,2040-09-27,17df00e2,Пополнение,900.0,Отказ,ATM,Казань,"Казань, пер. Волкова, д. 9/1"
1,849467850,2024-05-25 12:00:09,3e123824,3-57989,1957-08-11,96c241c8,бессрочно,e6fba4e5,Перевод,12347.68,Успешно,WEB,Москва,"Москва, бул. Веселый, д. 2"
2,977969154,2024-05-25 12:00:23,f14a2192,4-58986,1973-08-15,"7,516E+208",бессрочно,b6a86fe8,Оплата,362270.1,Успешно,WEB,Москва,"Москва, пр. Кузнецова, д. 8 стр. 7/5"
3,862781688,2024-05-25 12:00:25,31a96a70,5-11446,1974-01-15,8b002607,бессрочно,bc1004d0,Снятие,4400.0,Успешно,ATM,Санкт-Петербург,"Санкт-Петербург, пр. Народный, д. 576"
4,260193594,2024-05-25 12:00:36,18f381ce,9-48112,1988-05-09,c9b0cd8a,2033-04-28,cfade81f,Пополнение,2800.0,Отказ,ATM,Москва,"Москва, алл. Восточная, д. 5/3 стр. 11"


In [11]:
transaction_data = data.groupby('client').size().reset_index(name='transaction_count').values.tolist()

In [12]:
transaction_data.sort(key=lambda x: x[1])

In [13]:
transaction_data[-10:]

[['7-25295', 7],
 ['9-88379', 7],
 ['4-25431', 8],
 ['4-76146', 8],
 ['3-12580', 9],
 ['8-98410', 9],
 ['9-64936', 10],
 ['4-74153', 11],
 ['3-40634', 12],
 ['2-88451', 16]]

In [73]:
def check_passport_validity(con):
    query = """
    UPDATE temp SET passport_validity_fraud = 
    CASE
        WHEN TRY_CAST(passport_valid_to AS TIMESTAMP) IS NOT NULL
        AND TRY_CAST(date AS TIMESTAMP) > TRY_CAST(passport_valid_to AS TIMESTAMP)
        THEN TRUE
        ELSE FALSE
    END
    """
    
    con.execute(query)


def check_time_diff_fraud(con, time=10):
    query = f"""
    UPDATE temp SET time_diff_fraud = 
    CASE
        WHEN time_diff <= {time} THEN TRUE
        ELSE FALSE
    END
    """
    
    con.execute(query)


def check_time_address_diff(con, time=600):
    con.sql(f"""
    CREATE TEMP TABLE fraud_addresses AS
    SELECT DISTINCT client, address
    FROM temp
    WHERE changed_address AND time_diff < {time}
    """)

    con.sql("""
    UPDATE temp
    SET address_diff_fraud = TRUE
    WHERE temp.client IN (SELECT client FROM fraud_addresses)
    AND temp.address IN (SELECT address FROM fraud_addresses)
    """)


def check_time_city_diff(con, time=86400):
    con.sql(f"""
    CREATE TEMP TABLE fraud_citys AS
    SELECT DISTINCT client, city
    FROM temp
    WHERE changed_city = TRUE AND time_diff < {time}
    """)

    con.sql("""
    UPDATE temp
    SET city_diff_fraud = TRUE
    WHERE temp.client IN (SELECT client FROM fraud_citys)
    AND temp.city IN (SELECT city FROM fraud_citys)
    """)


def check_data_discrepancy_fraud(con):
    con.execute("""
    CREATE TEMP TABLE temp_counts AS
    SELECT client, 
           COUNT(DISTINCT card) AS distinct_cards, 
           COUNT(DISTINCT card || phone) AS distinct_combinations
    FROM temp
    GROUP BY client
    """)

    con.execute("""
    UPDATE temp
    SET data_discrepancy_fraud = 
    (SELECT CASE 
            WHEN temp_counts.distinct_cards < temp_counts.distinct_combinations THEN TRUE
            ELSE FALSE
            END
     FROM temp_counts
     WHERE temp.client = temp_counts.client)
    """)


def detect_hampel_outliers(con, window_size=3, n_sigmas=3):
    con.execute(f"""
    CREATE TEMP TABLE sorted_temp AS
    SELECT *, 
           MEDIAN(amount) OVER (PARTITION BY client ORDER BY date ROWS BETWEEN {window_size} PRECEDING AND {window_size} FOLLOWING) AS rolling_median,
           AVG(amount) OVER (PARTITION BY client ORDER BY date ROWS BETWEEN {window_size} PRECEDING AND {window_size} FOLLOWING) AS rolling_avg,
           STDDEV_SAMP(amount) OVER (PARTITION BY client ORDER BY date ROWS BETWEEN {window_size} PRECEDING AND {window_size} FOLLOWING) AS rolling_stddev
    FROM temp
    """)

    con.execute(f"""
    UPDATE temp
    SET amount_outlier_fraud = 
    CASE 
        WHEN ABS(temp.amount - sorted_temp.rolling_avg) > {n_sigmas} * sorted_temp.rolling_stddev THEN TRUE
        ELSE FALSE
    END
    FROM sorted_temp
    WHERE temp.client = sorted_temp.client AND temp.date = sorted_temp.date
    """)


In [165]:
con.sql("SELECT id_transaction, changed_address, address_diff_fraud, client, time_diff FROM temp where changed_address")

┌────────────────┬─────────────────┬────────────────────┬─────────┬───────────┐
│ id_transaction │ changed_address │ address_diff_fraud │ client  │ time_diff │
│     int64      │     boolean     │      boolean       │ varchar │  double   │
├────────────────┼─────────────────┼────────────────────┼─────────┼───────────┤
│      764935614 │ true            │ false              │ 4-15901 │   29766.0 │
│      474109917 │ true            │ false              │ 4-63301 │   59019.0 │
│      984851942 │ true            │ false              │ 6-71278 │   39449.0 │
│      924195571 │ true            │ false              │ 9-91761 │    9815.0 │
│      903815949 │ true            │ true               │ 2-66814 │     346.0 │
│      260619128 │ true            │ true               │ 2-66814 │     383.0 │
│      499694546 │ true            │ true               │ 2-66814 │     306.0 │
│      961215876 │ true            │ true               │ 2-66814 │     587.0 │
│      295788952 │ true            │ tru

In [154]:
con.sql("SELECT id_transaction, changed_city, address_diff_fraud, client FROM temp where changed_city")

┌────────────────┬─────────────────┬────────────────────┬─────────┐
│ id_transaction │ changed_address │ address_diff_fraud │ client  │
│     int64      │     boolean     │      boolean       │ varchar │
├────────────────┼─────────────────┼────────────────────┼─────────┤
│      726361364 │ true            │ false              │ 8-98410 │
│      565372207 │ true            │ false              │ 8-98410 │
│      865998220 │ true            │ false              │ 8-98410 │
│      994089337 │ true            │ false              │ 8-98410 │
│      481555314 │ true            │ false              │ 8-98410 │
│      770251044 │ true            │ false              │ 8-98410 │
│      402265320 │ true            │ false              │ 9-64936 │
│      886134731 │ true            │ false              │ 9-64936 │
│      321328428 │ true            │ false              │ 9-64936 │
│      752009766 │ true            │ false              │ 9-64936 │
│          ·     │  ·              │   ·        

In [157]:
test = data.loc[data['client'] == '5-23162'] 
test

Unnamed: 0,id_transaction,date,card,client,date_of_birth,passport,passport_valid_to,phone,operation_type,amount,operation_result,terminal_type,city,address
12331,415375328,2024-05-26 12:52:25,c643ffb2,5-23162,1973-05-15,97fa6978,бессрочно,302cc15f,Оплата,35844.99,Успешно,POS,Москва,"Москва, пр. Солнечный, д. 68 стр. 318"
12430,966346684,2024-05-26 13:00:58,c643ffb2,5-23162,1973-05-15,97fa6978,бессрочно,302cc15f,Оплата,991.64,Успешно,POS,Москва,"Москва, пр. Правды, д. 8"
12504,684011479,2024-05-26 13:09:23,c643ffb2,5-23162,1973-05-15,97fa6978,бессрочно,302cc15f,Оплата,995.86,Отказ,POS,Москва,"Москва, алл. Комарова, д. 323"
12594,584113629,2024-05-26 13:19:09,c643ffb2,5-23162,1973-05-15,97fa6978,бессрочно,302cc15f,Оплата,904.26,Успешно,POS,Москва,"Москва, пер. Николаева, д. 8/8 к. 6/2"
12670,944178841,2024-05-26 13:27:12,c643ffb2,5-23162,1973-05-15,97fa6978,бессрочно,302cc15f,Оплата,935.42,Отказ,POS,Москва,"Москва, наб. Придорожная, д. 3/4 стр. 9/4"


In [4]:
con = duckdb.connect()
con.register("temp_df", data)
con.sql("""
    CREATE TABLE temp AS
    SELECT *, 
    EXTRACT(EPOCH FROM (TRY_CAST(date AS TIMESTAMP) - LAG(TRY_CAST(date AS TIMESTAMP)) 
    OVER (PARTITION BY client ORDER BY date))) AS time_diff,
    CASE WHEN address != LAG(address) OVER (PARTITION BY client ORDER BY date) THEN true ELSE false END AS changed_address,
    CASE WHEN city != LAG(city) OVER (PARTITION BY client ORDER BY date) THEN true ELSE false END AS changed_city,    
    FROM temp_df
    """)

In [77]:
con.sql("ALTER TABLE temp DROP time_diff;")
con.sql("ALTER TABLE temp DROP changed_address;")
con.sql("ALTER TABLE temp DROP changed_city;")

In [78]:
con.sql("SELECT * FROM temp WHERE address_diff_fraud OR time_diff_fraud OR passport_validity_fraud OR city_diff_fraud OR data_discrepancy_fraud OR amount_outlier_fraud;")

┌────────────────┬─────────────────────┬──────────┬───┬─────────────────┬──────────────────────┬──────────────────────┐
│ id_transaction │        date         │   card   │ … │ city_diff_fraud │ data_discrepancy_f…  │ amount_outlier_fraud │
│     int64      │       varchar       │ varchar  │   │     boolean     │       boolean        │       boolean        │
├────────────────┼─────────────────────┼──────────┼───┼─────────────────┼──────────────────────┼──────────────────────┤
│      643184111 │ 2024-05-22 22:54:20 │ 164548b8 │ … │ true            │ false                │ false                │
│      731586751 │ 2024-05-24 00:20:49 │ cf001c39 │ … │ true            │ false                │ false                │
│      638763574 │ 2024-05-19 15:15:05 │ c06b1edf │ … │ false           │ false                │ false                │
│      316598314 │ 2024-05-19 15:22:01 │ c06b1edf │ … │ false           │ false                │ false                │
│      103589922 │ 2024-05-19 15:31:14 │

In [85]:
con.sql("SELECT amount FROM temp;")

┌────────────────────────┐
│         amount         │
│         double         │
├────────────────────────┤
│                  400.0 │
│                  300.0 │
│                  200.0 │
│                  500.0 │
│                  800.0 │
│                  700.0 │
│               43527.93 │
│               44520.83 │
│               21222.32 │
│               20381.69 │
│                    ·   │
│                    ·   │
│                    ·   │
│                 621.14 │
│                 922.32 │
│                 353.77 │
│                 793.19 │
│                 6600.0 │
│                 7300.0 │
│                 9900.0 │
│                 6000.0 │
│                  700.0 │
│                  400.0 │
├────────────────────────┤
│         ? rows         │
│ (>9999 rows, 20 shown) │
└────────────────────────┘

In [127]:
con.sql("ALTER TABLE temp ADD COLUMN fraud_flags TEXT;")

con.sql("""
    UPDATE TEMP 
    SET fraud_flags = (
        SELECT array_agg(ARRAY[
            CASE WHEN passport_validity_fraud THEN 'passport_validity_fraud' ELSE NULL END,
            CASE WHEN time_diff_fraud THEN 'time_diff_fraud' ELSE NULL END,
            CASE WHEN address_diff_fraud THEN 'address_diff_fraud' ELSE NULL END,
            CASE WHEN city_diff_fraud THEN 'address_diff_fraud' ELSE NULL END,
            CASE WHEN data_discrepancy_fraud THEN 'data_discrepancy_fraud' ELSE NULL END,
            CASE WHEN amount_outlier_fraud THEN 'amount_outlier_fraud' ELSE NULL END,
            ])
        FROM temp
        GROUP BY client, date
    )
    """)

In [29]:
test = con.sql("SELECT * FROM temp").fetchdf()

In [30]:
test.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 18374 entries, 0 to 18373
Data columns (total 23 columns):
 #   Column                   Non-Null Count  Dtype  
---  ------                   --------------  -----  
 0   id_transaction           18374 non-null  int64  
 1   date                     18374 non-null  object 
 2   card                     18374 non-null  object 
 3   client                   18374 non-null  object 
 4   date_of_birth            18374 non-null  object 
 5   passport                 18374 non-null  object 
 6   passport_valid_to        18374 non-null  object 
 7   phone                    18374 non-null  object 
 8   operation_type           18374 non-null  object 
 9   amount                   18374 non-null  float64
 10  operation_result         18374 non-null  object 
 11  terminal_type            18374 non-null  object 
 12  city                     18374 non-null  object 
 13  address                  18374 non-null  object 
 14  time_diff             

In [69]:
def combined_pipeline(df, autoencoder):
    duckdb_con = duckdb.connect()
    duckdb_con.register("temp_df", df)
    
    duckdb_con.sql("""
        CREATE OR REPLACE TEMP TABLE temp AS
        SELECT *, 
            EXTRACT(EPOCH FROM (TRY_CAST(date AS TIMESTAMP) - LAG(TRY_CAST(date AS TIMESTAMP)) 
            OVER (PARTITION BY client ORDER BY date))) AS time_diff,
            CASE WHEN address != LAG(address) OVER (PARTITION BY client ORDER BY date) THEN true ELSE false END AS changed_address,
            CASE WHEN city != LAG(city) OVER (PARTITION BY client ORDER BY date) THEN true ELSE false END AS changed_city
        FROM temp_df
    """)
    
    fraud_columns = [
        "passport_validity_fraud",
        "time_diff_fraud",
        "address_diff_fraud",
        "city_diff_fraud",
        "data_discrepancy_fraud",
        "amount_outlier_fraud",
        "alg_fraud_status",
        "ml_fraud_status"
    ]
    for col in fraud_columns:
        duckdb_con.sql(f"ALTER TABLE temp ADD COLUMN {col} BOOLEAN DEFAULT false;")
    
    check_passport_validity(duckdb_con)
    check_time_diff_fraud(duckdb_con)
    check_time_address_diff(duckdb_con)
    check_time_city_diff(duckdb_con)
    check_data_discrepancy_fraud(duckdb_con)
    detect_hampel_outliers(duckdb_con)
    
    duckdb_con.sql("""
        CREATE TEMP TABLE median_amounts AS
        SELECT client, AVG(amount) AS median_amount
        FROM temp_df
        GROUP BY client;
    """)
    
    duckdb_con.sql("""
        CREATE TEMP TABLE temp_df_normalized AS
        SELECT 
            t.*,
            t.amount / m.median_amount AS normalized_amount
        FROM 
            temp t
        JOIN 
            median_amounts m
        ON 
            t.client = m.client;
    """)
    
    outliers = ml_reporting(duckdb_con.sql("SELECT id_transaction, operation_type, terminal_type, normalized_amount FROM temp_df_normalized ORDER BY date").df(), autoencoder)
    outliers_df = pd.DataFrame(outliers, columns=['id_transaction', 'outlier_status'])

    duckdb_con.register("outliers_df", outliers_df)
    print(outliers_df)
    duckdb_con.sql("CREATE TEMP TABLE temp_outliers AS SELECT id_transaction, outlier_status FROM outliers_df")

    duckdb_con.sql("""
        UPDATE temp
        SET ml_fraud_status = TRUE
        WHERE id_transaction IN (SELECT id_transaction FROM temp_outliers WHERE outlier_status IS TRUE);
    """)

    duckdb_con.sql("""
        UPDATE temp
        SET alg_fraud_status = TRUE 
        WHERE passport_validity_fraud = TRUE 
        OR time_diff_fraud = TRUE
        OR address_diff_fraud = TRUE
        OR city_diff_fraud = TRUE
        OR data_discrepancy_fraud = TRUE
        OR amount_outlier_fraud = TRUE;
    """)

    duckdb_con.sql("ALTER TABLE temp DROP time_diff;")
    duckdb_con.sql("ALTER TABLE temp DROP changed_address;")
    duckdb_con.sql("ALTER TABLE temp DROP changed_city;")

    duckdb_con.sql("""
        INSERT INTO db.transactions
        SELECT * FROM temp;
    """)

    duckdb_con.close()

In [95]:
def ml_reporting(df, autoencoder):
    operation_mapping = {'Оплата': 1, 'Перевод': 2, 'Пополнение': 3, 'Снятие': 4}
    terminal_mapping = {'ATM': 1, 'POS': 2, 'WEB': 3}

    transactions_ids = df['id_transaction']
    df = df.drop(['id_transaction'], axis=1)

    df['operation_type'] = df['operation_type'].map(operation_mapping)
    df['terminal_type'] = df['terminal_type'].map(terminal_mapping)

    mn = pd.Series([1.0, 1.0, 0.0003888591843678608], index=['operation_type', 'terminal_type', 'normalized_amount'])
    mx = pd.Series([4.0, 3.0, 3.954896798326836], index=['operation_type', 'terminal_type', 'normalized_amount'])

    df = (df - mn) / (mx - mn)

    reconstructions = autoencoder.predict(df)

    mse = np.mean(np.power(df - reconstructions, 2), axis=1)

    outliers = q_mad_score(mse, 0.9)

    result = pd.concat([transactions_ids, outliers], axis=1)
    result.columns = ['id_transaction', 'outlier_status']
    
    return result


def q_mad_score(points, quantile=0.9):
    m = np.median(points)
    ad = np.abs(points - m)
    mad = np.median(ad)
    z_scores = 0.6745 * ad / mad
    outliers = z_scores > np.quantile(z_scores, quantile)

    return outliers



In [96]:
con.close()

In [97]:
con = duckdb.connect()

In [33]:
test

Unnamed: 0,operation_type,terminal_type,normalized_amount
0,Пополнение,ATM,1.500000
1,Перевод,WEB,0.512989
2,Оплата,WEB,1.066815
3,Снятие,ATM,1.047619
4,Пополнение,ATM,1.272727
...,...,...,...
18369,Пополнение,ATM,0.702703
18370,Пополнение,ATM,0.846688
18371,Снятие,ATM,1.458333
18372,Пополнение,ATM,1.021148


In [7]:
operation_mapping = {'Оплата': 1, 'Перевод': 2, 'Пополнение': 3, 'Снятие': 4}
terminal_mapping = {'ATM': 1, 'POS': 2, 'WEB': 3}

test['operation_type'] = test['operation_type'].map(operation_mapping)
test['terminal_type'] = test['terminal_type'].map(terminal_mapping)

In [8]:
test

Unnamed: 0,client,date,operation_type,terminal_type,normalized_amount
0,1-21360,2024-05-25 12:00:00,3,1,1.500000
1,3-57989,2024-05-25 12:00:09,2,3,0.512989
2,4-58986,2024-05-25 12:00:23,1,3,1.066815
3,5-11446,2024-05-25 12:00:25,4,1,1.047619
4,9-48112,2024-05-25 12:00:36,3,1,1.272727
...,...,...,...,...,...
18369,8-47569,2024-05-27 00:16:34,3,1,0.702703
18370,6-51685,2024-05-27 00:17:52,3,1,0.846688
18371,5-23671,2024-05-27 00:19:13,4,1,1.458333
18372,1-97423,2024-05-27 00:19:22,3,1,1.021148


In [9]:
test = test.drop(['client', 'date'], axis=1)

In [10]:
test

Unnamed: 0,operation_type,terminal_type,normalized_amount
0,3,1,1.500000
1,2,3,0.512989
2,1,3,1.066815
3,4,1,1.047619
4,3,1,1.272727
...,...,...,...
18369,3,1,0.702703
18370,3,1,0.846688
18371,4,1,1.458333
18372,3,1,1.021148
