In [3]:

//This code constructs a list of files that need to be read
val dates = Array("2017-01","2017-02","2017-03","2017-04","2017-05",
                  "2017-06","2017-07","2017-08","2017-09","2017-10",
                  "2017-11","2017-12")
val files = dates.map(d => "gs://cloud-class-2020/taxi/f"++d++".csv") //++ is the concatenation operator
//val files = dates.map(d => "taxi/f"++d++".csv") //++ is the concatenation operator


//data_rdd_array is an array of MapPartitionsRDD. Each MapPartitionRDD corresponds to one data file.
//Make sure that the data in data_rdd_array has the first two rows dropped (header and a blank row)
val data_rdd_array = files.map(f => sc.textFile(f).mapPartitionsWithIndex{ (idx,iter) => if (idx == 0) iter.drop(2) else iter })

//all_data combines all the MapPartitionsRDDs in data_rdd_array into a single MapPartitionsRDD
//Use union (see next cell) to combine two rdds
//Use reduce to combine all the rdds in data_rdd_array
val all_data = data_rdd_array.reduce((a,b) => a union b)

//Split all rows on comma
val split_data = all_data.map(l=>l.split(","))

//Create a map rdd. 
//The key should be the pickup point id (column 7).
//The value the sum of the fare (column 10) and the tip (column 13)
val mapped_data = split_data.map(l => (l(7),l(10).toDouble + l(13).toDouble))

//Write the combiner, the merger, and the mergeAndCombiner

val combiner = (x: Double) => (1,x,x*x)
val merger = (x: (Int, Double, Double),y: Double) => {
  val (c,acc1,acc2) = x
  (c +1 , acc1+y,acc2+y*y)
}
val mergeAndCombiner = (x1: (Int, Double,Double), x2: (Int, Double,Double)) => {
  val (c1, acc11, acc12) = x1
  val (c2, acc21, acc22) = x2
  (c1+c2,acc11+acc21,acc12+acc22)
}

//Do the combine by key
val combined_data = mapped_data.combineByKey(combiner,merger,mergeAndCombiner)

//Write a function that returns the variance and the average
//getVarAndAvgFunction = > (String, Double, Double)
val getVarAndAvgFunction = (x: (String, (Int, Double, Double))) => {
  val (identifier, (count, total1, total2)) = x
  (identifier,total1/count,(total2 - (total1/count)*(total1/count)*count)/(count-1))
}

//Collect the result
val result = combined_data.map(getVarAndAvgFunction).map(l => (l._1,(l._2,l._3)))

val r = result.collectAsMap



dates: Array[String] = Array(2017-01, 2017-02, 2017-03, 2017-04, 2017-05, 2017-06, 2017-07, 2017-08, 2017-09, 2017-10, 2017-11, 2017-12)
files: Array[String] = Array(taxi/f2017-01.csv, taxi/f2017-02.csv, taxi/f2017-03.csv, taxi/f2017-04.csv, taxi/f2017-05.csv, taxi/f2017-06.csv, taxi/f2017-07.csv, taxi/f2017-08.csv, taxi/f2017-09.csv, taxi/f2017-10.csv, taxi/f2017-11.csv, taxi/f2017-12.csv)
data_rdd_array: Array[org.apache.spark.rdd.RDD[String]] = Array(MapPartitionsRDD[90] at mapPartitionsWithIndex at <console>:50, MapPartitionsRDD[93] at mapPartitionsWithIndex at <console>:50, MapPartitionsRDD[96] at mapPartitionsWithIndex at <console>:50, MapPartitionsRDD[99] at mapPartitionsWithIndex at <console>:50, MapPartitionsRDD[102] at mapPartitionsWithIndex at <console>:50, MapPartitionsRDD[1...

In [4]:
r

res2: scala.collection.Map[String,(Double, Double)] = Map(188 -> (14.666259597276547,249.3562749761392), 204 -> (55.0072972972973,2740.689309159159), 194 -> (42.80580259365991,915.7370006383576), 90 -> (12.35628407722452,136849.68477359708), 99 -> (52.35909090909092,4817.481649090909), 111 -> (16.039274809160304,274.82378452955464), 167 -> (14.472861247947451,515.6178903314586), 57 -> (24.40454356846473,1113.5467820157687), 210 -> (25.33258503401361,1947.4929134719835), 219 -> (58.66534703196347,2077.871330805105), 84 -> (47.90564102564103,4113.146598920378), 173 -> (13.861785099582002,321.7962916970432), 78 -> (22.57363007778738,2131.890586984146), 63 -> (21.35235922330097,438.77261172130824), 105 -> (20.52487804878049,219.31668748500587), 149 -> (21.416573116691282,634.742579363359), ...

In [5]:
//Example of union
val t1 =sc.parallelize(Array("hello","bye"))
val t2 = sc.parallelize(Array("good","fellow"))
val t3 = t1 union t2
t3.collect

t1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at <console>:28
t2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:29
t3: org.apache.spark.rdd.RDD[String] = UnionRDD[4] at union at <console>:30
res1: Array[String] = Array(hello, bye, good, fellow)


<h2>The data</h2>
<li>nyc yellow cab data is available from <a href="https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page">https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page</a></li>
<li>We will work with all the data from 2017</li>
<li>Rather than downloading each file separately, we'll use a shell script to download all twelve 2017 files</li>

<h3>get_files.sh</h3>