In [1]:
import db_connection as db_conn
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, isnan, count, avg, lag, unix_timestamp
from pyspark.sql.types import FloatType, ArrayType, IntegerType
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark import StorageLevel

import pandas as pd
import numpy as np
import math
from datetime import timedelta, datetime
from functools import reduce
import matplotlib.pyplot as plt


In [2]:
db_config = db_conn.config_scam_alt_sql
spark = SparkSession.builder \
    .appName("process_tx") \
    .config("spark.driver.memory", "16g") \
    .config("spark.executor.memory", "32g") \
    .getOrCreate()

23/08/04 13:49:43 WARN Utils: Your hostname, NatRng-MBP.local resolves to a loopback address: 127.0.0.1; using 10.200.168.84 instead (on interface en0)
23/08/04 13:49:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/04 13:49:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/08/04 13:49:45 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
url = f"jdbc:mariadb://{db_config['host']}:{db_config['port']}/{db_config['database']}"
user = db_config['user']
password = db_config['password']
transactions_df = spark.read.format('jdbc').options(url=url, dbtable='Transactions', user=user, password=password).load()
block_df = spark.read.format('jdbc').options(url=url, dbtable='Blocks', user=user, password=password).load()
address_df = spark.read.format('jdbc').options(url=url, dbtable='Addresses', user=user, password=password).load()
category_df = spark.read.format('jdbc').options(url=url, dbtable='TxCategories', user=user, password=password).load()
contract_df = spark.read.format('jdbc').options(url=url, dbtable='Contracts', user=user, password=password).load()

In [4]:
transactions_df = transactions_df.withColumn("year_month", F.date_format(transactions_df["timestamp"], "yyyy-MM"))

Outgoing and incoming Transactions

In [5]:
outgoing_transactions = transactions_df.groupby("from_id", "year_month")\
                             .agg(F.count("tx_id").alias("outgoing_tx_count"))

incoming_transactions = transactions_df.filter(col("to_id").isNotNull()).groupby("to_id", "year_month")\
                             .agg(F.count("tx_id").alias("incoming_tx_count"))

outgoing_transactions = outgoing_transactions.withColumnRenamed("from_id", "address_id")
incoming_transactions = incoming_transactions.withColumnRenamed("to_id", "address_id")

transactions_count = outgoing_transactions.join(incoming_transactions, ["address_id", "year_month"], "outer")
transactions_count = transactions_count.fillna(0)

In [6]:
transactions_count.show()

                                                                                

+----------+----------+-----------------+-----------------+
|address_id|year_month|outgoing_tx_count|incoming_tx_count|
+----------+----------+-----------------+-----------------+
|         1|   2018-09|                4|               14|
|         2|   2018-11|                0|                1|
|         3|   2021-04|                0|                1|
|        11|   2018-07|                1|                3|
|        11|   2019-04|                0|                2|
|        11|   2022-09|                0|                1|
|        13|   2019-01|               59|                0|
|        13|   2019-05|               83|                0|
|        16|   2020-04|                4|                6|
|        18|   2019-04|                2|                2|
|        20|   2019-01|               13|                9|
|        25|   2022-07|                3|                0|
|        26|   2018-09|                5|               16|
|        26|   2019-04|                0

Time diff between first and last transaction

In [6]:
outgoing_timestamps = address_df.join(transactions_df, address_df.address_id == transactions_df.from_id, "left").select("address_id", "timestamp", "year_month")
outgoing_timestamps = outgoing_timestamps.na.drop(subset=["timestamp"])
incoming_timestamps = address_df.join(transactions_df, address_df.address_id == transactions_df.to_id, "left").select("address_id", "timestamp", "year_month")
incoming_timestamps = incoming_timestamps.na.drop(subset=["timestamp"])

all_timestamps = outgoing_timestamps.union(incoming_timestamps)
all_timestamps = all_timestamps.na.drop(subset=["timestamp"])
# Group by address_id and calculate min, max timestamp
transactions_timestamps = all_timestamps.groupBy("address_id", "year_month")\
                                          .agg(F.min("timestamp").alias("first_tx_timestamp"),
                                               F.max("timestamp").alias("last_tx_timestamp"))

# Calculate the time difference for each address_id in minutes
transactions_timestamps = transactions_timestamps.withColumn(
    "time_difference",
    F.col("last_tx_timestamp").cast("long") - F.col("first_tx_timestamp").cast("long"))

transactions_timestamps = transactions_timestamps.withColumn(
    "time_difference_in_minutes",
    (F.col("time_difference") / 60))

# Show the results
transactions_timestamps = transactions_timestamps.select("address_id", "year_month", "time_difference_in_minutes")

In [7]:
windowSpec = Window.partitionBy("address_id", "year_month").orderBy("timestamp")
time_diff_secs = (unix_timestamp(col("timestamp")) - lag(unix_timestamp(col("timestamp"))).over(windowSpec))

outtx_subquery = outgoing_timestamps.withColumn("time_diff_secs", time_diff_secs) \
    .groupBy("address_id", "year_month") \
    .agg(avg(col("time_diff_secs")).alias("avg_time_diff_secs"))

# Calculate the average time difference in minutes by dividing by 60
avg_outgoing_tx = outtx_subquery.withColumn("avg_time_diff_out_minutes", col("avg_time_diff_secs") / 60) \
    .select("address_id", "year_month", "avg_time_diff_out_minutes")

inctx_subquery = incoming_timestamps.withColumn("time_diff_secs", time_diff_secs) \
    .groupBy("address_id", "year_month") \
    .agg(avg(col("time_diff_secs")).alias("avg_time_diff_secs"))

# Calculate the average time difference in minutes by dividing by 60
avg_incoming_tx = inctx_subquery.withColumn("avg_time_diff_in_minutes", col("avg_time_diff_secs") / 60) \
    .select("address_id", "year_month", "avg_time_diff_in_minutes")

all_tx_subquery = all_timestamps.withColumn("time_diff_secs", time_diff_secs) \
    .groupBy("address_id", "year_month") \
    .agg(avg(col("time_diff_secs")).alias("avg_time_diff_secs"))

avg_total_tx = all_tx_subquery.withColumn("total_avg_time_diff_minutes", col("avg_time_diff_secs") / 60) \
    .select("address_id", "year_month", "total_avg_time_diff_minutes")

avg_tx = avg_outgoing_tx.join(avg_incoming_tx, ["address_id", "year_month"], "outer")
avg_tx = avg_tx.join(avg_total_tx, ["address_id", "year_month"], "outer")
avg_tx = avg_tx.fillna(0)                              

Contract Creation

In [8]:
contracts_created  = contract_df.join(transactions_df, contract_df['tx_id'] == transactions_df['tx_id'], "left")\
                                .select(contract_df["contract_id"], contract_df["tx_id"], transactions_df["from_id"], transactions_df["year_month"])

contracts_created = contracts_created.groupBy("from_id", "year_month").agg(F.count("tx_id").alias("contracts_created"))
contracts_created = contracts_created.withColumnRenamed("from_id", "address_id")

Get Unique Received From Addresses

In [9]:
unique_received_transactions = transactions_df.filter(col("to_id").isNotNull()).groupBy("to_id", "year_month").agg(F.countDistinct("from_id").alias("unique_received_transactions"))
unique_received_transactions = unique_received_transactions.withColumnRenamed("to_id", "address_id")
unique_sent_transactions = transactions_df.groupBy("from_id", "year_month").agg(F.countDistinct("to_id").alias("unique_sent_transactions"))
unique_sent_transactions = unique_sent_transactions.withColumnRenamed("from_id", "address_id")

In [10]:
incoming_eth = transactions_df.filter(col("asset") == "ETH").filter(col("to_id").isNotNull()) \
                    .fillna({"asset_value": 0}) \
                    .groupBy("to_id", "year_month").agg(F.sum("asset_value").alias("total_incoming_eth"))
incoming_eth = incoming_eth.withColumnRenamed("to_id", "address_id")

outgoing_eth = transactions_df.filter(col("asset") == "ETH")\
                    .fillna({"asset_value": 0}) \
                    .groupBy("from_id", "year_month").agg(F.sum("asset_value").alias("total_outgoing_eth"))
outgoing_eth = outgoing_eth.withColumnRenamed("from_id", "address_id")

erc20 transfers

In [11]:
erc_20_tnx = transactions_df.filter(col("category_id") == 3)
erc_20_tnx = erc_20_tnx.withColumn("asset_value", erc_20_tnx["asset_value"].cast(FloatType()))
outgoing_erc_20_tnx = erc_20_tnx.groupBy("from_id", "year_month").agg(F.count("tx_id").alias("outgoing_erc_20_tnx"))
incoming_erc_20_tnx = erc_20_tnx.filter(col("to_id").isNotNull()).groupBy("to_id", "year_month").agg(F.count("tx_id").alias("incoming_erc_20_tnx"))
outgoing_erc_20_tnx = outgoing_erc_20_tnx.withColumnRenamed("from_id", "address_id")
incoming_erc_20_tnx = incoming_erc_20_tnx.withColumnRenamed("to_id", "address_id")
total_erc_20_tnx = outgoing_erc_20_tnx.join(incoming_erc_20_tnx, ["address_id", "year_month"], "outer").fillna(0)
total_erc_20_tnx = total_erc_20_tnx.withColumn("total_erc_20_tnx", F.col("outgoing_erc_20_tnx") + F.col("incoming_erc_20_tnx"))
total_erc_20_tnx = total_erc_20_tnx.select("address_id", "year_month", "outgoing_erc_20_tnx", "incoming_erc_20_tnx", "total_erc_20_tnx")

ERC 20 ETH TX Features

In [12]:
erc20eth_out = erc_20_tnx.filter(col("asset") == "ETH")\
                .fillna({"asset_value": 0}) \
                .groupBy("from_id", "year_month").agg(F.sum("asset_value").alias("total_outgoing_erc20eth"))
erc20eth_out = erc20eth_out.withColumnRenamed("from_id", "address_id")

erc20eth_in = erc_20_tnx.filter(col("asset") == "ETH")\
                .filter(col("to_id").isNotNull()) \
                .fillna({"asset_value": 0}) \
                .groupBy("to_id", "year_month").agg(F.sum("asset_value").alias("total_incoming_erc20eth"))
erc20eth_in = erc20eth_in.withColumnRenamed("to_id", "address_id")

min_erc20token_out = erc_20_tnx.groupBy("from_id", "year_month") \
                    .min("asset_value").withColumnRenamed("min(asset_value)", "min_erc20token_out")
min_erc20token_out = min_erc20token_out.withColumnRenamed("from_id", "address_id")

min_erc20token_in = erc_20_tnx.filter(col("to_id").isNotNull()).groupBy("to_id", "year_month") \
                    .min("asset_value").withColumnRenamed("min(asset_value)", "min_erc20token_in")
min_erc20token_in = min_erc20token_in.withColumnRenamed("to_id", "address_id")

max_erc20token_out = erc_20_tnx.groupBy("from_id", "year_month") \
                    .max("asset_value").withColumnRenamed("max(asset_value)", "max_erc20token_out")
max_erc20token_out = max_erc20token_out.withColumnRenamed("from_id", "address_id")

max_erc20token_in = erc_20_tnx.filter(col("to_id").isNotNull()).groupBy("to_id", "year_month") \
                    .max("asset_value").withColumnRenamed("max(asset_value)", "max_erc20token_in")
max_erc20token_in = max_erc20token_in.withColumnRenamed("to_id", "address_id")

num_unique_erc20tokens_out = erc_20_tnx.groupBy("from_id", "year_month") \
                            .agg(F.countDistinct("asset").alias("num_unique_erc20tokens_out"))
num_unique_erc20tokens_out = num_unique_erc20tokens_out.withColumnRenamed("from_id", "address_id")

num_unique_erc20tokens_in = erc_20_tnx.filter(col("to_id").isNotNull()).groupBy("to_id", "year_month") \
                            .agg(F.countDistinct("asset").alias("num_unique_erc20tokens_in"))
num_unique_erc20tokens_in = num_unique_erc20tokens_in.withColumnRenamed("to_id", "address_id")

ERC1155 Transactions Features

In [13]:
erc_1155_tnx = transactions_df.filter(col("category_id") == 1)
outgoing_erc_1155_tnx = erc_1155_tnx.groupBy("from_id", "year_month").agg(F.count("tx_id").alias("outgoing_erc_1155_tnx"))
incoming_erc_1155_tnx = erc_1155_tnx.filter(col("to_id").isNotNull()).groupBy("to_id", "year_month").agg(F.count("tx_id").alias("incoming_erc_1155_tnx"))
outgoing_erc_1155_tnx = outgoing_erc_1155_tnx.withColumnRenamed("from_id", "address_id")
incoming_erc_1155_tnx = incoming_erc_1155_tnx.withColumnRenamed("to_id", "address_id")
total_erc_1155_tnx = outgoing_erc_1155_tnx.join(incoming_erc_1155_tnx, ["address_id", "year_month"], "outer").fillna(0)
total_erc_1155_tnx = total_erc_1155_tnx.withColumn("total_erc_1155_tnx", F.col("outgoing_erc_1155_tnx") + F.col("incoming_erc_1155_tnx"))
total_erc_1155_tnx = total_erc_1155_tnx.select("address_id", "year_month", "outgoing_erc_1155_tnx", "incoming_erc_1155_tnx", "total_erc_1155_tnx")

ERC721 TX Features

In [14]:
erc_721_tnx = transactions_df.filter(col("category_id") == 5)
outgoing_erc_721_tnx = erc_721_tnx.groupBy("from_id", "year_month").agg(F.count("tx_id").alias("outgoing_erc_721_tnx"))
incoming_erc_721_tnx = erc_721_tnx.filter(col("to_id").isNotNull()).groupBy("to_id", "year_month").agg(F.count("tx_id").alias("incoming_erc_721_tnx"))
outgoing_erc_721_tnx = outgoing_erc_721_tnx.withColumnRenamed("from_id", "address_id")
incoming_erc_721_tnx = incoming_erc_721_tnx.withColumnRenamed("to_id", "address_id")
total_erc_721_tnx = outgoing_erc_721_tnx.join(incoming_erc_721_tnx, ["address_id","year_month"], "outer").fillna(0)
total_erc_721_tnx = total_erc_721_tnx.withColumn("total_erc_721_tnx", F.col("outgoing_erc_721_tnx") + F.col("incoming_erc_721_tnx"))
total_erc_721_tnx = total_erc_721_tnx.select("address_id", "year_month","outgoing_erc_721_tnx", "incoming_erc_721_tnx", "total_erc_721_tnx")

num_unique_erc721asset_out = erc_721_tnx.groupBy("from_id", "year_month") \
                            .agg(F.countDistinct("asset").alias("num_unique_erc721asset_out"))
num_unique_erc721asset_out = num_unique_erc721asset_out.withColumnRenamed("from_id", "address_id")

num_unique_erc721asset_in = erc_721_tnx.filter(col("to_id").isNotNull()).groupBy("to_id", "year_month") \
                            .agg(F.countDistinct("asset").alias("num_unique_erc721asset_in"))
num_unique_erc721asset_in = num_unique_erc721asset_in.withColumnRenamed("to_id", "address_id")

num_unique_erc721token_out = erc_721_tnx.groupBy("from_id", "year_month") \
                            .agg(F.countDistinct("erc721_token_id").alias("num_unique_erc721tokens_out"))
num_unique_erc721token_out = num_unique_erc721token_out.withColumnRenamed("from_id", "address_id")

num_unique_erc721token_in = erc_721_tnx.filter(col("to_id").isNotNull()).groupBy("to_id", "year_month") \
                            .agg(F.countDistinct("erc721_token_id").alias("num_unique_erc721tokens_in"))
num_unique_erc721tokent_in = num_unique_erc721token_in.withColumnRenamed("to_id", "address_id")

Compute Gini Coefficients of daily and monthly transactions

In [17]:
transactions_df = transactions_df.withColumn('date', F.to_date('timestamp', 'yyyy-MM-dd HH:mm:ss'))
transactions_df = transactions_df.withColumn('week_of_year', F.weekofyear('date'))

In [18]:
daily_from_transactions = transactions_df.groupBy('from_id', 'year_month', 'date').agg(F.count('tx_id').alias('daily_tx_count'))
daily_from_transactions = daily_from_transactions.groupBy('from_id', 'year_month').agg(F.collect_list('daily_tx_count').alias('daily_tx_counts'))

daily_to_transactions = transactions_df.filter(col("to_id").isNotNull()).groupBy('to_id', 'year_month','date').agg(F.count('tx_id').alias('daily_tx_count'))
daily_to_transactions = daily_to_transactions.groupBy('to_id', 'year_month').agg(F.collect_list('daily_tx_count').alias('daily_tx_counts'))

weekly_from_transactions = transactions_df.groupBy('from_id', 'year_month', 'week_of_year').agg(F.count('tx_id').alias('weekly_tx_count'))
weekly_from_transactions = weekly_from_transactions.groupBy('from_id', 'year_month').agg(F.collect_list('weekly_tx_count').alias('weekly_tx_counts'))

weekly_to_transactions = transactions_df.groupBy('to_id', 'year_month', 'week_of_year').agg(F.count('tx_id').alias('weekly_tx_count'))
weekly_to_transactions = weekly_to_transactions.groupBy('to_id', 'year_month').agg(F.collect_list('weekly_tx_count').alias('weekly_tx_counts'))


In [19]:
total_transactions_out = transactions_df.select("from_id", "year_month", "timestamp", "date", "week_of_year")
total_transactions_in = transactions_df.filter(col("to_id").isNotNull()).select("to_id", "year_month", "timestamp", "date", "week_of_year")
total_transactions_out = total_transactions_out.withColumnRenamed("from_id", "address_id")
total_transactions_in = total_transactions_in.withColumnRenamed("to_id", "address_id")
total_transactions = total_transactions_out.union(total_transactions_in)

daily_total_transactions = total_transactions.groupBy("address_id", "year_month", "date").agg(F.count("timestamp").alias("daily_total_tx_count"))
daily_total_transactions = daily_total_transactions.groupBy("address_id", "year_month").agg(F.collect_list("daily_total_tx_count").alias("daily_total_tx_counts"))

weekly_total_transactions = total_transactions.groupBy("address_id", "year_month", "week_of_year").agg(F.count("timestamp").alias("weekly_total_tx_count"))
weekly_total_transactions = weekly_total_transactions.groupBy("address_id", "year_month").agg(F.collect_list("weekly_total_tx_count").alias("weekly_total_tx_counts"))


In [20]:
def extend_day_list(input_list):
    if len(input_list) < 30:
        input_list += [0] * (30 - len(input_list))
    return input_list

def extend_week_list(input_list):
    if len(input_list) < 4:
        input_list += [0] * (4 - len(input_list))
    return input_list

udf_extend_day_list = F.udf(extend_day_list, ArrayType(IntegerType()))
udf_extend_week_list = F.udf(extend_week_list, ArrayType(IntegerType()))

daily_from_transactions = daily_from_transactions.withColumn("daily_tx_counts", udf_extend_day_list(daily_from_transactions['daily_tx_counts']))
daily_to_transactions = daily_to_transactions.withColumn("daily_tx_counts", udf_extend_day_list(daily_to_transactions['daily_tx_counts']))

weekly_from_transactions = weekly_from_transactions.withColumn("weekly_tx_counts", udf_extend_week_list(weekly_from_transactions['weekly_tx_counts']))
weekly_to_transactions = weekly_to_transactions.withColumn("weekly_tx_counts", udf_extend_week_list(weekly_to_transactions['weekly_tx_counts']))

daily_total_transactions = daily_total_transactions.withColumn("daily_total_tx_counts", udf_extend_day_list(daily_total_transactions['daily_total_tx_counts']))
weekly_total_transactions = weekly_total_transactions.withColumn("weekly_total_tx_counts", udf_extend_week_list(weekly_total_transactions['weekly_total_tx_counts']))

In [21]:
def calculate_gini_idx(x):
    x = sorted(x) 
    n = len(x)
    total = sum((i+1) * xi for i, xi in enumerate(x))
    gini_index = (2 * total) / (n * sum(x)) - (n + 1) / n
    return gini_index

udf_calculate_gini_idx = F.udf(calculate_gini_idx, FloatType())

daily_from_transactions = daily_from_transactions.withColumn('daily_from_gini_index', udf_calculate_gini_idx('daily_tx_counts'))
daily_to_transactions = daily_to_transactions.withColumn('daily_to_gini_index', udf_calculate_gini_idx('daily_tx_counts'))

weekly_from_transactions = weekly_from_transactions.withColumn('weekly_from_gini_index', udf_calculate_gini_idx('weekly_tx_counts'))
weekly_to_transactions = weekly_to_transactions.withColumn('weekly_to_gini_index', udf_calculate_gini_idx('weekly_tx_counts'))

daily_total_transactions = daily_total_transactions.withColumn('daily_total_gini_index', udf_calculate_gini_idx('daily_total_tx_counts'))
weekly_total_transactions = weekly_total_transactions.withColumn('weekly_total_gini_index', udf_calculate_gini_idx('weekly_total_tx_counts'))

daily_from_transactions = daily_from_transactions.withColumnRenamed("from_id", "address_id")
daily_to_transactions = daily_to_transactions.withColumnRenamed("to_id", "address_id")
weekly_from_transactions = weekly_from_transactions.withColumnRenamed("from_id", "address_id")
weekly_to_transactions = weekly_to_transactions.withColumnRenamed("to_id", "address_id")

gini_dfs = [daily_from_transactions, daily_to_transactions, weekly_from_transactions, 
       weekly_to_transactions, daily_total_transactions, weekly_total_transactions]

daily_gini_index = reduce(lambda a, b: a.join(b, ["address_id", "year_month"], "outer"), gini_dfs[0:2])
daily_gini_index = daily_gini_index.select("address_id", "year_month", "daily_from_gini_index", "daily_to_gini_index")
 
weekly_gini_index = reduce(lambda a, b: a.join(b, ["address_id", "year_month"], "outer"), gini_dfs[2:4])
weekly_gini_index = weekly_gini_index.select("address_id", "year_month", "weekly_from_gini_index", "weekly_to_gini_index")

total_gini_index = reduce(lambda a, b: a.join(b, ["address_id", "year_month"], "outer"), gini_dfs[4:6])
total_gini_index = total_gini_index.select("address_id", "year_month", "daily_total_gini_index", "weekly_total_gini_index")

In [22]:
daily_gini_index.write.mode('overwrite').parquet("data/parquet_files/daily_gini_idx_scam_alt.parquet")
weekly_gini_index.write.mode('overwrite').parquet("data/parquet_files/weekly_gini_idx_scam_alt.parquet")
total_gini_index.write.mode('overwrite').parquet("data/parquet_files/total_gini_idx_scam_alt.parquet")

                                                                                

Recency Transactions

In [23]:
gamma = 0.3
window_period = Window.partitionBy("address_id", "year_month").orderBy("timestamp")

def compute_recency(time_delta):
    if time_delta is None:
        tx_recency = 0
    else:
        tx_recency = math.exp(-gamma * time_delta)
    return tx_recency

udf_tx_recency = F.udf(compute_recency, FloatType())

outgoing_tx_times = address_df.join(transactions_df, address_df.address_id == transactions_df.from_id, "left").select("tx_id", "address_id", "timestamp", "year_month")
outgoing_tx_times = outgoing_tx_times.na.drop(subset=["timestamp"])
incoming_tx_times = address_df.join(transactions_df, address_df.address_id == transactions_df.to_id, "left").select("tx_id", "address_id", "timestamp", "year_month")
incoming_tx_times = incoming_tx_times.na.drop(subset=["timestamp"])

time_diff_days = (unix_timestamp(col("timestamp")) - lag(unix_timestamp(col("timestamp"))).over(window_period)) / 86400

outgoing_tx_times = outgoing_tx_times.withColumn("time_diff_days", time_diff_days)
outgoing_tx_times = outgoing_tx_times.withColumn("recency", udf_tx_recency(col("time_diff_days")))
outgoing_tx_times = outgoing_tx_times.fillna({"recency": 0})
outgoing_tx_times = outgoing_tx_times.select("tx_id", "address_id", "year_month", "timestamp", "time_diff_days", "recency")
outgoing_recency_avg = outgoing_tx_times.groupBy("address_id", "year_month").agg(F.median('recency').alias("median_recency_out"))

incoming_tx_times = incoming_tx_times.withColumn("time_diff_days", time_diff_days)
incoming_tx_times = incoming_tx_times.withColumn("recency", udf_tx_recency(col("time_diff_days")))
incoming_tx_times = incoming_tx_times.fillna({"recency": 0})
incoming_tx_times = incoming_tx_times.select("tx_id", "address_id", "year_month", "timestamp","time_diff_days", "recency")
incoming_recency_avg = incoming_tx_times.groupBy("address_id", "year_month").agg(F.median('recency').alias("median_recency_in"))

recency_avg = outgoing_recency_avg.join(incoming_recency_avg, ["address_id","year_month"], "outer")
recency_avg = recency_avg.fillna(0)
recency_avg = recency_avg.select("address_id", "year_month", "median_recency_out", "median_recency_in")

In [24]:
recency_avg.write.mode('overwrite').parquet("data/parquet_files/recency_avg_scam_alt.parquet")

                                                                                

Anomolous ETH transactions

In [5]:
median_k = 1.4826
incoming_median = transactions_df.filter(col("asset") == "ETH") \
    .filter(col("to_id").isNotNull()) \
    .fillna({"asset_value": 0}) \
    .groupBy("to_id", 'year_month') \
    .agg(F.median('asset_value').alias("median_eth_in"))

outgoing_median = transactions_df.filter(col("asset") == "ETH") \
    .fillna({"asset_value": 0}) \
    .groupBy("from_id", 'year_month') \
    .agg(F.median('asset_value').alias("median_eth_out"))

transactions_df_alias = transactions_df.alias("transactions_df")

incoming_joined = transactions_df_alias.filter(col("asset") == "ETH") \
    .filter(col("to_id").isNotNull()) \
    .fillna({"asset_value": 0}) \
    .join(incoming_median, ["to_id", "year_month"], 'inner')

incoming_devs = incoming_joined.withColumn("abs_dev", median_k*F.abs(incoming_joined.asset_value - incoming_joined.median_eth_in))

incoming_mad = incoming_devs.groupBy("to_id", "year_month") \
    .agg(F.median('abs_dev').alias("mad_eth_in"))

outgoing_joined = transactions_df_alias.filter(col("asset") == "ETH") \
    .fillna({"asset_value": 0}) \
    .join(outgoing_median, ["from_id", "year_month"], 'inner')

outgoing_devs = outgoing_joined.withColumn("abs_dev", median_k*F.abs(outgoing_joined.asset_value - outgoing_joined.median_eth_out))

outgoing_mad = outgoing_devs.groupBy("from_id", "year_month") \
    .agg(F.median('abs_dev').alias("mad_eth_out"))

outgoing_median_mad = outgoing_median.join(outgoing_mad, ["from_id", "year_month"])
outgoing_median_mad = outgoing_median_mad.withColumnRenamed("from_id", "address_id")
outgoing_median_mad = outgoing_median_mad.alias("outgoing_median_mad")

incoming_median_mad = incoming_median.join(incoming_mad, ["to_id", "year_month"])
incoming_median_mad = incoming_median_mad.withColumnRenamed("to_id", "address_id")
incoming_median_mad = incoming_median_mad.alias("incoming_median_mad")

In [6]:
def compute_zscore(value, median, mad):
    return F.when(mad != 0, (value - median) / mad).otherwise(0)

transfer_df_expanded = transactions_df.filter(col("asset") == "ETH")\
    .join(outgoing_median_mad, 
          (transactions_df["from_id"] == outgoing_median_mad["address_id"]) & 
          (transactions_df["year_month"] == outgoing_median_mad["year_month"]), 
          "left")

transfer_df_expanded = transfer_df_expanded.drop('year_month')

transfer_df_expanded = transfer_df_expanded.join(incoming_median_mad, 
                                                 (transfer_df_expanded["to_id"] == incoming_median_mad["address_id"]) & 
                                                 (transactions_df["year_month"] == incoming_median_mad["year_month"]), 
                                                 "left")\
    .fillna({"median_eth_out": 0, "mad_eth_out": 0, "median_eth_in": 0, "mad_eth_in": 0})

transfer_df_expanded = transfer_df_expanded.withColumn("zscore_eth_in", 
                                   compute_zscore(F.col("asset_value"), 
                                                  F.col("median_eth_in"), 
                                                  F.col("mad_eth_in")))

transfer_df_expanded = transfer_df_expanded.withColumn("zscore_eth_out", 
                                   compute_zscore(F.col("asset_value"), 
                                                  F.col("median_eth_out"), 
                                                  F.col("mad_eth_out")))

In [7]:
outgoing_outliers = transfer_df_expanded.groupBy("from_id", "year_month")\
    .agg(F.sum(F.when((F.col("zscore_eth_out") > 3.5) | (F.col("zscore_eth_out") < -3.5), 1).otherwise(0))\
    .alias("num_outliers_eth_out"))
outgoing_outliers = outgoing_outliers.withColumnRenamed("from_id", "address_id")

incoming_outliers = transfer_df_expanded.groupBy("to_id", "year_month")\
    .agg(F.sum(F.when((F.col("zscore_eth_in") > 3.5) | (F.col("zscore_eth_in") < -3.5), 1).otherwise(0))\
    .alias("num_outliers_eth_in"))
incoming_outliers = incoming_outliers.withColumnRenamed("to_id", "address_id")

In [8]:
merged_outliers = outgoing_outliers.join(incoming_outliers, ["address_id","year_month"], "outer").fillna(0)

In [9]:
merged_outliers.write.mode('overwrite').parquet("data/parquet_files/outliers_scam_alt.parquet")

                                                                                

Merge DFs

In [15]:
dfs_1 = [transactions_count, transactions_timestamps, contracts_created, avg_tx, unique_received_transactions, unique_sent_transactions, 
         incoming_eth, outgoing_eth, total_erc_20_tnx, erc20eth_out, erc20eth_in, min_erc20token_in, min_erc20token_out]
account_data_1 = reduce(lambda a, b: a.join(b, ["address_id","year_month"], "outer"), dfs_1)
account_data_1 = account_data_1.repartition(100)
account_data_1.write.mode('overwrite').parquet("data/parquet_files/account_data_1_scam_alt.parquet")
account_data_1 = None

                                                                                

In [16]:
dfs_2 = [max_erc20token_in, max_erc20token_out, num_unique_erc20tokens_out, num_unique_erc20tokens_in, 
         total_erc_1155_tnx, total_erc_721_tnx, num_unique_erc721asset_out, num_unique_erc721asset_in]
account_data_2 = reduce(lambda a, b: a.join(b, ["address_id","year_month"], "outer"), dfs_2)
account_data_2.write.mode('overwrite').parquet("data/parquet_files/account_data_2_scam_alt.parquet")
account_data_2 = None

                                                                                

In [10]:
fillna_values = {
    "outgoing_tx_count": 0, "incoming_tx_count": 0, "time_difference_in_minutes": 0, "contracts_created": 0,
    "avg_time_diff_out_minutes": 0, "avg_time_diff_in_minutes": 0, "total_avg_time_diff_minutes": 0,
    "unique_received_transactions": 0, "unique_sent_transactions": 0, "total_incoming_eth": 0,
    "total_outgoing_eth": 0, "outgoing_erc_20_tnx": 0, "incoming_erc_20_tnx": 0, "total_erc_20_tnx": 0,
    "total_outgoing_erc20eth": 0, "total_incoming_erc20eth": 0, "min_erc20token_in": 0,
    "min_erc20token_out": 0, "max_erc20token_in": 0, "max_erc20token_out": 0,
    "num_unique_erc20tokens_out": 0, "num_unique_erc20tokens_in": 0, "outgoing_erc_1155_tnx": 0,
    "incoming_erc_1155_tnx": 0, "total_erc_1155_tnx": 0, "outgoing_erc_721_tnx": 0,
    "incoming_erc_721_tnx": 0, "total_erc_721_tnx": 0, "num_unique_erc721asset_out": 0,
    "num_unique_erc721asset_in": 0
}

account_data_1 = spark.read.parquet("data/parquet_files/account_data_1_scam_alt.parquet")
account_data_2 = spark.read.parquet("data/parquet_files/account_data_2_scam_alt.parquet")
daily_gini_index = spark.read.parquet("data/parquet_files/daily_gini_idx_scam_alt.parquet")
weekly_gini_index = spark.read.parquet("data/parquet_files/weekly_gini_idx_scam_alt.parquet")
total_gini_index = spark.read.parquet("data/parquet_files/total_gini_idx_scam_alt.parquet")
recency_avg = spark.read.parquet("data/parquet_files/recency_avg_scam_alt.parquet")
outliers = spark.read.parquet("data/parquet_files/outliers_scam_alt.parquet")
merge_saved_df = [account_data_1, account_data_2, daily_gini_index, weekly_gini_index, total_gini_index, recency_avg, outliers]
account_df = reduce(lambda a, b: a.join(b, ["address_id","year_month"], "outer"), merge_saved_df)
account_df = account_df.fillna(fillna_values)

account_df = account_df.withColumn("total_tx_with_contracts", 
                                   F.col("outgoing_tx_count") + F.col("incoming_tx_count") + F.col("contracts_created"))

account_df = account_df.select([
    "address_id", "year_month", "outgoing_tx_count", "incoming_tx_count", "unique_received_transactions", "unique_sent_transactions",
    "contracts_created", "total_tx_with_contracts", "total_incoming_eth", "total_outgoing_eth", "time_difference_in_minutes",
    "avg_time_diff_out_minutes", "avg_time_diff_in_minutes", "total_avg_time_diff_minutes", "outgoing_erc_20_tnx",
    "incoming_erc_20_tnx", "total_erc_20_tnx", "total_outgoing_erc20eth", "total_incoming_erc20eth", "min_erc20token_in",
    "min_erc20token_out", "max_erc20token_in", "max_erc20token_out", "num_unique_erc20tokens_out",
    "num_unique_erc20tokens_in", "outgoing_erc_1155_tnx", "incoming_erc_1155_tnx", "total_erc_1155_tnx",
    "outgoing_erc_721_tnx", "incoming_erc_721_tnx", "total_erc_721_tnx", "num_unique_erc721asset_out",
    "num_unique_erc721asset_in", "daily_from_gini_index", "daily_to_gini_index", "weekly_from_gini_index",
    "weekly_to_gini_index", "daily_total_gini_index", "weekly_total_gini_index", "median_recency_out", "median_recency_in",
    "num_outliers_eth_out", "num_outliers_eth_in"
])

account_df = account_df.filter(col("daily_total_gini_index").isNotNull())

In [11]:
addresses = pd.read_pickle("data/pickle_files/scam_users_and_contract_creators_alt.pkl")
null_addresses = pd.read_pickle("data/pickle_files/missing_addresses_alt.pickle")
addresses = [address for address in addresses if address not in null_addresses]
scam_address_df = spark.createDataFrame([(address,) for address in addresses], ['address'])
scam_address_df = scam_address_df.withColumn('address_lower', F.lower(scam_address_df['address']))
scam_address_id_df = scam_address_df.join(address_df.withColumn('address_lower', F.lower(address_df['address'])), on='address_lower', how='left')
scam_address_id_df = scam_address_id_df.select('address_lower', 'address_id')
scam_address_id_df = scam_address_id_df.withColumnRenamed("address_lower", "address")
scam_address_id = scam_address_id_df.select("address_id").collect()

                                                                                

In [12]:
selected_address_df = account_df.filter(account_df['address_id'].isin(*[row.address_id for row in scam_address_id]))

In [13]:
selected_address_df.show()

23/08/04 13:55:01 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 119:>                                                        (0 + 1) / 1]

+----------+----------+-----------------+-----------------+----------------------------+------------------------+-----------------+-----------------------+------------------+--------------------+--------------------------+-------------------------+------------------------+---------------------------+-------------------+-------------------+----------------+-----------------------+-----------------------+-----------------+------------------+-----------------+------------------+--------------------------+-------------------------+---------------------+---------------------+------------------+--------------------+--------------------+-----------------+--------------------------+-------------------------+---------------------+-------------------+----------------------+--------------------+----------------------+-----------------------+------------------+--------------------+--------------------+-------------------+
|address_id|year_month|outgoing_tx_count|incoming_tx_count|unique_received_t

                                                                                

In [14]:
selected_address_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in account_df.columns]).show()

                                                                                

+----------+----------+-----------------+-----------------+----------------------------+------------------------+-----------------+-----------------------+------------------+------------------+--------------------------+-------------------------+------------------------+---------------------------+-------------------+-------------------+----------------+-----------------------+-----------------------+-----------------+------------------+-----------------+------------------+--------------------------+-------------------------+---------------------+---------------------+------------------+--------------------+--------------------+-----------------+--------------------------+-------------------------+---------------------+-------------------+----------------------+--------------------+----------------------+-----------------------+------------------+-----------------+--------------------+-------------------+
|address_id|year_month|outgoing_tx_count|incoming_tx_count|unique_received_transa

In [15]:
selected_address_df.write.mode('overwrite').parquet("data/parquet_files/account_df_scam_alt.parquet")

                                                                                

In [16]:
selected_address_df = spark.read.parquet("data/parquet_files/account_df_scam_alt.parquet")
selected_address_df.count()

26653