## **Distributed Graph Cube Computation Using Spark**




In [188]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as sf
from pyspark import StorageLevel
import findspark

from itertools import product as crossProduct
from itertools import combinations
from math import prod

findspark.init()

spark = (SparkSession.builder.master('local').
            appName("Graph Cube Computations").                                     
            getOrCreate())

In [187]:
spark.stop()

### **GraphNaive Algorithm**

In [114]:
class GraphNaive:

    def __init__(self, verticesRDD, vertexDimensions, egdesRDD, edgeDimensions):
        self.graphCube = {}
        self.dimensions = {
            'V': vertexDimensions,
            'E': edgeDimensions,
            'VE': (*vertexDimensions, *edgeDimensions)
        }

        self.V = verticesRDD
        self.E = egdesRDD

        # Generate Multidimensional Graph Table
        self.VE = (self.V.join(self.E.map(lambda e: (e[0], (e[1], e[2]))))           
            .map(lambda row: (row[1][1][0], (row[1][0], row[1][1][1])))             
            .join(self.V)                                                             
            .map(lambda row: (row[1][0][0], row[1][1], row[1][0][1])))                 


    def computeGraphCube(self):
        VCube = self.V.flatMap(lambda v :
            [(key, 1) for key in crossProduct(*[['*', val] for val in v[1]])]
        ).aggregateByKey(
            0,
            seqFunc = lambda x, y : x + 1,
            combFunc = lambda x, y : x + y,
        )
        # self.graphCube['V'].persist(StorageLevel.MEMORY_AND_DISK)
        self._computeCuboids('V', VCube)

        ECube = self.E.flatMap(lambda e :
            [(key, 1) for key in crossProduct(*[['*', val] for val in e[2]])]
        ).aggregateByKey(
            0,
            seqFunc = lambda x, y : x + 1,
            combFunc = lambda x, y : x + y,
        )
        # self.graphCube['E'].persist(StorageLevel.MEMORY_AND_DISK)
        self._computeCuboids('E', ECube)

        VECube = self.VE.flatMap(lambda ve:
            ([(key, 1) for key in crossProduct(
                *[['*', val] for val in
                [*list(map(lambda x, y: " - ".join([x, y]), ve[0], ve[1])),
                *list(map(str, ve[2]))]]
            )])
        ).aggregateByKey(
            0,
            seqFunc = lambda x, y : x + 1,
            combFunc = lambda x, y : x + y,
        )
        # self.graphCube['VE'].persist(StorageLevel.MEMORY_AND_DISK)
        self._computeCuboids('VE', VECube)


    def _computeCuboids(self, dType, cube):
        dim = self.dimensions[dType]
        self.graphCube[dType] = {}
        for i in range(0, len(self.dimensions[dType])):
            for cuboidDim in combinations(self.dimensions[dType], i):
                self.graphCube[dType][frozenset(cuboidDim)] = (
                    cube.filter(
                        lambda v: all([v[0][i] != '*' if dim[i] in cuboidDim else v[0][i] == '*'
                                    for i in range(len(dim))])
                    )
                )
                self.graphCube[dType][frozenset(cuboidDim)].persist(StorageLevel.MEMORY_AND_DISK)



    def getCuboid(self, queryDim):
        if not self.graphCube:
            self.computeGraphCube()

        verticesQueryDim = [dim for dim in queryDim if dim in self.dimensions['V']]
        vertices = self.graphCube['V'][frozenset(verticesQueryDim)].collect()
        edges = self.graphCube['VE'][frozenset(queryDim)].collect()

        return vertices, edges

### **GraphTDC Algorithm**

In [143]:
class GraphTDC:

    def __init__(self, verticesRDD, vertexDimensions, egdesRDD, edgeDimensions, cardinalityMap):
        self.graphCube = {}
        self.dimensions = {
            'V': vertexDimensions,
            'E': edgeDimensions,
            'VE': (*vertexDimensions, *edgeDimensions)
        }
        self.cardinalityMap = cardinalityMap
        self.plan = {
            'V': self._planGenerator('V'),
            'E': self._planGenerator('E'),
            'VE': self._planGenerator('VE')
        }

        self.V = verticesRDD
        self.E = egdesRDD

        # Generate Multidimensional Graph Table
        self.VE = (self.V.join(self.E.map(lambda e: (e[0], (e[1], e[2])))).
                    map(lambda row: (row[1][1][0], (row[1][0], row[1][1][1]))).
                    join(self.V).
                    map(lambda row: (row[1][0][0], row[1][1], row[1][0][1])))


    def _planGenerator(self, dType):
        try:
            dim = self.dimensions[dType]
        except:
            raise ValueError('Invalid input: `dType`')

        plan = {}
        for L in range(len(dim), 0, -1):
            UGC = self._getGraphCuboids(dim, L)
            LGC = self._getGraphCuboids(dim, L-1)
            for lgc in LGC:
                AGC = self._getAncestorGraphCuboids(lgc, UGC)
                smallestAncestor = self._getSmallestGraphCuboid(AGC)
                if smallestAncestor not in plan:
                    plan[smallestAncestor] = []
                plan[smallestAncestor].append(lgc)
        return plan


    def _getGraphCuboids(self, dim, level):
        return list(combinations(dim, level))


    def _getAncestorGraphCuboids(self, lgc, UGC):
        AGC = []
        for ugc in UGC:
            if all(d in ugc for d in lgc):
                AGC.append(ugc)
        return AGC


    def _getSmallestGraphCuboid(self, AGC):
        minAncestor = None
        for agc in AGC:
            cost = prod(self.cardinalityMap[d] for d in agc)
            if not minAncestor or cost < minCost:
                minAncestor = agc
                minCost = cost
        return minAncestor


    def computeGraphCube(self):
        VTopAncestor = self.computeCuboid(self.V, list(self.dimensions['V']), 'V', True)
        self.graphCube['V'] = {}
        self.graphCube['V'][frozenset(self.dimensions['V'])] = VTopAncestor
        self.graphCube['V'][frozenset(self.dimensions['V'])].persist(StorageLevel.MEMORY_AND_DISK)
        self.computeDescendants(self.dimensions['V'], 'V')

        ETopAncestor = self.computeCuboid(self.E, list(self.dimensions['E']), 'E', True)
        self.graphCube['E'] = {}
        self.graphCube['E'][frozenset(self.dimensions['E'])] = ETopAncestor
        self.graphCube['E'][frozenset(self.dimensions['E'])].persist(StorageLevel.MEMORY_AND_DISK)
        self.computeDescendants(self.dimensions['E'], 'E')

        VETopAncestor = self.computeCuboid(self.VE, list(self.dimensions['VE']), 'VE', True)
        self.graphCube['VE'] = {}
        self.graphCube['VE'][frozenset(self.dimensions['VE'])] = VETopAncestor
        self.graphCube['VE'][frozenset(self.dimensions['VE'])].persist(StorageLevel.MEMORY_AND_DISK)
        self.computeDescendants(self.dimensions['VE'], 'VE')


    def computeCuboid(self, ancestor, dim, dType, isTopCuboid=False):
        valIdx = 0
        try:
            header = self.dimensions[dType]
        except:
            raise ValueError('Invalid input: `dType`')
        
        if isTopCuboid:
            if dType == 'V':
                    valIdx = 1
            elif dType == 'E':
                    valIdx = 2
            elif dType == 'VE':
                    return ancestor.map(
                        lambda rec : ((*["-".join([rec[0][i], rec[1][i]])
                        for i in range(len(rec[0]))], *list(map(str, rec[2]))), 1)
                    ).aggregateByKey(
                        0,
                        seqFunc = lambda x, y : x + y,
                        combFunc = lambda x, y : x + y,
                    )
            else:
                raise ValueError('Invalid input: `dType`')

        return ancestor.map(lambda rec :
                (tuple([rec[valIdx][i]
                        if header[i] in dim else '*'
                        for i in range(len(rec[valIdx]))]), 1 if isTopCuboid else rec[1])
            ).aggregateByKey(
                0,
                seqFunc = lambda x, y : x + y,
                combFunc = lambda x, y : x + y,
            )


    def computeDescendants(self, ancestorDim, dType):
        try:
            plan = self.plan[dType]
        except:
            raise ValueError('Invalid input: `dType`')

        for descendantDim in plan[ancestorDim]:
            self.graphCube[dType][frozenset(descendantDim)] = self.computeCuboid(
                self.graphCube[dType][frozenset(ancestorDim)],
                descendantDim,
                dType
            )
            self.graphCube[dType][frozenset(descendantDim)].persist(StorageLevel.MEMORY_AND_DISK)
            if descendantDim in plan.keys():
                self.computeDescendants(descendantDim, dType)


    def getCuboid(self, queryDim):
        if not self.graphCube:
            self.computeGraphCube()

        verticesQueryDim = [dim for dim in queryDim if dim in self.dimensions['V']]
        vertices = self.graphCube['V'][frozenset(verticesQueryDim)].collect()
        edges = self.graphCube['VE'][frozenset(queryDim)].collect()

        return vertices, edges

### **Sample Dataset**

##### Vertices

In [207]:
sampleVerticesData = [
    (1, 'M', 'USA', 'Professor'),
    (2, 'F', 'USA', 'Doctor'),
    (3, 'M', 'China', 'Engineer'),
    (4, 'M', 'SG', 'Engineer'),
    (5, 'F', 'SG', 'Professor'),
    (6, 'M', 'SG', 'Doctor'),
    (7, 'F', 'China', 'Engineer'),
    (8, 'F', 'China', 'Doctor'),
    (9, 'M', 'USA', 'Doctor')
]

sampleVertices = spark.sparkContext.parallelize(sampleVerticesData).map(
    lambda rec: (rec[0], tuple(rec[1:]))
)


##### Edges

In [208]:
sampleEdgesData = [
    (1, 2, '2008', 'Family', 9),
    (1, 4, '2010', 'Friend', 7),
    (1, 5, '2011', 'Colleague', 9),
    (2, 3, '2011', 'Friend', 7),
    (2, 4, '2011', 'Friend', 4),
    (3, 4, '2008', 'Friend', 8),
    (3, 7, '2011', 'Colleague', 8),
    (3, 6, '2012', 'Family', 3),
    (4, 5, '2009', 'Family', 9),
    (4, 6, '2010', 'Friend', 6),
    (5, 6, '2010', 'Family', 8),
    (5, 9, '2012', 'Friend', 8),
    (6, 7, '2008', 'Friend', 7),
    (6, 8, '2012', 'Colleague', 9),
    (6, 9, '2012', 'Friend', 5),
    (7, 8, '2011', 'Friend', 5),
    (8, 9, '2012', 'Colleague', 8)
]

sampleEdges = spark.sparkContext.parallelize(sampleEdgesData).map(
    lambda rec: (rec[0], rec[1], tuple(rec[2:]))
)

#### **Performance Test**

##### GraphNaive

In [219]:
sampleNaiveCube = GraphNaive(sampleVertices, ('Gender', 'Nation', 'Profession'), sampleEdges, ('Year', 'Type', 'Weight'))
sampleNaiveCube.computeGraphCube()

Cuboid Query

In [229]:
# %%timeit
queryDim = ('Gender', 'Nation')
vertices, edges = sampleNaiveCube.getCuboid(queryDim)

print('Vertices: ')
for v in vertices:
    print(v)
print('\nEdges: ')
for e in edges:
    print(e)

Vertices: 
(('M', 'USA', '*'), 2)
(('F', 'USA', '*'), 1)
(('M', 'China', '*'), 1)
(('M', 'SG', '*'), 2)
(('F', 'SG', '*'), 1)
(('F', 'China', '*'), 2)

Edges: 
(('F - M', 'SG - USA', '*', '*', '*', '*'), 1)
(('M - M', 'USA - SG', '*', '*', '*', '*'), 1)
(('M - F', 'China - China', '*', '*', '*', '*'), 1)
(('F - F', 'China - China', '*', '*', '*', '*'), 1)
(('M - F', 'USA - USA', '*', '*', '*', '*'), 1)
(('F - M', 'USA - China', '*', '*', '*', '*'), 1)
(('M - M', 'SG - USA', '*', '*', '*', '*'), 1)
(('F - M', 'China - USA', '*', '*', '*', '*'), 1)
(('F - M', 'USA - SG', '*', '*', '*', '*'), 1)
(('M - F', 'SG - China', '*', '*', '*', '*'), 2)
(('M - F', 'SG - SG', '*', '*', '*', '*'), 1)
(('M - M', 'SG - SG', '*', '*', '*', '*'), 1)
(('M - M', 'China - SG', '*', '*', '*', '*'), 2)
(('F - M', 'SG - SG', '*', '*', '*', '*'), 1)
(('M - F', 'USA - SG', '*', '*', '*', '*'), 1)


##### GraphTDC

In [221]:
sampleTDC = GraphTDC(sampleVertices, ('Gender', 'Nation', 'Profession'), sampleEdges, ('Year', 'Type', 'Strength'),
             cardinalityMap={'Gender':2, 'Nation':3, 'Profession':3, 'Year':5, 'Type':3, 'Strength':10})
sampleTDC.computeGraphCube()

Cuboid Query

In [230]:
# %%timeit
queryDim = ('Gender', 'Nation')
vertices, edges = sampleTDC.getCuboid(queryDim)

print('Vertices: ')
for v in vertices:
    print(v)
print('\nEdges: ')
for e in edges:
    print(e)

Vertices: 
(('M', 'USA', '*'), 2)
(('F', 'USA', '*'), 1)
(('M', 'China', '*'), 1)
(('M', 'SG', '*'), 2)
(('F', 'SG', '*'), 1)
(('F', 'China', '*'), 2)

Edges: 
(('M-F', 'USA-SG', '*', '*', '*', '*'), 1)
(('F-M', 'SG-USA', '*', '*', '*', '*'), 1)
(('F-M', 'China-USA', '*', '*', '*', '*'), 1)
(('F-F', 'China-China', '*', '*', '*', '*'), 1)
(('F-M', 'USA-SG', '*', '*', '*', '*'), 1)
(('M-F', 'USA-USA', '*', '*', '*', '*'), 1)
(('M-M', 'SG-SG', '*', '*', '*', '*'), 1)
(('F-M', 'USA-China', '*', '*', '*', '*'), 1)
(('M-M', 'USA-SG', '*', '*', '*', '*'), 1)
(('M-M', 'SG-USA', '*', '*', '*', '*'), 1)
(('M-F', 'SG-SG', '*', '*', '*', '*'), 1)
(('M-F', 'SG-China', '*', '*', '*', '*'), 2)
(('M-M', 'China-SG', '*', '*', '*', '*'), 2)
(('M-F', 'China-China', '*', '*', '*', '*'), 1)
(('F-M', 'SG-SG', '*', '*', '*', '*'), 1)


##### Dataframe

In [57]:
dfV = spark.createDataFrame(sampleVerticesData, ['Id', 'Gender', 'Country', 'Profession'])
dfE = spark.createDataFrame(sampleEdgesData, ['SourceId', 'DestinationId', 'Year', 'Type', 'Weight'])

dfVE = dfE.join(dfV, dfV.Id == dfE.SourceId).alias('src').join(
        dfV.alias('dest'), sf.col('src.DestinationId') == sf.col('dest.Id')
    ).select(
        'src.Gender', 'src.Country', 'src.Profession',
        'dest.Gender', 'dest.Country', 'dest.Profession',
        'src.Year', 'src.Type', 'src.Weight'
    )

VCubeDf = dfV.cube('Gender', 'Country', 'Profession').count().orderBy('Gender', 'Country', 'Profession')

VECubeDf = (dfVE.cube(
    'src.Gender', 'dest.Gender', 
    'src.Country', 'dest.Country',
    'src.Profession', 'dest.Profession',
    'Year', 'Type', 'Weight')
    .count())

Cuboid Query

In [10]:
# %%timeit
VCubeDf.filter(
    sf.col('Gender').isNotNull() &
    sf.col('Country').isNull() &
    sf.col('Profession').isNull()
).show()

VECubeDf.filter(
    sf.col('src.Gender').isNotNull() & sf.col('dest.Gender').isNotNull() &
    sf.col('src.Country').isNull() & sf.col('dest.Country').isNull() &
    sf.col('src.Profession').isNull() & sf.col('dest.Profession').isNull() &
    sf.col('Year').isNull() & 
    sf.col('Type').isNotNull() &
    sf.col('Weight').isNull()
).show()

+------+-------+----------+-----+
|Gender|Country|Profession|count|
+------+-------+----------+-----+
|     F|   NULL|      NULL|    4|
|     M|   NULL|      NULL|    5|
+------+-------+----------+-----+

+------+------+-------+-------+----------+----------+----+---------+------+-----+
|Gender|Gender|Country|Country|Profession|Profession|Year|     Type|Weight|count|
+------+------+-------+-------+----------+----------+----+---------+------+-----+
|     M|     F|   NULL|   NULL|      NULL|      NULL|NULL|Colleague|  NULL|    3|
|     F|     M|   NULL|   NULL|      NULL|      NULL|NULL|Colleague|  NULL|    1|
|     M|     M|   NULL|   NULL|      NULL|      NULL|NULL|   Friend|  NULL|    4|
|     M|     F|   NULL|   NULL|      NULL|      NULL|NULL|   Family|  NULL|    2|
|     F|     M|   NULL|   NULL|      NULL|      NULL|NULL|   Family|  NULL|    1|
|     F|     M|   NULL|   NULL|      NULL|      NULL|NULL|   Friend|  NULL|    3|
|     M|     M|   NULL|   NULL|      NULL|      NULL|NULL

### **Facebook Dataset**

##### Vertices

In [195]:
fbVertices = spark.sparkContext.textFile("./data/facebook_vertices.csv")
fbVertices = fbVertices.map(lambda rec: (rec.split(',')[0], tuple(rec.split(',')[1:])))
fbVerticesHeader = fbVertices.first()
fbVertices = fbVertices.filter(lambda rec: rec != fbVerticesHeader)

##### Edges

In [196]:
fbEdges = spark.sparkContext.textFile("./data/facebook_edges.csv")
fbEdges = fbEdges.map(lambda rec: (rec.split(',')[0], rec.split(',')[1], tuple(rec.split(',')[2:])))
fbEgdesHeader = fbEdges.first()
fbEdges = fbEdges.filter(lambda rec: rec != fbEgdesHeader)

#### **Performance Test**

##### GraphNaive

In [231]:
fbNaiveCube = GraphNaive(fbVertices, ('PageType', 'Year', 'Country'), fbEdges, ('FriendType', 'FriendshipAge'))
fbNaiveCube.computeGraphCube()

Cuboid Query

In [232]:
# %%timeit
queryDim = ('PageType', 'FriendType')
vertices, edges = fbNaiveCube.getCuboid(queryDim)

print('Vertices: ')
for v in vertices:
    print(v)
print('\nEdges: ')
for e in edges:
    print(e)

Vertices: 
(('tvshow', '*', '*'), 3327)
(('government', '*', '*'), 6880)
(('company', '*', '*'), 6495)
(('politician', '*', '*'), 5768)

Edges: 
(('tvshow - tvshow', '*', '*', 'Restricted', '*'), 4335)
(('politician - politician', '*', '*', 'Acquaintance', '*'), 12388)
(('government - government', '*', '*', 'Acquaintance', '*'), 27334)
(('company - government', '*', '*', 'Acquaintance', '*'), 842)
(('company - government', '*', '*', 'Restricted', '*'), 819)
(('government - government', '*', '*', 'Restricted', '*'), 27252)
(('company - company', '*', '*', 'Acquaintance', '*'), 6712)
(('politician - tvshow', '*', '*', 'Restricted', '*'), 197)
(('company - politician', '*', '*', 'Restricted', '*'), 118)
(('tvshow - politician', '*', '*', 'Close Friend', '*'), 224)
(('politician - government', '*', '*', 'Acquaintance', '*'), 1541)
(('politician - government', '*', '*', 'Close Friend', '*'), 1485)
(('tvshow - government', '*', '*', 'Acquaintance', '*'), 217)
(('government - tvshow', '*', '*

##### GraphTDC

In [233]:
fbTDC = GraphTDC(fbVertices, ('PageType', 'Year', 'Country'), fbEdges, ('FriendType', 'FriendshipAge'),
                {'PageType':4, 'Year':22, 'Country':196, 'FriendType':3, 'FriendshipAge':11})
fbTDC.computeGraphCube()

Cuboid Query

In [234]:
# %%timeit
queryDim = ('PageType', 'FriendType')
vertices, edges = fbTDC.getCuboid(queryDim)

print('Vertices: ')
for v in vertices:
    print(v)
print('\nEdges: ')
for e in edges:
    print(e)

Vertices: 
(('tvshow', '*', '*'), 3327)
(('government', '*', '*'), 6880)
(('company', '*', '*'), 6495)
(('politician', '*', '*'), 5768)

Edges: 
(('tvshow-tvshow', '*', '*', 'Restricted', '*'), 4335)
(('tvshow-tvshow', '*', '*', 'Close Friend', '*'), 4304)
(('government-company', '*', '*', 'Restricted', '*'), 724)
(('government-government', '*', '*', 'Acquaintance', '*'), 27334)
(('politician-politician', '*', '*', 'Restricted', '*'), 12247)
(('tvshow-company', '*', '*', 'Acquaintance', '*'), 401)
(('government-tvshow', '*', '*', 'Restricted', '*'), 211)
(('government-company', '*', '*', 'Close Friend', '*'), 728)
(('government-tvshow', '*', '*', 'Acquaintance', '*'), 186)
(('politician-company', '*', '*', 'Acquaintance', '*'), 87)
(('government-politician', '*', '*', 'Restricted', '*'), 1590)
(('company-company', '*', '*', 'Restricted', '*'), 6739)
(('politician-politician', '*', '*', 'Close Friend', '*'), 12274)
(('politician-tvshow', '*', '*', 'Acquaintance', '*'), 198)
(('politicia

##### Dataframe

In [203]:
fbVerticesDf  = spark.read.csv('./data/facebook_vertices.csv', header=True)

fbEdgesDf  = spark.read.csv('./data/facebook_edges.csv', header=True)

fbVEDf = fbEdgesDf.join(fbVerticesDf , fbVerticesDf.id == fbEdgesDf.id_1).alias('src').join(
        fbVerticesDf.alias('dest'), sf.col('src.id_2') == sf.col('dest.id')
    ).select(
        'src.page_type', 'src.year_established', 'src.country',
        'dest.page_type', 'dest.year_established', 'dest.country',
        'src.friend_type', 'src.years',
    )

fbVCubeDf = fbVerticesDf.cube(fbVerticesDf.columns[1:]).count().orderBy(fbVerticesDf.columns[1:])

fbVECubeDf = (fbVEDf.cube(
    'src.page_type', 'dest.page_type', 
    'src.year_established', 'dest.year_established',
    'src.country', 'dest.country',
    'friend_type', 'years')
    .count())

Cuboid Query

In [204]:
# %%timeit
fbVCubeDf.filter(
    sf.col('page_type').isNotNull() &
    sf.col('year_established').isNull() &
    sf.col('country').isNull()
).show()

fbVECubeDf.filter(
    sf.col('src.page_type').isNotNull() & sf.col('dest.page_type').isNotNull() &
    sf.col('src.year_established').isNull() & sf.col('dest.year_established').isNull() &
    sf.col('src.country').isNull() & sf.col('dest.country').isNull() &
    sf.col('friend_type').isNotNull() & 
    sf.col('years').isNull()
).show()

+----------+----------------+-------+-----+
| page_type|year_established|country|count|
+----------+----------------+-------+-----+
|   company|            NULL|   NULL| 6495|
|government|            NULL|   NULL| 6880|
|politician|            NULL|   NULL| 5768|
|    tvshow|            NULL|   NULL| 3327|
+----------+----------------+-------+-----+

+----------+----------+----------------+----------------+-------+-------+------------+-----+-----+
| page_type| page_type|year_established|year_established|country|country| friend_type|years|count|
+----------+----------+----------------+----------------+-------+-------+------------+-----+-----+
|government|   company|            NULL|            NULL|   NULL|   NULL|  Restricted| NULL|  724|
|   company|politician|            NULL|            NULL|   NULL|   NULL|Close Friend| NULL|  126|
|   company|politician|            NULL|            NULL|   NULL|   NULL|Acquaintance| NULL|  121|
|   company|   company|            NULL|            N

### **Github Dataset**

##### Vertices

In [235]:
ghVertices = spark.sparkContext.textFile("./data/github_vertices.csv")
ghVertices = ghVertices.map(lambda rec: (rec.split(',')[0], tuple(rec.split(',')[1:])))
ghVerticesHeader = ghVertices.first()
ghVertices = ghVertices.filter(lambda rec: rec != ghVerticesHeader)

##### Edges

In [236]:
ghEdges = spark.sparkContext.textFile("./data/github_edges.csv")
ghEdges = ghEdges.map(lambda rec: (rec.split(',')[0], rec.split(',')[1], tuple(rec.split(',')[2:])))
ghEgdesHeader = ghEdges.first()
ghEdges = ghEdges.filter(lambda rec: rec != ghEgdesHeader)

#### **Performance Test**

##### GraphNaive

In [237]:
ghNaiveCube = GraphNaive(ghVertices, ('devDomain', 'gender', 'yearJoined', 'hacktoberfest'), 
                         ghEdges, ('relationshipType',))
ghNaiveCube.computeGraphCube()

Cuboid Query

In [244]:
# %%timeit
queryDim = ('devDomain', 'relationshipType')
vertices, edges = ghNaiveCube.getCuboid(queryDim)

print('Vertices: ')
for v in vertices:
    print(v)
print('\nEdges: ')
for e in edges:
    print(e)

Vertices: 
(('Android', '*', '*', '*'), 4659)
(('DevOps', '*', '*', '*'), 4676)
(('Web', '*', '*', '*'), 4691)
(('Machine Learning', '*', '*', '*'), 4642)
(('Cloud', '*', '*', '*'), 4704)
(('Windows', '*', '*', '*'), 4841)
(('Cybersecurity', '*', '*', '*'), 4690)
(('IOS', '*', '*', '*'), 4797)

Edges: 
(('IOS - Android', '*', '*', '*', 'Friends'), 1302)
(('Machine Learning - Android', '*', '*', '*', 'Colleagues'), 1370)
(('Machine Learning - Android', '*', '*', '*', 'Collaborators'), 1346)
(('Web - Android', '*', '*', '*', 'Colleagues'), 1269)
(('Windows - Cybersecurity', '*', '*', '*', 'Friends'), 1683)
(('Web - Cybersecurity', '*', '*', '*', 'Collaborators'), 1275)
(('Windows - Cybersecurity', '*', '*', '*', 'Colleagues'), 1700)
(('Web - Cybersecurity', '*', '*', '*', 'Colleagues'), 1345)
(('IOS - Cybersecurity', '*', '*', '*', 'Friends'), 1434)
(('Windows - Windows', '*', '*', '*', 'Colleagues'), 2305)
(('Windows - Windows', '*', '*', '*', 'Collaborators'), 2337)
(('IOS - Windows', 

##### GraphTDC

In [239]:
ghTDC = GraphTDC(ghVertices, ('devDomain', 'gender', 'yearJoined', 'hacktoberfest'), ghEdges, ('relationshipType',),
                cardinalityMap={'devDomain':8, 'gender':3, 'yearJoined':15, 'hacktoberfest':2, 'relationshipType':3})
ghTDC.computeGraphCube()

Cuboid Query

In [243]:
# %%timeit
queryDim = ('devDomain', 'relationshipType')
vertices, edges = ghTDC.getCuboid(queryDim)

print('Vertices: ')
for v in vertices:
    print(v)
print('\nEdges: ')
for e in edges:
    print(e)

Vertices: 
(('Android', '*', '*', '*'), 4659)
(('DevOps', '*', '*', '*'), 4676)
(('Web', '*', '*', '*'), 4691)
(('Machine Learning', '*', '*', '*'), 4642)
(('Cloud', '*', '*', '*'), 4704)
(('Windows', '*', '*', '*'), 4841)
(('Cybersecurity', '*', '*', '*'), 4690)
(('IOS', '*', '*', '*'), 4797)

Edges: 
(('Windows-Windows', '*', '*', '*', 'Friends'), 2307)
(('Web-Machine Learning', '*', '*', '*', 'Colleagues'), 1449)
(('DevOps-Machine Learning', '*', '*', '*', 'Friends'), 1440)
(('Cybersecurity-Machine Learning', '*', '*', '*', 'Colleagues'), 1612)
(('IOS-Windows', '*', '*', '*', 'Friends'), 2091)
(('Windows-Windows', '*', '*', '*', 'Collaborators'), 2337)
(('Android-Windows', '*', '*', '*', 'Colleagues'), 1836)
(('Machine Learning-Web', '*', '*', '*', 'Colleagues'), 1357)
(('Web-Web', '*', '*', '*', 'Friends'), 1269)
(('Web-IOS', '*', '*', '*', 'Collaborators'), 1319)
(('Machine Learning-IOS', '*', '*', '*', 'Friends'), 1456)
(('Windows-IOS', '*', '*', '*', 'Colleagues'), 1768)
(('IOS-

##### Dataframe

In [13]:
ghVerticesDf  = spark.read.csv('./data/github_vertices.csv', header=True)

ghEdgesDf  = spark.read.csv('./data/github_edges.csv', header=True)

ghVEDf = ghEdgesDf.join(ghVerticesDf , ghVerticesDf.id == ghEdgesDf.id_1).alias('src').join(
        ghVerticesDf.alias('dest'), sf.col('src.id_2') == sf.col('dest.id')
    ).select(
        'src.dev_domain', 'src.gender', 'src.year_joined', 'src.hacktoberfest',
        'dest.dev_domain', 'dest.gender', 'dest.year_joined', 'dest.hacktoberfest',
        'src.relationship_type',
    )

ghVCubeDf = ghVerticesDf.cube(ghVerticesDf.columns[1:]).count()

ghVECubeDf = (ghVEDf.cube(
        'src.dev_domain', 'dest.dev_domain', 
        'src.gender', 'dest.gender', 
        'src.year_joined', 'dest.year_joined',
        'src.hacktoberfest', 'dest.hacktoberfest',
        'relationship_type',
    ).count())

Cuboid Query

In [14]:
# %%timeit
ghVCubeDf.filter(
    sf.col('dev_domain').isNotNull() &
    sf.col('gender').isNull() &
    sf.col('year_joined').isNull() &
    sf.col('hacktoberfest').isNull()
).show()

ghVECubeDf.filter(
    sf.col('src.dev_domain').isNotNull() & sf.col('dest.dev_domain').isNotNull() &
    sf.col('src.gender').isNull() & sf.col('dest.gender').isNull() &
    sf.col('src.year_joined').isNull() & sf.col('dest.year_joined').isNull() &
    sf.col('src.hacktoberfest').isNull() & sf.col('dest.hacktoberfest').isNull() &
    sf.col('relationship_type').isNotNull()
).show()

+----------------+------+-----------+-------------+-----+
|      dev_domain|gender|year_joined|hacktoberfest|count|
+----------------+------+-----------+-------------+-----+
|           Cloud|  NULL|       NULL|         NULL| 4704|
|             IOS|  NULL|       NULL|         NULL| 4797|
|             Web|  NULL|       NULL|         NULL| 4691|
|         Android|  NULL|       NULL|         NULL| 4659|
|          DevOps|  NULL|       NULL|         NULL| 4676|
|Machine Learning|  NULL|       NULL|         NULL| 4642|
|   Cybersecurity|  NULL|       NULL|         NULL| 4690|
|         Windows|  NULL|       NULL|         NULL| 4841|
+----------------+------+-----------+-------------+-----+

+----------------+----------------+------+------+-----------+-----------+-------------+-------------+-----------------+-----+
|      dev_domain|      dev_domain|gender|gender|year_joined|year_joined|hacktoberfest|hacktoberfest|relationship_type|count|
+----------------+----------------+------+------+--