# Cypher on Dataproc Jupyter Notebook
## Prerequisite
### 1.0 Dataproc Setup
Create a spark 3.0 dataproc with below reference cmd on gcloud, dont use the latest spark image 

### 1.1 Spark Packages setup - Spark connector
```bash
hadoop fs -copyToLocal gs://dataproc-staging-us-central1-185692258849-ipivnxza/notebooks/jupyter/spark3/spark-bigquery-with-dependencies_2.12-0.18.0.jar /usr/local/share/google/dataproc/lib/
```

### 1.2 Spark Packages setup - Cypher package
```bash
hadoop fs -copyToLocal gs://dataproc-staging-us-central1-185692258849-ipivnxza/notebooks/jupyter/spark3/morpheus-spark-cypher-0.4.2-all.jar /usr/lib/spark/jars/ 
```

In [2]:
!hadoop fs -copyToLocal gs://dataproc-staging-us-central1-185692258849-ipivnxza/notebooks/jupyter/spark3/spark-bigquery-with-dependencies_2.12-0.18.0.jar /usr/local/share/google/dataproc/lib/

In [5]:
!hadoop fs -copyToLocal gs://dataproc-staging-us-central1-185692258849-ipivnxza/notebooks/jupyter/spark3/morpheus-spark-cypher-0.4.2-all.jar /usr/lib/spark/jars/ 

In [9]:
%%time
println("Hello World")

Hello World
Time: 0.4546999931335449 seconds.



In [10]:
val stopTimesDF = (spark.read.format("bigquery")
  .option("table", "gcp-hk-markii-dis:public_dataset.sf_transit_muni_stop_times")
  .option("filter", "arrives_next_day = false AND dropoff_type = 'regular'")
  .load().cache())

stopTimesDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [stop_id: bigint, trip_id: bigint ... 7 more fields]


In [11]:
stopTimesDF.createOrReplaceTempView("stopTimes")
stopTimesDF.printSchema()

root
 |-- stop_id: long (nullable = true)
 |-- trip_id: long (nullable = true)
 |-- stop_sequence: long (nullable = true)
 |-- arrival_time: long (nullable = true)
 |-- arrives_next_day: boolean (nullable = true)
 |-- departure_time: long (nullable = true)
 |-- departs_next_day: boolean (nullable = true)
 |-- dropoff_type: string (nullable = true)
 |-- exact_timepoint: boolean (nullable = true)



In [12]:
val stopsDF = spark.sql("SELECT DISTINCT stop_id AS id, stop_id AS stop_number FROM stopTimes")
val tripsDF = spark.sql("SELECT DISTINCT trip_id AS id, trip_id AS trip_number FROM stopTimes")
val containsDF = spark.sql("SELECT DISTINCT trip_id AS source, stop_id AS target, stop_sequence, " +
   "CONCAT(trip_id, stop_id) AS id FROM stopTimes")

stopsDF: org.apache.spark.sql.DataFrame = [id: bigint, stop_number: bigint]
tripsDF: org.apache.spark.sql.DataFrame = [id: bigint, trip_number: bigint]
containsDF: org.apache.spark.sql.DataFrame = [source: bigint, target: bigint ... 2 more fields]


In [13]:
%%time
import org.opencypher.morpheus.api.MorpheusSession
import org.opencypher.morpheus.api.io.{MorpheusNodeTable, MorpheusRelationshipTable}
import spark.sqlContext.implicits._

Time: 0.9468436241149902 seconds.



import org.opencypher.morpheus.api.MorpheusSession
import org.opencypher.morpheus.api.io.{MorpheusNodeTable, MorpheusRelationshipTable}
import spark.sqlContext.implicits._


In [14]:
val stopTable = MorpheusNodeTable(Set("Stop"), stopsDF)
val tripTable = MorpheusNodeTable(Set("Trip"), tripsDF)
val containsTable = MorpheusRelationshipTable("CONTAINS", containsDF)

stopTable: org.opencypher.morpheus.api.io.MorpheusElementTable = MorpheusElementTable(ElementMapping(NodePattern(NODE(:Stop)),Map(PatternElement(node,NODE(:Stop)) -> Map(stop_number -> stop_number)),Map(PatternElement(node,NODE(:Stop)) -> Map(SourceIdKey -> id))),org.opencypher.morpheus.impl.table.SparkTable$DataFrameTable@62f25f1c)
tripTable: org.opencypher.morpheus.api.io.MorpheusElementTable = MorpheusElementTable(ElementMapping(NodePattern(NODE(:Trip)),Map(PatternElement(node,NODE(:Trip)) -> Map(trip_number -> trip_number)),Map(PatternElement(node,NODE(:Trip)) -> Map(SourceIdKey -> id))),org.opencypher.morpheus.impl.table.SparkTable$DataFrameTable@72c8be3d)
containsTable: org.opencypher.morpheus.api.io.MorpheusElementTable = MorpheusElementTable(ElementMapping(RelationshipPattern(RE...


In [15]:
implicit val morpheus: MorpheusSession = MorpheusSession.local()
val graph = morpheus.readFrom(stopTable, tripTable, containsTable)

morpheus: org.opencypher.morpheus.api.MorpheusSession = MorpheusSession
graph: org.opencypher.okapi.relational.api.graph.RelationalCypherGraph[org.opencypher.morpheus.impl.table.SparkTable.DataFrameTable] = ScanGraph(NodePattern(NODE(:Stop)), NodePattern(NODE(:Trip)), RelationshipPattern(RELATIONSHIP(:CONTAINS)))


In [16]:
val result = graph.cypher("""
  |MATCH
  | (s1:Stop {stop_number: 15104})<-[c1:CONTAINS]-(t1:Trip)
  |RETURN t1.trip_number AS trip, c1.stop_sequence AS seq
""".stripMargin)
result.records.table.df.toDF("trip", "seq").createOrReplaceTempView("results")
val resultsTable = spark.sql("SELECT * FROM results")
resultsTable.show()

+-------+---+
|   trip|seq|
+-------+---+
|8961808| 59|
|8962105| 59|
|8962102| 59|
|8961791| 59|
|8962089| 59|
|8961822| 59|
|8961821| 59|
|8961819| 59|
|8962069| 59|
|8961968| 59|
|8962120| 59|
|8961942| 59|
|8961933| 59|
|8961931| 59|
|8962079| 59|
|8961824| 59|
|8962076| 59|
|8962075| 59|
|8961982| 59|
|8961818| 59|
+-------+---+
only showing top 20 rows



result: org.opencypher.okapi.relational.api.planning.RelationalCypherResult[org.opencypher.morpheus.impl.table.SparkTable.DataFrameTable] = RelationalCypherResult(Some(Select(orderedFields=List(trip :: INTEGER?, seq :: INTEGER?), solved=SolvedQueryModel(Set(seq :: INTEGER?, s1 :: NODE(:Stop) @ session.tmp1, trip :: INTEGER?, t1 :: NODE(:Trip) @ session.tmp1, c1 :: RELATIONSHIP(:CONTAINS) @ session.tmp1),Set(t1:Trip :: BOOLEAN, s1:Stop :: BOOLEAN, c1:CONTAINS :: BOOLEAN, s1.stop_number :: INTEGER? = $  AUTOINT0 :: INTEGER)))),Some(Select(expressions=List(trip :: INTEGER?, seq :: INTEGER?), columnRenames=Map())))
resultsTable: org.apache.spark.sql.DataFrame = [trip: bigint, seq: bigint]
