In [1]:
spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log
70,application_1607949680860_0072,pyspark,idle,Link,Link


SparkSession available as 'spark'.
<pyspark.sql.session.SparkSession object at 0x7fd2b4765f90>

In [2]:
import hashlib
from datetime import datetime
from graphframes import *
from pyspark.sql import functions as func
from pyspark.sql.types import FloatType
import hsfs
from hops import hdfs
import os
from pyspark.sql import SQLContext

In [3]:
def hashnode(x):
    return hashlib.sha1(x.encode("UTF-8")).hexdigest()[:8]

hashnode_udf = func.udf(hashnode)

In [4]:
# Create a connection
connection = hsfs.connection()
# Get the feature store handle for the project's feature store
fs = connection.get_feature_store()

Connected. Call `.close()` to terminate connection gracefully.

In [5]:
edge_fg = fs.get_feature_group('transactions_fg', 1)
node_fg = fs.get_feature_group('account_features', 1)

In [6]:
node_fg.show(5)

+--------------+---------+---------------+-------+
|tx_behavior_id|prior_sar|initial_deposit|acct_id|
+--------------+---------+---------------+-------+
|             1|        0|       84442.19|      0|
|             1|        0|       75795.44|      1|
|             1|        0|       42057.16|      2|
|             1|        0|       25891.68|      3|
|             1|        0|       51127.47|      4|
+--------------+---------+---------------+-------+
only showing top 5 rows

In [7]:
edge_fg.show(5)

+------+-------+--------------+--------+------+--------+------+-------+
|is_sar|tran_id|tran_timestamp|base_amt|target|alert_id|source|tx_type|
+------+-------+--------------+--------+------+--------+------+-------+
|     0|      1|   1.4832288E9| 9405.71|  3259|      -1|  1767|      4|
|     0|      2|   1.4832288E9| 6884.54|  5141|      -1|  7363|      4|
|     0|      3|   1.4832288E9|  7968.4|  9532|      -1|  7585|      4|
|     0|      4|   1.4832288E9| 9042.67|  8792|      -1|  1750|      4|
|     0|      5|   1.4832288E9| 4692.79|  4670|      -1|  9060|      4|
+------+-------+--------------+--------+------+--------+------+-------+
only showing top 5 rows

In [8]:
only_sar_edge_df = edge_fg.read().filter(func.col('alert_id') != -1)
only_normal_edge_df =  edge_fg.read().filter(func.col('alert_id') == -1)

In [9]:
only_normal_edge_df.show(5)

+------+-------+--------------+--------+------+--------+------+-------+
|is_sar|tran_id|tran_timestamp|base_amt|target|alert_id|source|tx_type|
+------+-------+--------------+--------+------+--------+------+-------+
|     0|      1|   1.4832288E9| 9405.71|  3259|      -1|  1767|      4|
|     0|      2|   1.4832288E9| 6884.54|  5141|      -1|  7363|      4|
|     0|      3|   1.4832288E9|  7968.4|  9532|      -1|  7585|      4|
|     0|      4|   1.4832288E9| 9042.67|  8792|      -1|  1750|      4|
|     0|      5|   1.4832288E9| 4692.79|  4670|      -1|  9060|      4|
+------+-------+--------------+--------+------+--------+------+-------+
only showing top 5 rows

In [10]:
only_sar_edge_df.show()

+------+-------+--------------+--------+------+--------+------+-------+
|is_sar|tran_id|tran_timestamp|base_amt|target|alert_id|source|tx_type|
+------+-------+--------------+--------+------+--------+------+-------+
|     1|     98|   1.4832288E9|  108.62|  5688|      16|  2298|      4|
|     1|    108|   1.4832288E9|  183.25|  9601|      26|  8627|      4|
|     1|    135|   1.4832288E9|  142.71|  8359|      15|  2756|      4|
|     1|    137|   1.4832288E9|  132.47|  7702|       9|  7605|      4|
|     1|    218|   1.4832288E9|  119.51|  7377|      17|  5891|      4|
|     1|    335|   1.4832288E9|  136.02|  1661|      12|  6787|      4|
|     1|    439|   1.4832288E9|  194.53|  7950|       0|  8485|      4|
|     1|    477|   1.4832288E9|  184.32|  2177|       3|  5324|      4|
|     1|    514|   1.4832288E9|  130.63|  4616|      19|  4919|      4|
|     1|    564|   1.4832288E9|  183.27|  1589|       5|  4170|      4|
|     1|    580|   1.4832288E9|  135.69|  1590|      18|  6483| 

In [11]:
only_sar_edge_df.count()

732

In [12]:
sar_sources = only_sar_edge_df.select("source")
sar_targets = only_sar_edge_df.select("target")
sar_nodes = sar_sources.union(sar_targets).toDF("id").dropDuplicates()
sar_nodes.count()

sar_edges = only_sar_edge_df.select("source", "target").toDF("src", "dst")
sar_edges.count()

732

In [13]:
# Now lets construct the graph
g_sar = GraphFrame(sar_nodes,sar_edges)
sc.setCheckpointDir("hdfs:///Projects/{}/Logs/sc".format(hdfs.project_name()))
cc_sar = g_sar.connectedComponents()

In [14]:
cc_sar.cache().show()

+----+---------+
|  id|component|
+----+---------+
|8086|     2599|
|7833|     3671|
|3997|     2773|
|5300|     2773|
| 463|      124|
|1127|     1127|
| 540|      540|
|6393|     3396|
|1522|     1054|
|5614|      825|
|9162|     2889|
|3488|     2599|
|2393|      397|
|7387|      643|
|1265|     1009|
|4042|     1590|
|5223|     1013|
|4364|     1096|
|3425|     1054|
|5157|       61|
+----+---------+
only showing top 20 rows

In [15]:
cc_sar.groupBy('component').count().select('count').dropDuplicates().orderBy('count').show()

+-----+
|count|
+-----+
|    5|
|    6|
|    7|
|    8|
|    9|
|   10|
+-----+

In [None]:
scc_sar = g_sar.stronglyConnectedComponents(20)

In [None]:
scc_sar.cache().show()

In [None]:
scc_sar.groupBy('component').count().select('count').dropDuplicates().orderBy('count').show()

In [None]:
scc_sar.groupBy('component').count().where(func.col('count')==1).count()

In [None]:
scc_sar.groupBy('component').count().where(func.col('count')>1).count()

In [None]:
scc_comp_count = scc_sar.groupBy('component').count().where(func.col('count')>1)

In [None]:
scc_sar = scc_sar.join(scc_comp_count,['component'])

In [None]:
scc_sar.show()

In [None]:
scc_sar =  scc_sar.drop('count')

In [None]:
cc_sar = cc_sar.groupBy('component').count().where(func.col('count')>2).drop('count')

sar_cc_grouped = cc_sar.join(
    only_sar_edge_df,
    [(only_sar_edge_df.source==cc_sar.id)|(only_sar_edge_df.target==cc_sar.id)],
    how="left"
).dropDuplicates(subset=['tran_id'])

In [None]:
sar_scc_grouped = scc_sar.join(
    only_sar_edge_df,
    [(only_sar_edge_df.source==scc_sar.id)|(only_sar_edge_df.target==scc_sar.id)],
    how="left"
).dropDuplicates(subset=['tran_id'])

In [None]:
sar_cc_grouped.count()

In [None]:
sar_scc_grouped.count()

In [None]:
only_sar_edge_df.count()

In [None]:
sar_scc_grouped.show()

In [None]:
only_sar_edge_df = sar_cc_grouped

In [None]:
only_sar_edge_df.show(5)

In [None]:
only_normal_edge_df.count()

In [None]:
only_sar_edge_df_grouped = only_sar_edge_df.groupBy('component').agg(func.min("tran_timestamp"),func.max("tran_timestamp")).toDF("component", "window_start", "window_end")
only_sar_edge_df_grouped.show(5)

In [None]:
only_sar_edges_df_windows = only_sar_edge_df.join(only_sar_edge_df_grouped,["component"])

In [None]:
only_sar_edges_df_windows.show()

In [None]:
only_sar_edges_df_windows.count()

In [None]:
only_normal_edges_df_windows = only_sar_edge_df_grouped.select("window_start", "window_end").join(
    only_normal_edge_df,
    [(only_normal_edge_df.tran_timestamp>=only_sar_edge_df_grouped.window_start)&(only_normal_edge_df.tran_timestamp<=only_sar_edge_df_grouped.window_end)],
    how="left"
)

In [None]:
only_normal_edges_df_windows.show()

In [None]:
only_normal_edges_df_windows.count()

In [None]:
only_normal_edges_df_windows = only_normal_edges_df_windows.withColumnRenamed("source", "origId")\
                                                           .withColumnRenamed("target", "destId")  
only_normal_edges_df_windows = only_normal_edges_df_windows.withColumn('target',hashnode_udf(func.concat(func.col('destId'),func.lit('_'),func.col('window_start'),func.lit('_'),func.col('window_end'))))\
                                                           .withColumn('source',hashnode_udf(func.concat(func.col('origId'),func.lit('_'),func.col('window_start'),func.lit('_'),func.col('window_end'))))            
only_normal_edges_df_windows.show()

In [None]:
normal_sources = only_normal_edges_df_windows.select("source")
normal_targets = only_normal_edges_df_windows.select("target")
normal_nodes = normal_sources.union(normal_targets).toDF("id").dropDuplicates()
normal_edges = only_normal_edges_df_windows.select("source", "target").toDF("src", "dst")

In [None]:
# Now lets construct the graph
g_normal = GraphFrame(normal_nodes,normal_edges)

In [None]:
sc.setCheckpointDir("hdfs:///Projects/{}/Logs/sc".format(hdfs.project_name()))
cc_normal = g_normal.connectedComponents().cache()
#scc_normal = g_normal.stronglyConnectedComponents(20).cache()

In [None]:
cc_norm_comp_count = cc_normal.groupBy('component').count().where(func.col('count')>2)
cc_normal = cc_normal.join(cc_norm_comp_count,['component'])
cc_normal =  cc_normal.drop('count')
normal_cc_grouped = cc_normal.join(
    only_normal_edge_df,
    [(only_normal_edge_df.source==cc_normal.id)|(only_normal_edge_df.target==cc_normal.id)],
    how="left"
).dropDuplicates(subset=['tran_id'])

In [None]:
normal_cc_grouped.show()