In [1]:
# ! pip install findspark

# !pip install graphframes
# https://towardsdatascience.com/graphframes-in-jupyter-a-practical-guide-9b3b346cebc5

# imports and libraries
import findspark
findspark.init()

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext, DataFrame
from pyspark.conf import SparkConf

# Spark runtime boilerplate

sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
sqlC = SQLContext(sc)
sc.addPyFile("/Users/josephhaaga/.ivy2/jars/graphframes_graphframes-0.6.0-spark2.3-s_2.11.jar")

## 1. Preexisting GraphFrame g

In [2]:
edges = sqlC.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("./data/peopleAndCompanies_edges.csv") 
    
vertices = sqlC.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("./data/peopleAndCompanies_vertices.csv")
    

In [3]:
from graphframes import *
# https://stackoverflow.com/a/50404308
g = GraphFrame(vertices, edges)

from pyspark.sql.functions import lit

In [4]:
motifs = g.find("(a)-[e]->(b); (c)-[e2]->(b)")

results = motifs.filter("a.type == 'person'") \
    .filter("c.type == 'person'") \
    .filter("b.type == 'company'") \
    .filter("e.relationship == 'employed_by'") \
    .filter("e2.relationship == 'employed_by'")

In [5]:
src_dst = results.select("a.id","c.id")

In [6]:
newEdges = src_dst.withColumn("relationship", lit("works_with"))

In [7]:
totalE = g.edges.union(newEdges)

In [8]:
totalE.count()

707

In [9]:
# src_dst = results.select("a.id","c.id")
# newEdges = src_dst.withColumn("relationship", lit("works_with"))
# totalE = g.edges.union(newEdges)
# totalE.count()

## 2. Get triplets from g

In [10]:
def describeMetamodel(g):
    """
    Generate Graql insert statements to create a metamodel representation of GraphFrame g.
    """
    # Vertices
    vertexTypes = g.vertices.select("type").distinct().rdd.flatMap(lambda x: x).collect()
    createVertices = ['$'+v+' isa graphVertex has name "'+v+'";' \
     for v in vertexTypes]

    # Edges
    edgeTypes = g.edges.select("relationship").distinct().rdd.flatMap(lambda x: x).collect()
    createEdges = ['$'+e+' isa graphEdge has name "'+e+'";' \
     for e in edgeTypes]

    s = g.edges

    # Triples
    tripleTypes = s.join(vertices, s.src == vertices.id) \
        .select(["src","type","dst","relationship"]) \
        .withColumnRenamed('type','src_type') \
        .join(vertices, s.dst == vertices.id) \
        .select(["src","src_type","dst","type","relationship"]) \
        .withColumnRenamed('type', 'dst_type') \
        .select(['src_type', 'relationship', 'dst_type']) \
        .distinct() \
        .collect()
    tripleTypes = [a.asDict() for a in tripleTypes]
    createTriplets = [ \
        """${0} isa graphTriplet has name "{1}";
        (src-vertex-owned: ${2}, dst-vertex-owned: ${3}, edge-owned: ${4}, object-owner: ${0}) isa has-graphobjects;
        """.format( \
                a['src_type']+"_"+a['relationship']+"_"+a['dst_type'], \
                a['src_type']+" "+a['relationship']+" "+a['dst_type'], \
                a['src_type'], \
                a['dst_type'], \
                a['relationship'] \
            ).replace("\n","").replace("\t", ' ')
        for a in tripleTypes \
    ]

    verts = " ".join(createVertices)
    edges = " ".join(createEdges)
    trips = " ".join(createTriplets)
    return (verts + ' ' + edges + " " + trips)

In [11]:
metamodelCreateStatements = describeMetamodel(g)
metamodelCreateStatements = metamodelCreateStatements[:metamodelCreateStatements.rindex(";")]

In [12]:
#!pip install primal-grakn

## 3. Create metamodel

In [15]:
import primal_grakn as grakn
with grakn.Graph(uri='localhost:48555', keyspace='grakn') as graph:
    graph.match_or_insert(metamodelCreateStatements)
    graph.commit()

## Query metamodel for possible parameters
Now we can provide the user with a OneHop operation, using the parameter options described by the Grakn metamodel.


In [16]:
with grakn.Graph(uri='localhost:48555', keyspace='grakn') as graph:
    concept_map = graph.execute('match $a isa has-graphobjects; get;')    
    graph.commit()

In [17]:
len(concept_map)

3

In [18]:
concept_map[1]['a']

{'base_type': 'relationship',
 'dst-vertex-owned': {'base_type': 'entity',
  'id': 'V8312',
  'name': {'value': 'company'},
  'type': 'graphVertex'},
 'edge-owned': {'base_type': 'entity',
  'id': 'V4264',
  'name': {'value': 'employed_by'},
  'type': 'graphEdge'},
 'id': 'V12416',
 'object-owner': {'base_type': 'entity',
  'id': 'V40964104',
  'name': {'value': 'person employed_by company'},
  'type': 'graphTriplet'},
 'src-vertex-owned': {'base_type': 'entity',
  'id': 'V40964280',
  'name': {'value': 'person'},
  'type': 'graphVertex'},
 'type': 'has-graphobjects'}

In [19]:
sources = [a['a']['src-vertex-owned'] for a in concept_map]
sources

[{'base_type': 'entity',
  'id': 'V40964280',
  'name': {'value': 'person'},
  'type': 'graphVertex'},
 {'base_type': 'entity',
  'id': 'V40964280',
  'name': {'value': 'person'},
  'type': 'graphVertex'},
 {'base_type': 'entity',
  'id': 'V8312',
  'name': {'value': 'company'},
  'type': 'graphVertex'}]

## 4. Perform transform

In [105]:
def OneHop(g, params):
    """
    Create a 'new_edge' relationship between A and C due to shared relationship with vertex B.
    
    g is a GraphFrame object
    
    params is a dictionary where each parameter in the motif query is a key,
    and it's value can be a specific parameter or "None"/omitted (wildcard)
    
    e.g.
    
    {
        a: "person",
        c: "person",
        e: "employed_by",
        # note how e2 is omitted
        new_edge: "works_with"
    }
    """
    template = "(a)-[e]->(b); (c)-[e2]->(b)";
    results = g.find(template)

    if 'a' in params and params['a'] is not None:
        results = results.filter("a.type == '"+params['a']+"'")
    if 'b' in params and params['b'] is not None:
        results = results.filter("b.type == '"+params['b']+"'")
    if 'c' in params and params['c'] is not None:
        results = results.filter("c.type == '"+params['c']+"'")

    if 'e' in params and params['e'] is not None:
        results = results.filter("e.relationship == '"+params['e']+"'")
    if 'e2' in params and params['e2'] is not None:
        results = results.filter("e2.relationship == '"+params['e2']+"'")
    
#   Filter to ensure person does not 'works_with' themself
    src_dst = results.select("a.id","c.id").filter('a.id != c.id')
    newEdges = src_dst.withColumn("relationship", lit(params['new_edge']))
    totalE = g.edges.union(newEdges)
#     print(str(newEdges.count())+" '"+params['new_edge']+"' edges added.")
    print(str(totalE.filter("relationship == '"+params['new_edge']+"'").count())+" '"+params['new_edge']+"' edges added.")
    return GraphFrame(g.vertices, totalE)

## 5. Genera

In [106]:
g2 = OneHop(g, {"a":"person", \
               "b":"company", \
               "c":"person", \
               "e":"employed_by", \
               "e2":"employed_by", \
               "new_edge":"works_with"})

452 'works_with' edges added.


N.B. We can provide a `<select>` list of dynamically-changing options ["guard rails"] when we connect the frontend UI. That way, users will not be able to query for nonexistent triplets like `(Company)-[claims_dependent]-(Company)`.

## Sanity check

Check how many employed_by relationships there are, and calculate how many works_with edges should have been added.

In [107]:
numEmployments = g2.edges.filter("relationship == 'employed_by'").count()
print(str(numEmployments) + " employement relationships")

98 employement relationships


In [108]:
a = g2.edges.filter("relationship == 'employed_by'").toPandas()['dst'].value_counts()
b = a - 1
print("Largest company has "+str(a.max()) + " employees")

Largest company has 9 employees


In [109]:
print("Assuming # of works_with relationships between n nodes = n(n-1)...")
print("We expect to add "+str(sum(a*b))+" edges.")

Assuming # of works_with relationships between n nodes = n(n-1)...
We expect to add 458 edges.


In [110]:
g2.edges.filter("relationship == 'works_with'").count()

452