# Stage 3
## 1. Running Spark Apps

In [1]:
from pyspark.sql import SparkSession

# Add here your team number teamx
team = "team31"

# location of your Hive database in HDFS
warehouse = "project/hive/warehouse"

spark = SparkSession.builder\
        .appName("{} - spark ML".format(team))\
        .master("yarn")\
        .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883")\
        .config("spark.sql.warehouse.dir", warehouse)\
        .config("spark.sql.avro.compression.codec", "snappy")\
        .enableHiveSupport()\
        .getOrCreate()

In [2]:
db_name = f"{team}_projectdb"

spark.sql("SHOW DATABASES").show()
spark.sql(f"USE {db_name}").show()
spark.sql("SHOW TABLES").show()

+--------------------+
|           namespace|
+--------------------+
|             default|
|             root_db|
|     team0_projectdb|
|team12_hive_proje...|
|    team13_projectdb|
|    team14_projectdb|
|    team15_projectdb|
|    team16_projectdb|
|    team17_projectdb|
|    team18_projectdb|
|    team19_projectdb|
|     team1_projectdb|
|    team20_projectdb|
|    team21_projectdb|
|    team22_projectdb|
|    team23_projectdb|
|    team24_projectdb|
|    team25_projectdb|
|    team26_projectdb|
|    team27_projectdb|
+--------------------+
only showing top 20 rows

++
||
++
++
+----------------+--------------------+-----------+
|       namespace|           tableName|isTemporary|
+----------------+--------------------+-----------+
|team31_projectdb|          checkpoint|      false|
|team31_projectdb|current_market_va...|      false|
|team31_projectdb|current_owners_pa...|      false|
|team31_projectdb|market_values_dis...|      false|
|team31_projectdb|  mint_holding_times|      f

## 2. Read Hive tables

In [3]:
print(spark.catalog.listTables(db_name))

[Table(name='checkpoint', database='team31_projectdb', description=None, tableType='EXTERNAL', isTemporary=False), Table(name='current_market_values', database='team31_projectdb', description=None, tableType='EXTERNAL', isTemporary=False), Table(name='current_owners_partitioned', database='team31_projectdb', description=None, tableType='EXTERNAL', isTemporary=False), Table(name='market_values_distribution', database='team31_projectdb', description=None, tableType='EXTERNAL', isTemporary=False), Table(name='mint_holding_times', database='team31_projectdb', description=None, tableType='EXTERNAL', isTemporary=False), Table(name='mints', database='team31_projectdb', description=None, tableType='EXTERNAL', isTemporary=False), Table(name='nfts', database='team31_projectdb', description=None, tableType='EXTERNAL', isTemporary=False), Table(name='ownership_transitions', database='team31_projectdb', description=None, tableType='EXTERNAL', isTemporary=False), Table(name='q1_results', database='t

In [22]:
mints = spark.read.format("avro").table(f"{db_name}.mints").select("token_id", "timestamp", "nft_address", "transaction_value")
mints.printSchema()
mints.show(5)
mints.count()

root
 |-- token_id: string (nullable = true)
 |-- timestamp: integer (nullable = true)
 |-- nft_address: string (nullable = true)
 |-- transaction_value: float (nullable = true)
+--------+----------+--------------------+-----------------+
|token_id| timestamp|         nft_address|transaction_value|
+--------+----------+--------------------+-----------------+
|    2970|1630824455|0x8Ca5209d8CCe34b...|              0.0|
|    1115|1628128263|0xd448E6CCA10ff5d...|     9.9999998E16|
|  661252|1630824788|0x1dfe7Ca09e99d10...|              0.0|
|   90489|1630824861|0x1dfe7Ca09e99d10...|              0.0|
|     952|1629043069|0x8184a482A5038B1...|    1.28000003E18|
+--------+----------+--------------------+-----------------+
only showing top 5 rows


631816

In [23]:
transfers = spark.read.format("avro").table(f"{db_name}.transfers").select("token_id", "timestamp", "transaction_value")
transfers.printSchema()
transfers.show(5)
transfers.count()

root
 |-- token_id: string (nullable = true)
 |-- timestamp: integer (nullable = true)
 |-- transaction_value: float (nullable = true)
+--------+----------+-----------------+
|token_id| timestamp|transaction_value|
+--------+----------+-----------------+
|    7731|1630202720|     5.8999997E18|
|93000048|1624715050|     9.9999998E16|
|    7263|1632165910|    1.49999994E18|
|    5913|1629735476|              0.0|
|     169|1624231533|    1.29000004E17|
+--------+----------+-----------------+
only showing top 5 rows


380676

In [24]:
nfts = spark.read.format("avro").table(f"{db_name}.nfts").select("address", "name")
nfts.printSchema()
nfts.show(5)
nfts.count()

root
 |-- address: string (nullable = true)
 |-- name: string (nullable = true)

+--------------------+------------------+
|             address|              name|
+--------------------+------------------+
|0x385eDC73Dd943b6...|    Silly Old Bear|
|0xf6680e700394e7f...|        Astraglade|
|0xcEb2EA7E904c74A...|                  |
|0xfE303C462407Bf6...|     Image By Andy|
|0x5e302f9EbA7B0E8...|PFP Ability Scores|
+--------------------+------------------+
only showing top 5 rows


9388

## 3. ML Modeling
### 3.1 Feature Engineering

In [9]:
from pyspark.sql import functions as F

contract_stats = (
    mints
    .groupBy("nft_address")
    .agg(
        F.countDistinct("token_id").alias("num_tokens"),
        F.avg("transaction_value").alias("avg_mint_price"),
        F.max("transaction_value").alias("max_mint_price"),
        F.min("transaction_value").alias("min_mint_price"),
        F.min("timestamp").alias("first_mint_date"),
        F.max("timestamp").alias("last_mint_date"),
    )
)
contract_stats.show(5)

+--------------------+----------+--------------------+--------------+--------------+---------------+--------------+
|         nft_address|num_tokens|      avg_mint_price|max_mint_price|min_mint_price|first_mint_date|last_mint_date|
+--------------------+----------+--------------------+--------------+--------------+---------------+--------------+
|0xe5fAB3f3e33762E...|        54|6.296296467910808E15| 1.00000003E16|           0.0|     1630518131|    1630543090|
|0xdAd298d2A9D7DF0...|        37|                 0.0|           0.0|           0.0|     1630884356|    1631937227|
|0x11957A61aC1684E...|       282|3.500000043309577E17| 4.20000007E17|           0.0|     1629886854|    1630332557|
|0xD9a3fCa910f76C8...|        94|1.332978718157547...|  5.9999999E17|           0.0|     1623417307|    1624391310|
|0x2D9A757e4B5a3A9...|        38|1.042105268956864...|  5.9999999E17|           0.0|     1631657379|    1632494736|
+--------------------+----------+--------------------+--------------+---

In [39]:
import pyspark.sql.types as T

def _get_last_n_txs(arr, n):
    return arr[-n:]

get_last_n_txs = F.udf(_get_last_n_txs, T.ArrayType(T.StructType([
    T.StructField("timestamp", T.LongType()),
    T.StructField("transaction_value", T.DoubleType()),
])))

LAST_N = 10

nft_history = (
    transfers
    .groupBy("token_id")
    .agg(
        F.sort_array(F.collect_list(F.struct("timestamp", "transaction_value"))).alias("tx_data"),
    )
    .withColumn("tx_data_except_last", F.expr("slice(tx_data, 1, size(tx_data) - 1)"))
    .withColumn("last_tx", F.element_at("tx_data", -1))
    .withColumn("last_tx_value", F.col("last_tx.transaction_value"))
    .drop("last_tx")
    .drop("tx_data")
    # Overall stats (excluding last tx)
    .withColumn("tx_count", F.size("tx_data_except_last"))
    .withColumn("min_tx_value", F.array_min("tx_data_except_last.transaction_value"))
    .withColumn("max_tx_value", F.array_max("tx_data_except_last.transaction_value"))
    .withColumn("first_tx_timestamp", F.element_at("tx_data_except_last.timestamp", 1))
    .withColumn("last_tx_timestamp", F.element_at("tx_data_except_last.timestamp", -1))
    .withColumn("last_n_transactions", get_last_n_txs("tx_data_except_last", F.lit(LAST_N)))
    .drop("tx_data_except_last")
    # History for the last N transactions
    .withColumn("tx_values", F.col("last_n_transactions.transaction_value"))
    .withColumn("tx_timestamps", F.col("last_n_transactions.timestamp"))
    .drop("last_n_transactions")
)
nft_history.show(5)

+-------------+-------------+--------+-------------+-------------+------------------+-----------------+--------------------+--------------------+
|     token_id|last_tx_value|tx_count| min_tx_value| max_tx_value|first_tx_timestamp|last_tx_timestamp|           tx_values|       tx_timestamps|
+-------------+-------------+--------+-------------+-------------+------------------+-----------------+--------------------+--------------------+
|1000000000004|       9.9E18|       1|          0.0|          0.0|        1621456719|       1621456719|               [0.0]|        [1621456719]|
|1000000000012|          0.0|       3|          0.0|          0.0|        1625827928|       1626100381|     [0.0, 0.0, 0.0]|[1625827928, 1626...|
|    100000048|          0.0|       0|         null|         null|              null|             null|                  []|                  []|
|    100000078| 7.9999999E17|       1|       1.8E19|       1.8E19|        1632123394|       1632123394|[1.80000004047162...|

In [40]:
features = (
    mints.selectExpr("nft_address", "token_id", "timestamp as mint_timestamp", "transaction_value as mint_tx_value")
    .join(contract_stats, on="nft_address", how="left")
    .join(nft_history, on="token_id", how="left")
    .join(nfts, on=(mints.nft_address == nfts.address), how="left")
    .drop("nft_address", "address")
)
print(features.count())

631816


In [41]:
features.show(5)

+--------+--------------+-------------+----------+--------------------+--------------+--------------+---------------+--------------+-------------+--------+------------+------------+------------------+-----------------+--------------------+--------------------+-------------+
|token_id|mint_timestamp|mint_tx_value|num_tokens|      avg_mint_price|max_mint_price|min_mint_price|first_mint_date|last_mint_date|last_tx_value|tx_count|min_tx_value|max_tx_value|first_tx_timestamp|last_tx_timestamp|           tx_values|       tx_timestamps|         name|
+--------+--------------+-------------+----------+--------------------+--------------+--------------+---------------+--------------+-------------+--------+------------+------------+------------------+-----------------+--------------------+--------------------+-------------+
|  112994|    1630773556|          0.0|     12287|                 0.0|           0.0|           0.0|     1630769995|    1632578269|         null|    null|        null|       

In [42]:
features = features.withColumnRenamed("last_tx_value", "label")

In [54]:
filtered_features = features.na.drop()

In [55]:
filtered_features.count()

570887

### 3.2 Feature Extraction Pipeline

In [56]:
date_cols = ["mint_timestamp", "first_mint_date", "last_mint_date", "first_tx_timestamp", "last_tx_timestamp"]
text_cols = ["name"]

In [65]:
filtered_features_with_dt = filtered_features.selectExpr(*(f"from_unixtime({col}) as {col}" if col in date_cols else col for col in filtered_features.columns))
filtered_features_with_dt.show(5)

+--------+-------------------+-------------+----------+--------------------+--------------+--------------+-------------------+-------------------+-------------+--------+------------+-------------+-------------------+-------------------+--------------------+--------------------+-----------------+
|token_id|     mint_timestamp|mint_tx_value|num_tokens|      avg_mint_price|max_mint_price|min_mint_price|    first_mint_date|     last_mint_date|        label|tx_count|min_tx_value| max_tx_value| first_tx_timestamp|  last_tx_timestamp|           tx_values|       tx_timestamps|             name|
+--------+-------------------+-------------+----------+--------------------+--------------+--------------+-------------------+-------------------+-------------+--------+------------+-------------+-------------------+-------------------+--------------------+--------------------+-----------------+
|   10351|2021-08-20 21:56:27|       7.5E19|      1011|2.569362058904216...|        7.5E19|           0.0|202

In [57]:
from pyspark.ml.feature import Tokenizer, Word2Vec, VectorAssembler, StandardScaler

# Collection name encoding
tokenizer = Tokenizer(inputCol=text_cols[0], outputCol=text_cols[0] + "_tokens")
word2vec = Word2Vec(vectorSize=16, minCount=1, inputCol=tokenizer.getOutputCol(), outputCol=text_cols[0] + "_w2v")

In [58]:
import math

from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable

# Date encoding
class DateCyclicalEncodingTransformer(Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):
    input_col = Param(Params._dummy(), "input_col", "input column name.",
                      typeConverter=TypeConverters.toString)
    output_col = Param(Params._dummy(), "output_col", "output column name.",
                       typeConverter=TypeConverters.toString)

    @keyword_only
    def __init__(self, input_col: str = "input", output_col: str = "output"):
        super(DateCyclicalEncodingTransformer, self).__init__()
        self._setDefault(input_col=None, output_col=None)
        kwargs = self._input_kwargs
        self.set_params(**kwargs)

    @keyword_only
    def set_params(self, input_col: str = "input", output_col: str = "output"):
        kwargs = self._input_kwargs
        self._set(**kwargs)

    def get_input_col(self):
        return self.getOrDefault(self.input_col)

    def get_output_col(self):
        return self.getOrDefault(self.output_col)

    def _transform(self, df):
        input_col = self.get_input_col()
        output_col = self.get_output_col()
        df = df.withColumn(output_col + "_year", F.year(F.col(input_col)))
        for col, val_count in (("month", 12), ("day", 31), ("hour", 24), ("minute", 60), ("second", 60)):
            df = (
                df
                .withColumn(output_col + f"_{col}_sin", F.sin(2 * math.pi * F.expr(f"{col}({input_col})") / val_count))
                .withColumn(
                    output_col + f"_{col}_cos", F.cos(2 * math.pi * F.expr(f"{col}({input_col})") / val_count)
                )
            )
        return df
    
    def get_all_column_names(self):
        output_col = self.get_output_col()
        return [output_col + "_year"] + [output_col + f"_{col}_sin" for col in ("month", "day", "hour", "minute", "second")] + [output_col + f"_{col}_cos" for col in ("month", "day", "hour", "minute", "second")]

In [59]:
date_transformers = [DateCyclicalEncodingTransformer(input_col=col, output_col="encoded_" + col) for col in date_cols]

In [60]:
cols_to_assemble = [text_cols[0] + "_w2v"] + sum((dt.get_all_column_names() for dt in date_transformers), [])

assembler = VectorAssembler(inputCols=cols_to_assemble, outputCol="raw_features")
scaler = StandardScaler(inputCol="raw_features", outputCol="features", withMean=True, withStd=True)

In [61]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[tokenizer, word2vec] + date_transformers + [assembler, scaler])

In [66]:
pipeline_model = pipeline.fit(filtered_features_with_dt)

In [68]:
transformed_features = pipeline_model.transform(filtered_features_with_dt).select("features", "label")
transformed_features.show(5)

+--------------------+-------------+
|            features|        label|
+--------------------+-------------+
|[-2.4569591305176...|5.39999985E17|
|[-0.1038806243314...|          0.0|
|[-1.7667336791337...|5.39999985E17|
|[0.63856358700349...|5.39999985E17|
|[-0.0862136722249...|5.39999985E17|
+--------------------+-------------+


### 3.3 Train-Test Split

In [69]:
(train_data, test_data) = transformed_features.randomSplit([0.7, 0.3], seed=42)

In [73]:
train_data\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("json")\
    .save("project/data/train")

In [74]:
test_data\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("json")\
    .save("project/data/test")

In [75]:
!hdfs dfs -cat project/data/train/*.json > ../data/train.json

In [76]:
!hdfs dfs -cat project/data/test/*.json > ../data/test.json