In [0]:
%spark
import org.apache.spark.sql.{SaveMode, SparkSession}

// Configuration pour la connexion à Neo4j
val neo4jUrl = "bolt://neo4j:7687"
val neo4jUser = "neo4j"
val neo4jPassword = "bitnami1"

val spark = SparkSession.builder().getOrCreate()

val all = (spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "bolt://neo4j:7687")
  .option("authentication.basic.username", neo4jUser)
  .option("authentication.basic.password", neo4jPassword)
  .option("query", "MATCH (n:Node1) WITH n MATCH (n)-[relation]->(target) WHERE relation.book IS NOT NULL AND relation.weight IS NOT NULL RETURN n.name AS Source, target.name AS Target, TYPE(relation) AS Type, relation.weight AS Weight, relation.book AS Book;")
  .load())
 all.show()

In [1]:
%spark
all.printSchema()

In [2]:
//CREATION DU GRAPH

import org.apache.spark.sql.Row
import org.apache.spark.graphx.{Graph, Edge, VertexId}
import org.apache.spark.rdd.RDD

// Let's create the vertex RDD.
val vertices: RDD[(VertexId, String)] = all
  .selectExpr("explode(array(Target, Source)) as vertex")
  .distinct
  .rdd
  .map(_.getAs[String](0))
  .zipWithIndex
  .map(_.swap)

// Now let's define a vertex dataframe because joins are clearer in Spark SQL
val vertexDf = vertices.toDF("id", "node")

// And let's extract the edges and join their vertices with their respective IDs
val edges: RDD[Edge[(String, Long, Long)]] = all
  .join(vertexDf, all("Source") === vertexDf("node"))
  .select(all("Type"), all("Target"), all("Weight").alias("Weight"), all("Book").alias("Book"), vertexDf("id").alias("idS"))
  .join(vertexDf, all("Target") === vertexDf("node"))
  .select("idS", "id", "Type", "Weight", "Book")
  .rdd
  .map { row =>
    Edge(
      row.getAs[Long]("idS"),
      row.getAs[Long]("id"),
      (row.getAs[String]("Type"), row.getAs[Long]("Weight"), row.getAs[Long]("Book"))
    )
  }

// And finally
val graph: Graph[Unit, (String, Long, Long)] = Graph.fromEdges(edges, ())


In [3]:
//AFFICHAGE DES NODES ET RELATIONS

// Display the first 10 vertices
println("First 10 Vertices:")
graph.vertices.take(10).foreach(println)

// Display the first 10 edges
println("First 10 Edges:")
graph.edges.take(10).foreach(println)


In [4]:
//MANIPULATION DU GRAPH GRAPHX

// Nombre de nœuds et d'arêtes
val numVertices = graph.numVertices
val numEdges = graph.numEdges

// Degré moyen, degré maximum et minimum des nœuds
val avgDegree = graph.degrees.map(_._2).mean()
val maxDegree = graph.degrees.map(_._2).max()
val minDegree = graph.degrees.map(_._2).min()

// Statistiques des poids et des livres
val weightStats = graph.edges.map(_.attr._2).stats()
val bookStats = graph.edges.map(_.attr._3).stats()

// Affichage des résultats
println(s"Nombre de nœuds: $numVertices")
println(s"Nombre d'arêtes: $numEdges")
println(s"Degré moyen: $avgDegree")
println(s"Degré maximum: $maxDegree")
println(s"Degré minimum: $minDegree")

println("Statistiques des poids:")
println(weightStats)

println("Statistiques des livres:")
println(bookStats)


In [5]:
//calcule la distribution des degrés des nœuds dans un graphe

val degreeDistribution = graph.degrees.map{ case (vertexId, degree) => (degree, vertexId) }.countByValue()



// Display degree distribution with names in descending order
println("Top 20 Degree Distribution with Names (Descending Order):")
degreeDistribution.toSeq.sortBy { case ((degree, vertexId), count) => -degree }.take(20).foreach { case ((degree, vertexId), count) =>
  val vertexName = vertices.lookup(vertexId).headOption.getOrElse("UnknownVertex")
  println(s"Vertex $vertexName, Degree $degree: $count nodes")
}


In [6]:
//1.PAGE RANK ALGO

import org.apache.spark.graphx.lib.PageRank
import org.apache.spark.sql.types.{StructType, StructField, DoubleType, StringType}

// Run PageRank
val pageRankGraph = PageRank.run(graph, numIter = 10)

val schema = StructType(Seq(
  StructField("VertexName", StringType, nullable = false),
  StructField("PageRank", DoubleType, nullable = false)
))

// Create an empty DataFrame with the defined schema
var pageRankDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)

// Get the vertices with their PageRank scores
val pageRanks: RDD[(VertexId, Double)] = pageRankGraph.vertices

// Display the top 10 vertices with their PageRank scores in descending order
println("Top 10 Vertices with PageRank Scores (Descending Order):")
pageRanks.sortBy(_._2, ascending = false).take(10).foreach { case (vertexId, pageRank) =>
  val vertexName = vertices.lookup(vertexId).headOption.getOrElse("UnknownVertex")
  println(s"Vertex $vertexName, PageRank: $pageRank")
  // Append data to the DataFrame
  val newData = Seq((vertexName, pageRank))
  val newRow = spark.createDataFrame(newData).toDF("VertexName", "PageRank")
  pageRankDF = pageRankDF.union(newRow)
}

println("--------------------------------------------------------")
pageRankDF.show()


In [7]:
//2.ConnectedComponents Algo

import org.apache.spark.graphx.{Graph, VertexId}
import org.apache.spark.graphx.lib.ConnectedComponents
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}

// Assuming you already have 'spark' as your SparkSession

// Define the schema for connected components DataFrame
val schema = StructType(Seq(
  StructField("VertexName", StringType, nullable = false),
  StructField("ConnectedComponent", LongType, nullable = false)
))

// Create an empty DataFrame with the defined schema
var connectedComponentsDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)

// Run ConnectedComponents
val connectedComponentsGraph = ConnectedComponents.run(graph)

// Get the vertices with their connected component identifiers
val components = connectedComponentsGraph.vertices.collect()

// Display the connected component of each vertex and create DataFrame
println("Connected Components:")
components.foreach { case (vertexId, componentId) =>
  val vertexName = vertices.lookup(vertexId).headOption.getOrElse("UnknownVertex")
  println(s"Vertex $vertexName, Connected Component: $componentId")
  // Append data to the DataFrame
  val newData = Seq((vertexName, componentId))
  val newRow = spark.createDataFrame(newData).toDF("VertexName", "ConnectedComponent")
  connectedComponentsDF = connectedComponentsDF.union(newRow)
}

println("--------------------------------------------------------")
connectedComponentsDF.show()


In [8]:
//3.Label Propagation


import org.apache.spark.graphx.{Graph, VertexId}
import org.apache.spark.graphx.lib.LabelPropagation

val schema = StructType(Seq(
  StructField("VertexName", StringType, nullable = false),
  StructField("communityLabel", DoubleType, nullable = false)
))

// Create an empty DataFrame with the defined schema
var communityDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)

// Run Label Propagation with a reasonable number of maxSteps
val maxSteps = 10
val labeledGraph = LabelPropagation.run(graph, maxSteps)

// Get the vertices with their final community labels
val communities = labeledGraph.vertices

// Display the top 20 vertices with their community labels
println("Top 20 Vertices with Community Labels:")
communities.sortBy(_._2, ascending = false).take(20).foreach { case (vertexId, communityLabel) =>
  val vertexName = vertices.lookup(vertexId).headOption.getOrElse("UnknownVertex")
  println(s"Vertex $vertexName, Community Label: $communityLabel")
  // Append data to the DataFrame
  val newData = Seq((vertexName, communityLabel))
  val newRow = spark.createDataFrame(newData).toDF("VertexName", "communityLabel")
  communityDF = communityDF.union(newRow)
}

println("--------------------------------------------------------")
communityDF.show()



In [9]:
//4.TRIANGLECOUNT


import org.apache.spark.graphx._
import org.apache.spark.graphx.lib.TriangleCount
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

// Assuming 'graph' is your original graph

// Define the schema for the triangle count DataFrame
val schema = StructType(Seq(
  StructField("VertexName", StringType, nullable = false),
  StructField("triangleCount", DoubleType, nullable = false)
))

// Create an empty DataFrame with the defined schema
var triangleCountDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)

// Run TriangleCount algorithm on the graph
val triangleCountGraph: Graph[Int, (String, Long, Long)] = TriangleCount.run(graph)

// Get the vertices with their triangle counts
val triangleCounts = triangleCountGraph.vertices

// Display the top 20 vertices with their triangle counts
println("Top 20 Vertices with Triangle Counts:")
triangleCounts.sortBy(_._2, ascending = false).take(20).foreach { case (vertexId, triangleCount) =>
  val vertexName = vertices.lookup(vertexId).headOption.getOrElse("UnknownVertex")
  println(s"Vertex $vertexName, Triangle Count: $triangleCount")
  // Append data to the DataFrame
  val newData = Seq((vertexName, triangleCount.toDouble))
  val newRow = spark.createDataFrame(newData).toDF("VertexName", "triangleCount")
  triangleCountDF = triangleCountDF.union(newRow)
}

println("--------------------------------------------------------")
triangleCountDF.show()


In [10]:
%spark
//5.STRONGLY CONNECTED


import org.apache.spark.graphx._
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StructType, StructField, StringType, LongType}

// Assuming 'graph' is your original graph

// Run stronglyConnectedComponents algorithm
val stronglyConnectedComponents: Graph[VertexId, (String, Long, Long)] = graph.stronglyConnectedComponents(5)

// Define the schema for the strongly connected components DataFrame
val schema = StructType(Seq(
  StructField("VertexName", StringType, nullable = false),
  StructField("componentId", LongType, nullable = false),
  StructField("componentName", StringType, nullable = false)
))

// Convert 'vertices' RDD to a DataFrame
val verticesDF = vertices.toDF("VertexId", "VertexName")

// Get the vertices with their component IDs
val componentVertices = stronglyConnectedComponents.vertices.collect()

// Get the mapping between VertexId and VertexName from the 'vertices' DataFrame
val vertexMapping = verticesDF.select("VertexId", "VertexName").as[(Long, String)].collect.toMap

// Create a sequence of rows for DataFrame creation
val rows = componentVertices.map { case (vertexId, componentId) =>
  val vertexName = vertexMapping.getOrElse(vertexId, "UnknownVertex")
  val componentName = vertexMapping.getOrElse(componentId, "UnknownComponent")
  Row(vertexName, componentId, componentName)
}

// Create the DataFrame once using the sequence of rows
val sccDF = spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)

println("Vertices with Strongly Connected Components:")
sccDF.show()



In [11]:
%spark
sccDF.coalesce(1).write.csv("/zeppelin/notebook/StronglyConnected.csv")
triangleCountDF.coalesce(1).write.csv("/zeppelin/notebook/TriangleCount.csv")
connectedComponentsDF.coalesce(1).write.csv("/zeppelin/notebook/connectedCompo.csv")
communityDF.coalesce(1).write.csv("/zeppelin/notebook/labelPropa.csv")
pageRankDF.coalesce(1).write.csv("/zeppelin/notebook/pageRankDF.csv")
