Permalink
Browse files

Parse csv file and get unique station count

  • Loading branch information...
1 parent eee605b commit 791e8f015b5f8c9b9d71f95a48fa024e31d0c368 @vergenzt committed Jun 19, 2014
Showing with 113 additions and 10 deletions.
  1. +7 −1 README.md
  2. +14 −0 build.gradle
  3. +88 −6 src/main/scala/Bikeshare.scala
  4. +4 −3 submit.sh
View
@@ -1,4 +1,4 @@
-spark-bikeshare
+spark-citibike-analysis
===============
Some analysis on Citibike's published data using Spark on Amazon EC2.
@@ -12,4 +12,10 @@ Some analysis on Citibike's published data using Spark on Amazon EC2.
The `submit.sh` script simply `scp`s the built jar file to the cluster and runs `spark-submit` via `ssh`.
+## Note
+
+I rapidly found my `submit.sh` script to be annoying to maintain, so it no longer works if you have multiple jars. Use it if you like.
+
+I've modified `build.gradle` to copy required libraries into `build/libs`, so you should be able to `scp` everything from that folder to the server, then pass the jars in as an argument to `spark-submit`. I've just been doing that manually now.
+
[spark-ec2]: http://spark.apache.org/docs/1.0.0/ec2-scripts.html
View
@@ -5,9 +5,23 @@ repositories {
mavenCentral()
}
+configurations {
+ release
+ compile.extendsFrom release
+}
+
dependencies {
compile 'org.scala-lang:scala-library:2.10.4'
compile 'org.apache.spark:spark-core_2.10:1.0.0'
compile 'org.apache.spark:spark-graphx_2.10:1.0.0'
+ release 'joda-time:joda-time:2.3'
+ release 'com.github.tototoshi:scala-csv_2.10:1.0.0'
}
+task copyDependencies(type: Copy) {
+ into "$buildDir/libs"
+ from configurations.release
+}
+
+build.dependsOn copyDependencies
+
@@ -1,16 +1,98 @@
+import scala.Array.canBuildFrom
+
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.serializer.KryoRegistrator
+import org.joda.time.DateTime
+import org.joda.time.format.DateTimeFormat
+
+import com.esotericsoftware.kryo.Kryo
object Bikeshare {
+
+ val tripsFile = "s3n://citibike-tripdata/citibike_tripdata_2013-07_2014-02.csv.raw"
+
def main(args: Array[String]) = {
-
- val conf = new SparkConf()
- .setAppName("Bikeshare Analysis")
- val sc = new SparkContext(conf)
+ val conf = new SparkConf()
+ .setAppName("Example App")
+ .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .set("spark.kry.registrator", "BikeshareRegistrator")
+ .set("fs.s3n.awsAccessKeyId", sys.env.get("AWS_ACCESS_KEY_ID").get)
+ .set("fs.s3n.awsSecretAccessKey", sys.env.get("AWS_SECRET_ACCESS_KEY").get)
+ implicit val sc = new SparkContext(conf)
- val nums = sc.makeRDD(Array(1,2,3))
- nums.map(_*2).collect().foreach(println(_))
+ val tripsRaw = sc.textFile(tripsFile).cache()
+ val trips = parseTripsCsv(tripsRaw)
+
+ val stationIds = trips.flatMap(t => Seq(t.startStation.id, t.endStation.id)).distinct
+ println(stationIds.count)
}
+
+ def parseTripsCsv(tripsRaw: RDD[String])(implicit sc: SparkContext) = {
+ import Types._
+ val format = sc.broadcast(DateTimeFormat.forPattern("YYYY-MM-dd HH:mm:ss"))
+ val trips = tripsRaw
+ .map { line: String =>
+ line
+ // split the csv line
+ .split(",")
+ // remove leading and trailing quote characters from each entry
+ .map(_.stripPrefix("\"").stripSuffix("\""))
+ }
+ .map { line: Array[String] =>
+ val startStation = Station(
+ line(3).toInt,
+ line(4),
+ line(5).toDouble,
+ line(6).toDouble)
+ val endStation = Station(
+ line(7).toInt,
+ line(8),
+ line(9).toDouble,
+ line(10).toDouble)
+
+ Trip(
+ line(0).toInt,
+ format.value.parseDateTime(line(1)),
+ format.value.parseDateTime(line(2)),
+ startStation,
+ endStation,
+ line(11).toInt,
+ line(12),
+ if (line(12) == "Subscriber") line(13).toInt else -1,
+ line(14).toInt)
+ }
+ trips
+ }
+}
+
+object BikeshareRegistrator extends KryoRegistrator {
+ override def registerClasses(kryo: Kryo) {
+ import Types._
+ kryo.register(classOf[Station])
+ kryo.register(classOf[Trip])
+ kryo.register(classOf[DateTimeFormat])
+ }
+}
+
+object Types {
+ case class Station(
+ val id: Int,
+ val name: String,
+ val latitude: Double,
+ val longitude: Double)
+
+ case class Trip(
+ val tripDuration: Int,
+ val startTime: DateTime,
+ val stopTime: DateTime,
+ val startStation: Station,
+ val endStation: Station,
+ val bikeId: Int,
+ val userType: String,
+ val birthYear: Int,
+ val gender: Int)
}
View
@@ -3,14 +3,15 @@ if [ -z "$AWS_CLUSTER" ]; then
exit 1
fi
-APPLICATION_JAR=build/libs/bikeshare.jar
+APPLICATION_JAR=build/libs/spark-citibike-analysis.jar
+MAIN_CLASS=Bikeshare
if [ ! -f $APPLICATION_JAR ]; then
echo '[error]: must run "./gradlew build" first'
exit 1
fi
echo "Copying jar to cluster"
-scp $APPLICATION_JAR root@$AWS_CLUSTER:/root/
-ssh root@$AWS_CLUSTER /root/spark/bin/spark-submit --master spark://$AWS_CLUSTER:7077 --class Bikeshare /root/$(basename $APPLICATION_JAR)
+scp build/libs/* root@$AWS_CLUSTER:/root/
+ssh root@$AWS_CLUSTER '/root/spark/bin/spark-submit --master spark://$AWS_CLUSTER:7077 --class $MAIN_CLASS --jars "\$(find /root -maxdepth 1 -name *.jar -print0 | tr "\0" ",")" /root/\$(basename $APPLICATION_JAR)'

0 comments on commit 791e8f0

Please sign in to comment.