In [1]:
import org.apache.spark.rdd._
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.graphx._
import scala.sys.process._

# Create Graphs

```
def Graph.apply[VD, ED](
    vertices: RDD[(VertexId, VD)],
    edges: RDD[Edge[ED]],
    defaultVertexAttr: VD = null)
: Graph[VD, ED]

def Graph.fromEdges[VD, ED](
    edges: RDD[Edge[ED]],
    defaultValue: VD)
: Graph[VD, ED]

def Graph.fromEdgeTuples[VD](
    rawEdges: RDD[(VertexId, VertexId)],
    defaultValue: VD,
    uniqueEdges: Option[PartitionStrategy] = None)
: Graph[VD, Int]

def GraphLoader.edgeListFile(
    sc: SparkContext,
    path: String,
    canonicalOrientation: Boolean = false,
    minEdgePartitions: Int = 1)
: Graph[Int, Int]

```

In [2]:
val arr = Seq((1,2), (2,3), (5,6), (6,7)).map{case Tuple2(x,y)=>(x.toLong, y.toLong)}
val edges: RDD[(VertexId, VertexId)] = sc.parallelize(arr)
val graph = Graph.fromEdgeTuples(edges, 0)

arr = List((1,2), (2,3), (5,6), (6,7))
edges = ParallelCollectionRDD[0] at parallelize at <console>:38
graph = org.apache.spark.graphx.impl.GraphImpl@3e4e5bc8


org.apache.spark.graphx.impl.GraphImpl@3e4e5bc8

# Enron

In [3]:
"head -n5 data/emailEnron.txt" !



# Directed graph (each unordered pair of nodes is saved once): Email-Enron.txt 
# Enron email network (edge indicated that email was exchanged, undirected edges)
# Nodes: 36692 Edges: 367662
# FromNodeId	ToNodeId
0	1


0

In [4]:
val path = "file:///home/sergey/Spark_GraphX/data/emailEnron.txt"
val emailGraph = GraphLoader.edgeListFile(sc, path)

path = file:///home/sergey/Spark_GraphX/data/emailEnron.txt
emailGraph = org.apache.spark.graphx.impl.GraphImpl@5d2f9d38


org.apache.spark.graphx.impl.GraphImpl@5d2f9d38

# Bipartite

In [5]:
import scala.io.Source

## Ingredients

In [6]:
Source.
    fromFile("./data/ingr_info.tsv").
    getLines.
    take(5).
    foreach(println)

# id	ingredient name	category
0	magnolia_tripetala	flower
1	calyptranthes_parriculata	plant
2	chamaecyparis_pisifera_oil	plant derivative
3	mackerel	fish/seafood


## Compounds

In [7]:
Source.
    fromFile("./data/comp_info.tsv").
    getLines().
    take(5).
    foreach(println)

# id	Compound name	CAS number
0	jasmone	488-10-8
1	5-methylhexanoic_acid	628-46-6
2	l-glutamine	56-85-9
3	1-methyl-3-methoxy-4-isopropylbenzene	1076-56-8


## Adjacency list

In [8]:
Source.
    fromFile("./data/ingr_comp.tsv").
    getLines().
    take(5).
    foreach(println)

# ingredient id	compound id
1392	906
1259	861
1079	673
22	906


## Build Graph

In [9]:
// https://stackoverflow.com/questions/12705309/scala-case-class-inheritance
sealed trait FNNode {
    def name: String
}
case class Ingredient(name: String, category: String) extends FNNode
case class Compound(name: String, cas: String) extends FNNode

defined trait FNNode
defined class Ingredient
defined class Compound


In [10]:
// class FNNode(val name: String) extends Serializable
// case class Ingredient(override val name: String, category: String) extends FNNode(name)
// case class Compound(override val name: String, cas: String) extends FNNode(name)

Syntax Error.: 

In [11]:
val ingsPath = "file:///home/sergey/Spark_GraphX/data/ingr_info.tsv"
val ingredients: RDD[(VertexId, FNNode)] =
    sc.textFile(ingsPath).
    filter(! _.startsWith("#")).
    map {line =>
            val row = line.split("\\s+")
            (row(0).toLong, Ingredient(row(1), row(2)))
}

val compsPath = "file:///home/sergey/Spark_GraphX/data/comp_info.tsv"
val comps: RDD[(VertexId, FNNode)] =
    sc.textFile(compsPath).
    filter(! _.startsWith("#")).
    map {line =>
            val row = line.split("\\s+")
            (10000L + row(0).toLong, Compound(row(1), row(2)))
}

val linksPath = "file:///home/sergey/Spark_GraphX/data/ingr_comp.tsv"
val links: RDD[Edge[Int]] =
    sc.textFile(linksPath).
    filter(! _.startsWith("#")).
    map {line => 
            val row = line split '\t'
            Edge(row(0).toLong, 10000L + row(1).toLong, 1)
}

val nodes = ingredients ++ comps

val foodNetwork = Graph(nodes, links)

ingsPath = file:///home/sergey/Spark_GraphX/data/ingr_info.tsv
ingredients = MapPartitionsRDD[25] at map at <console>:47
compsPath = file:///home/sergey/Spark_GraphX/data/comp_info.tsv
comps = MapPartitionsRDD[29] at map at <console>:56
linksPath = file:///home/sergey/Spark_GraphX/data/ingr_comp.tsv
links = MapPartitionsRDD[33] at map at <console>:65
nodes = UnionRDD[34] at $plus$plus at <console>:70
foodNetwork = org.apache.spark.graphx.impl.GraphImpl@3b...


org.apache.spark.graphx.impl.GraphImpl@3b...

In [12]:
def showTriplet(t: EdgeTriplet[FNNode,Int]): String = 
    "The ingredient " ++ t.srcAttr.name ++ " contains " ++ t.dstAttr.name

foodNetwork.
    triplets.
    take(5).
    foreach(t => println(showTriplet(t) ))

showTriplet: (t: org.apache.spark.graphx.EdgeTriplet[FNNode,Int])String


The ingredient calyptranthes_parriculata contains citral_(neral)
The ingredient chamaecyparis_pisifera_oil contains undecanoic_acid
The ingredient hyssop contains myrtenyl_acetate
The ingredient hyssop contains 4-(2,6,6-trimethyl-cyclohexa-1,3-dienyl)but-2-en-4-one
The ingredient buchu contains menthol
