In [26]:
spark.stop()

# General Imports and Spark Config

In [27]:
import pyspark
from pyspark.sql import SparkSession
from graphframes import *
from delta import *
from delta.tables import *

import pyspark.sql.functions as fn
from pyspark.sql.types import StructField, StringType, LongType, DoubleType, BooleanType, StructType

In [28]:
config = pyspark.SparkConf().setAll([
    ('spark.executor.memory', '32g'), 
    ('spark.executor.cores', '6'), 
    ('spark.cores.max', '24'),
    ('spark.driver.memory','8g'),
    ('spark.executor.instances', '1'),
    ('spark.dynamicAllocation.enabled', 'true'),
    ('spark.dynamicAllocation.shuffleTracking.enabled', 'true'),
    ('spark.dynamicAllocation.executorIdleTimeout', '60s'),
    ('spark.dynamicAllocation.minExecutors', '1'),
    ('spark.dynamicAllocation.maxExecutors', '4'),
    ('spark.dynamicAllocation.initialExecutors', '1'),
    ('spark.dynamicAllocation.executorAllocationRatio', '1'),
    ('spark.worker.cleanup.enabled', 'true'),
    ('spark.worker.cleanup.interval', '60'),
    ('spark.shuffle.service.db.enabled', 'true'),
    ('spark.worker.cleanup.appDataTtl', '60'),
    ('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector:10.0.2')
])

Important: In neo4j there is a naming convention, node labels should use camelcase (beginning with uppercase) and relationship labels should use all uppercase with _

# Create Spark Session

In [29]:
spark = SparkSession \
    .builder \
    .config(conf=config) \
    .appName("TransactionNetworkNeo4jBatch") \
    .master("spark://172.23.149.212:7077") \
    .getOrCreate()

86950198 [Thread-4] WARN  org.apache.spark.util.Utils  - Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
86950199 [Thread-4] WARN  org.apache.spark.util.Utils  - Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
86950347 [Thread-4] WARN  org.apache.spark.ExecutorAllocationManager  - Dynamic allocation without a shuffle service is an experimental feature.


Define the schema manually to guarantee that no voids are used in schema

In [30]:
schema = StructType([ \
    StructField("_id", StringType(), True), \
    StructField("asset", LongType(), True), \
    StructField("extra", StringType(), True), \
    StructField("intra", LongType(), True), \
    StructField("round", LongType(), True), \
    StructField("rr", LongType(), True), \
    StructField("sig", StringType(), True), \
    StructField("txid", StringType(), True), \
    StructField("txn_aamt", LongType(), True), \
    StructField("txn_aclose", StringType(), True), \
    StructField("txn_afrz", BooleanType(), True), \
    StructField("txn_amt", LongType(), True), \
    StructField("txn_apaa", StringType(), True), \
    StructField("txn_apan", LongType(), True), \
    StructField("txn_apap", StringType(), True), \
    StructField("txn_apar", StringType(), True), \
    StructField("txn_apas", StringType(), True), \
    StructField("txn_apat", StringType(), True), \
    StructField("txn_apep", StringType(), True), \
    StructField("txn_apfa", StringType(), True), \
    StructField("txn_apgs", StringType(), True), \
    StructField("txn_apid", LongType(), True), \
    StructField("txn_apls", StringType(), True), \
    StructField("txn_apsu", StringType(), True), \
    StructField("txn_arcv", StringType(), True), \
    StructField("txn_asnd", StringType(), True), \
    StructField("txn_caid", LongType(), True), \
    StructField("txn_close", StringType(), True), \
    StructField("txn_fadd", StringType(), True), \
    StructField("txn_faid", LongType(), True), \
    StructField("txn_fee", LongType(), True), \
    StructField("txn_fv", LongType(), True), \
    StructField("txn_gen", StringType(), True), \
    StructField("txn_gh", StringType(), True), \
    StructField("txn_grp", StringType(), True), \
    StructField("txn_lsig", StringType(), True), \
    StructField("txn_lv", LongType(), True), \
    StructField("txn_lx", StringType(), True), \
    StructField("txn_msig", StringType(), True), \
    StructField("txn_nonpart", BooleanType(), True), \
    StructField("txn_note", StringType(), True), \
    StructField("txn_rcv", StringType(), True), \
    StructField("txn_rekey", StringType(), True), \
    StructField("txn_selkey", StringType(), True), \
    StructField("txn_sig", StringType(), True), \
    StructField("txn_snd", StringType(), True), \
    StructField("txn_type", StringType(), True), \
    StructField("txn_votefst", LongType(), True), \
    StructField("txn_votekd", LongType(), True), \
    StructField("txn_votekey", StringType(), True), \
    StructField("txn_votelst", LongType(), True), \
    StructField("txn_xaid", LongType(), True), \
    StructField("typeenum", LongType(), True) \
])

In [31]:
dfTxn = spark.read.format("mongodb") \
    .option('spark.mongodb.connection.uri', 'mongodb://172.23.149.212:27017') \
    .option('spark.mongodb.database', 'algorand') \
    .option('spark.mongodb.collection', 'txn') \
    .option('park.mongodb.read.readPreference.name', 'primaryPreferred') \
    .option('spark.mongodb.change.stream.publish.full.document.only','true') \
    .option("forceDeleteTempCheckpointLocation", "true") \
    .schema(schema) \
    .load()

In [None]:
# coalesce is used to decrease the number of partitions to the number specified
# dfTxn = df.coalesce(18)

In [None]:
# print(dfTxn.rdd.getNumPartitions())

# Payment transactions and nodes

#### Preparation of data

Prepare the edges/relationship dataframe

In [None]:
dfPaymentTx = dfTxn.filter(dfTxn.typeenum == 1) \
                    .select(dfTxn.txid, \
                            dfTxn.txn_snd, \
                            dfTxn.txn_rcv, \
                            dfTxn.txn_amt, \
                            dfTxn.txn_fee, \
                            dfTxn.round, \
                            dfTxn.intra, \
                            dfTxn.txn_close)

#### Create dataframes and write data to Neo4j

Create the accounts dataframe containing the payment tx sender and receiver

In [None]:
dfTxnSender = dfPaymentTx.select(dfPaymentTx.txn_snd.alias("account"))
dfTxnReceiver = dfPaymentTx.select(dfPaymentTx.txn_rcv.alias("account"))
dfPaymentAccounts = dfTxnSender.union(dfTxnReceiver).distinct()

Write the accounts to neo4j

In [None]:
dfPaymentAccounts.write.format("org.neo4j.spark.DataSource") \
  .mode("Overwrite") \
  .option("url", "bolt://172.23.149.212:7687") \
  .option("labels", ":Account") \
  .option("node.keys", "account") \
  .save()

Write the transactions between the accounts to Neo4j

In [None]:
dfPaymentTx.write.format("org.neo4j.spark.DataSource") \
  .option("url", "bolt://172.23.149.212:7687") \
  .mode("Append") \
  .option("relationship", "PAYMENT") \
  .option("relationship.save.strategy", "keys") \
  .option("relationship.source.labels", ":Account") \
  .option("relationship.source.save.mode", "Overwrite") \
  .option("relationship.source.node.keys", "txn_snd:account") \
  .option("relationship.target.labels", ":Account") \
  .option("relationship.target.save.mode", "Overwrite") \
  .option("relationship.target.node.keys", "txn_rcv:account") \
  .option("relationship.properties", "txn_amt:amount, txn_fee:fee, round:blockNumber, intra:intraBlockTxNumber, txid:txId, txn_close:closedSndAccountTx") \
  .save()

## Keyreg transactions and nodes

#### Preparation of data

In [None]:
dfKeyregTx = dfTxn.filter(dfTxn.typeenum == 2) \
                    .select(dfTxn.txid, \
                            dfTxn.round, \
                            dfTxn.intra, \
                            dfTxn.txn_fee, \
                            dfTxn.txn_snd, \
                            dfTxn.txn_selkey, \
                            dfTxn.txn_votefst, \
                            dfTxn.txn_votekd, \
                            dfTxn.txn_votekey, \
                            dfTxn.txn_votelst)

Add another field to the dataframe to distinguish online from offline transactions. We can distinguish them the following way:
- If selkey, votefst, votekd, votekey and votelst, if any of this field is not null then it is an online tx
- If the fields above are null it is an offline tx.

In [None]:
from pyspark.sql.functions import when

dfKeyregTx = dfKeyregTx.withColumn('keyRegistrationType', \
                        when(fn.col("txn_selkey").isNotNull() | fn.col("txn_votefst").isNotNull() |fn.col("txn_votekd").isNotNull() | fn.col("txn_votekey").isNotNull() | fn.col("txn_votelst").isNotNull(), "online") \
                        .otherwise("offline")) \
                        .withColumn('txn_rcv', fn.lit(0))

In [None]:
dfParticipationNodes = dfKeyregTx.select(dfKeyregTx.txn_rcv.alias("id")).distinct()

Create the participationNode in Neo4j

In [None]:
dfParticipationNodes.write.format("org.neo4j.spark.DataSource") \
  .mode("Overwrite") \
  .option("url", "bolt://172.23.149.212:7687") \
  .option("labels", ":ParticipationNode") \
  .option("node.keys", "id") \
  .save()

#### Create dataframes and write data to Neo4j

Select all senders of transactions to guarantee that there nodes are in there









In [None]:
dfKeyRegAccounts = dfKeyregTx.select(dfKeyregTx.txn_snd.alias("account")).distinct()

Write all accounts that sent a transaction to Neo4j

In [None]:
dfKeyRegAccounts.write.format("org.neo4j.spark.DataSource") \
  .mode("Overwrite") \
  .option("url", "bolt://172.23.149.212:7687") \
  .option("labels", ":Account") \
  .option("node.keys", "account") \
  .save()

Write all transaction from the dfAssetConfigTx to Neo4j.

In [None]:
dfKeyregTx.write.format("org.neo4j.spark.DataSource") \
  .option("url", "bolt://172.23.149.212:7687") \
  .mode("Append") \
  .option("relationship", "KEY_REGISTRATION") \
  .option("relationship.save.strategy", "keys") \
  .option("relationship.source.labels", ":Account") \
  .option("relationship.source.save.mode", "Overwrite") \
  .option("relationship.source.node.keys", "txn_snd:account") \
  .option("relationship.target.labels", ":ParticipationNode") \
  .option("relationship.properties", "txn_fee:fee, round:blockNumber, intra:intraBlockTxNumber, keyRegistrationType:keyRegistrationType") \
  .save()

## Asset configuration transactions and nodes

#### Preparation of data

An AssetConfigTx is used to create an asset, modify certain parameters of an asset, or destroy an asset.

In [None]:
dfAssetConfigTx = dfTxn.filter(dfTxn.typeenum == 3) \
                        .select(dfTxn.txid, \
                                dfTxn.round, \
                                dfTxn.intra, \
                                dfTxn.txn_fee, \
                                dfTxn.txn_snd, \
                                dfTxn.txn_caid, \
                                dfTxn.txn_apar, \
                                dfTxn.asset)

Add another column to the dataframe which indicates whether the transaction was a creation, configuration or a destryoment tx. We can distinguish them the following way:
- If caid is null and apar is not null it is a creation tx
- If caid is not null and apar is not null it is a configuration tx
- Ic caid is not null and apar is null it is a destruction tx

In [None]:
from pyspark.sql.functions import when

dfAssetConfigTx = dfAssetConfigTx.withColumn('configurationType', \
                           when(fn.col("txn_caid").isNull(), "creation") \
                           .when(fn.col("txn_caid").isNotNull() & fn.col("txn_apar").isNotNull(), "configuration") \
                           .when(fn.col("txn_caid").isNotNull() & fn.col("txn_apar").isNull(), "destruction"))

#### Create dataframes and write data to Neo4j

Select all senders of transactions to guarantee that there nodes are in there

In [None]:
dfAssetAccountsConfig = dfAssetConfigTx.select(dfAssetConfigTx.txn_snd.alias("account")).distinct()

Write all accounts that sent a transaction to Neo4j

In [None]:
dfAssetAccountsConfig.write.format("org.neo4j.spark.DataSource") \
  .mode("Overwrite") \
  .option("url", "bolt://172.23.149.212:7687") \
  .option("labels", ":Account") \
  .option("node.keys", "account") \
  .save()

Select all assets id's from the dfAssetConfigTx

In [None]:
dfAssets = dfAssetConfigTx.select(dfAssetConfigTx.asset.alias("asset")).distinct()

Write all assets as nodes to Neo4j. Should actually not add any new nodes to Neo4j as there are already the asset conf transactions

In [None]:
dfAssets.write.format("org.neo4j.spark.DataSource") \
  .mode("Overwrite") \
  .option("url", "bolt://172.23.149.212:7687") \
  .option("labels", ":Asset") \
  .option("node.keys", "asset") \
  .save()

Write all asset transactions between the accounts to Neo4j

In [None]:
dfAssetConfigTx.write.format("org.neo4j.spark.DataSource") \
  .option("url", "bolt://172.23.149.212:7687") \
  .mode("Append") \
  .option("relationship", "ASSET_CONFIGURATION") \
  .option("relationship.save.strategy", "keys") \
  .option("relationship.source.labels", ":Account") \
  .option("relationship.source.save.mode", "Overwrite") \
  .option("relationship.source.node.keys", "txn_snd:account") \
  .option("relationship.target.labels", ":Asset") \
  .option("relationship.target.save.mode", "Overwrite") \
  .option("relationship.target.node.keys", "asset:asset") \
  .option("relationship.properties", "txn_fee:fee, round:blockNumber, intra:intraBlockTxNumber, txid:txId, txn_caid:assetId, txn_apar:configurationParameters, configurationType:configurationType") \
  .save()

## Asset transfer transactions and nodes

#### Preparation of data

Select all necessary information from all transactions

In [None]:
dfAssetTransferTx = dfTxn.filter(dfTxn.typeenum == 4) \
                            .select(dfTxn.txid, \
                                    dfTxn.round, \
                                    dfTxn.intra, \
                                    dfTxn.txn_fee, \
                                    dfTxn.txn_snd, \
                                    dfTxn.txn_arcv, \
                                    dfTxn.txn_aamt, \
                                    dfTxn.txn_asnd, \
                                    dfTxn.asset,
                                    dfTxn.txn_xaid)

Add another field indicating whether it was an opt-in, transfer or revoking transaction. Can be distinguished the following way:
- opt-in: if snd and arcv are the same
- revoke: if the field asnd is filled in
- transfer: if none of the above

In [None]:
from pyspark.sql.functions import when

dfAssetTransferTx = dfAssetTransferTx.withColumn('transferType', \
                           when(fn.col("txn_asnd").isNotNull(), "revoke") \
                           .when(fn.col("txn_snd") == fn.col("txn_arcv"), "opt-in")
                           .otherwise("transfer"))

#### Create dataframes and write data to Neo4j

Select all assets id's from the dfAssetTransferTx

In [None]:
dfAssets = dfAssetTransferTx.select(dfAssetTransferTx.txn_xaid.alias("asset")).distinct()

Write all assets as nodes to Neo4j. Should actually not add any new nodes to Neo4j as there are already the asset conf transactions

In [None]:
dfAssets.write.format("org.neo4j.spark.DataSource") \
  .mode("Overwrite") \
  .option("url", "bolt://172.23.149.212:7687") \
  .option("labels", ":Asset") \
  .option("node.keys", "asset") \
  .save()

Create the accounts dataframe containing the asset sender and the asset receiver

In [None]:
dfTxnSender = dfAssetTransferTx.select(dfAssetTransferTx.txn_snd.alias("account"))
dfTxnReceiver = dfAssetTransferTx.select(dfAssetTransferTx.txn_arcv.alias("account"))
dfAssetAccounts = dfTxnSender.union(dfTxnReceiver).distinct()

Write all accounts to Neo4j

In [None]:
dfAssetAccounts.write.format("org.neo4j.spark.DataSource") \
  .mode("Overwrite") \
  .option("url", "bolt://172.23.149.212:7687") \
  .option("labels", ":Account") \
  .option("node.keys", "account") \
  .save()

Write all asset transactions between the accounts to Neo4j

In [None]:
dfAssetTransferTx.write.format("org.neo4j.spark.DataSource") \
  .option("url", "bolt://172.23.149.212:7687") \
  .mode("Append") \
  .option("relationship", "ASSET_TRANSFER") \
  .option("relationship.save.strategy", "keys") \
  .option("relationship.source.labels", ":Account") \
  .option("relationship.source.save.mode", "Overwrite") \
  .option("relationship.source.node.keys", "txn_snd:account") \
  .option("relationship.target.labels", ":Account") \
  .option("relationship.target.save.mode", "Overwrite") \
  .option("relationship.target.node.keys", "txn_arcv:account") \
  .option("relationship.properties", "txn_aamt:amount, txn_fee:fee, round:blockNumber, intra:intraBlockTxNumber, txid:txId, txn_xaid:assetId, txn_asnd:assetSenderInRevokingTx, transferType") \
  .save()

## Asset freeze transactions and nodes

The faid is the asset id being frozen. fadd is the address of the account whose asset is being frozen or unfrozen

#### Preparation of data

Select all necessary information from all transactions

In [None]:
dfAssetFreezeTx = dfTxn.filter(dfTxn.typeenum == 5) \
                        .select(dfTxn.txid, \
                                dfTxn.round, \
                                dfTxn.intra, \
                                dfTxn.txn_fee, \
                                dfTxn.txn_snd, \
                                dfTxn.txn_afrz, \
                                dfTxn.txn_fadd, \
                                dfTxn.txn_faid, \
                                dfTxn.asset)

Add another field indicating whether it was a freeze or unfreeze transaction. Can be distinguished the following way:
- afrz = true, means that the asset was freezed
- afrz = false, means that the asset was unfreezed

In [None]:
from pyspark.sql.functions import when

dfAssetFreezeTx = dfAssetFreezeTx.withColumn('freezeType', \
                           when(fn.col("txn_afrz") == "true", "freeze") \
                           .when(fn.col("txn_afrz") == "false", "unfreeze"))

#### Create dataframes and write data to Neo4j

Select all assets id's from the dfAssetTransferTx

In [None]:
dfAssetsFreeze = dfAssetFreezeTx.select(dfAssetFreezeTx.asset.alias("asset")).distinct()

Write all assets as nodes to Neo4j. Should actually not add any new nodes to Neo4j as there are already the asset conf transactions

In [None]:
dfAssetsFreeze.write.format("org.neo4j.spark.DataSource") \
  .mode("Overwrite") \
  .option("url", "bolt://172.23.149.212:7687") \
  .option("labels", ":Asset") \
  .option("node.keys", "asset") \
  .save()

Create the accounts dataframe containing the asset sender and the asset receiver

In [None]:
dfAssetFreezeAccounts = dfAssetFreezeTx.select(dfAssetFreezeTx.txn_snd.alias("account")).distinct()

Write all accounts to Neo4j

In [None]:
dfAssetFreezeAccounts.write.format("org.neo4j.spark.DataSource") \
  .mode("Overwrite") \
  .option("url", "bolt://172.23.149.212:7687") \
  .option("labels", ":Account") \
  .option("node.keys", "account") \
  .save()

Write all freeze transactions between the accounts to Neo4j

In [None]:
dfAssetFreezeTx.write.format("org.neo4j.spark.DataSource") \
  .option("url", "bolt://172.23.149.212:7687") \
  .mode("Append") \
  .option("relationship", "ASSET_FREEZE") \
  .option("relationship.save.strategy", "keys") \
  .option("relationship.source.labels", ":Account") \
  .option("relationship.source.save.mode", "Overwrite") \
  .option("relationship.source.node.keys", "txn_snd:account") \
  .option("relationship.target.labels", ":Asset") \
  .option("relationship.target.save.mode", "Overwrite") \
  .option("relationship.target.node.keys", "asset:asset") \
  .option("relationship.properties", "txn_fee:fee, round:blockNumber, intra:intraBlockTxNumber, txid:txId, txn_fadd:frozenAssetAccountHolder, txn_faid:assetIdBeingFrozen, freezeType:freezeType") \
  .save()

## Application transactions and nodes

#### Preparation of data

The apid is the id of the application. An Application Call Transaction is submitted to the network with an AppId and an OnComplete method. The AppId specifies which App to call and the OnComplete method is used in the contract to determine what branch of logic to execute.
Application Call transactions may include other fields needed by the logic such as:
- ApplicationArgs - To pass arbitrary arguments to an application (or in the future to call an ABI method)
- Accounts - To pass accounts that may require some balance checking or opt-in status
- ForeignApps - To pass apps and allow state access to an external application (or in the future to call an ABI method)
- ForeignAssets - To pass ASAs for parameter checking

We distinguish the following transactions:
- Create: When an application is to be created, the OnComplete method is set to NoOp, no AppId is set, and the Approval/Clear programs and Schema are passed.
- Update: An Application Update transaction may be submitted and approved assuming the logic of the Approval program allows it. This is done by specifying the AppId to update and passing the new logic for Approval and Clear programs.
- Delete: An application may be deleted as long as the logic in the Approval Program allows for it.
- Opt-In: An Application Opt-In transaction must be submitted by an account in order for the local state for that account to be used. If no local state is required, this transaction is not necessary for a given account. 
- Close-Out: An Application Close Out transaction is used when an account wants to opt out of a contract gracefully and remove its local state from its balance record. This transaction may fail according to the logic in the Approval program.
- Clear-State: An Application Clear State transaction is used to force removal of the local state from the balance record of the sender. Given a well formed transaction this method will always succeed.
- NoOp: Regular call. Application NoOp Transactions make up a majority of the Application Call methods in practice. The logic in a smart contract will often branch to appropriate logic given the contents of the ApplicationArgs array passed.

Select all necessary information from all transactions

In [None]:
dfApplicationCallTx = dfTxn.filter(dfTxn.typeenum == 6) \
                        .select(dfTxn.txid, \
                                dfTxn.round, \
                                dfTxn.intra, \
                                dfTxn.txn_fee, \
                                dfTxn.txn_snd, \
                                dfTxn.txn_apid, \
                                dfTxn.txn_apap, \
                                dfTxn.txn_apgs, \
                                dfTxn.txn_apls, \
                                dfTxn.txn_apsu, \
                                dfTxn.txn_apan, \
                                dfTxn.txn_apaa, \
                                dfTxn.txn_apas, \
                                dfTxn.txn_apat, \
                                dfTxn.txn_apfa, \
                                dfTxn.txn_apep, \
                                dfTxn.asset, \
                                dfTxn.txn_note)

Add another field indicating whether it was a freeze or unfreeze transaction. 
We have the following fields:
- apap: Approval program that is executed for every transaction.
- apsu: Clear program that is executed when on-completion is set to true.
- apan: OnComplete / Operation which should be performed. Defines what additional operation should be performed in the tx.
- apid: The id of the application being called.
- apaa: Transaction specific application arguments.
- apat: List of accounts in addtion to the sender.
- apas: List of ForeignAssets whose AssetParams may be accessed by approval- or clear program.
- apfa: List of ForeignApplications whose global states may be accessed by approval- or clear program.


Transactions be distinguished the following way:
- create: apap and apsu are set and apan is set to 0 -> therefore omitted
- update: apid is set to an app and apan = 4
- delete: apid is set to an app and apan = 5
- opt-in: apid is set to an app and apan = 1
- close-out: apid is set to an app and apan = 2
- clear-state: apid is set to an app and apan = 3
- noOp: apid is set to an app, apaa contains to the string docs and the integer 1, apat contains address, apas contains the ASA used, apfa contains other application id, apan is set to 0 -> therefore omitted

In [None]:
from pyspark.sql.functions import when

dfApplicationCallTx = dfApplicationCallTx.withColumn('applicationCallType', \
                           when(fn.col("txn_apan").isNull() & fn.col("txn_apid").isNull() & fn.col("txn_apap").isNotNull() &  fn.col("txn_apsu").isNotNull(),"create") \
                           .when(fn.col("txn_apan") == 4, "update")
                            .when(fn.col("txn_apan") == 5, "delete")
                            .when(fn.col("txn_apan") == 1, "opt-in")
                            .when(fn.col("txn_apan") == 2, "close-out")
                            .when(fn.col("txn_apan") == 3, "clear-state")
                            .otherwise("noOp"))

#### Create dataframes and write data to Neo4j

Select all application id's from the dfApplicationCallTx.

In [None]:
dfApplications = dfApplicationCallTx.select(dfApplicationCallTx.asset.alias("application")).distinct()

Write all applications as nodes to Neo4j.

In [None]:
dfApplications.write.format("org.neo4j.spark.DataSource") \
  .mode("Overwrite") \
  .option("url", "bolt://172.23.149.212:7687") \
  .option("labels", ":Application") \
  .option("node.keys", "application") \
  .save()

Create the accounts dataframe containing the application call sender.

In [None]:
dfApplicationAccounts = dfApplicationCallTx.select(dfApplicationCallTx.txn_snd.alias("account")).distinct()

Write all accounts to Neo4j.

In [None]:
dfApplicationAccounts.write.format("org.neo4j.spark.DataSource") \
  .mode("Overwrite") \
  .option("url", "bolt://172.23.149.212:7687") \
  .option("labels", ":Account") \
  .option("node.keys", "account") \
  .save()

Write all application call transactions to Neo4j.

In [None]:
dfApplicationCallTx.write.format("org.neo4j.spark.DataSource") \
  .option("url", "bolt://172.23.149.212:7687") \
  .mode("Append") \
  .option("relationship", "APPLICATION_CALL") \
  .option("relationship.save.strategy", "keys") \
  .option("relationship.source.labels", ":Account") \
  .option("relationship.source.save.mode", "Overwrite") \
  .option("relationship.source.node.keys", "txn_snd:account") \
  .option("relationship.target.labels", ":Application") \
  .option("relationship.target.save.mode", "Overwrite") \
  .option("relationship.target.node.keys", "asset:application") \
  .option("relationship.properties", "txn_fee:fee, round:blockNumber, intra:intraBlockTxNumber, txid:txId, applicationCallType, txn_apan:applicationCallTypeEnum, txn_apid:applicationId, txn_apap:approvalProgram, txn_apsu:clearProgram, txn_apaa:applicationCallArguments, txn_apat:accountsList, txn_apfa:applicationsList, txn_apas:assetsList") \
  .save()