In [None]:
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrame

//val spark = SparkSession.builder.appName("Simple Application").getOrCreate()  
//spark context is already availiable in the running environment
//read in station dataset from csv file and register it as a table  
val station = spark.read.format("csv")
         .option("header", "true") //first line in file has header
         .option("mode", "DROPMALFORMED")
         .load("./station.csv")
station.registerTempTable("station")
val bikeStations = spark.sqlContext.sql("SELECT * FROM station") 
bikeStations.printSchema()

//read in trip dataset from csv file and register it as a table  
val trip = spark.read.format("csv")
         .option("header", "true") //first line in file has header
         .option("mode", "DROPMALFORMED")
         .load("./trip.csv")
trip.registerTempTable("trip")
val tripData = spark.sqlContext.sql("SELECT * FROM trip")
tripData.printSchema()

//make sure each station occur only ones as stations will be the nodes
val justStations = bikeStations
  .selectExpr("float(id) as station_id", "name")
  .distinct()

//station ids  
val stations = tripData
  .select("start_station_id").withColumnRenamed("start_station_id", "station_id")
  .union(tripData.select("end_station_id").withColumnRenamed("end_station_id", "station_id"))
  .distinct()
  .select(col("station_id").cast("long").alias("value"))

stations.take(1) // this is just a station_id

//create set of vertices with properties
val stationVertices: RDD[(VertexId, String)] = stations
  .join(justStations, stations("value") === justStations("station_id"))
  .select(col("station_id").cast("long"), col("name"))
  .rdd
  .map(row => (row.getLong(0), row.getString(1))) 

stationVertices.take(1)

//create trip edges
val stationEdges:RDD[Edge[Long]] = tripData
  .select(col("start_station_id").cast("long"), col("end_station_id").cast("long"))
  .rdd
  .map(row => Edge(row.getLong(0), row.getLong(1), 1))
//add dummy value of 1
stationEdges.take(1)

//build a graph
val defaultStation = ("Missing Station") 
val stationGraph = Graph(stationVertices, stationEdges, defaultStation)
stationGraph.cache()

// sanity check      
println("Total Number of Stations: " + stationGraph.numVertices)
println("Total Number of Trips: " + stationGraph.numEdges)
println("Total Number of Trips in Original Data: " + tripData.count)    

In [None]:
// Most common distinations from location to location
// Demonstrate usege of the triplets 
stationGraph
    .groupEdges((edge1, edge2) => edge1 + edge2)
    .triplets.sortBy(_.attr, ascending=false)
    .map(triplet => "There were " + triplet.attr.toString + " trips fom " + triplet.srcAttr + " to " + triplet.dstAttr + "." )
    .take(10)
    .foreach(println)

In [None]:
// as directed graph is used in- and out-degrees of the vertices can be calculated
// it canbe interpreted as number of trips that arrive on a particular station or departure from it
stationGraph
  .inDegrees // computes in-degree
  .join(stationVertices)
  .sortBy(_._2._1, ascending=false)
  .take(10)
  .foreach(x => println(x._2._2 + " has " + x._2._1 + " in degrees."))

stationGraph
  .outDegrees // out-degree
  .join(stationVertices)
  .sortBy(_._2._1, ascending=false)
  .take(10)
  .foreach(x => println(x._2._2 + " has " + x._2._1 + " out degrees."))

In [None]:
// PageRank
// returns 10 most significant stations
val ranks = stationGraph.pageRank(0.0001).vertices
ranks.join(stationVertices).sortBy(_._2._1, ascending=false).take(10).foreach(x => println(x._2._2))