In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import pyspark.sql.functions as F


spark = SparkSession.builder \
    .appName("LayerZeroTransactionAnalysis") \
    .config("spark.executor.memory", "6g") \
    .config("spark.driver.memory", "6g") \
    .getOrCreate()

In [2]:
df = spark.read.csv('./data/snapshot1_transactions.csv', header=True, inferSchema=True)

df.printSchema()
df.show(5)

root
 |-- SOURCE_CHAIN: string (nullable = true)
 |-- SOURCE_TRANSACTION_HASH: string (nullable = true)
 |-- DESTINATION_CHAIN: string (nullable = true)
 |-- DESTINATION_TRANSACTION_HASH: string (nullable = true)
 |-- SENDER_WALLET: string (nullable = true)
 |-- SOURCE_TIMESTAMP_UTC: timestamp (nullable = true)
 |-- PROJECT: string (nullable = true)
 |-- NATIVE_DROP_USD: double (nullable = true)
 |-- STARGATE_SWAP_USD: double (nullable = true)

+------------+-----------------------+-----------------+----------------------------+--------------------+--------------------+-------+---------------+-----------------+
|SOURCE_CHAIN|SOURCE_TRANSACTION_HASH|DESTINATION_CHAIN|DESTINATION_TRANSACTION_HASH|       SENDER_WALLET|SOURCE_TIMESTAMP_UTC|PROJECT|NATIVE_DROP_USD|STARGATE_SWAP_USD|
+------------+-----------------------+-----------------+----------------------------+--------------------+--------------------+-------+---------------+-----------------+
|    Optimism|   0x45017be8ad994b9...|  O

In [4]:
from pyspark.ml.feature import VectorAssembler
from sklearn.cluster import DBSCAN
import numpy as np

transaction_freq = df.groupBy("SENDER_WALLET", F.window("SOURCE_TIMESTAMP_UTC", "1 day")).count()
avg_daily_txn = transaction_freq.groupBy("SENDER_WALLET").agg(F.avg("count").alias("avg_daily_txn"))
amount_stats = df.groupBy("SENDER_WALLET").agg(
    F.sum("NATIVE_DROP_USD").alias("total_native_drop_usd"),
    F.avg("NATIVE_DROP_USD").alias("avg_native_drop_usd"),
    F.sum("STARGATE_SWAP_USD").alias("total_stargate_swap_usd"),
    F.avg("STARGATE_SWAP_USD").alias("avg_stargate_swap_usd")
)
feature_df = avg_daily_txn.join(amount_stats, "SENDER_WALLET")
pandas_df = feature_df.toPandas()


features = pandas_df[["avg_daily_txn", "total_native_drop_usd", "avg_native_drop_usd", "total_stargate_swap_usd", "avg_stargate_swap_usd"]].fillna(0).values
dbscan = DBSCAN(eps=0.5, min_samples=5)
pandas_df['cluster'] = dbscan.fit_predict(features)
print(pandas_df[['SENDER_WALLET', 'cluster']])


MemoryError: 

In [8]:
# 按1分钟的时间窗口统计每个发送者的交易数
windowed_sender_distribution = df.groupBy(
    F.window("SOURCE_TIMESTAMP_UTC", "1 minutes"),  # 1分钟的时间窗口
    "SENDER_WALLET"
).count()

short_time_suspicious_senders = windowed_sender_distribution.filter(F.col("count") > 100).orderBy(F.col("count").desc())
short_time_suspicious_senders.show(5, truncate=False)

+------------------------------------------+------------------------------------------+-----+
|window                                    |SENDER_WALLET                             |count|
+------------------------------------------+------------------------------------------+-----+
|{2024-02-06 22:44:00, 2024-02-06 22:45:00}|0x2a9e0c195bd235e7eed5c8b6a52857e59bc2733d|600  |
|{2024-02-07 00:29:00, 2024-02-07 00:30:00}|0x4bc295dfdf62501cebbc9be3157d3fccdd3f12ce|500  |
|{2024-02-07 23:25:00, 2024-02-07 23:26:00}|0xb1b09b57b8be1d0ce65555d68f7c677dc80ad29e|500  |
|{2024-04-24 18:13:00, 2024-04-24 18:14:00}|0x81698d4be3c2cde0fbd2cc457ba1aa0d5df11ddd|456  |
|{2024-04-24 18:14:00, 2024-04-24 18:15:00}|0x81698d4be3c2cde0fbd2cc457ba1aa0d5df11ddd|420  |
+------------------------------------------+------------------------------------------+-----+
only showing top 5 rows



In [13]:
sender_activity = df.groupBy("SENDER_WALLET").agg(F.count("*").alias("total_transactions"))
suspicious_senders = short_time_suspicious_senders.join(sender_activity, on="SENDER_WALLET") \
    .filter((F.col("total_transactions") < 150) & (F.col("count") > 100))

suspicious_senders.show()

+--------------------+--------------------+-----+------------------+
|       SENDER_WALLET|              window|count|total_transactions|
+--------------------+--------------------+-----+------------------+
|0x0f9d76acdbc4417...|{2024-02-07 05:49...|  111|               123|
|0x31e9063813533cb...|{2024-03-01 10:09...|  110|               117|
|0xfa3e289c03bf7af...|{2024-04-30 19:10...|  101|               120|
+--------------------+--------------------+-----+------------------+



In [3]:
# Filter dapp
sybil_projects = ["merkly", "l2pass"]
filtered_df = df.filter(F.col("PROJECT").isin(sybil_projects))

window_spec = Window.partitionBy("SENDER_WALLET", "PROJECT").orderBy("SOURCE_TIMESTAMP_UTC")
filtered_df = filtered_df.withColumn("prev_timestamp", F.lag("SOURCE_TIMESTAMP_UTC").over(window_spec))
filtered_df = filtered_df.withColumn("time_diff", F.unix_timestamp("SOURCE_TIMESTAMP_UTC") - F.unix_timestamp("prev_timestamp"))

path_df = filtered_df.groupBy("PROJECT", "SENDER_WALLET").agg(F.collect_list("time_diff").alias("interaction_times"))
path_df.show(5, truncate=False)

+-------+-------------+-----------------+
|PROJECT|SENDER_WALLET|interaction_times|
+-------+-------------+-----------------+
+-------+-------------+-----------------+

