|
| 1 | +/* |
| 2 | +#----------------------------------------------------- |
| 3 | +# This program |
| 4 | +# |
| 5 | +# 1. Builds a graph using GraphFrames package. |
| 6 | +# |
| 7 | +# 2. Applies Breadth First Search (BFS) algorithm. |
| 8 | +# Breadth-First Search (BFS) finds the shortest |
| 9 | +# path(s) from one vertex (or a set of vertices) |
| 10 | +# to another vertex (or a set of vertices). The |
| 11 | +# beginning and end vertices are specified as Spark |
| 12 | +# DataFrame expressions. |
| 13 | +#------------------------------------------------------ |
| 14 | +# Input Parameters: |
| 15 | +# none |
| 16 | +#------------------------------------------------------- |
| 17 | +# @author Sanjay Bheemasenarao |
| 18 | +#------------------------------------------------------- |
| 19 | +*/ |
| 20 | + |
1 | 21 | package org.data.algorithms.spark.ch06
|
2 | 22 |
|
| 23 | +import org.apache.spark.sql.SparkSession |
| 24 | +import org.graphframes._ |
| 25 | + |
3 | 26 | object BreadthFirstSearchExample {
|
| 27 | + def main(args: Array[String]): Unit = { |
| 28 | + |
| 29 | + // create an instance of SparkSession |
| 30 | + // spark is an instance of a SparkSession |
| 31 | + val spark = SparkSession |
| 32 | + .builder |
| 33 | + .appName("breadth_first_search_example").master("local[*]") |
| 34 | + .getOrCreate() |
| 35 | + |
| 36 | + // Step-1: create vertices: |
| 37 | + val vertices = List(("a", "Alice", 30), |
| 38 | + ("b", "Bob", 31), |
| 39 | + ("c", "Charlie", 32), |
| 40 | + ("d", "David", 23), |
| 41 | + ("e", "Emma", 24), |
| 42 | + ("f", "Frank", 26)) |
| 43 | + |
| 44 | + import spark.implicits._ |
| 45 | + val v = spark.createDataset(vertices).toDF("id", "name", "age") |
| 46 | + v.show() |
| 47 | + |
| 48 | + //Step-2: Create an Edge DataFrame with "src" and "dst" |
| 49 | + val edges = List(("a", "b", "follow"), |
| 50 | + ("b", "c", "follow"), |
| 51 | + ("c", "d", "follow"), |
| 52 | + ("d", "e", "follow"), |
| 53 | + ("b", "e", "follow"), |
| 54 | + ("c", "e", "follow"), |
| 55 | + ("e", "f", "follow")) |
| 56 | + |
| 57 | + val e = spark.createDataset(edges).toDF("src","dst","relationship") |
| 58 | + e.show() |
| 59 | + /* |
| 60 | + +---+---+------------+ |
| 61 | + |src|dst|relationship| |
| 62 | + +---+---+------------+ |
| 63 | + | a| b| follow| |
| 64 | + | b| c| follow| |
| 65 | + | c| d| follow| |
| 66 | + | d| e| follow| |
| 67 | + | b| e| follow| |
| 68 | + | c| e| follow| |
| 69 | + | e| f| follow| |
| 70 | + +---+---+------------+ |
| 71 | + */ |
| 72 | + |
| 73 | + /* |
| 74 | + Step-3: Create a GraphFrame. Using GraphFrames API, a graph |
| 75 | + is built as an instance of a GraphFrame, which is a pair of |
| 76 | + vertices (as `v`) and edges (as `e`): |
| 77 | + */ |
| 78 | + val graph = GraphFrame(v, e) |
| 79 | + print("graph=", graph) |
| 80 | + |
| 81 | + // GraphFrame(v:sql.DataFrame, e:sql.DataFrame) |
| 82 | + |
| 83 | + /* |
| 84 | + ============== |
| 85 | + BFS Algorithm |
| 86 | + ============== |
| 87 | + The following code snippets uses BFS to find path between |
| 88 | + vertex with name "Alice" to a vertex with age < 27. |
| 89 | +
|
| 90 | + Search from "Alice" for users of age < 27. |
| 91 | + */ |
| 92 | + |
| 93 | + val paths = graph.bfs.fromExpr("name = 'Alice'").toExpr("age > 30").run() |
| 94 | + paths.show() |
| 95 | + |
| 96 | + // Specify edge filters or max path lengths. |
| 97 | + val paths2 = graph.bfs.fromExpr("name = 'Alice'").toExpr("age > 30").edgeFilter("relationship == 'follow'").maxPathLength(4).run() |
| 98 | + paths2.show() |
4 | 99 |
|
| 100 | + //done! |
| 101 | + spark.stop() |
| 102 | + } |
5 | 103 | }
|
0 commit comments