# Tutorial : Analyse avec GraphX 

Ce notebook couvre les bases de l’analyse à l’aide de l’API GraphX. Le but de ce tutoriel est de vous montrer comment utiliser l’API GraphX pour réaliser une analyse graphique, en utilisant les données de vélo accessibles depuis le portail de la Bay Area Bike parts, analysant plus précisément l'année 2014 des données.

Le traitement par à l'aide de graphes est un aspect de l’analyse qui s’applique à un grand nombre de cas d’utilisation. Fondamentalement, le traitement et la théorie des graphes reposent sur la définition des relations entre les différents nœuds et les arêtes (arcs). Les nœuds ou sommets sont les unités tandis que les arêtes définissent les relations entre les nœuds. Cela fonctionne très bien pour les algorithmes d'analyse et de gestion de réseau social comme PageRank.

Références supplémentaires

Théorie des graphes sur Wikipedia https://fr.wikipedia.org/wiki/Théorie_des_graphes

PageRank sur Wikipedia https://fr.wikipedia.org/wiki/PageRank


Certains cas d’usage affaires incluent d'étudier les gens les plus importantes (centraux) dans les réseaux sociaux (identifier qui est le plus populaire dans un groupe d’amis), l’importance du classement des articles dans les bibliographies (Quels papiers sont les plus référencés) ou bien encore le classement des pages web.

Nous utiliserons ici des données open data sur les trajets à bicyclettes au états unis.



La première étape consiste à demarrer une session spark. Au passage, on en profite pour importer les librairies spark nécessaires pour importer des données.

In [23]:
import sys.process._
import java.net.URL
import java.io.File

import org.apache.spark.rdd.RDD
import scala.collection.Map

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql.types._
import org.apache.spark.sql._

import scala.collection.immutable.WrappedString;
import scala.collection.Map

// initialisation du contexte spark
val sparkConf = new SparkConf().setMaster("local").setAppName("graph")
val sc = new SparkContext(sparkConf)

sparkConf = org.apache.spark.SparkConf@65c86e7f
sc = org.apache.spark.SparkContext@3dc128fa


org.apache.spark.SparkContext@3dc128fa

La deuxième étape consiste en la lecture des données csv. Ici, la localisation des stations ainsi que les trajets effectués.

Il existe plusieurs méthodes pour lire un fichier. On désire récupérer la structure du tableau. "sqlContext.read" fait le job.

Pour info, on peut obtenir une version RDD en utilisant "sc.textfile".


In [24]:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// lecture du fichiers contenant des info sur les stations
val stations = sqlContext.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("201408_station_data.csv")

// autre façon de lire mais ???
val rdd_station = sc.textFile("201408_station_data.csv")

// lecture du fichier contenat des trajets.
val trips = sqlContext.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("201408_trip_data.csv")

sqlContext = org.apache.spark.sql.SQLContext@648a270
stations = [station_id: int, name: string ... 5 more fields]
rdd_station = 201408_station_data.csv MapPartitionsRDD[10] at textFile at <console>:82
trips = [Trip ID: int, Duration: int ... 9 more fields]




[Trip ID: int, Duration: int ... 9 more fields]

C'est ici que vous effectuez une sélection (ou pas) des colonnes dans vos données.

On peut en profiter pour sélectionner ses colonnes ou bien filtrer ses données

Pour exemple, nous allons sélectionner les 2 premières colones du tableau "stations" et les colonnes "Trip ID", "Start Station", "End Staton" du tableau "trips".

Nous avons également donné en example comment récupérer toutes les colonnes. "sqlContext" permet d'appliquer d'autres fonctions SQL.

In [None]:
val bikeStations_r = stations.select("station_id", "name")
val bikeStations = stations.select("*")
val tripData = trips.select("*")
val tripData_r = trips.select("Trip ID","Start Station", "End Station")


Visualition des données lues.
On affiche la première ligne.

In [4]:
bikeStations.show(1) 
bikeStations_r.show(1)
tripData.show(1)
tripData_r.show(1)

+----------+--------------------+---------+-----------+---------+--------+------------+
|station_id|                name|      lat|       long|dockcount|landmark|installation|
+----------+--------------------+---------+-----------+---------+--------+------------+
|         2|San Jose Diridon ...|37.329732|-121.901782|       27|San Jose|    8/6/2013|
+----------+--------------------+---------+-----------+---------+--------+------------+
only showing top 1 row

+----------+--------------------+
|station_id|                name|
+----------+--------------------+
|         2|San Jose Diridon ...|
+----------+--------------------+
only showing top 1 row

+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+------+---------------+--------+
|Trip ID|Duration|     Start Date|       Start Station|Start Terminal|       End Date|         End Station|End Terminal|Bike #|Subscriber Type|Zip Code|
+-------+--------+---------------+--

Il peut être utile d’examiner la structure des données lues exact pour s’assurer que vous avez les bons types associés avec les colonnes de droite. Dans ce cas, vous n’avez pas fait toute manipulation donc vous n’aurez rien en dehors de la string.

On peut également afficher la structure des données (avec des info sur le type de chaque colonne)

On remarque que "station_id" et "Trip ID" sont des valeurs de type integer.

In [5]:
bikeStations_r.printSchema()

tripData_r.printSchema()

root
 |-- station_id: integer (nullable = true)
 |-- name: string (nullable = true)

root
 |-- Trip ID: integer (nullable = true)
 |-- Start Station: string (nullable = true)
 |-- End Station: string (nullable = true)



Il est temps d'importer GraphX et de créer les graphes.

In [6]:
import org.apache.spark.graphx._

# Construction du graphe

Maintenant que vous avez importé vos données, vous devez construire votre graphe. 

Vous allez construire la structure des sommets (ou nœuds) et la structure des arêtes.
Il faut contruire des RDD, le premier RDD contient les sommets. La structure est la suivante : (id et label) 
Le deuxième contient les arcs et la structure est la suivante : (id_sommet_debut_arc, id_sommet_fin_arc, 1).

Les sommets correspondent aux stations et les arêtes à un trajet entre 2 stations.

Vous avez peut-être remarqué que vous avez des ID de station à l’intérieur de vos données de bikeStations, mais pas à l’intérieur de vos données de tripData. 

Cela complique les choses parce que vous devez vérifier que vous disposez des données numériques pour GraphX. Cela signifie que les sommets doivent être identifiables avec une valeur numérique, pas une valeur de chaîne comme nom de la station. C’est pourquoi vous devez effectuer certaines jointures pour s’assurer que vous avez ces ID associée à chaque voyage.



In [7]:
val justStations = bikeStations
  .selectExpr("float(station_id) as station_id", "name")
  .distinct()

val completeTripData = tripData
  .join(justStations, tripData("Start Station") === bikeStations("name"))
  .withColumnRenamed("station_id", "start_station_id")
  .drop("name")
  .join(justStations, tripData("End Station") === bikeStations("name"))
  .withColumnRenamed("station_id", "end_station_id")
  .drop("name")

completeTripData.show(1)

+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+------+---------------+--------+----------------+--------------+
|Trip ID|Duration|     Start Date|       Start Station|Start Terminal|       End Date|         End Station|End Terminal|Bike #|Subscriber Type|Zip Code|start_station_id|end_station_id|
+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+------+---------------+--------+----------------+--------------+
| 432946|     406|8/31/2014 22:31|Mountain View Cal...|            28|8/31/2014 22:38|Castro Street and...|          32|    17|     Subscriber|   94040|            28.0|          32.0|
+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+------+---------------+--------+----------------+--------------+
only showing top 1 row



justStations = [station_id: float, name: string]
completeTripData = [Trip ID: int, Duration: int ... 11 more fields]


[Trip ID: int, Duration: int ... 11 more fields]

In [8]:
val stations = completeTripData
  .select("start_station_id", "end_station_id")
  .rdd
  .distinct() // helps filter out duplicate trips
  .flatMap(x => Iterable(x(0).asInstanceOf[Number].longValue, x(1).asInstanceOf[Number].longValue)) // helps us maintain types
  .distinct()
  .toDF() // return to a DF to make merging + joining easier

stations.take(1) // this is just a station_id at this point
stations.show(1)

+-----+
|value|
+-----+
|   13|
+-----+
only showing top 1 row



stations = [value: bigint]


[value: bigint]

In [9]:
val stationVertices: RDD[(VertexId, String)] = stations
  .join(justStations, stations("value") === justStations("station_id"))
  .select("station_id", "name")
  .rdd
  .map(row => (row(0).asInstanceOf[Number].longValue, row(1).asInstanceOf[String])) // maintain type information

stationVertices.take(1)

stationVertices.take(1).foreach(println)

(13,St James Park)


stationVertices = MapPartitionsRDD[298] at map at <console>:58


MapPartitionsRDD[298] at map at <console>:58

In [10]:
val stationEdges:RDD[Edge[Long]] = completeTripData
  .select("start_station_id", "end_station_id")
  .rdd
  .map(row => Edge(row(0).asInstanceOf[Number].longValue, row(1).asInstanceOf[Number].longValue, 1))

stationEdges.take(1).foreach(println)

Edge(28,32,1)


stationEdges = MapPartitionsRDD[309] at map at <console>:55


MapPartitionsRDD[309] at map at <console>:55

In [11]:
val defaultStation = ("Missing Station") 
val stationGraph = Graph(stationVertices, stationEdges, defaultStation)
stationGraph.cache()

defaultStation = Missing Station
stationGraph = org.apache.spark.graphx.impl.GraphImpl@5771f592


org.apache.spark.graphx.impl.GraphImpl@5771f592

In [12]:
println("Total Number of Stations: " + stationGraph.numVertices)
println("Total Number of Trips: " + stationGraph.numEdges)
// sanity check
println("Total Number of Trips in Original Data: " + tripData.count)

Total Number of Stations: 67
Total Number of Trips: 163546
Total Number of Trips in Original Data: 171792


In [13]:
val ranks = stationGraph.pageRank(0.0001).vertices
ranks
  .join(stationVertices)
  .sortBy(_._2._1, ascending=false) // sort by the rank
  .take(10) // get the top 10
  .foreach(x => println(x._2._2))

San Francisco Caltrain (Townsend at 4th)
San Jose Diridon Caltrain Station
Mountain View Caltrain Station
Redwood City Caltrain Station
Embarcadero at Sansome
Harry Bridges Plaza (Ferry Building)
Market at Sansome
San Francisco Caltrain 2 (330 Townsend)
2nd at Townsend
University and Emerson


ranks = VertexRDDImpl[1171] at RDD at VertexRDD.scala:57


VertexRDDImpl[1171] at RDD at VertexRDD.scala:57

In [14]:
stationGraph
  .groupEdges((edge1, edge2) => edge1 + edge2)
  .triplets
  .sortBy(_.attr, ascending=false)
  .map(triplet => 
    "There were " + triplet.attr.toString + " trips from " + triplet.srcAttr + " to " + triplet.dstAttr + ".")
  .take(10)
  .foreach(println)

There were 1689 trips from Harry Bridges Plaza (Ferry Building) to Embarcadero at Sansome.
There were 1527 trips from Townsend at 7th to San Francisco Caltrain (Townsend at 4th).
There were 1352 trips from San Francisco Caltrain 2 (330 Townsend) to Townsend at 7th.
There were 1128 trips from Embarcadero at Sansome to Steuart at Market.
There were 1109 trips from 2nd at Townsend to Harry Bridges Plaza (Ferry Building).
There were 1039 trips from 2nd at South Park to Market at Sansome.
There were 1007 trips from Harry Bridges Plaza (Ferry Building) to 2nd at Townsend.
There were 974 trips from Market at 10th to San Francisco Caltrain (Townsend at 4th).
There were 972 trips from Steuart at Market to San Francisco Caltrain (Townsend at 4th).
There were 929 trips from San Francisco Caltrain (Townsend at 4th) to Temporary Transbay Terminal (Howard at Beale).


In [15]:
stationGraph
  .inDegrees // computes in Degrees
  .join(stationVertices)
  .sortBy(_._2._1, ascending=false)
  .take(10)
  .foreach(x => println(x._2._2 + " has " + x._2._1 + " in degrees."))

San Francisco Caltrain (Townsend at 4th) has 16540 in degrees.
Embarcadero at Sansome has 7891 in degrees.
Market at Sansome has 7802 in degrees.
Harry Bridges Plaza (Ferry Building) has 7736 in degrees.
2nd at Townsend has 7335 in degrees.
San Francisco Caltrain 2 (330 Townsend) has 7297 in degrees.
Steuart at Market has 6209 in degrees.
Townsend at 7th has 6033 in degrees.
Temporary Transbay Terminal (Howard at Beale) has 5590 in degrees.
Powell Street BART has 5219 in degrees.


In [16]:
stationGraph
  .outDegrees // out degrees
  .join(stationVertices)
  .sortBy(_._2._1, ascending=false)
  .take(10)
  .foreach(x => println(x._2._2 + " has " + x._2._1 + " out degrees."))

San Francisco Caltrain (Townsend at 4th) has 12703 out degrees.
Harry Bridges Plaza (Ferry Building) has 8157 out degrees.
San Francisco Caltrain 2 (330 Townsend) has 6835 out degrees.
Embarcadero at Sansome has 6811 out degrees.
Market at Sansome has 6749 out degrees.
2nd at Townsend has 6679 out degrees.
Temporary Transbay Terminal (Howard at Beale) has 6298 out degrees.
Steuart at Market has 6159 out degrees.
Townsend at 7th has 5423 out degrees.
Market at 4th has 5141 out degrees.


In [17]:
stationGraph
  .inDegrees
  .join(stationGraph.outDegrees) // join with out Degrees
  .join(stationVertices) // join with our other stations
  .map(x => (x._2._1._1.toDouble/x._2._1._2.toDouble, x._2._2)) // ratio of in to out
  .sortBy(_._1, ascending=false)
  .take(5)
  .foreach(x => println(x._2 + " has a in/out degree ratio of " + x._1))

MLK Library has a in/out degree ratio of 1.9752475247524752
Franklin at Maple has a in/out degree ratio of 1.3863636363636365
San Francisco Caltrain (Townsend at 4th) has a in/out degree ratio of 1.302054632763914
Redwood City Public Library has a in/out degree ratio of 1.2156862745098038
Japantown has a in/out degree ratio of 1.186138613861386


In [18]:
stationGraph
  .inDegrees
  .join(stationGraph.inDegrees) // join with out Degrees
  .join(stationVertices) // join with our other stations
  .map(x => (x._2._1._1.toDouble/x._2._1._2.toDouble, x._2._2)) // ratio of in to out
  .sortBy(_._1)
  .take(5)
  .foreach(x => println(x._2 + " has a in/out degree ratio of " + x._1))

St James Park has a in/out degree ratio of 1.0
Powell Street BART has a in/out degree ratio of 1.0
Palo Alto Caltrain Station has a in/out degree ratio of 1.0
Santa Clara at Almaden has a in/out degree ratio of 1.0
Powell at Post (Union Square) has a in/out degree ratio of 1.0


In [19]:
import org.apache.spark.graphx._
import scala.reflect.ClassTag
def drawGraph[VD:ClassTag,ED:ClassTag](g:Graph[VD,ED]) = {
val u = java.util.UUID.randomUUID
val v = g.vertices.collect.map(_._1)
println("""%html
<div id='a""" + u + """' style='width:960px; height:500px'></div>
<style>
.node circle { fill: gray; }
.node text { font: 10px sans-serif;
             text-anchor: middle;
             fill: white; }
line.link { stroke: gray;
            stroke-width: 1.5px; }
            </style>

<script>
var width = 960, height = 500;
var svg = d3.select("#a""" + u + """").append("svg")
    .attr("width", width).attr("height", height);
var nodes = [""" + v.map("{id:" + _ + "}").mkString(",") + """];
var links = [""" + g.edges.collect.map(
  e => "{source:nodes[" + v.indexWhere(_ == e.srcId) + "],target:nodes[" +
       v.indexWhere(_ == e.dstId) + "]}").mkString(",") + """];
var link = svg.selectAll(".link").data(links);
link.enter().insert("line", ".node").attr("class", "link");
var node = svg.selectAll(".node").data(nodes);
var nodeEnter = node.enter().append("g").attr("class", "node")
nodeEnter.append("circle").attr("r", 8);
nodeEnter.append("text").attr("dy", "0.35em")
         .text(function(d) { return d.id; });
d3.layout.force().linkDistance(50).charge(-200).chargeDistance(300) .friction(0.95).linkStrength(0.5).size([width, height]) .on("tick", function() {
      link.attr("x1", function(d) { return d.source.x; })
          .attr("y1", function(d) { return d.source.y; })
          .attr("x2", function(d) { return d.target.x; })
          .attr("y2", function(d) { return d.target.y; });
      node.attr("transform", function(d) {
        return "translate(" + d.x + "," + d.y + ")";
      });
   }).nodes(nodes).links(links).start();
</script>
""")
}

drawGraph(org.apache.spark.graphx.util.GraphGenerators.rmatGraph(sc,32,60));
 

drawGraph: [VD, ED](g: org.apache.spark.graphx.Graph[VD,ED])(implicit evidence$1: scala.reflect.ClassTag[VD], implicit evidence$2: scala.reflect.ClassTag[ED])Unit


%html
<div id='a679d30da-40f3-4e81-a303-c284ca3ed79c' style='width:960px; height:500px'></div>
<style>
.node circle { fill: gray; }
.node text { font: 10px sans-serif;
             text-anchor: middle;
             fill: white; }
line.link { stroke: gray;
            stroke-width: 1.5px; }
            </style>

<script>
var width = 960, height = 500;
var svg = d3.select("#a679d30da-40f3-4e81-a303-c284ca3ed79c").append("svg")
    .attr("width", width).attr("height", height);
var nodes = [{id:19},{id:21},{id:16},{id:22},{id:25},{id:28},{id:29},{id:30},{id:27},{id:24},{id:23},{id:17},{id:18},{id:20},{id:31},{id:26}];
var links = [{source:nodes[2],target:nodes[2]},{source:nodes[2],target:nodes[0]},{source:nodes[2],target:nodes[13]},{source:nodes[2],target:nodes[3]},{source:nodes[2],target:nodes[15]},{source:nodes[2],target:nodes[5]},{source:nodes[2],target:nodes[6]},{source:nodes[11],target:nodes[2]},{source:nodes[11],target:nodes[11]},{source:nodes[11],target:nodes[12]},{source:nodes[11],

In [20]:
%%javascript
(function(element) {
require(['d3'], function(d3) {
var width = 800, height = 800;
var data = [1, 2, 4, 8, 16, 8, 4, 2, 1]
var svg = d3.select(element.get(0)).append("svg")
    .attr("width", width).attr("height", height);
var nodes = [{id:19},{id:21},{id:16},{id:22},{id:25},{id:28},{id:29},{id:30},{id:27},{id:24},{id:23},{id:17},{id:18},{id:20},{id:31},{id:26}];
var links = [{source:nodes[2],target:nodes[2]},{source:nodes[2],target:nodes[11]},{source:nodes[2],target:nodes[13]},{source:nodes[2],target:nodes[3]},{source:nodes[2],target:nodes[4]},{source:nodes[11],target:nodes[2]},{source:nodes[11],target:nodes[11]},{source:nodes[11],target:nodes[12]},{source:nodes[11],target:nodes[4]},{source:nodes[11],target:nodes[15]},{source:nodes[11],target:nodes[8]},{source:nodes[11],target:nodes[14]},{source:nodes[12],target:nodes[11]},{source:nodes[12],target:nodes[12]},{source:nodes[12],target:nodes[15]},{source:nodes[12],target:nodes[8]},{source:nodes[0],target:nodes[12]},{source:nodes[0],target:nodes[0]},{source:nodes[0],target:nodes[1]},{source:nodes[0],target:nodes[8]},{source:nodes[13],target:nodes[1]},{source:nodes[13],target:nodes[3]},{source:nodes[13],target:nodes[5]},{source:nodes[1],target:nodes[1]},{source:nodes[1],target:nodes[14]},{source:nodes[3],target:nodes[2]},{source:nodes[3],target:nodes[12]},{source:nodes[3],target:nodes[3]},{source:nodes[3],target:nodes[7]},{source:nodes[10],target:nodes[1]},{source:nodes[9],target:nodes[2]},{source:nodes[9],target:nodes[13]},{source:nodes[9],target:nodes[9]},{source:nodes[9],target:nodes[4]},{source:nodes[9],target:nodes[15]},{source:nodes[9],target:nodes[5]},{source:nodes[9],target:nodes[14]},{source:nodes[4],target:nodes[2]},{source:nodes[4],target:nodes[9]},{source:nodes[4],target:nodes[4]},{source:nodes[4],target:nodes[15]},{source:nodes[4],target:nodes[8]},{source:nodes[4],target:nodes[5]},{source:nodes[4],target:nodes[6]},{source:nodes[15],target:nodes[12]},{source:nodes[15],target:nodes[7]},{source:nodes[8],target:nodes[4]},{source:nodes[8],target:nodes[8]},{source:nodes[8],target:nodes[6]},{source:nodes[5],target:nodes[2]},{source:nodes[5],target:nodes[11]},{source:nodes[5],target:nodes[13]},{source:nodes[5],target:nodes[1]},{source:nodes[5],target:nodes[9]},{source:nodes[6],target:nodes[9]},{source:nodes[6],target:nodes[6]},{source:nodes[7],target:nodes[3]},{source:nodes[7],target:nodes[14]},{source:nodes[14],target:nodes[7]},{source:nodes[14],target:nodes[14]}];
   
var link = svg.selectAll(".link").data(links);
link.enter().insert("line", ".node").attr("class", "link");
var node = svg.selectAll(".node").data(nodes);  
var nodeEnter = node.enter().append("g").attr("class", "node")   

    
svg.selectAll('circle')
            .data(data)
            .enter()
            .append('circle')
            .attr("cx", function(d, i) {return 40 * (i + 1);})
            .attr("cy", function(d, i) {return 100 + 30 * (i % 3 - 1);})
            .style("fill", "#1570a4")
            .transition().duration(2000)
            .attr("r", function(d) {return 2*d;})
        ;

    
})
    })(element);

var links = [{source:nodes[2],target:nodes[2]},{source:nodes[2],target:nodes[11]},{source:nodes[2],target:nodes[13]},{source:nodes[2],target:nodes[3]},{source:nodes[2],target:nodes[4]},{source:nodes[11],target:nodes[2]},{source:nodes[11],target:nodes[11]},{source:nodes[11],target:nodes[12]},{source:nodes[11],target:nodes[4]},{source:no...


In [22]:
sc.stop()