In [6]:
# A quick look at the source data.
import dask_deltasharing as dds
print(dds.load_as_dask("test-profile.json#lynxkite-dev.default.aml_entities_live").head())
print(dds.load_as_dask("test-profile.json#lynxkite-dev.default.aml_transactions_live").head())
v = dds.get_latest_table_version("test-profile.json#lynxkite-dev.default.aml_transactions_live")
for v, c, df in dds.load_as_dask_changes("test-profile.json#lynxkite-dev.default.aml_transactions_live", starting_version=1, ending_version=v):
    print(c, len(df), df.head(1))

                                             address  \
0           28668 Ashley Ferry, Brownburgh, WA 30791   
1  07699 Shannon Islands Suite 956, New Sharonfor...   
2         22498 Sara Course, Alyssaborough, FM 66170   
3    22674 Smith Hill Suite 232, Ericaland, TX 87967   
4           197 Olson Views, North Heather, SD 23879   

                         email_addr  entity_id entity_type  \
0  christopher.schwartz@hotmail.com     100000      Person   
1           john.miller@hotmail.com     100001      Person   
2          lisa.johnson@hotmail.com     100002      Person   
3           kara.anderson@yahoo.com     100003      Person   
4         michael.serrano@gmail.com     100004      Person   

                   name          phone_number  risk_score  
0  Christopher Schwartz      001-557-314-4196           4  
1           John Miller   +1-634-083-6275x321           0  
2          Lisa Johnson  +1-277-378-7466x3414           1  
3         Kara Anderson     775-490-7777x1957     

In [2]:
import dask_deltasharing as dds
print(dds.load_as_dask("umls.json#aml.default.aml_entities_live").head())
print(dds.load_as_dask("umls.json#aml.default.aml_transactions_live").head())
v = dds.get_latest_table_version("umls.json#aml.default.aml_transactions_live")
for v, c, df in dds.load_as_dask_changes("umls.json#aml.default.aml_transactions_live", starting_version=1, ending_version=v):
    print(c, len(df), df.head(1))

                                             address  \
0      1238 Justin Unions, Lake Stevemouth, VI 91105   
1             6425 Carey Courts, Brettfort, IA 31029   
2     5818 Armstrong Center, North Patrick, NJ 99779   
3            2796 Dawn Courts, Rebeccabury, SD 96536   
4  0342 Fletcher Drives, North Carrieburgh, GA 97838   

                  email_addr  entity_id entity_type             name  \
0    diane.leonard@yahoo.com     100000      Person    Diane Leonard   
1  veronica.burton@yahoo.com     100001      Person  Veronica Burton   
2    william.walls@gmail.com     100002      Person    William Walls   
3   joanna.clark@hotmail.com     100003      Person     Joanna Clark   
4       kyle.jones@gmail.com     100004      Person       Kyle Jones   

            phone_number  risk_score  
0   001-382-978-8791x172           1  
1      334-098-2811x9531           2  
2  001-914-284-9870x6662           3  
3             1748402737           1  
4     (104)178-6252x6992           

In [3]:
# Run this to reset the state.
from katana import remote
client = remote.Client()
for g in client.find_graphs_by_name("aml demo graph"):
    g.delete()

In [35]:
from katana import remote
from katana.remote import import_data
import dask_deltasharing
nodes = dask_deltasharing.load_as_dask("umls.json#aml.default.aml_entities_live")
edges = dask_deltasharing.load_as_dask("umls.json#aml.default.aml_transactions_live")

#nodes=dask_deltasharing.load_as_dask("test-profile.json#lynxkite-dev.default.aml_entities_live")
#edges = dask_deltasharing.load_as_dask("test-profile.json#lynxkite-dev.default.aml_transactions_live")
client = remote.Client()
graph = client.create_graph(num_partitions=4)
with import_data.DataFrameImporter(graph) as df_importer:
    df_importer.nodes_dataframe(nodes, id_column="entity_id", id_space="all")
    df_importer.edges_dataframe(
        edges,
        source_id_space="all",
        source_column="originator_id",
        destination_id_space="all",
        destination_column="beneficiary_id")
    df_importer.insert()



          0/? [?op/s]

          0/? [?op/s]

In [4]:
# The following is what goes into the periodically executed script.
# This "cronjob" is executed by a scheduling system and updates the RDG
# to keep it up to date with the source tables.

#from katana import remote
import incremental

client = remote.Client()
graph = incremental.get_or_create_graph(client, "aml demo graph", num_partitions=4)

def fullname(table):
    return "umls.json#aml.default." + table

# This is an idempotent one-time setup.
# It doesn't _need_ to be part of the cronjob, but I think putting it here is a great idea.
# This way you can add a new table anytime and it will be pulled in on the next execution.

incremental.upsert_incremental_node_source(
    graph,
    "entities",
    incremental.SourceSystem.Delta,
    fullname("aml_entities_live"),
    id_column="entity_id",
    label_column="entity_type",
    id_space="Entity")
incremental.upsert_incremental_edge_source(
    graph,
    "transactions",
    incremental.SourceSystem.Delta,
    fullname("aml_transactions_live"),
    type="TRANSFER",
    id_column="entity_id",
    source_id_space="Entity",
    source_column="originator_id",
    destination_id_space="Entity",
    destination_column="beneficiary_id")

# Here we go with the actual work!

incremental.incremental_refresh(graph)
print(graph.num_nodes(), graph.num_edges())

          0/? [?op/s]

          0/? [?op/s]

          0/? [?op/s]

          0/? [?op/s]

          0/? [?op/s]



          0/? [?op/s]

          0/? [?op/s]

          0/? [?op/s]



          0/? [?op/s]

          0/? [?op/s]

          0/? [?op/s]

          0/? [?op/s]

          0/? [?op/s]

          0/? [?op/s]

          0/? [?op/s]

          0/? [?op/s]

          0/? [?op/s]

          0/? [?op/s]

          0/? [?op/s]

          0/? [?op/s]

          0/? [?op/s]

          0/? [?op/s]

1100 587


The graph is loaded! If the source data is updated, just re-run the cell above.
It will update the graph in Katana with the new commits from the CDF.

Some debug queries below:

In [45]:
graph.query("match (v:Company) return v limit 10")

          0/? [?op/s]

Unnamed: 0,v.internal_id,v.labels,v.address,v.email_addr,v.entity_id,v.name,v.phone_number,v.risk_score,v.type
0,240,"[Company, Entity]","24524 Peterson Forge Suite 370, New Ryan, CA 8...",info@jensen-plc.com,200013,Jensen PLC,(847)914-0048x129,3,node
1,241,"[Company, Entity]","Unit 3637 Box 9875, DPO AA 45687",info@garcia-sullivan-and-merritt.com,200014,"Garcia, Sullivan and Merritt",3284179392,1,node
2,242,"[Company, Entity]","6636 Thomas Glen Suite 304, Christopherfort, P...",kimberly.edwards@yahoo.com,200019,Thompson and Sons,+1-252-433-8553x534,0,node
3,243,"[Company, Entity]","75570 Danielle Harbors Apt. 938, West Maryburg...",james.green@hotmail.com,200020,Thompson-Fritz,001-138-182-1805x0277,0,node
4,244,"[Company, Entity]","956 Burns Rest, Lake Anthony, CA 11768",info@smith-and-sons.com,200022,Smith and Sons,2754758785,4,node
5,245,"[Company, Entity]","34943 Smith Cliffs, Millerbury, NY 26434",info@hinton-matthews-and-anderson.com,200026,"Hinton, Matthews and Anderson",001-028-223-8820x612,1,node
6,246,"[Company, Entity]","3638 Scott Bridge, Lake Nicolehaven, TX 65004",info@vasquez-plc.com,200034,Vasquez PLC,278-683-4356,0,node
7,247,"[Company, Entity]","6251 Jessica Well, Parkton, CA 77615",rachael.collins@gmail.com,200042,"Rogers, Estrada and Branch",001-159-153-4551,0,node
8,248,"[Company, Entity]","16436 James Groves, Aliciatown, MT 90920",jennifer.fleming@yahoo.com,200044,"Compton, Park and Williams",7636194430,3,node
9,249,"[Company, Entity]","9192 Erica Light Apt. 137, Smithberg, AZ 63796",info@ferguson-taylor-and-rivera.com,200055,"Ferguson, Taylor and Rivera",637.415.8338,1,node


In [None]:
graph.query("match ()-[r]->() return r limit 10")

In [43]:
graph.schema().view()

          0/? [?op/s]

VBox(children=(HTML(value='\n                <style>\n                #jp-main-content-panel .widget-container…

In [5]:
smurfing = graph.query(
'''
MATCH
  (p1)-->(p2)-->(p9)-[tx9]->(p11:Company),
  (p3)-->(p4)-->(p9),
  (p5)-->(p6)-->(p10)-[tx10]->(p11),
  (p7)-->(p8)-->(p10)
WITH tx9.txn_amount + tx10.txn_amount AS total, p1.name AS first, p2.name AS second, p9.name AS third, p11.name AS fourth
WHERE total > 15000
RETURN DISTINCT first, second, third, fourth, total;
    '''
)
smurfing

          0/? [?op/s]

Unnamed: 0,first,second,third,fourth,total
0,Valdez-Daniels,Lee and Sons,Clark-Wood,Lee and Sons,15677
1,Lee and Sons,Miller Inc,Clark-Wood,Lee and Sons,15677
2,Sharon Gomez,Lee and Sons,Clark-Wood,Lee and Sons,15677
3,Gardner LLC,Lee and Sons,Clark-Wood,Lee and Sons,15677
4,"Cowan, Velazquez and Ortiz",Cohen PLC,Clark-Wood,Lee and Sons,15677
...,...,...,...,...,...
238,Smith and Sons,Valdez-Daniels,Lee and Sons,Clark-Wood,15222
239,Owens-Larson,Valdez-Daniels,Lee and Sons,Clark-Wood,15222
240,Lee and Sons,Valdez-Daniels,Lee and Sons,Clark-Wood,15222
241,"Sanchez, Taylor and Ross",Valdez-Daniels,Lee and Sons,Clark-Wood,15222


In [6]:
round_tripping = graph.query(
'''

MATCH
  (a)-[e1]->(b)-[e2]->(c:Company)-[e3]->(b)-[e4]->(a)
WITH e1.txn_amount + e2.txn_amount + e3.txn_amount + e4.txn_amount AS agg_txn_amount, a.name AS a, b.name AS b, c.name AS c
RETURN a, b, c, agg_txn_amount;
    '''
)

round_tripping

          0/? [?op/s]

Unnamed: 0,a,b,c,agg_txn_amount
0,Cheryl Ferguson,Thompson and Sons,Thompson and Sons,38690
1,Cheryl Ferguson,Thompson and Sons,Thompson and Sons,38602
2,Cheryl Ferguson,Thompson and Sons,Thompson and Sons,38602
3,Cheryl Ferguson,Thompson and Sons,Thompson and Sons,38514
4,Michelle Smith,"Garcia, Sullivan and Merritt","Moore, Williams and Chen",10166
...,...,...,...,...
90,Gardner LLC,Nelson-Phillips,Dean Inc,25064
91,Denise Taylor,Williams-Brown,Yang PLC,15235
92,Wendy Ray,Miles LLC,Thompson-Fritz,16762
93,"Sandoval, Williams and Golden","Banks, Diaz and Lopez","Garcia, Sullivan and Merritt",22346
