Laen sisse test data

Teen selle testimise kasutades juhendeid siin: https://docs.databricks.com/spark/latest/graph-analysis/graph-analysis-graphx-tutorial.html

In [1]:
val stationsDF = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/home/jovyan/data/stations/201508_station_data.csv")

stationsDF = [station_id: int, name: string ... 5 more fields]


[station_id: int, name: string ... 5 more fields]

In [4]:
stationsDF.count()

70

In [5]:
val tripsDF = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/home/jovyan/data/stations/201508_trip_data.csv")

tripsDF = [Trip ID: int, Duration: int ... 9 more fields]


[Trip ID: int, Duration: int ... 9 more fields]

In [9]:
tripsDF

[Trip ID: int, Duration: int ... 9 more fields]

Importing graph stuff from graphX

In [10]:
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD


In [11]:
val justStations = stationsDF.selectExpr("float(station_id) as station_id", "name")

justStations = [station_id: float, name: string]


[station_id: float, name: string]

In [14]:
val completeTripData = tripsDF.
  join(justStations, tripsDF("Start Station") === stationsDF("name")).
  withColumnRenamed("station_id","start_station_id").
  drop("name").
  join(justStations, tripsDF("End Station") === stationsDF("name")).
  withColumnRenamed("station_id","end_station_id").
  drop("name")

completeTripData = [Trip ID: int, Duration: int ... 11 more fields]


[Trip ID: int, Duration: int ... 11 more fields]

In [17]:
val stations = completeTripData.select("start_station_id","end_station_id")
  .rdd
  .distinct()
  .flatMap(x => Iterable(x(0).asInstanceOf[Number].longValue, x(1).asInstanceOf[Number].longValue)) // helps us maintain types
  .distinct()
  .toDF()

stations = [value: bigint]


[value: bigint]

In [19]:
val stationVertices: RDD[(VertexId, String)] = stations
  .join(justStations, stations("value") === justStations("station_id"))
  .select("station_id", "name")
  .rdd
  .map(row => (row(0).asInstanceOf[Number].longValue, row(1).asInstanceOf[String])) // maintain type information

stationVertices.take(1)

stationVertices = MapPartitionsRDD[70] at map at <console>:45


Array((56,Beale at Market))

In [20]:
val stationEdges:RDD[Edge[Long]] = completeTripData
  .select("start_station_id", "end_station_id")
  .rdd
  .map(row => Edge(row(0).asInstanceOf[Number].longValue, row(1).asInstanceOf[Number].longValue, 1))

stationEdges = MapPartitionsRDD[78] at map at <console>:42


MapPartitionsRDD[78] at map at <console>:42

In [21]:
val defaultStation = ("Missing Station")

defaultStation = Missing Station


Missing Station

In [22]:
val stationGraph = Graph(stationVertices, stationEdges, defaultStation)

stationGraph = org.apache.spark.graphx.impl.GraphImpl@266be6d4


org.apache.spark.graphx.impl.GraphImpl@266be6d4

In [23]:
stationGraph.cache()

org.apache.spark.graphx.impl.GraphImpl@266be6d4

In [25]:
println("Total Number of Stations: " + stationGraph.numVertices)
println("Total Number of Trips: " + stationGraph.numEdges)
// sanity check
println("Total Number of Trips in Original Data: " + tripsDF.count)

Total Number of Stations: 68
Total Number of Trips: 339030
Total Number of Trips in Original Data: 354152


# PageRank-i jooksutamine

In [None]:
val ranks = stationGraph.pageRank(0.0001).vertices

Most importantly ranked stations

In [40]:
ranks.sortBy(_._2, ascending=false).take(10)

Array((2,3.95332475313009), (70,3.2708212384471804), (28,2.5131577090624435), (22,2.2289235741998996), (69,2.17767954644027), (50,1.781452831653251), (61,1.544284802694958), (4,1.5342586021027171), (65,1.5309106362489386), (60,1.5045403211302477))

In [44]:
ranks
  .join(stationVertices)
  .sortBy(_._2._1, ascending=false) // sort by the rank
  .take(10) // get the top 10
  .foreach(x => println("Station: " + x._2._2 + "\t\t Rank: " + x._2._1))

Station: San Jose Diridon Caltrain Station		 Rank: 3.95332475313009
Station: San Francisco Caltrain (Townsend at 4th)		 Rank: 3.2708212384471804
Station: Mountain View Caltrain Station		 Rank: 2.5131577090624435
Station: Redwood City Caltrain Station		 Rank: 2.2289235741998996
Station: San Francisco Caltrain 2 (330 Townsend)		 Rank: 2.17767954644027
Station: Harry Bridges Plaza (Ferry Building)		 Rank: 1.781452831653251
Station: 2nd at Townsend		 Rank: 1.544284802694958
Station: Santa Clara at Almaden		 Rank: 1.5342586021027171
Station: Townsend at 7th		 Rank: 1.5309106362489386
Station: Embarcadero at Sansome		 Rank: 1.5045403211302477


Most common trips between station to station

In [28]:
stationGraph
  .groupEdges((edge1, edge2) => edge1 + edge2)
  .triplets
  .sortBy(_.attr, ascending=false)
  .map(triplet =>
    "There were " + triplet.attr.toString + " trips from " + triplet.srcAttr + " to " + triplet.dstAttr + ".")
  .take(10)
  .foreach(println)

There were 614 trips from San Francisco Caltrain 2 (330 Townsend) to Townsend at 7th.
There were 584 trips from Townsend at 7th to San Francisco Caltrain 2 (330 Townsend).
There were 573 trips from San Francisco Caltrain 2 (330 Townsend) to Townsend at 7th.
There were 555 trips from Townsend at 7th to San Francisco Caltrain 2 (330 Townsend).
There were 555 trips from San Francisco Caltrain 2 (330 Townsend) to Townsend at 7th.
There were 550 trips from Townsend at 7th to San Francisco Caltrain (Townsend at 4th).
There were 525 trips from Townsend at 7th to San Francisco Caltrain 2 (330 Townsend).
There were 513 trips from Townsend at 7th to San Francisco Caltrain (Townsend at 4th).
There were 511 trips from Harry Bridges Plaza (Ferry Building) to Embarcadero at Sansome.
There were 506 trips from San Francisco Caltrain 2 (330 Townsend) to Townsend at 7th.


In and out degrees

In [30]:
stationGraph
    .inDegrees
    .join(stationVertices)
    .sortBy(_._2._1, ascending=false)
    .take(10)
    .foreach(x => println(x._2._2 + " has " + x._2._1 + " in degrees."))

San Francisco Caltrain (Townsend at 4th) has 34166 in degrees.
San Francisco Caltrain 2 (330 Townsend) has 22109 in degrees.
Harry Bridges Plaza (Ferry Building) has 17403 in degrees.
2nd at Townsend has 15279 in degrees.
Townsend at 7th has 15126 in degrees.
Embarcadero at Sansome has 14705 in degrees.
Market at Sansome has 13778 in degrees.
Steuart at Market has 13383 in degrees.
Temporary Transbay Terminal (Howard at Beale) has 12748 in degrees.
Market at 10th has 9988 in degrees.


In [31]:
stationGraph
  .outDegrees // out degrees
  .join(stationVertices)
  .sortBy(_._2._1, ascending=false)
  .take(10)
  .foreach(x => println(x._2._2 + " has " + x._2._1 + " out degrees."))

San Francisco Caltrain (Townsend at 4th) has 25631 out degrees.
San Francisco Caltrain 2 (330 Townsend) has 21245 out degrees.
Harry Bridges Plaza (Ferry Building) has 16917 out degrees.
Temporary Transbay Terminal (Howard at Beale) has 14242 out degrees.
Embarcadero at Sansome has 13885 out degrees.
2nd at Townsend has 13746 out degrees.
Steuart at Market has 13478 out degrees.
Townsend at 7th has 13465 out degrees.
Market at 10th has 11449 out degrees.
Market at Sansome has 11317 out degrees.


Highest in/out degree ratio stations

In [32]:
stationGraph
  .inDegrees
  .join(stationGraph.outDegrees) // join with out Degrees
  .join(stationVertices) // join with your other stations
  .map(x => (x._2._1._1.toDouble/x._2._1._2.toDouble, x._2._2)) // ratio of in to out
  .sortBy(_._1, ascending=false)
  .take(5)
  .foreach(x => println(x._2 + " has a in/out degree ratio of " + x._1))

Redwood City Medical Center has a in/out degree ratio of 1.5333333333333334
San Mateo County Center has a in/out degree ratio of 1.4724409448818898
SJSU 4th at San Carlos has a in/out degree ratio of 1.3621052631578947
San Francisco Caltrain (Townsend at 4th) has a in/out degree ratio of 1.3329952011236395
Paseo de San Antonio has a in/out degree ratio of 1.2535046728971964


Lowest in/out degree ratio stations

In [33]:
stationGraph
  .inDegrees
  .join(stationGraph.inDegrees) // join with out Degrees
  .join(stationVertices) // join with your other stations
  .map(x => (x._2._1._1.toDouble/x._2._1._2.toDouble, x._2._2)) // ratio of in to out
  .sortBy(_._1)
  .take(5)
  .foreach(x => println(x._2 + " has a in/out degree ratio of " + x._1))


Beale at Market has a in/out degree ratio of 1.0
SJSU - San Salvador at 9th has a in/out degree ratio of 1.0
Santa Clara County Civic Center has a in/out degree ratio of 1.0
Embarcadero at Vallejo has a in/out degree ratio of 1.0
Castro Street and El Camino Real has a in/out degree ratio of 1.0
