# Events of interest from raw instrumentation logs

In [15]:
import pandas as pd

logs_df = pd.read_csv('loggedfs_events_pagerank.log')
cols = [col for col in logs_df.columns if col in ['Action','Time','Path','PID','PPID','UID','Command Line']]
df = logs_df[cols]
df[(df['Action'] == 'read') | (df['Action'] == 'write')]

Unnamed: 0,Time,Action,Path,UID,PID,PPID,Command Line
1,1490382765590,read,/home/lubuntu/src/file_access_monitor/test_wor...,1000,3702,3701,python concat_csvs.py in1.csv in2.csv in3.csv ...
3,1490382765592,read,/home/lubuntu/src/file_access_monitor/test_wor...,1000,3702,3701,python concat_csvs.py in1.csv in2.csv in3.csv ...
5,1490382765593,read,/home/lubuntu/src/file_access_monitor/test_wor...,1000,3702,3701,python concat_csvs.py in1.csv in2.csv in3.csv ...
7,1490382765594,read,/home/lubuntu/src/file_access_monitor/test_wor...,1000,3702,3701,python concat_csvs.py in1.csv in2.csv in3.csv ...
10,1490382765598,write,/home/lubuntu/src/file_access_monitor/test_wor...,1000,3702,3701,python concat_csvs.py in1.csv in2.csv in3.csv ...
12,1490382766933,read,/home/lubuntu/src/file_access_monitor/test_wor...,1000,3709,3701,python concat_csvs.py in1.csv in2.csv in3.csv ...
14,1490382766935,read,/home/lubuntu/src/file_access_monitor/test_wor...,1000,3709,3701,python concat_csvs.py in1.csv in2.csv in3.csv ...
16,1490382766936,read,/home/lubuntu/src/file_access_monitor/test_wor...,1000,3709,3701,python concat_csvs.py in1.csv in2.csv in3.csv ...
18,1490382766937,read,/home/lubuntu/src/file_access_monitor/test_wor...,1000,3709,3701,python concat_csvs.py in1.csv in2.csv in3.csv ...
21,1490382766940,write,/home/lubuntu/src/file_access_monitor/test_wor...,1000,3709,3701,python concat_csvs.py in1.csv in2.csv in3.csv ...


# Build a Neo4j lineage graph from instrumentation logs

In [11]:
import lineage_graph_fake_telemetry as lg

graph = lg.build_lineage_graph('loggedfs_events_pagerank.log')

# Custom visualization of lineage graph using Vis.js

In [12]:
from scripts.vis import draw

options = {"Dataset": "name", "Job": "uid"}
draw(graph, options)

# Query Neo4j temporal lineage graph using Cypher

## Query results as a Pandas DataFrame

In [None]:
# Sanity check query.
query = """
    MATCH (n)-[r]->(m)
    RETURN id(n) AS source_id,
           id(r) AS rel_id,
           id(m) AS target_id
"""

results_df = pd.DataFrame(graph.data(query))
print(results_df)

## Copy detection as a reachability query between 2 content similar datasets

In [None]:
query = """
    MATCH (n:Dataset)
    WHERE (n.name = '/home/lubuntu/src/file_access_monitor/test_workflows/in1.csv'
    OR n.name = '/home/lubuntu/src/file_access_monitor/test_workflows/out4.csv')
    RETURN id(n) as node_id
"""
node_ids = graph.data(query)

# Path query with monotonically increasing timestamps.
query = """
    START a=node(%d), b=node(%d)
    MATCH p=(a)-[r*]->(b)
    WITH head(relationships(p)) as r1, p, b
    RETURN p, b.name
""" % (node_ids[0]['node_id'], node_ids[1]['node_id'])


paths = graph.data(query)
print('\nFound %d path(s) from a to b:\n' % len(paths))
print(paths)

# Path query with monotonically increasing timestamps.
query = """
    START a=node(%d), b=node(%d)
    MATCH p=(b)-[r*]->(a)
    WITH head(relationships(p)) as r1, p, b
    RETURN p, b.name
""" % (node_ids[0]['node_id'], node_ids[1]['node_id'])


paths = graph.data(query)
print('\nFound %d path(s) from b to a:\n' % len(paths))
print(paths)

## Copy detection as a reachability query between 2 content similar datasets (monotonically increasing edge timestamps)

In [None]:
query = """
    MATCH (n:Dataset)
    WHERE (n.name = '/home/lubuntu/src/file_access_monitor/test_workflows/in1.csv'
    OR n.name = '/home/lubuntu/src/file_access_monitor/test_workflows/out4.csv')
    RETURN id(n) as node_id
"""
node_ids = graph.data(query)

# Path query with monotonically increasing timestamps.
query = """
    START a=node(%d), b=node(%d)
    MATCH p=(a)-[r*]->(b)
    WITH head(relationships(p)) as r1, p, b
    WHERE all(r2 in relationships(p)
              where r2.timestamp>=r1.timestamp)    
    RETURN p, b.name
""" % (node_ids[0]['node_id'], node_ids[1]['node_id'])


paths = graph.data(query)
print('\nFound %d path(s) from a to b with monotonically increasing timestamps:\n' % len(paths))
print(paths)

# Path query with monotonically increasing timestamps.
query = """
    START a=node(%d), b=node(%d)
    MATCH p=(b)-[r*]->(a)
    WITH head(relationships(p)) as r1, p, b
    WHERE all(r2 in relationships(p)
              where r2.timestamp>=r1.timestamp)    
    RETURN p, b.name
""" % (node_ids[0]['node_id'], node_ids[1]['node_id'])


paths = graph.data(query)
print('\nFound %d path(s) from b to a with monotonically increasing timestamps:\n' % len(paths))
print(paths)

# Aggregate Queries


In [None]:
query = """
    START from=node(*)
    MATCH p=(from)-->(to)
    WITH from as from, to as to, count(p) as paths
    WHERE paths >= 1
    RETURN to,paths
"""

paths = graph.data(query)
print(paths)

# PageRank

In [14]:
%load_ext cypher
import networkx as nx
import operator
%matplotlib inline

results = %cypher MATCH p = (a)-[r*]->(b) RETURN p

# Networkx graph.
g = results.get_graph()

#nx.draw(g)
#g.nodes(data=True)

# Print nodes so that we can see their original ids and properties.
print("\nOriginal Nodes:")
print(g.nodes(data=True))

# Node weights for personalized pagerank.
personalize = {}
for node, data in g.nodes(data=True):
    if 'Dataset' in data['labels']:
        personalize[node] = 1
    elif 'Job' in data['labels']:
        # Let's weight each edge by the amount of CPU consumed.
        personalize[node] = data['cpu']
    print("node=%s, data=%s" % (node, data))

print("\nPersonalize Vector:")
print(personalize)

# Transformation from MultiDigraph to Graph for Pagerank calculation.
H = nx.Graph()
for src, dst, edge in g.edges(data=True):
    # Let's weight each edge by the amount of bytes read / written.
    w = edge['data_bytes']
    if H.has_edge(src, dst):
        H[src][dst]['weight'] += w
    else:
        H.add_edge(src, dst, weight=w)

#print("\nPageRank:")
#print(nx.pagerank(H))

print("\nPersonalized PageRank:")
ranks = nx.pagerank(H, personalization=personalize)
sorted_ranks = sorted(ranks.items(), key=operator.itemgetter(1), reverse=True)
print(sorted_ranks)

The cypher extension is already loaded. To reload it, use:
  %reload_ext cypher
104 rows affected.

Original Nodes:
[('242', {'pid': 3515, 'name': 'cp out6.csv out8.csv ', 'labels': ['Job'], 'cpu': 86.27050511285313, 'uid': 'PID=3515,CLI=cp out6.csv out8.csv '}), ('239', {'name': '/home/lubuntu/src/file_access_monitor/test_workflows/out6.csv', 'labels': ['Dataset']}), ('244', {'pid': 3517, 'name': 'cp out7.csv out9.csv ', 'labels': ['Job'], 'cpu': 71.10056355828232, 'uid': 'PID=3517,CLI=cp out7.csv out9.csv '}), ('236', {'pid': 3509, 'name': 'cp out3.csv out5.csv ', 'labels': ['Job'], 'cpu': 65.48975625975224, 'uid': 'PID=3509,CLI=cp out3.csv out5.csv '}), ('231', {'name': '/home/lubuntu/src/file_access_monitor/test_workflows/in2.csv', 'labels': ['Dataset']}), ('246', {'pid': 3519, 'name': 'cp out8.csv out10.csv ', 'labels': ['Job'], 'cpu': 86.37975548674837, 'uid': 'PID=3519,CLI=cp out8.csv out10.csv '}), ('237', {'name': '/home/lubuntu/src/file_access_monitor/test_workflows/out5.csv'