### Import settings

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = 'pyspark-shell'

In [2]:
import findspark
findspark.init('/opt/spark')
from pyspark import SparkContext,SparkConf
conf = (SparkConf()
         .setMaster("spark://10.200.5.39:7077")
         .set("spark.executor.memory","30g")
        .set("spark.network.timeout", "600s")
        .set("spark.sql.autoBroadcastJoinThreshold", "-1")
         .setAppName("exportapp2"))
sc = SparkContext(conf=conf)

In [3]:
import pyspark
import binascii
from pyspark.sql import SQLContext
from functools import reduce
import pygraphviz
import pyspark.sql.functions as f
from IPython.display import Image
from networkx.drawing.nx_pydot import write_dot
sqlContext = SQLContext(sc)

In [4]:
path = "hdfs://10.200.5.25:9001/user/titanium/"
pathDir="part1"

In [5]:
###            LOAD TRANSACTIONS
#############################################################
df_transactions = sqlContext.read\
             .format("csv")\
             .option("header", "true")\
             .option("inferSchema", "true")\
             .load(path+"transaction/"+pathDir)


KeyboardInterrupt: 

In [None]:
#############################################################
#            LOAD INPUTS ADDRESSES TAG
#############################################################
df_input_addresses_tag_reduced = sqlContext.read\
             .format("csv")\
             .option("header", "true")\
             .option("inferSchema", "true")\
             .load(path+pathDir+"/df_input_addresses_tag")

In [None]:
#############################################################
#            LOAD OUTPUT ADDRESSES TAG
#############################################################
df_output_addresses_tag_reduced = sqlContext.read\
             .format("csv")\
             .option("header", "true")\
             .option("inferSchema", "true")\
             .load(path+pathDir+"/df_output_addresses_tag")

In [None]:
motifs_11 = sqlContext.read\
 .format("csv")\
 .option("header", "true")\
 .option("inferSchema", "true")\
.load(path+pathDir+"/motifs_1_part1")

In [None]:
motifs_12 = sqlContext.read\
 .format("csv")\
 .option("header", "true")\
 .option("inferSchema", "true")\
.load(path+pathDir+"/motifs_1_part2")

In [None]:
motifs_1 = motifs_11.union(motifs_12)

In [None]:
df_output_addresses_tag_reduced=df_output_addresses_tag_reduced1.union(df_output_addresses_tag_reduced2)

In [None]:
df_transactions=df_transactions.where(f.col("height")<=375000)
#df_input_addresses_tag_reduced=df_input_addresses_tag_reduced.where(f.col("height")<325000)

In [None]:
#############################################################
#   Aggregate input and output labeled dataframe for distinct address
#############################################################

df_output_addresses_tag_grpby_addr=df_output_addresses_tag_reduced.groupby(df_output_addresses_tag_reduced.address)\
.agg(f.count('address').alias("count"),(f.sum('amount')).alias("totamount"),f.first(f.col("user")).alias("user"))
df_input_addresses_tag_grpby_addr=df_input_addresses_tag_reduced.groupby(df_input_addresses_tag_reduced.address)\
.agg(f.count('address').alias("count"),(f.sum('amount')).alias("totamount"),f.first(f.col("user")).alias("user"))

df_output_addresses_tag_grpby_addr =df_output_addresses_tag_grpby_addr.withColumn("totamount",f.round(f.col("totamount"))/100000000)
df_input_addresses_tag_grpby_addr =df_input_addresses_tag_grpby_addr.withColumn("totamount",f.round(f.col("totamount"))/100000000)

In [None]:
#############################################################
# Aggregate input and output labeled dataframe for distinct user
#############################################################

df_output_addresses_tag_grpby_user=df_output_addresses_tag_grpby_addr.groupby(df_output_addresses_tag_grpby_addr.user)\
.agg(f.count('address').alias("naddress"),f.sum('totamount').alias("balanceout"))
df_input_addresses_tag_grpby_user=df_input_addresses_tag_grpby_addr.groupby(df_input_addresses_tag_grpby_addr.user)\
.agg(f.count('address').alias("naddress"),f.sum('totamount').alias("balancein"))

In [None]:
#############################################################
#           BALANCE ESTIMATION
#############################################################
# Retrive user in label dataframe that are not present into the input/output dataframe
list_unique_input_user=df_input_addresses_tag_grpby_user.groupby("user").agg(f.first("user").alias("unique")).drop("user")
list_unique_output_user=df_output_addresses_tag_grpby_user.groupby("user").agg(f.first("user").alias("unique")).drop("user")

# Add retrived user into the input/output dataframe (with default parameters)
user_out_toadd=list_unique_input_user.alias("a").join(list_unique_output_user.alias("b"),f.col("a.unique")==f.col("b.unique"),"left_anti")
user_in_toadd=list_unique_output_user.alias("a").join(list_unique_input_user.alias("b"),f.col("a.unique")==f.col("b.unique"),"left_anti")

#Add missing user in the input and output dataframe in order to calculate an estimation of the balance
user_in_toadd = user_in_toadd.withColumn("naddress", f.lit(0))
user_in_toadd = user_in_toadd.withColumn("balancein", f.lit(0))

df_input_addresses_tag_grpby_user_filled = df_input_addresses_tag_grpby_user.union(user_in_toadd)

user_out_toadd = user_out_toadd.withColumn("naddress", f.lit(0))
user_out_toadd = user_out_toadd.withColumn("balanceout", f.lit(0))
df_output_addresses_tag_grpby_user_filled = df_output_addresses_tag_grpby_user.union(user_out_toadd)


df_user_balance=df_output_addresses_tag_grpby_user_filled.alias('a')\
.join(df_input_addresses_tag_grpby_user_filled.alias('b'),(f.col('b.user') == f.col('a.user')),"leftouter")\
.select(f.col('a.user'),f.col('a.balanceout'),f.col('b.balancein'))
df_user_balance=df_user_balance.withColumn("balance",f.col("balanceout")-f.col("balancein"))\
.sort(f.col("balance").desc())
df_user_balance = df_user_balance.withColumn("balance",f.when(f.col("balance")<0.00000001,0).otherwise(f.col("balance")))

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

In [None]:
df_label2 = df_output_addresses_tag_grpby_addr.union(df_input_addresses_tag_grpby_addr)
df_label2=df_label2.groupby("address").agg(f.first("user").alias("label"))

In [None]:
#Prepare a basic dataframe with all transaction-user-address information
df_transactions_general = df_transactions.alias('a').join(df_label2.alias('b'),f.col('a.address')==f.col('b.address'),"leftouter")\
.select(f.col("a.height"),f.col("a.coinbase"),f.col("a.timestamp"),f.col("a.tx_id"),f.col("a.tx_number"),f.col("a.address"),f.col("a.amount"),f.col("a.vout_idx"),f.col("a.vin_txid"),f.col("a.vin_vout"),f.col("b.label").alias("outuser")).cache()

#Calculate the input amount of each transaction from each inuser
df_transactions_general_join_amount = df_transactions_general.groupBy("tx_id","address","vout_idx").agg(f.first("amount").alias("unique_amount"),f.first("outuser").alias("inuser"))

#Join amount information with the basic dataframe information
df_transactions_general_information = df_transactions_general.alias('a').join(df_transactions_general_join_amount.alias('b'),(f.col('a.vin_txid')==f.col('b.tx_id'))&(f.col('a.vin_vout')==f.col('b.vout_idx')),"leftouter")\
.select(f.col("a.height"),f.col("a.coinbase"),f.col("a.timestamp"),f.col("a.tx_id"),f.col("a.amount"),f.col("a.outuser"),f.col("a.address"),f.col("a.vin_txid"),f.col("a.vin_vout"),f.col("b.unique_amount").alias("amount_sent"),f.col("b.address").alias("address_sent"),f.col("b.inuser").alias("inuser_old"))

#Remove outuser with null field
df_transactions_general_information = df_transactions_general_information.filter(f.col("outuser").isNotNull())

#Remove substitute inuser null information with Coinbase information
df_transactions_general_information = df_transactions_general_information.withColumn("inuser",f.when((f.col("inuser_old").isNull())&(f.col("address").isNotNull()),"Coinbase").otherwise(f.col("inuser_old")))
df_transactions_general_information = df_transactions_general_information.drop(f.col("inuser_old")).cache()

#Inuser-Outuser dataframe with count distinct transaction
df_inuser_outuser_numtx = df_transactions_general_information.groupby("outuser","inuser").agg(f.countDistinct("tx_id"))

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

In [None]:
#############################################################
#           COMPUTE ADDRESS FEATURE
#############################################################

address_feature=df_output_addresses_tag_grpby_addr.alias("a")\
.join(df_input_addresses_tag_grpby_addr.alias("b"),f.col("a.address")==f.col("b.address"),"outer")\
.select(f.col('a.address').alias("a1"),f.col('b.address').alias("b1"),f.col('a.user').alias("a2"),f.col('b.user').alias("b2"),f.col('a.count').alias("count_rec"),f.col('a.totamount').alias("totamount_rec"),f.col('b.count').alias("count_sent"),f.col('b.totamount').alias("totamount_sent"))
address_feature=address_feature.withColumn("address",f.when(f.col("a1").isNotNull(),f.col("a1")).otherwise(f.col("b1")))\
.drop("a1","b1")
address_feature=address_feature.withColumn("user",f.when(f.col("a2").isNotNull(),f.col("a2")).otherwise(f.col("b2")))\
.drop("a2","b2")
address_feature=address_feature.fillna(0)
address_feature=address_feature.withColumn("balance",f.col("totamount_rec")-f.col("totamount_sent"))
address_feature=address_feature.withColumn("unique",f.when((f.col("count_rec")<2)&(f.col("count_sent")<2),1).otherwise(0))

address_feature=address_feature.alias("a")\
.join(df_output_addresses_tag_grpby_user.alias("b"),f.col("a.user")==f.col("b.user"),"leftouter")\
.select(f.col('a.address'),f.col('a.user'),f.col('a.count_rec'),f.col("totamount_rec"),f.col('a.count_sent'),f.col('a.totamount_sent'),f.col('a.balance'),f.col('a.unique'),f.col('b.naddress').alias('sibling'))
address_feature=address_feature.fillna(0)

In [None]:
#############################################################
#           STORE ADDRESS FEATURE IN HDFS
#############################################################
address_feature.write\
.format("com.databricks.spark.csv")\
.option("header", "true")\
.save(path+pathDir+"/address_feature")

In [None]:
#############################################################
#           COMPUTE ENTITY FEATURE
#############################################################
entity_feature=df_user_balance.alias("a")\
.join(df_output_addresses_tag_reduced.groupBy("user").agg(f.countDistinct("address").alias("add_recv"),f.countDistinct("tx_id").alias("count_recv")).alias("b"),f.col("a.user")==f.col("b.user"),"leftouter")\
.select(f.col('a.user'),f.col('a.balanceout'),f.col('a.balancein'),f.col('a.balance'),f.col('b.count_recv'),f.col('b.add_recv'))
entity_feature=entity_feature.alias("a")\
.join(df_input_addresses_tag_reduced.groupBy("user").agg(f.countDistinct("address").alias("add_sent"),f.countDistinct("tx_id").alias("count_sent")).alias("b"),f.col("a.user")==f.col("b.user"),"leftouter")\
.select(f.col('a.user'),f.col('a.balanceout').alias("balance_recv"),f.col('a.balancein'),f.col('a.balance'),f.col('a.count_recv'),f.col('b.count_sent'),f.col('a.add_recv'),f.col('b.add_sent'))

entity_feature=entity_feature.fillna(0,subset=["count_recv","count_sent"])

In [None]:
#############################################################
#           STORE ENTITY FEATURE IN HDFS
#############################################################
entity_feature.write\
.format("com.databricks.spark.csv")\
.option("header", "true")\
.save(path+pathDir+"/entity_feature")

In [None]:
df_transactions_general_information.printSchema()

In [None]:
#############################################################
#           COMPUTE MOTIFS1 FEATURE
#############################################################

motifs_1 = df_transactions_general_information.groupBy("outuser","inuser",'tx_id').agg(f.countDistinct("address").alias("address_recv_dist"))\
.select("outuser","inuser",'tx_id',"address_recv_dist").cache()

#Calculate out amount of each user (in-out) in each transactions
amount_out_processing = df_transactions_general_information.groupBy("outuser","inuser","tx_id","address").agg(f.first("amount").alias("amount_recv"))\
.groupBy("outuser","inuser","tx_id").agg(f.sum("amount_recv").alias("amount_recv"))
amount_out_processing=amount_out_processing.fillna("Unknow")
#Calculate in amount of each user (in-out) in each transactions
amount_in_processing = df_transactions_general_information.groupBy("outuser","inuser","tx_id").agg(f.count("vin_txid").alias("tx_sent"),f.sum("amount_sent").alias("amount_sent"),f.countDistinct("address_sent").alias("address_sent"))\
.groupBy("outuser","inuser","tx_id").agg(f.sum("tx_sent").alias("tx_sent"),f.sum("amount_sent").alias("amount_sent"),f.sum("address_sent").alias("address_sent_dist"))
amount_in_processing=amount_in_processing.fillna("Unknow")


#Calculate out amount of each transactions
amount_out_processing_tx = amount_out_processing.groupBy("tx_id").agg(f.sum("amount_recv").alias("total_recv_amount"))
#Calculate in amount of each transactions
amount_in_processing_tx = df_transactions_general_information.groupBy("tx_id","vin_txid","vin_vout").agg(f.first("amount_sent").alias("amount_sent"))\
.groupBy("tx_id").agg(f.sum("amount_sent").alias("total_sent_amount"))
amount_out_processing=amount_out_processing.fillna("Unknow")
#Calculate fee in each transaction
fee_tx = amount_out_processing_tx.alias('a').join(amount_in_processing_tx.alias('b'), f.col("a.tx_id")==f.col("b.tx_id"))\
.select("a.tx_id","total_recv_amount","total_sent_amount")
fee_tx = fee_tx.withColumn("fees",f.col("total_sent_amount")-f.col("total_recv_amount"))

#Join all dataframe information to a unique dataframe for motifs-1
motifs_1 = motifs_1.alias("a").join(amount_out_processing.alias("b"),(f.col("a.outuser")==f.col("b.outuser"))&(f.col("a.inuser")==f.col("b.inuser"))&(f.col("a.tx_id")==f.col("b.tx_id")))\
.select("a.outuser","a.inuser","a.tx_id","address_recv_dist","amount_recv")
motifs_1 = motifs_1.alias("a").join(amount_in_processing.alias("b"),(f.col("a.outuser")==f.col("b.outuser"))&(f.col("a.inuser")==f.col("b.inuser"))&(f.col("a.tx_id")==f.col("b.tx_id")))\
.select("a.outuser","a.inuser","a.tx_id","a.address_recv_dist","a.amount_recv","b.tx_sent","b.address_sent_dist","amount_sent")
motifs_1=motifs_1.alias('a').join(motifs_1.groupBy("outuser","inuser").agg(f.countDistinct("tx_id").alias("tx_recv_tot")).fillna("Unknow").alias('b'),(f.col("a.outuser")==f.col("b.outuser"))&(f.col("a.inuser")==f.col("b.inuser")))\
.select("a.outuser","a.inuser","a.tx_id","a.address_recv_dist","a.amount_recv","a.tx_sent","a.address_sent_dist","a.amount_sent","tx_recv_tot")
motifs_1=motifs_1.alias('a').join(fee_tx.alias('b'),(f.col("a.tx_id")==f.col("b.tx_id")))\
.select("a.outuser","a.inuser","a.tx_id","a.address_recv_dist","a.amount_recv","a.tx_sent","a.address_sent_dist","a.amount_sent","a.tx_recv_tot","b.fees")

#Define relation between user loop or direct
motifs_1 = motifs_1.withColumn("loop_in_out", f.when(f.col("outuser")==f.col("inuser"),1).otherwise(0))
motifs_1 = motifs_1.withColumn("direct_in_out", f.when(f.col("outuser")==f.col("inuser"),0).otherwise(1)).cache()

#Set to 0 where find null
motifs_1 = motifs_1.fillna(0,subset=['amount_sent','fees'])

In [None]:
#############################################################
#           STORE MOTIFS1 FEATURE IN HDFS
#############################################################
motifs_1.write\
.format("com.databricks.spark.csv")\
.option("header", "true")\
.save(path+pathDir+"/motifs_1_part2")

In [None]:
#############################################################
#           COMPUTE MOTIFS2 FEATURE
#############################################################

#Create dataframe with all information without repeating, and remove "null" user (clone)
df_transactions_general_join_motifs2 = df_transactions_general.groupBy("tx_id","address","vout_idx").agg(f.first("amount").alias("unique_amount"),f.first("outuser").alias("miduser"),f.first("vin_txid").alias("vin_txid"),f.first("vin_vout").alias("vin_vout"))
#df_transactions_general_join_motifs2 = df_transactions_general_join_motifs2.filter(f.col("miduser").isNotNull())


#Join the previuos dataframe with the dataframe general in order to obtain 1-motifs
df_transactions_general_information2 = df_transactions_general.alias('a').join(df_transactions_general_join_motifs2.alias('b'),(f.col('a.vin_txid')==f.col('b.tx_id'))&(f.col('a.vin_vout')==f.col('b.vout_idx')),"leftouter")\
.select(f.col("a.height"),f.col("a.coinbase"),f.col("a.timestamp"),f.col("a.tx_id"),f.col("a.amount"),f.col("a.outuser"),f.col("a.address"),f.col("a.vin_txid").alias("tx_id_mid"),f.col("a.vin_vout").alias("vin_vout_idx_mid"),f.col("b.vin_txid").alias("tx_id_in"),f.col("b.vin_vout").alias("vin_vout_idx_in"),f.col("b.unique_amount").alias("amount_mid"),f.col("b.address").alias("address_mid"),f.col("b.miduser"))

#Repeat the previuos operation in order to obtain 2-motifs
df_transactions_general_info_deep = df_transactions_general_information2.alias('a').join(df_transactions_general_join_motifs2.alias('b'),(f.col('a.tx_id_in')==f.col('b.tx_id'))&(f.col('a.vin_vout_idx_in')==f.col('b.vout_idx')),"leftouter")\
.select(f.col("a.height"),f.col("a.timestamp"),f.col("a.tx_id"),f.col("a.amount"),f.col("a.outuser"),f.col("a.address"),f.col("a.tx_id_mid"),f.col("a.vin_vout_idx_mid"),f.col("a.amount_mid"),f.col("a.address_mid"),f.col("a.miduser"),f.col("b.unique_amount").alias("amount_sent"),f.col("b.address").alias("address_sent"),f.col("b.miduser").alias("inuser_old"))

#Remove null user
df_transactions_general_info_deep = df_transactions_general_info_deep.filter(f.col("outuser").isNotNull())
df_transactions_general_info_deep = df_transactions_general_info_deep.filter(f.col("miduser").isNotNull())

#Change null user but with address with "Coinbase"
df_transactions_general_info_deep = df_transactions_general_info_deep.withColumn("inuser",f.when((f.col("inuser_old").isNull())&(f.col("address").isNotNull()),"Coinbase").otherwise(f.col("inuser_old")))
df_transactions_general_info_deep = df_transactions_general_info_deep.drop(f.col("inuser_old"))

#Creating unique dataframe with outuser->tx->miduser->tx->inuser
motifs_2 = df_transactions_general_info_deep.groupBy("outuser","miduser","inuser","tx_id","tx_id_mid")\
.agg(f.count("address"))\
.select("outuser","tx_id","miduser","tx_id_mid","inuser")

motifs_2 = motifs_2.withColumn("loop_mid_out", f.when(f.col("outuser")==f.col("miduser"),1).otherwise(0))
motifs_2 = motifs_2.withColumn("loop_in_mid", f.when(f.col("miduser")==f.col("inuser"),1).otherwise(0))
motifs_2 = motifs_2.withColumn("loop_in_out", f.when(f.col("outuser")==f.col("inuser"),1).otherwise(0))

motifs_2 = motifs_2.withColumn("direct_mid_out", f.when(f.col("outuser")==f.col("miduser"),0).otherwise(1))
motifs_2 = motifs_2.withColumn("direct_in_mid", f.when(f.col("miduser")==f.col("inuser"),0).otherwise(1))
motifs_2 = motifs_2.withColumn("direct_in_out", f.when(f.col("outuser")==f.col("inuser"),0).otherwise(1))

#Encrich the previous dataframe with information from the motifs-1
motifs_1_cloned = motifs_1.toDF("outuser","inuser","tx_id","address_recv_dist","amount_recv","tx_sent","address_sent_dist","amount_sent","tx_recv_tot","fees","loop_in_out","direct_in_out")
motifs_2_cloned = motifs_2.toDF("outuser","tx_id","miduser","tx_id_mid","inuser","loop_mid_out","loop_in_mid","loop_in_out","direct_mid_out","direct_in_mid","direct_in_out")

#Rename correctly the column
motifs_2_cloned= motifs_2_cloned.alias("a").join(motifs_1_cloned.alias("b"),(f.col("a.outuser")==f.col("b.outuser"))&(f.col("a.miduser")==f.col("b.inuser"))&(f.col("a.tx_id")==f.col("b.tx_id")),"leftouter")\
.select("a.outuser","a.tx_id","b.address_recv_dist","b.amount_recv","b.fees","b.tx_sent","b.address_sent_dist","b.amount_sent","a.miduser","a.tx_id_mid","a.inuser","a.loop_mid_out","a.loop_in_mid","a.loop_in_out","a.direct_mid_out","a.direct_in_mid","a.direct_in_out")\
.withColumnRenamed("fees","fee2")\
.withColumnRenamed("address_recv_dist","address_recv_dist_to_out")\
.withColumnRenamed("amount_recv","amount_recv_to_out")\
.withColumnRenamed("tx_sent","tx_sent_from_mid")\
.withColumnRenamed("address_sent_dist","address_sent_from_mid")\
.withColumnRenamed("amount_sent","amount_sent_from_mid")

motifs_2_cloned= motifs_2_cloned.alias("a").join(motifs_1_cloned.alias("b"),(f.col("a.inuser")==f.col("b.inuser"))&(f.col("a.miduser")==f.col("b.outuser"))&(f.col("a.tx_id_mid")==f.col("b.tx_id")),"leftouter")\
.select("a.outuser","a.tx_id","a.address_recv_dist_to_out","a.amount_recv_to_out","a.fee2","a.tx_sent_from_mid","a.address_sent_from_mid","a.amount_sent_from_mid","a.miduser","a.tx_id_mid","b.address_recv_dist","b.amount_recv","b.tx_sent","b.address_sent_dist","b.amount_sent","b.fees","a.inuser","a.loop_mid_out","a.loop_in_mid","a.loop_in_out","a.direct_mid_out","a.direct_in_mid","a.direct_in_out")\
.withColumnRenamed("fees","fee1")\
.withColumnRenamed("address_recv_dist","address_recv_to_mid")\
.withColumnRenamed("amount_recv","amount_recv_to_mid")\
.withColumnRenamed("tx_sent","tx_sent_from_in")\
.withColumnRenamed("address_sent_dist","address_sent_from_in")\
.withColumnRenamed("amount_sent","amount_sent_from_in")

In [None]:
#############################################################
#           STORE MOTIFS2 FEATURE IN HDFS
#############################################################
motifs_2_cloned.write\
.format("com.databricks.spark.csv")\
.option("header", "true")\
.save(path+pathDir+"/motifs_2_part2")