In [None]:
import os
import uuid
from notebookutils import mssparkutils
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, MapType, ArrayType,BooleanType
# from graphframes import *

f_uuid = F.udf(lambda: str(uuid.uuid4()), StringType())
f_bool = F.udf(lambda: True, BooleanType())


In [None]:
# Create mount point for reading and saving data
# Note linked-storage-service is register in Synapse
mssparkutils.fs.mount( 
    "abfss://bronze@xxxxxxx.dfs.core.windows.net", 
    "/mydata", 
    {"linkedService":"linked-storage-service"} 
)
job_id = mssparkutils.env.getJobId()
bronze_mount_point = f"synfs:/{job_id}/mydata/archive.zip"

In [None]:
cosmosEndpoint = "https://ebcbin5oofjcs.documents.azure.com:443/" # note use document endpoint and not gremlin endpoint
cosmosMasterKey = "xxxxxxxxxxxxxxxx"
cosmosDatabaseName = "database01"
cosmosContainerName = "graph01" # "/accountId as partitionKey"

cfg = {
  "spark.cosmos.accountEndpoint" : cosmosEndpoint,
  "spark.cosmos.accountKey" : cosmosMasterKey,
  "spark.cosmos.database" : cosmosDatabaseName,
  "spark.cosmos.container" : cosmosContainerName,
}
# Configure Catalog Api to be used
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", cosmosMasterKey)
spark.conf.set("spark.cosmos.throughputControl.enabled",True)
spark.conf.set("spark.cosmos.throughputControl.targetThroughput",20000)

def write_to_cosmos_graph(df: DataFrame, data_type: str, save: bool = False):
    if (save):
        df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(f"synfs:/{job_id}/mydata/{data_type}/")
        
    df.write\
   .format("cosmos.oltp")\
   .options(**cfg)\
   .mode("APPEND")\
   .save()

In [None]:
# Build dataframe to injest assuming "accountId" as partition key for the graph01 cosmos collection
def prep_cosmos_vertices_df(df: DataFrame) -> DataFrame:
    tmp = df.select("id")
    return tmp.withColumn('label',F.lit('account')).withColumn('accountId',df['id']).\
    select("label","id","accountId").distinct()

# Create vertices based on activity and num of transaction made from raw data
def prepare_vertices_sample(df: DataFrame) -> DataFrame:
    nameOrig = df.select("nameOrig")
    nameDest = df.select("nameDest")
    busy_accounts = nameOrig.union(nameDest).withColumnRenamed('nameOrig','id')
    top_account_activity = busy_accounts.groupBy('id').count().filter(F.col('count') > 10 ).select('id') # accounts with more than 10 transactions
    top_account_amount = df.filter(F.col('amount') > 1000000) # accounts with amount transfer more than 1000000
    nameOrigA = top_account_amount.select('nameOrig')
    nameDestA = top_account_amount.select('nameDest')
    top_account_transact = nameOrigA.union(nameDestA).withColumnRenamed('nameOrig','id').distinct().select('id')
    vertices = top_account_activity.union(top_account_transact).distinct()
    return prep_cosmos_vertices_df(vertices)


In [None]:

def prep_cosmos_edges_df(df: DataFrame) -> DataFrame:
   return df.select("type","amount","nameOrig","nameDest","oldbalanceOrg","newbalanceOrig","oldbalanceDest","newbalanceDest").\
    withColumn("label",df['type']).\
    withColumn('accountId',df['nameOrig']).\
    withColumnRenamed('nameOrig','_vertexId').\
    withColumn('_sinkPartition',df['nameDest']).\
    withColumnRenamed('nameDest',"_sink").\
    withColumn('_sinkLabel',F.lit("account")).\
    withColumn('_vertexLabel',F.lit("account")).\
    withColumn('_isEdge',f_bool()).\
    withColumn('id', f_uuid()).\
    select("id","label","_sink","_sinkLabel","_sinkPartition","_vertexId","_vertexLabel","_isEdge","accountId","type","amount","oldbalanceOrg","oldbalanceDest","newbalanceDest").\
    distinct()


# Filter transaction that have either the source or destination as vertices
def prepare_edges_sample_1(df: DataFrame, vertices:DataFrame) -> DataFrame:
    transactions_nameOrig = df.join(vertices, df['nameOrig'] == vertices['id'])
    transactions_nameDest = df.join(vertices, df['nameDest'] == vertices['id'])
    edges = transactions_nameOrig.union(transactions_nameDest).select("type","amount","nameOrig","nameDest","oldbalanceOrg","newbalanceOrig","oldbalanceDest","newbalanceDest")
    return prep_cosmos_edges_df(edges)
    
    

# Filter transaction that have both source and destination as vertices
def prepare_edges_sample_2(df: DataFrame, vertices:DataFrame) -> DataFrame:
    transactions = df.join(
        vertices, df.nameOrig == vertices.id)\
        .drop(vertices.id)\
        .join(
            vertices.alias("c"), df.nameDest == vertices.id)
    return prep_cosmos_edges_df(transactions)

# Ingest all edges
def prepare_edges_all(df:DataFrame) -> DataFrame:
    return prep_cosmos_edges_df(df)



In [None]:
def all_vertices(df: DataFrame) -> DataFrame:
    nameOrig = df.select("nameOrig")
    nameDest = df.select("nameDest")
    tmp = nameOrig.union(nameDest).withColumnRenamed('nameOrig','id')
    return prep_cosmos_vertices_df(tmp)

def all_edges(df:DataFrame) -> DataFrame:
    tmp = raw_data.select("type","amount","nameOrig","nameDest","oldbalanceOrg","newbalanceOrig","oldbalanceDest","newbalanceDest")
    return prep_cosmos_edges_df(tmp)


In [None]:
def injest_sample_data_1() -> None:
    sample_transact = raw_data.limit(50000)
    v = all_vertices(sample_transact)
    e = prepare_edges_sample_2(sample_transact,v)
    write_to_cosmos_graph(v,"vertices")
    write_to_cosmos_graph(e,"edges")

In [None]:
# Ingest the entire dataset
# edge_mount = f"synfs:/{job_id}/mydata/edges"
# vertices_mount = f"synfs:/{job_id}/mydata/vertices"
raw_data = spark.read.format('delta').load(bronze_mount_point).distinct()
v = all_vertices(raw_data)
e = prepare_edges_sample_2(raw_data,v) 
write_to_cosmos_graph(v,"vertices",True)
write_to_cosmos_graph(e,"edges",True)

In [None]:
# # Create test data set to verify vertices and edges are being created correctly
# cosmos_edges_df_test = cosmos_edges_df.limit(10)
# source = cosmos_edges_df_test.select('_vertexId')
# dest = cosmos_edges_df_test.select('_sink')
# tmp = source.union(dest).distinct().withColumnRenamed('_vertexId','id').withColumn('label',F.lit('account'))
# cosmos_vertices_df_test = tmp.withColumn('accountId',tmp['id']).select("label","id","accountId")
# print(f"Number of Accounts/Vertices: {cosmos_vertices_df_test.count()}, Number of transactions/Edges: {cosmos_edges_df_test.count()}")
# write_to_cosmos_graph(cosmos_vertices_df_test,"vertices",False)
# write_to_cosmos_graph(cosmos_edges_df."edges",False)