Skip to content

Commit

Permalink
Added comment
Browse files Browse the repository at this point in the history
  • Loading branch information
Supriya committed Feb 23, 2016
1 parent 3a5395a commit e1a9c6a
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 10 deletions.
10 changes: 4 additions & 6 deletions src/main/scala/com/rklick/graphx/context/GraphXProcess.scala
Expand Up @@ -19,6 +19,7 @@ trait GraphXProcess {


/**
* Create edges from dataframe
*
* @param df
* @param graph
Expand All @@ -35,6 +36,7 @@ trait GraphXProcess {
}

/**
* Create vertices from dataframe
*
* @param df The DataFrame
* @param graph The domain.GraphComponent
Expand All @@ -48,6 +50,7 @@ trait GraphXProcess {
}

/**
* Process graph
*
* @param df The DataFrame
* @param graphComponent The domain.GraphComponent
Expand All @@ -67,6 +70,7 @@ trait GraphXProcess {
}

/**
* Apply graph algorithm & find the output
*
* @param graph
* @param primaryIndex
Expand All @@ -75,20 +79,16 @@ trait GraphXProcess {
def applyGraphAlgorithm(graph: Graph[String, String], primaryIndex: String): Either[String, GraphRDD] = {
try {
//Page rank Algorithm
//val pageRankStart = System.currentTimeMillis()
val pageRankGraph = PageRank.run(graph, 1, 0.001)
//println(s"End Page Rank GraphX at :::::${System.currentTimeMillis() - pageRankStart}") */
// Page Rank join with graph object
val graphWithPageRank = graph.outerJoinVertices(pageRankGraph.vertices) {
case (id, attr, Some(pr)) => (pr, attr)
case (id, attr, None) => (0.0, attr)
}

//Triangle Count Algorithm
//val triangleStart = System.currentTimeMillis()
val triangleComponent = graph.partitionBy(PartitionStrategy.RandomVertexCut)
.triangleCount().vertices
//println(s"End triangleComponent GraphX at :::::${System.currentTimeMillis() - triangleStart}") */

//Triangle Count join with page rank graph object
val triByGraph = graphWithPageRank.outerJoinVertices(triangleComponent) {
Expand All @@ -97,9 +97,7 @@ trait GraphXProcess {
}

//Connected Component Algorithm
//val connectedStart = System.currentTimeMillis()
val cComponent = ConnectedComponents.run(graph).vertices
//println(s"End cComponent GraphX at :::::${System.currentTimeMillis() - connectedStart}")
//Connected Component join with triangle component graph object
val ccByGraph = triByGraph.outerJoinVertices(cComponent) {
case (id, (rank, tri, attr), Some(cc)) => (rank, tri, cc, attr)
Expand Down
8 changes: 8 additions & 0 deletions src/main/scala/com/rklick/graphx/process/GraphTest.scala
@@ -0,0 +1,8 @@
package com.rklick.graphx.process

/**
* Created by supriya on 23/2/16.
*/
object GraphTest {

}
Expand Up @@ -12,7 +12,7 @@ import scala.util.{Left, Right}
*/
object GraphXMainProcess extends GraphXProcess {

val sqlContext = SparkCommon.sparkSqlContext
val sqlContext = SparkCommon.sqlContext

/**
*
Expand All @@ -24,9 +24,9 @@ object GraphXMainProcess extends GraphXProcess {
val df = sqlContext.read.format("csv").options(SCHEMA_OPTIONS).load(path)
val graphComponent = GraphComponent("ID", "User", "Relationship", "RelationId")
processGraph(df, graphComponent) match {
case Right(data) => logger.info(s"GraphX process successfully completed.")
case Right(data) => println(s"GraphX process successfully completed.")
sqlContext.sparkContext.stop()
case Left(error) => logger.error(s"Error during graphX:::${error}")
case Left(error) => println(s"Error during graphX:::${error}")
sqlContext.sparkContext.stop()
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/rklick/graphx/utils/SparkCommon.scala
Expand Up @@ -15,6 +15,6 @@ object SparkCommon {
}

lazy val sparkContext = new SparkContext(conf)
lazy val sparkSqlContext = SQLContext.getOrCreate(sparkContext)
lazy val sqlContext = SQLContext.getOrCreate(sparkContext)

}

0 comments on commit e1a9c6a

Please sign in to comment.