### This notebook was ran on a cluster on Google Cloud due to the very large size of dataset `nyc_data.csv`

<h1>More cloud practice and some Spark!</h1>


In this assignment we will work with NYC yellow cab data. The goal of this assignment is two fold:
<ol>
    <li>To get more practice working with the cloud. In particular, we will use our independent virtual machine to load data into our storage bucket, and will get to work a little with shell scripts</li>
    <li>To practice a bit more of spark</li>

In [9]:
//SOLUTION GOES HERE!!



//This code constructs a list of files that need to be read
val dates = Array("2017-01","2017-02","2017-03","2017-04","2017-05",
                  "2017-06","2017-07","2017-08","2017-09","2017-10",
                  "2017-11","2017-12")
val files = dates.map(d => "gs://zhangzy_cloud_analytics/taxi/f"++d++".csv") //++ is the concatenation operator

//data_rdd_array is an array of MapPartitionsRDD. Each MapPartitionRDD corresponds to one data file.
//Make sure that the data in data_rdd_array has the first two rows dropped (header and a blank row)
val data_rdd_array = files.map(f=>sc.textFile(f).mapPartitionsWithIndex{ (idx,iter) => 
    if (idx==0) iter.drop(2) else iter})

//all_data combines all the MapPartitionsRDDs in data_rdd_array into a single MapPartitionsRDD
//Use union (see next cell) to combine two rdds
//Use reduce to combine all the rdds in data_rdd_array
val all_data = data_rdd_array.reduce(_ union _)

//Split all rows on comma
val split_data = all_data.map(l => l.split(","))

//Create a map rdd. 
//The key should be the pickup point id (column 7).                            // "zone"
//The value the sum of the fare (column 10) and the tip (column 13)            // "trip_cost"
val mapped_data = split_data.map(l=>(l(7), (l(10).toDouble+l(13).toDouble, 
            (l(10).toDouble+l(13).toDouble)*(l(10).toDouble+l(13).toDouble)))) // sum-squares of "trip_cost"

//Write the combiner, the merger, and the mergeAndCombiner

val combiner = (x: (Double, Double)) => (1,x)
val merger = (x: (Int, (Double, Double)),y: (Double, Double)) => {
    val (c,(acc_value_1,acc_square_1)) = x
    val (acc_value_2, acc_square_2) = y
    (c+1, (acc_value_1+acc_value_2, acc_square_1+ acc_square_2))
}
val mergeAndCombiner = (x1: (Int, (Double, Double)), x2: (Int, (Double, Double))) => {
    val (c1, (acc_value_1, acc_square_1)) = x1
    val (c2, (acc_value_2, acc_square_2)) = x2
    (c1+c2, (acc_value_1+acc_value_2, acc_square_1+ acc_square_2))
}


//Do the combine by key
val combined_data = mapped_data.combineByKey(combiner,merger,mergeAndCombiner)

//Write a function that returns the variance and the average
//getVarAndAvgFunction = > (String, Double, Double)
val getVarAndAvgFunction = (x: (String, (Int, (Double,Double)))) => {
    val (identifier, (count,(total, sum_squares))) = x
    (identifier,(total/count,(sum_squares-count*(total/count)*(total/count))/(count)))
}

//get the result
val result = combined_data.map(getVarAndAvgFunction)

//Output is shown in the next cell !!!

dates = Array(2017-01, 2017-02, 2017-03, 2017-04, 2017-05, 2017-06, 2017-07, 2017-08, 2017-09, 2017-10, 2017-11, 2017-12)
files = Array(gs://zhangzy_cloud_analytics/taxi/f2017-01.csv, gs://zhangzy_cloud_analytics/taxi/f2017-02.csv, gs://zhangzy_cloud_analytics/taxi/f2017-03.csv, gs://zhangzy_cloud_analytics/taxi/f2017-04.csv, gs://zhangzy_cloud_analytics/taxi/f2017-05.csv, gs://zhangzy_cloud_analytics/taxi/f2017-06.csv, gs://zhangzy_cloud_analytics/taxi/f2017-07.csv, gs://zhangzy_cloud_analytics/taxi/f2017-08.csv, gs://zhangzy_cloud_analytics/taxi/f2017-09.csv, gs://zhangzy_cloud_analytics/taxi/f2017-10.csv, gs://zhangzy_cloud_analytics/taxi/f2017-11.csv, gs://zhangzy_cloud_analytics/taxi/f2017-12.csv)


data_rdd_array: Array[org.apache.spark.rdd.RDD[String]]...


[gs://zhangzy_cloud_analytics/taxi/f2017-01.csv, gs://zhangzy_cloud_analytics/taxi/f2017-02.csv, gs://zhangzy_cloud_analytics/taxi/f2017-03.csv, gs://zhangzy_cloud_analytics/taxi/f2017-04.csv, gs://zhangzy_cloud_analytics/taxi/f2017-05.csv, gs://zhangzy_cloud_analytics/taxi/f2017-06.csv, gs://zhangzy_cloud_analytics/taxi/f2017-07.csv, gs://zhangzy_cloud_analytics/taxi/f2017-08.csv, gs://zhangzy_cloud_analytics/taxi/f2017-09.csv, gs://zhangzy_cloud_analytics/taxi/f2017-10.csv, gs://zhangzy_cloud_analytics/taxi/f2017-11.csv, gs://zhangzy_cloud_analytics/taxi/f2017-12.csv]

In [10]:
//Show the output (run collectAsMap on result)

result.collectAsMap

Map(188 -> (14.666259597276547,249.33821353365218), 204 -> (55.0072972972973,2666.6166251278305), 194 -> (42.805802593659955,915.6050500618951), 90 -> (12.356284077224528,136849.62022189386), 99 -> (52.35909090909092,4379.528771900826), 111 -> (16.0392748091603,273.7748387870754), 167 -> (14.472861247947455,515.4062245308297), 57 -> (24.40454356846474,1111.236518982541), 210 -> (25.3325850340136,1944.8432632495715), 219 -> (58.665347031963464,2077.5550642403), 84 -> (47.90564102564103,4007.681301512163), 173 -> (13.861785099581997,321.7171679469332), 78 -> (22.57363007778738,2130.047984921065), 63 -> (21.352359223300972,438.5596153078517), 105 -> (20.52487804878049,217.53362498512772), 149 -> (21.416573116691282,633.8049980053634), 45 -> (15.392694347338317,175.5325653053832), 30 -> (40.527714285714296,4862.637611918367), 197 -> (32.31200050645732,2040.637258545454), 248 -> (18.38524193548388,677.6241638815592), 39 -> (25.332368983957213,1184.2685940135837), 155 -> (31.821510204081633,

In [1]:
//Example of union
val t1 =sc.parallelize(Array("hello","bye"))
val t2 = sc.parallelize(Array("good","fellow"))
val t3 = t1 union t2
t3.collect

Waiting for a Spark session to start...

Waiting for a Spark session to start...

t1 = ParallelCollectionRDD[0] at parallelize at <console>:28
t2 = ParallelCollectionRDD[1] at parallelize at <console>:29
t3 = UnionRDD[2] at union at <console>:30


[hello, bye, good, fellow]

## Step by Step:

#### Step 1

In [21]:
//This code constructs a list of files that need to be read
val dates = Array("2017-01","2017-02","2017-03","2017-04","2017-05",
                  "2017-06","2017-07","2017-08","2017-09","2017-10",
                  "2017-11","2017-12")
val files = dates.map(d => "gs://zhangzy_cloud_analytics/taxi/f"++d++".csv")

dates = Array(2017-01, 2017-02, 2017-03, 2017-04, 2017-05, 2017-06, 2017-07, 2017-08, 2017-09, 2017-10, 2017-11, 2017-12)
files = Array(gs://zhangzy_cloud_analytics/taxi/f2017-01.csv, gs://zhangzy_cloud_analytics/taxi/f2017-02.csv, gs://zhangzy_cloud_analytics/taxi/f2017-03.csv, gs://zhangzy_cloud_analytics/taxi/f2017-04.csv, gs://zhangzy_cloud_analytics/taxi/f2017-05.csv, gs://zhangzy_cloud_analytics/taxi/f2017-06.csv, gs://zhangzy_cloud_analytics/taxi/f2017-07.csv, gs://zhangzy_cloud_analytics/taxi/f2017-08.csv, gs://zhangzy_cloud_analytics/taxi/f2017-09.csv, gs://zhangzy_cloud_analytics/taxi/f2017-10.csv, gs://zhangzy_cloud_analytics/taxi/f2017-11.csv, gs://zhangzy_cloud_analytics/taxi/f2017-12.csv)


[gs://zhangzy_cloud_analytics/taxi/f2017-01.csv, gs://zhangzy_cloud_analytics/taxi/f2017-02.csv, gs://zhangzy_cloud_analytics/taxi/f2017-03.csv, gs://zhangzy_cloud_analytics/taxi/f2017-04.csv, gs://zhangzy_cloud_analytics/taxi/f2017-05.csv, gs://zhangzy_cloud_analytics/taxi/f2017-06.csv, gs://zhangzy_cloud_analytics/taxi/f2017-07.csv, gs://zhangzy_cloud_analytics/taxi/f2017-08.csv, gs://zhangzy_cloud_analytics/taxi/f2017-09.csv, gs://zhangzy_cloud_analytics/taxi/f2017-10.csv, gs://zhangzy_cloud_analytics/taxi/f2017-11.csv, gs://zhangzy_cloud_analytics/taxi/f2017-12.csv]

#### Step 2

In [24]:
//data_rdd_array is an array of MapPartitionsRDD. Each MapPartitionRDD corresponds to one data file.
//Make sure that the data in data_rdd_array has the first two rows dropped (header and a blank row)
val data_rdd_array = files.map(f=>sc.textFile(f).mapPartitionsWithIndex{ (idx,iter) => 
    if (idx==0) iter.drop(2) else iter})

data_rdd_array = Array(MapPartitionsRDD[101] at mapPartitionsWithIndex at <console>:32, MapPartitionsRDD[104] at mapPartitionsWithIndex at <console>:32, MapPartitionsRDD[107] at mapPartitionsWithIndex at <console>:32, MapPartitionsRDD[110] at mapPartitionsWithIndex at <console>:32, MapPartitionsRDD[113] at mapPartitionsWithIndex at <console>:32, MapPartitionsRDD[116] at mapPartitionsWithIndex at <console>:32, MapPartitionsRDD[119] at mapPartitionsWithIndex at <console>:32, MapPartitionsRDD[122] at mapPartitionsWithIndex at <console>:32, MapPartitionsRDD[125] at mapPartitionsWithIndex at <console>:32, MapPartitionsRDD[128] at mapPartitionsWithIndex at <console>:32, MapPartitionsRDD[131] at mapPartitionsWithIndex at <console>:32, MapPartitionsRDD[1...


[MapPartitionsRDD[101] at mapPartitionsWithIndex at <console>:32, MapPartitionsRDD[104] at mapPartitionsWithIndex at <console>:32, MapPartitionsRDD[107] at mapPartitionsWithIndex at <console>:32, MapPartitionsRDD[110] at mapPartitionsWithIndex at <console>:32, MapPartitionsRDD[113] at mapPartitionsWithIndex at <console>:32, MapPartitionsRDD[116] at mapPartitionsWithIndex at <console>:32, MapPartitionsRDD[119] at mapPartitionsWithIndex at <console>:32, MapPartitionsRDD[122] at mapPartitionsWithIndex at <console>:32, MapPartitionsRDD[125] at mapPartitionsWithIndex at <console>:32, MapPartitionsRDD[128] at mapPartitionsWithIndex at <console>:32, MapPartitionsRDD[131] at mapPartitionsWithIndex at <console>:32, MapPartitionsRDD[134] at mapPartitionsWithIndex at <console>:32]

In [25]:
//Check RDD
data_rdd_array(0).take(4).foreach(println)

1,2017-01-09 11:13:28,2017-01-09 11:25:45,1,3.30,1,N,263,161,1,12.5,0,0.5,2,0,0.3,15.3
1,2017-01-09 11:32:27,2017-01-09 11:36:01,1,.90,1,N,186,234,1,5,0,0.5,1.45,0,0.3,7.25
1,2017-01-09 11:38:20,2017-01-09 11:42:05,1,1.10,1,N,164,161,1,5.5,0,0.5,1,0,0.3,7.3
1,2017-01-09 11:52:13,2017-01-09 11:57:36,1,1.10,1,N,236,75,1,6,0,0.5,1.7,0,0.3,8.5


#### Step 3

In [32]:
//all_data combines all the MapPartitionsRDDs in data_rdd_array into a single MapPartitionsRDD
//Use union (see next cell) to combine two rdds
//Use reduce to combine all the rdds in data_rdd_array
val all_data = data_rdd_array.reduce(_ union _)

all_data = UnionRDD[168] at union at <console>:35


UnionRDD[168] at union at <console>:35

In [33]:
//Check RDD
all_data.take(4).foreach(println)

1,2017-01-09 11:13:28,2017-01-09 11:25:45,1,3.30,1,N,263,161,1,12.5,0,0.5,2,0,0.3,15.3
1,2017-01-09 11:32:27,2017-01-09 11:36:01,1,.90,1,N,186,234,1,5,0,0.5,1.45,0,0.3,7.25
1,2017-01-09 11:38:20,2017-01-09 11:42:05,1,1.10,1,N,164,161,1,5.5,0,0.5,1,0,0.3,7.3
1,2017-01-09 11:52:13,2017-01-09 11:57:36,1,1.10,1,N,236,75,1,6,0,0.5,1.7,0,0.3,8.5


#### Step 4

In [34]:
//Split all rows on comma
val split_data = all_data.map(l => l.split(","))

split_data = MapPartitionsRDD[169] at map at <console>:35


MapPartitionsRDD[169] at map at <console>:35

In [37]:
//Check RDD
split_data.first

[1, 2017-01-09 11:13:28, 2017-01-09 11:25:45, 1, 3.30, 1, N, 263, 161, 1, 12.5, 0, 0.5, 2, 0, 0.3, 15.3]

In [55]:
split_data.count                  // 113,496,874 records in total

113496874

#### Step 5

In [117]:
//Create a map rdd. 
//The key should be the pickup point id (column 7).                           // "zone"
//The value the sum of the fare (column 10) and the tip (column 13)           // "trip_cost"
val mapped_data = split_data.map(l=>(l(7), (l(10).toDouble+l(13).toDouble, 
            (l(10).toDouble+l(13).toDouble)*(l(10).toDouble+l(13).toDouble))))// sum-squares of "trip_cost"

mapped_data = MapPartitionsRDD[198] at map at <console>:39


MapPartitionsRDD[198] at map at <console>:39

In [118]:
//Check RDD
mapped_data.take(4).foreach(println)

(263,(14.5,210.25))
(186,(6.45,41.6025))
(164,(6.5,42.25))
(236,(7.7,59.290000000000006))


In [120]:
mapped_data.count                 // 113,496,874 trips in total
                                  // same number as the total records, 
                                  // therefore no missing values that can't be converted to Double

113496874

In [121]:
mapped_data.keys.distinct.count   // 265 zones in total

265

In [122]:
mapped_data.values.count

113496874

#### Step 6

In [138]:
//Write the combiner, the merger, and the mergeAndCombiner

val combiner = (x: (Double, Double)) => (1,x)
val merger = (x: (Int, (Double, Double)),y: (Double, Double)) => {
    val (c,(acc_value_1,acc_square_1)) = x
    val (acc_value_2, acc_square_2) = y
    (c+1, (acc_value_1+acc_value_2, acc_square_1+ acc_square_2))
}
val mergeAndCombiner = (x1: (Int, (Double, Double)), x2: (Int, (Double, Double))) => {
    val (c1, (acc_value_1, acc_square_1)) = x1
    val (c2, (acc_value_2, acc_square_2)) = x2
    (c1+c2, (acc_value_1+acc_value_2, acc_square_1+ acc_square_2))
}

combiner = > (Int, (Double, Double)) = <function1>
merger = > (Int, (Double, Double)) = <function2>
mergeAndCombiner = > (Int, (Double, Double)) = <function2>


<function2>

In [139]:
//Test
combiner((14.5,210.25))

(1,(14.5,210.25))

In [140]:
merger((1,(2.1,4.41)),(3.3,10.89))

(2,(5.4,15.3))

In [142]:
mergeAndCombiner((2,(5.4,15.3)),(7,(21.2,50.8)))

(9,(26.6,66.1))

#### Step 7

In [143]:
//Do the combine by key
val combined_data = mapped_data.combineByKey(combiner,merger,mergeAndCombiner)

combined_data = ShuffledRDD[204] at combineByKey at <console>:43


ShuffledRDD[204] at combineByKey at <console>:43

In [144]:
//Check RDD
combined_data.take(4).foreach(println)

(231,(2059762,(3.067394365000015E7,6.860670653134961E8)))
(187,(39,(1190.7199999999998,73584.519)))
(22,(1557,(43554.29000000001,3737455.855500001)))
(77,(767,(16293.5,603246.5922)))


In [145]:
//265 zones
combined_data.count

265

#### Step 8

In [167]:
//Write a function that returns the variance and the average
//getVarAndAvgFunction = > (String, Double, Double)
val getVarAndAvgFunction = (x: (String, (Int, (Double,Double)))) => {
    val (identifier, (count,(total, sum_squares))) = x
    (identifier,(total/count,(sum_squares-count*(total/count)*(total/count))/(count)))
}

getVarAndAvgFunction = > (String, (Double, Double)) = <function1>


<function1>

#### Step 9

In [168]:
//get the result
val result = combined_data.map(getVarAndAvgFunction)

result = MapPartitionsRDD[209] at map at <console>:47


MapPartitionsRDD[209] at map at <console>:47

In [169]:
result.take(4).foreach(println)

(231,(14.891984438007958,111.30954635795813))
(187,(30.531282051282044,954.6233547666013))
(22,(27.973211303789345,1617.920679604033))
(77,(21.24315514993481,335.2297832653681))


#### Step 10

In [170]:
//Show the output (run collectAsMap on result)

result.collectAsMap

Map(188 -> (14.666259597276547,249.33821353365224), 204 -> (55.007297297297306,2666.616625127829), 194 -> (42.805802593659955,915.6050500618951), 90 -> (12.35628407722453,136849.62022189386), 99 -> (52.35909090909092,4379.528771900826), 111 -> (16.0392748091603,273.77483878707557), 167 -> (14.47286124794746,515.4062245308296), 57 -> (24.40454356846474,1111.236518982541), 210 -> (25.3325850340136,1944.843263249572), 219 -> (58.665347031963464,2077.5550642402964), 84 -> (47.905641025641025,4007.681301512163), 173 -> (13.861785099582,321.71716794693333), 78 -> (22.573630077787378,2130.0479849210656), 63 -> (21.352359223300972,438.5596153078517), 105 -> (20.524878048780483,217.5336249851281), 149 -> (21.416573116691282,633.8049980053631), 45 -> (15.392694347338317,175.53256530538297), 30 -> (40.52771428571429,4862.637611918368), 197 -> (32.31200050645733,2040.6372585454546), 248 -> (18.38524193548387,677.6241638815593), 39 -> (25.33236898395722,1184.2685940135839), 155 -> (31.8215102040816

<h2>The data</h2>
<li>nyc yellow cab data is available from <a href="https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page">https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page</a></li>
<li>We will work with all the data from 2017</li>
<li>Rather than downloading each file separately, we'll use a shell script to download all twelve 2017 files</li>

<h3>get_files.sh</h3>

<h3>the script explained</h3>
<li>You don't have to read this!</li>
<li><span style="color:blue">LINE 1</span>: There are many different unix shells (csh, bash, dash, zsh, etc.). When writing a shell script, the first line tells the OS which shell to run. All the shells are stored in /bin (take a look at the contents of /bin using <span style="color:red">ls /bin</span>). </li>
<li><span style="color:blue">LINE 2</span>: <span style="color:red">declare</span> declares a new variable. The <span style="color:red">-a</span> option indicates that the variable is an array. Note that the elements are separated by space rather than a comma</li>
<li><span style="color:blue">LINE 3</span>: A for loop. arr[@] indicates that we want to use the variable arr (declared in line 2) as a collection from which we will draw values. The \$ sign is a peculiarity of unix shell. When declaring a variable, only the name of the variable is used (e.g., x=5) but when using a pre-declared variable, the \$ sign must be appended in front of the variable name (e.g., echo \$x). Note that there should be no spaces in x=5)</li>
<li><span style="color:blue">LINE 4</span>: do indicates the start of the for block</li>
<li><span style="color:blue">LINE 5</span>: curl is a linux command for getting a file from the internet. Here we want to get the file https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_\$val.csv after substituting 2017-01, 2017-02, etc. for \$val (we're using the val variable so need to put a \$ in front - hopefully you can see why!). The -o option specifies the name of the output file. I'm saving the files in /cloud-class-2020/taxi/f\$val.csv (again, see the \$val). cloud-class-2020 is my storage bucket and I've created a sub-directory taxi for storing all the files. You will need to change cloud-class-2020 to the name of your storage bucket.</li>
<li><span style="color:blue">LINE 6</span>: done indicates the end of the for block</li>

<h3>Dealing with the script</h3>
<li>modify the script to replace cloud-class-2020 with the name of your storage bucket</li>
<li>upload it to your VM</li>
<ol>
    <li>Choose Compute engine from the navigation menu on google cloud console</li>
    <li>Launch an ssh shell (the first option on the SSH dropdown menu at the end of the VM entry)</li>
    <li>Click on the gear icon on the top right corner of the shell window</li>
    <li>Choose upload file and upload the shell script. The script will now be in the home directory of your virtual machine (note that this is not your storage bucket!)<li>
    <li>type chmod u+x get_files.sh to make the script executable</li>
</ol>
<li>Create the taxi directory using the command mkdir /cloud-console-2020/taxi (replace cloud-console-2020 with the name of your own storage bucket)</li>
<li>Run get_files.sh with the command ./get_files.sh</li>
<li>This will take a while but curl will give you download stats as it downloads each file</li>
<li>Once completed, navigate to your storage bucket on google cloud console and make sure the files are in the bucket</li>

<h1>The problem</h1>
<li><b>Calculate the mean and the variance of the trip cost for trips originating in each taxi zone</b></li>
<li>The PULocationID column corresponds to the taxi zone</li>
<li>The trip cost is the fare_amount plus the tip_amount. You can ignore all other charges</li>
<li>You <b>must use combineByKey</b> for this problem. And you must use the one pass method for calculating both the mean as well as the variance. (i.e., you should <b>make only one call to combineByKey</b>)</li>
<li>Use the following formula for calculating variance: $ variance = \frac{\sum x_i^2 -n\bar{x}^2}{(n-1)} $ </li>
<li>The program should <b>output a Map</b> of (zone -> (mean,variance), zone  -> (mean,variance) , ......</li>

Example of output:

Map(188 -> (14.666259597276545,249.33821353365235), 204 -> (55.0072972972973,2666.6166251278305), 194 -> (42.80580259365995,915.6050500618951), ....

In [None]:
Map(188 -> (14.666259597276547,249.33821353365218), 204 -> (55.0072972972973,2666.6166251278305), 
    194 -> (42.805802593659955,915.6050500618951))