<a href="https://cognitiveclass.ai"><img src = "https://ibm.box.com/shared/static/9gegpsmnsoo25ikkbl4qzlvlyjbgxs5x.png" width = 400> </a>

<h1 align = "center"> Spark Fundamentals I - Introduction to Spark</h1>
<h2 align = "center"> Scala - Working with Scala Libraries</h2>
<br align = "left">

**Related free online courses:**

Related courses can be found in the following learning paths:

- [Spark Fundamentals path](http://cocl.us/Spark_Fundamentals_Path)
- [Big Data Fundamentals path](http://cocl.us/Big_Data_Fundamentals_Path)

<img src = "http://spark.apache.org/images/spark-logo.png", height = 100, align = 'left'>

## Creating a Spark application using Spark SQL

Spark SQL provides the ability to write relational queries to be run on Spark. There is the abstraction SchemaRDD which is to create an RDD in which you can run SQL, HiveQL, and Scala. In this lab section, you will use SQL to find out the average weather and precipitation for a given time period in New York. The purpose is to demonstrate how to use the Spark SQL libraries on Spark.

### Please note that in Spark 1.3 DataFrames have replaced schemaRDDs however, it is still possible to switch between the two for supporting legacy systems. DataFrames is the recommended method going forward

### Let's first download the data that we will be working with in this lab

In [1]:
// download module to run shell commands within this notebook
import sys.process._

In [2]:
// download data from IBM Servier
// this may take ~30 seconds depending on your internet speed
"wget --quiet https://ibm.box.com/shared/static/j8skrriqeqw66f51iyz911zyqai64j2g.zip" !
println("Data Downloaded!")

Let's unzip the data that we just downloaded into a directory dedicated for this course. Let's choose the directory **/resources/jupyter/labs/BD0211EN/**.

In [3]:
// unzip the folder's content into "resources" directory
"unzip -q -o -d /resources/jupyter/labs/BD0211EN/ j8skrriqeqw66f51iyz911zyqai64j2g.zip" !
println("Data Extracted!")

Data Extracted!


The data is in a folder called **LabData**. Let's list all the files in the data that we just downloaded and extracted.

In [4]:
// list the extracted files
"ls -1 /resources/jupyter/labs/BD0211EN/LabData/" !

followers.txt
notebook.log
nyctaxi100.csv
nyctaxi.csv
nyctaxisub.csv
nycweather.csv
pom.xml
README.md
taxistreams.py
users.txt


Let's take a look at the nycweather data. So run the following code:

In [5]:
val lines = scala.io.Source.fromFile("/resources/jupyter/labs/BD0211EN/LabData/nycweather.csv").mkString
println(lines)

"2013-01-01",1,0
"2013-01-02",-2,0
"2013-01-03",-2,0
"2013-01-04",1,0
"2013-01-05",3,0
"2013-01-06",4,0
"2013-01-07",5,0
"2013-01-08",6,0
"2013-01-09",7,0
"2013-01-10",7,0
"2013-01-11",6,13.97
"2013-01-12",7,0.51
"2013-01-13",8,0
"2013-01-14",8,2.29
"2013-01-15",3,3.05
"2013-01-16",2,17.53
"2013-01-17",4,0
"2013-01-18",-1,0
"2013-01-19",5,0
"2013-01-20",6,0
"2013-01-21",-2,0
"2013-01-22",-7,0
"2013-01-23",-9,0
"2013-01-24",-8,0
"2013-01-25",-7,1.78
"2013-01-26",-6,0
"2013-01-27",-3,0
"2013-01-28",1,5.59
"2013-01-29",6,1.52
"2013-01-30",9,1.02
"2013-01-31",8,22.86
"2013-02-01",-2,0
"2013-02-02",-4,0.51
"2013-02-03",-3,0.51
"2013-02-04",-3,0
"2013-02-05",-1,0.51
"2013-02-06",1,0
"2013-02-07",-2,0
"2013-02-08",-1,29.21
"2013-02-09",-3,9.65
"2013-02-10",-3,0
"2013-02-11",4,12.45
"2013-02-12",4,0
"2013-02-13",4,0.76
"2013-02-14",4,0
"2013-02-15",8,0
"2013-02-16",2,0.51
"2013-02-17",-4,0
"2013-02-18",-3,0
"2013-02-19",5,3.81
"2013-02-20",0,0
"2013-02-21",-2,0
"2013-02-22",0,0
"2013-02-23",4,

There are three columns in the dataset, the date, the mean temperature in Celsius, and the precipitation for the day. Since we already know the schema, we will infer the schema using reflection.

You will first need to define the SparkSQL context. Do so by creating it from an existing SparkContext. Type in:

In [6]:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

Next, you need to import a library for creating a SchemaRDD. Type this:

In [7]:
import sqlContext.implicits._

Create a case class in Scala that defines the schema of the table. Type in:

In [8]:
case class Weather(date: String, temp: Int, precipitation: Double)

Create the RDD of the Weather object:

In [11]:
val weather = sc.textFile("/resources/jupyter/labs/BD0211EN/LabData/nycweather.csv").map(_.split(",")). map(w => Weather(w(0), w(1).trim.toInt, w(2).trim.toDouble)).toDF()


You first load in the file, and then you map it by splitting it up by the commas and then another mapping to get it into the Weather class.

Next you need to register the RDD as a table. Type in:

In [12]:
weather.registerTempTable("weather")

At this point, you are ready to create and run some queries on the RDD. You want to get a list of the hottest dates with some precipitation. Type in:

In [13]:
val hottest_with_precip = sqlContext.sql("SELECT * FROM weather WHERE precipitation > 0.0 ORDER BY temp DESC")

hottest_with_precip.collect()

Array(["2013-06-26",27,1.27], ["2013-06-27",27,6.1], ["2013-07-08",27,5.59], ["2013-07-09",27,5.84], ["2013-07-22",27,1.52], ["2013-07-23",27,7.87], ["2013-08-09",27,1.27], ["2013-06-02",26,21.59], ["2013-07-03",26,13.46], ["2013-08-27",26,0.25], ["2013-08-28",26,10.92], ["2013-09-02",26,1.27], ["2013-09-10",26,0.25], ["2013-09-12",26,40.64], ["2013-06-17",25,0.25], ["2013-07-02",25,2.03], ["2013-07-29",25,0.25], ["2013-07-01",24,21.34], ["2013-08-08",24,11.68], ["2013-08-12",24,1.27], ["2013-08-22",24,6.35], ["2013-08-26",24,1.02], ["2013-09-03",24,0.76], ["2013-06-18",23,4.83], ["2013-07-12",23,6.35], ["2013-07-13",23,1.52], ["2013-07-28",23,6.1], ["2013-08-03",23,1.52], ["2013-08-13",23,21.59], ["2013-05-23",22,45.97], ["2013-06-03",22,22.1], ...

Normal RDD operations will work. Print the top hottest days with some precipitation out to the console:

In [14]:
hottest_with_precip.map(x => ("Date: " + x(0), "Temp : " + x(1), "Precip: " + x(2))).top(10).foreach(println)

(Date: "2013-12-21",Temp : 14,Precip: 0.25)
(Date: "2013-12-17",Temp : -2,Precip: 4.83)
(Date: "2013-12-15",Temp : 2,Precip: 18.29)
(Date: "2013-12-14",Temp : -2,Precip: 18.54)
(Date: "2013-12-10",Temp : 1,Precip: 5.84)
(Date: "2013-12-09",Temp : 2,Precip: 7.62)
(Date: "2013-12-08",Temp : -1,Precip: 2.03)
(Date: "2013-12-07",Temp : 3,Precip: 3.56)
(Date: "2013-12-06",Temp : 10,Precip: 18.54)
(Date: "2013-12-05",Temp : 12,Precip: 0.25)


## Creating a Spark application using MLlib

In this section, Spark will be used to acquire the K-Means clustering for drop-off latitudes and longitudes of taxis for 3 clusters. The sample data contains a subset of taxi trips with hack license, medallion, pickup date/time, drop off date/time, pickup/drop off latitude/longitude, passenger count, trip distance, trip time and other information. As such, this may give a good indication of where to best to hail a cab.

Remember, this is only a subset of the file that you used in a previous exercise. If you ran this exercise on the full dataset, it would take a long time as we are only running on a test environment with limited resources.

Import the needed packages for K-Means algorithm and Vector packages:

In [15]:
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors

Create an RDD

In [16]:
val taxiFile = sc.textFile("/resources/jupyter/labs/BD0211EN/LabData/nyctaxisub.csv")

Determine the number of rows in taxiFile.

In [17]:
taxiFile.count()

250000

Cleanse the data.

In [18]:
val taxiData=taxiFile.filter(_.contains("2013")).
    filter(_.split(",")(3)!="" ).    //dropoff_latitude
    filter(_.split(",")(4)!="")      //dropoff_longitude

The first filter limits the rows to those that occurred in the year 2013. This will also remove any header in the file. The third and fourth columns contain the drop off latitude and longitude. The transformation will throw exceptions if these values are empty.

Do another count to see what was removed.

In [19]:
taxiData.count()

249999

In this case, if we had used the full set of data, it would have filtered out a great many more lines.

To fence the area roughly to New York City use this command:

In [20]:
val taxiFence=taxiData.
    filter(_.split(",")(3).toDouble>40.70).
    filter(_.split(",")(3).toDouble<40.86).
    filter(_.split(",")(4).toDouble>(-74.02)).
    filter(_.split(",")(4).toDouble<(-73.93))

Determine how many are left in taxiFence:

In [21]:
taxiFence.count()

206646

Approximately, 43,354 rows were dropped since these drop-off points are outside of New York City.

Create Vectors with the latitudes and longitudes that will be used as input to the K-Means algorithm.

In [23]:
val taxi=taxiFence.
    map{
        line=>Vectors.dense(
            line.split(',').slice(3,5).map(_ .toDouble)
        )
    }

In [24]:
val iterationCount=10
val clusterCount=3

val model=KMeans.train(taxi,clusterCount,iterationCount)
val clusterCenters=model.clusterCenters.map(_.toArray)

clusterCenters.foreach(lines=>println(lines(0),lines(1)))

(40.75703155351417,-73.98081909551594)
(40.72485514327806,-73.99587153564757)
(40.78713693572286,-73.95706839545623)


Now we know the map co-ordinates. Not surprisingly, the second point is between the Theater District and Grand Central. The third point is in The Village, NYU, Soho and Little Italy area. The first point is the Upper East Side, presumably where people are more likely to take cabs than subways.



## Creating a Spark application using Spark Streaming

This section focuses on Spark Streams, an easy to build, scalable, stateful (e.g. sliding windows) stream processing library. Streaming jobs are written the same way Spark batch jobs are coded and support Java, Scala and Python. In this exercise, taxi trip data will be streamed using a socket connection and then analyzed to provide a summary of number of passengers by taxi vendor. This will be implemented in the Spark shell using Scala.

There are two relevant files for this section. The first one is the nyctaxi100.csv which will serve as the source of the stream. The other file is a python file, taxistreams.py, which will feed the csv file through a socket connection to simulate a stream.

### <span style="color: red">IN ORDER TO START THE STREAM PLEASE OPEN A NEW PYTHON NOTEBOOK AND RUN THE CODE BELOW IN IT:</span> 

To open a new Python notebook click on the blue notebook button at the top right of this page, next to the search box. Choose PYTHON 3 and then copy and past the code below into the cell in the new Python notebook. Run the cell as normal. To interrupt the kernel hit the STOP button in the Action buttons above.

```
!python /resources/jupyter/labs/BD0211EN/LabData/taxistreams.py

```

Once started, the program will bind and listen to the localhost socket 7777. When a connection is made, it will read ‘nyctaxi100.csv’ and send across the socket. The sleep is set such that one line will be sent every 0.5 seconds, or 2 rows a second. This was intentionally set to a high value to make it easier to view the data during execution.

Turn off logging so that you can see the output of the application and Import the required libraries:

In [3]:
import org.apache.log4j.Logger
import org.apache.log4j.Level
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

Create the StreamingContext by using the existing SparkContext (sc). It will be using a 1 second batch interval, which means the stream is divided to 1 second batches and each batch becomes a RDD. This is intentional to make it easier to read the data during execution.

In [4]:
val ssc = new StreamingContext(sc, Seconds(1))

Create the socket stream that connects to the localhost socket 7777. This matches the port that the Python script is listening on. Each batch from the Stream be a lines RDD.

In [6]:
val lines = ssc.socketTextStream("localhost", 7777)

Next, put in the business logic to split up the lines on each comma and mapping pass(15), which is the vendor, and pass(7), which is the passenger count. Then this is reduced by key resulting in a summary of number of passengers by vendor.

In [7]:
val pass = lines.map(_.split(",")).
    map(pass=>(pass(15), pass(7).toInt)).
    reduceByKey(_+_)

Print out to the console:

In [8]:
pass.print()

The next two line starts the stream. 

In [9]:
ssc.start()
ssc.awaitTermination()

-------------------------------------------
Time: 1532834880000 ms
-------------------------------------------

-------------------------------------------
Time: 1532834881000 ms
-------------------------------------------

-------------------------------------------
Time: 1532834882000 ms
-------------------------------------------

-------------------------------------------
Time: 1532834883000 ms
-------------------------------------------



Name: java.lang.InterruptedException
Message: null
StackTrace: java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
org.apache.spark.streaming.ContextWaiter.waitForStopOrError(ContextWaiter.scala:63)
org.apache.spark.streaming.StreamingContext.awaitTermination(StreamingContext.scala:642)
$line40.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34)
$line40.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39)
$line40.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41)
$line40.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:43)
$line40.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:45)
$line40.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:47)
$line40.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:49)
$line40.$read$$iwC$$iwC$$iw

It will take a few cycles for the connection to be recognized, and then the data is sent. In this case, 2 rows per second of taxi trip data is receive in a 1 second batch interval.

In the Python terminal, the contents of the file are printed as they are streamed.

**Note: TO STOP THE STREAM PLEASE INTERRUPT THE KERNEL IN BOTH THE OTHER PYTHON NOTEBOOK AND THIS NOTEBOOK. THEN RESTART THIS NOTEBOOK'S KERNEL TO CONTINUE ONTO THE GRAPHX APPLICATION**

This is just a simple example showing how you can take streaming data into Spark and do some type of processing on it. In the case here, the taxi and the number of passengers was extracted from the data stream.

## Creating a Spark application using GraphX

Users.txt is a set of users and followers is the relationship between the users. Take a look at the contents of these two files.

In [10]:
println("Users: ")
println(scala.io.Source.fromFile("/resources/jupyter/labs/BD0211EN/LabData/users.txt").mkString)

println("Followers: ")
println(scala.io.Source.fromFile("/resources/jupyter/labs/BD0211EN/LabData/followers.txt").mkString)

Users: 
1,BarackObama,Barack Obama
2,ladygaga,Goddess of Love
3,jeresig,John Resig
4,justinbieber,Justin Bieber
6,matei_zaharia,Matei Zaharia
7,odersky,Martin Odersky
8,anonsys

Followers: 
2 1
4 1
1 2
6 3
7 3
7 6
6 7
3 7



Import the GraphX package:

In [11]:
import org.apache.spark.graphx._

Create the users RDD and parse into tuples of user id and attribute list:

In [19]:
val users = (sc.textFile("/resources/jupyter/labs/BD0211EN/LabData/users.txt").map(line => line.split(",")).map(parts => (parts.head.toLong, parts.tail)))

users.take(5).foreach(println)

(1,[Ljava.lang.String;@3dda9edf)
(2,[Ljava.lang.String;@2453670a)
(3,[Ljava.lang.String;@62e1e727)
(4,[Ljava.lang.String;@55f90c2c)
(6,[Ljava.lang.String;@3c3c5c2a)


Parse the edge data, which is already in userId -> userId format

In [20]:
val followerGraph = GraphLoader.edgeListFile(sc, "/resources/jupyter/labs/BD0211EN/LabData/followers.txt")

Attach the user attributes

In [21]:
val graph = followerGraph.outerJoinVertices(users) {
    case (uid, deg, Some(attrList)) => attrList
    case (uid, deg, None) => Array.empty[String]
}

Restrict the graph to users with usernames and names:

In [15]:
val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)

Compute the PageRank

In [16]:
val pagerankGraph = subgraph.pageRank(0.001)

Get the attributes of the top pagerank users

In [17]:
val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) {
    case (uid, attrList, Some(pr)) => (pr, attrList.toList)
    case (uid, attrList, None) => (0.0, attrList.toList)
}

Print the line out:

In [18]:
println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))

(1,(1.453834747463902,List(BarackObama, Barack Obama)))
(2,(1.3857595353443166,List(ladygaga, Goddess of Love)))
(7,(1.2892158818481694,List(odersky, Martin Odersky)))
(3,(0.9936187772892124,List(jeresig, John Resig)))
(6,(0.697916749785472,List(matei_zaharia, Matei Zaharia)))


<div class="alert alert-success alertsuccess" style="margin-top: 20px">
**Tip**: Enjoyed using Jupyter notebooks with Spark? Get yourself a free 
    <a href="http://cocl.us/DSX_on_Cloud">IBM Cloud</a> account where you can use Data Science Experience notebooks
    and have *two* Spark executors for free!
</div>

## Summary

Having completed this exercise, you should have some familiarity with using the Spark libraries. In particular, you use Spark SQL to effectively query data inside of Spark. You used Spark Streaming to process incoming streams of batch data. You used Spark's MLlib to compute the *k*-means algorithm to find the best place to hail a cab. Finally, you used Spark's GraphX library to perform and parallel graph calculations on a dataset to find the attributes of the top users.

This notebook is part of the free course on **Cognitive Class** called *Spark Fundamentals I*. If you accessed this notebook outside the course, you can take this free self-paced course, online by going to: http://cocl.us/Spark_Fundamentals_I

### About the Authors:  
Hi! It's [Alex Aklson](https://www.linkedin.com/in/aklson/), one of the authors of this notebook. I hope you found this lab educational! There is much more to learn about Spark but you are well on your way. Feel free to connect with me if you have any questions.
<hr>