In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, lit, abs as spark_abs, row_number
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, TimestampType
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("DataProcessing").getOrCreate()

#-------------------------------------------- Charger les données
def load_data_spark(file_list, base_path='dbfs:/FileStore/tables/'):
    dataframes = {}
    for file in file_list:
        file_path = f"{base_path}/{file}.parquet"
        df = spark.read.parquet(file_path)
        dataframes[file] = df
        print(f"DataFrame '{file}' contient les colonnes :\n {df.columns}\n")
    return dataframes

dataframes = load_data_spark(['users', 'market'])

DataFrame 'users' contient les colonnes :
 ['address', 'first_seen', 'last_seen', 'protocol_types', 'protocols_used', 'received_count', 'total_received (ETH)', 'sent_count', 'total_sent (ETH)', 'transactions']

DataFrame 'market' contient les colonnes :
 ['timestamp', 'blockchain', 'protocol_name', 'symbol', 'type', 'contract_address', 'open (usd)', 'high (usd)', 'low (usd)', 'close (usd)', 'volume', 'nb_tx_1h', 'nb_tx_24h', 'total_value_eth_1h', 'total_value_eth_24h', 'total_gas_used_1h', 'total_gas_used_24h', 'nb_unique_receivers_1h', 'nb_unique_receivers_24h', 'nb_unique_senders_1h', 'nb_unique_senders_24h', 'std_value_eth_1h', 'std_value_eth_24h', 'std_gas_used_1h', 'std_gas_used_24h', 'avg_gas_used_1h', 'avg_gas_used_24h', 'avg_value_eth_per_tx_1h', 'avg_value_eth_per_tx_24h', 'max_gas_used_1h', 'max_gas_used_24h', 'max_value_eth_1h', 'max_value_eth_24h', 'median_value_eth_1h', 'median_value_eth_24h', 'min_gas_used_1h', 'min_gas_used_24h', 'min_value_eth_1h', 'min_value_eth_24h', 

In [0]:
def clean_column_names_spark(df):
    """
    Nettoyer les noms de colonnes en remplaçant les points par des underscores.
    """
    new_columns = [col_name.replace('.', '_').strip().lower().replace(' ', '_') for col_name in df.columns]
    
    for old_name, new_name in zip(df.columns, new_columns):
        df = df.withColumnRenamed(old_name, new_name)
    
    return df

In [0]:
#-------------------------------------------- Traiter la colonne `protocol_types`
protocol_types_schema = StructType([
    StructField("DEX", IntegerType(), True),
    StructField("Lending", IntegerType(), True),
    StructField("Stablecoin", IntegerType(), True),
    StructField("Yield Farming", IntegerType(), True),
    StructField("NFT-Fi", IntegerType(), True)
])

users = dataframes['users']
users = users.withColumn("parsed_protocols", from_json(col("protocol_types"), protocol_types_schema))

# Extraire les colonnes des types de protocoles
for protocol in ["DEX", "Lending", "Stablecoin", "Yield Farming", "NFT-Fi"]:
    users = users.withColumn(f"type_{protocol.lower().replace(' ', '_')}", col("parsed_protocols")[protocol])

users = users.drop("protocol_types", "parsed_protocols")

In [0]:
from pyspark.sql.functions import map_keys

def transform_protocols_used_spark(df, column_name="protocols_used"):
    """
    Transformer la colonne `protocols_used` en extrayant les clés et les valeurs,
    et en créant des colonnes distinctes pour chaque protocole.
    """
    df = df.withColumn(column_name, from_json(col(column_name), MapType(StringType(), StructType([
        StructField("count", IntegerType(), True),
        StructField("blockchain", StringType(), True)
    ]))))

    protocols = df.select(explode(map_keys(col(column_name))).alias("protocol_name")).distinct().collect()

    for protocol_row in protocols:
        protocol_name = protocol_row["protocol_name"]

        df = df.withColumn(f"{protocol_name}_count", col(column_name).getItem(protocol_name).getField("count"))

    return df.drop(column_name)

users = transform_protocols_used_spark(users)
users = clean_column_names_spark(users)

In [0]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, BooleanType, FloatType

transaction_schema = StructType([
    StructField("transaction_hash", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("value (ETH)", FloatType(), True),
    StructField("is_sender", BooleanType(), True),
    StructField("gas_used", FloatType(), True),
    StructField("protocol_name", StringType(), True),
    StructField("protocol_type", StringType(), True),
    StructField("blockchain", StringType(), True),
    StructField("contract_id", StringType(), True)
])

def extract_transactions_spark(users):
    users_with_transactions = users.withColumn(
        "transactions_struct", from_json(col("transactions"), transaction_schema)
    )

    transactions_extracted = users_with_transactions.select(
        "address",
        "first_seen",
        "last_seen",
        "received_count",
        "total_received_(eth)",
        "sent_count",
        "total_sent_(eth)",
        "type_dex",
        "type_lending",
        "type_stablecoin",
        "type_yield_farming",
        "type_nft-fi",
        "curve_dao_count",
        "aave_count",
        "tether_count",
        "uniswap_count",
        "maker_count",
        "yearn_finance_count",
        "usdc_count",
        "dai_count",
        "balancer_count",
        "harvest_finance_count",
        col("transactions_struct.timestamp").alias("tx_timestamp"),
        col("transactions_struct.protocol_name").alias("tx_protocol"),
        col("transactions_struct.value (ETH)").alias("tx_value_eth"),
        col("transactions_struct.is_sender").alias("tx_is_sender"),
        col("transactions_struct.gas_used").alias("tx_gas_used")
    )
    
    return transactions_extracted

users = extract_transactions_spark(users)
users.show(truncate=False)

+------------------------------------------+-------------------+-------------------+--------------+--------------------+----------+---------------------+--------+------------+---------------+------------------+-----------+---------------+----------+------------+-------------+-----------+-------------------+----------+---------+--------------+---------------------+------------+-----------+------------+------------+-----------+
|address                                   |first_seen         |last_seen          |received_count|total_received_(eth)|sent_count|total_sent_(eth)     |type_dex|type_lending|type_stablecoin|type_yield_farming|type_nft-fi|curve_dao_count|aave_count|tether_count|uniswap_count|maker_count|yearn_finance_count|usdc_count|dai_count|balancer_count|harvest_finance_count|tx_timestamp|tx_protocol|tx_value_eth|tx_is_sender|tx_gas_used|
+------------------------------------------+-------------------+-------------------+--------------+--------------------+----------+---------

In [0]:
#-------------------------------------------- Fusionner `users` et `market`
market = dataframes['market']
window_spec = Window.partitionBy("tx_protocol").orderBy(spark_abs(col("timestamp").cast("long") - col("tx_timestamp").cast("long")))

merged_df = users.join(market, users.tx_protocol == market.protocol_name, "inner") \
    .withColumn("rank", row_number().over(window_spec)) \
    .filter(col("rank") == 1) \
    .drop("rank")

merged_df.show(5)

+-------+----------+---------+--------------+--------------------+----------+----------------+--------+------------+---------------+------------------+-----------+---------------+----------+------------+-------------+-----------+-------------------+----------+---------+--------------+---------------------+------------+-----------+------------+------------+-----------+---------+----------+-------------+------+----+----------------+----------+----------+---------+-----------+------+--------+---------+------------------+-------------------+-----------------+------------------+----------------------+-----------------------+--------------------+---------------------+----------------+-----------------+---------------+----------------+---------------+----------------+-----------------------+------------------------+---------------+----------------+----------------+-----------------+-------------------+--------------------+---------------+----------------+----------------+-----------------+---

In [0]:
from pyspark.sql.functions import year, month, col
import os

merged_df = merged_df.withColumn("year", year(merged_df['tx_timestamp']))
merged_df = merged_df.withColumn("month", month(merged_df['tx_timestamp']))

merged_df = merged_df.limit(1000)

output_dir = "dbfs:/FileStore/tables/"

for year_value, month_value in sample_df.select("year", "month").distinct().collect():
    filtered_df = merged_df.filter((col("year") == year_value) & (col("month") == month_value))
    
    file_name = f"processed_data_{year_value}_{month_value:02d}.parquet"
    file_path = os.path.join(output_dir, file_name)
    
    filtered_df.coalesce(1).write.mode("overwrite").parquet(file_path)
    print(f"Enregistrement du fichier : {file_path}")