# Analyzing Big Data

Big data means collecting, storing, and processing data at a large scale on multiple machines.

## The Challenges of Data Science

1. Vast majority of work goes into feature engineering and selection, i.e preprocessing steps.
2. Iteration: Modeling and analysis require multiple passes over the same data.
3. Provide the same model in data applications.

## Introducing Apache Spark

Apache Spark is an open source framework that combines an engine for distributing programs across clusters of machines with an elegant model for writing programs atop it.

Features:

1. Rather than relying on a rigid map-then-reduce format, its engine can execute a more general directed acyclic graph (DAG) of operators.
2. it complements this capability with a rich set of transformations that enable users to express computation more naturally.
3. Spark extends its predecessors with in-memory processing.
4. Sitting atop the JVM, it can take advantage of many of the operational and debugging tools built for the Java stack.
5. Spark boasts strong integration with the variety of tools in the Hadoop ecosystem.

# Introduction to Data Analysis with Scala and Spark

One of the most important talents that one can develop as a data scientist is the ability to discover interesting and worthwhile problems in every phase of the data analytics lifecycle.

## Scala for Data Scientists

Learning how to work with Spark in the same language in which the underlying framework is written (Scala) has a number of advantages:

1. It reduces performance overhead.
2. It gives you access to the latest and greatest.
3. It will help you understand the Spark philosophy.

## The Spark Programming Model

Spark programming starts with a data set or few, usually residing in some form of distributed, persistent storage like the Hadoop Distributed File System (HDFS). Writing a Spark program typically consists of a few related steps:

1. Defining a set of transformations on input data sets.
2. Invoking actions that output the transformed data sets to persistent storage or return results to the driver’s local memory.
3. Running local computations that operate on the results computed in a distributed fashion. These can help you decide what transformations and actions to undertake next.

## Record Linkage

The general structure of the problem is something like this: we have a large collection of records from one or more source systems, and it is likely that some of the records refer to the same underlying entity, such as a customer, a patient, or the location of a business or an event. Identify the records that refers to the same entity.

Dataset: curl -o donation.zip http://bit.ly/1Aoywaq

## Getting Started: The Spark Shell and SparkContext

spark-shell, which is a REPL (read-eval-print loop) for the Scala language that also has some Spark-specific extensions.

In [1]:
sc

org.apache.spark.SparkContext@4d5afd74

There are two ways to create an RDD in Spark:

1. Using the SparkContext to create an RDD from an external data source, like a file in HDFS, a database table via JDBC, or a local collection of objects that we create in the Spark shell.
2. Performing a transformation on one or more existing RDDs, like filtering records, aggregating records by a common key, or joining multiple RDDs together.

## Resilient Distributed Datasets

In [2]:
val rdd = sc.parallelize(Array(1, 2, 2, 4), 4)
rdd

ParallelCollectionRDD[0] at parallelize at <console>:19

In [3]:
val rdd2 = sc.textFile("file:///usr/local/spark/README.md")
rdd2

file:///usr/local/spark/README.md MapPartitionsRDD[2] at textFile at <console>:19

If Spark is given a directory instead of an individual file, it will consider all of the files in that directory as part of the given RDD.

In [4]:
val rawblocks = sc.textFile("hdfs:///user/root/linkage")
rawblocks

hdfs:///user/root/linkage MapPartitionsRDD[4] at textFile at <console>:19

Whenever we create a new variable in Scala, we must preface the name of the variable with either val or var. Variables that are prefaced with val are immutable, and cannot be changed to refer to another value once they are assigned, whereas variables that are prefaced with var can be changed to refer to different objects of the same type.

## Bringing Data from the Cluster to the Client

In [5]:
rawblocks.first()

"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"

Use collect() if the client node can handle all the data present in the cluster.

In [6]:
val head = rawblocks.take(5)

In [7]:
head.length

5

## Actions

The act of creating an RDD does not cause any distributed computation to take place on the cluster. Rather, RDDs define logical data sets that are intermediate steps in a computation. Distributed computation occurs upon invoking an action on an RDD.

In [8]:
head.foreach(println)

"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"
37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE
39086,47614,1,?,1,?,1,1,1,1,1,TRUE
70031,70237,1,?,1,?,1,1,1,1,1,TRUE
84795,97439,1,?,1,?,1,1,1,1,1,TRUE


In [9]:
def isHeader(line: String): Boolean = line.contains("id_1")

In [10]:
head.filter(isHeader).foreach(println)

"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"


In [11]:
head.filterNot(isHeader).length

4

### Using Anonymous function

In [12]:
head.filter(x => !isHeader(x)).length

4

In [13]:
head.filter(!isHeader(_)).length

4

## Shipping Code from the Client to the Cluster

In [14]:
val noheader = rawblocks.filter(x => !isHeader(x))

In [15]:
noheader.first()

37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE

## Structuring Data with Tuples and Case Classes

In [16]:
val line = head(4)

In [17]:
val pieces = line.split(',')

In [18]:
val id1 = pieces(0).toInt
val id2 = pieces(1).toInt
val matched = pieces(11).toBoolean
val rawscores = pieces.slice(2, 11)

def toDouble(s: String): Double = if ("?".equals(s)) Double.NaN else s.toDouble

rawscores.map(toDouble)

Array(1.0, NaN, 1.0, NaN, 1.0, 1.0, 1.0, 1.0, 1.0)

### Scala Feature: Implicit Type Conversion

Implicits work like this: if you call a method on a Scala object, and the Scala compiler does not see a definition for that method in the class definition for that object, the compiler will try to convert your object to an instance of a class that does have that method defined.

In [19]:
def parse(line: String): Tuple4[Int, Int, Array[Double], Boolean] = {
    val pieces = line.split(',')
    val id1 = pieces(0).toInt
    val id2 = pieces(1).toInt
    val scores = pieces.slice(2, 11).map(toDouble)
    val matched = pieces(11).toBoolean
    
    (id1, id2, scores, matched)
}

In [20]:
val tup = parse(line)

In [21]:
tup._1

84795

In [22]:
tup._2

97439

In [23]:
tup._3

Array(1.0, NaN, 1.0, NaN, 1.0, 1.0, 1.0, 1.0, 1.0)

In [24]:
tup._4

true

### Case classes

Using case classes, one can create a simple record type that would allow us to address our fields by name, instead of by position.

In [25]:
case class MatchData(id1: Int, id2: Int, scores: Array[Double], matched: Boolean)

In [26]:
def parse(line: String): MatchData = {
    val pieces = line.split(',')
    val id1 = pieces(0).toInt
    val id2 = pieces(1).toInt
    val scores = pieces.slice(2, 11).map(toDouble)
    val matched = pieces(11).toBoolean
    
    MatchData(id1, id2, scores, matched)
}

In [27]:
val md = parse(line)

In [28]:
md.id1

84795

In [29]:
md.id2

97439

In [30]:
md.scores

Array(1.0, NaN, 1.0, NaN, 1.0, 1.0, 1.0, 1.0, 1.0)

In [31]:
md.matched

true

In [32]:
val mds = head.filter(x => !isHeader(x)).map(x => parse(x))

In [33]:
val parsed = noheader.map(x => parse(x))

In [34]:
parsed.cache()

In [35]:
parsed.count()

5749132

### Caching

Although the contents of RDDs are transient by default, Spark provides a mechanism for persisting the data in an RDD. After the first time an action requires computing such an RDD’s contents, they are stored in memory or disk across the cluster. The next time an action depends on the RDD, it need not be recomputed from its dependencies. Its data is returned from the cached partitions directly

## Aggregations

In [36]:
val grouped = mds.groupBy(md => md.matched)

In [37]:
grouped.mapValues(x => x.size).foreach(println)

(true,4)

## Creating Histograms

In [38]:
val matchCounts = parsed.map(md => md.matched).countByValue()

In [39]:
matchCounts

Map(true -> 20931, false -> 5728201)

In [40]:
val matchCountsSeq = matchCounts.toSeq

In [41]:
matchCountsSeq.sortBy(_._1).foreach(println)

(false,5728201)
(true,20931)


In [42]:
matchCountsSeq.sortBy(_._2).reverse.foreach(println)

(false,5728201)
(true,20931)


## Summary Statistics for Continuous Variables

In [43]:
import java.lang.Double.isNaN

val stats = (0 until 9) map(i => {
    parsed.map(md => md.scores(i)).filter(!isNaN(_)).stats()
})

In [44]:
stats

Vector((count: 5748125, mean: 0.712902, stdev: 0.388758, max: 1.000000, min: 0.000000), (count: 103698, mean: 0.900018, stdev: 0.271316, max: 1.000000, min: 0.000000), (count: 5749132, mean: 0.315628, stdev: 0.334234, max: 1.000000, min: 0.000000), (count: 2464, mean: 0.318413, stdev: 0.368492, max: 1.000000, min: 0.000000), (count: 5749132, mean: 0.955001, stdev: 0.207301, max: 1.000000, min: 0.000000), (count: 5748337, mean: 0.224465, stdev: 0.417230, max: 1.000000, min: 0.000000), (count: 5748337, mean: 0.488855, stdev: 0.499876, max: 1.000000, min: 0.000000), (count: 5748337, mean: 0.222749, stdev: 0.416091, max: 1.000000, min: 0.000000), (count: 5736289, mean: 0.005529, stdev: 0.074149, max: 1.000000,...

## Creating Reusable Code for Computing Summary Statistics

Whenever we expect that some analysis task we need to perform will be useful again and again, it’s worth spending some time to develop our code in a way that makes it easy for other analysts to use the solution we come up in their own analyses. To do this, we can write Scala code in a separate file that we can then load into the Spark shell for testing and validation, and we can then share that file with others once we know that it works.

Note: Make classes as Serializable in order to use the instances of this class inside Spark RDDs, and job will fail if Spark cannot serialize the data contained inside an RDD.

Scala’s object keyword is used to declare a singleton that can provide helper methods for a class, analogous to the static method definitions on a Java class.

A good feature has two properties: it tends to have significantly different values for matches and nonmatches (so the difference between the means will be large) and it occurs often enough in the data that we can rely on it to be regularly available for any pair of records.

# Recommending Music and the Audioscrobbler Data Set

The output of a recommender is more intuitively understandable than other machine learning algorithms. A support vector machine classifier is a set of coefficients, and it’s hard even for practitioners to articulate what the numbers mean when they make predictions.

## Data Set

Audioscrobbler was the first music recommendation system for last.fm, one of the first Internet streaming
radio sites, founded in 2002. Audioscrobbler provided an open API for “scrobbling,” or recording listeners’ plays of artists’ songs. last.fm used this information to build a powerful music recommender engine.

## The Alternating Least Squares Recommender Algorithm

We need an algorithm that learns without access to user or artist attributes. These are typically called collaborative filtering algorithms.

Latent-Factor Models: They try to explain observed interactions between large numbers of users and products through a relatively small number of unobserved, underlying reasons.

Matrix Factorization Model: Mathematically, these algorithms treat the user and product data as if it were a large matrix A, where the entry at row i and column j exists if user i has played artist j. A is sparse: most entries of A are 0, because only a few of all possible user-artist combinations actually appear in the data. They factor A as the matrix product of two smaller matrices, X and Y. They are very skinny—both have many rows because A has many rows and columns, but both have just a few columns (k). The k columns correspond to the latent factors that are being used to explain the interaction data

One can use Alternating Least Squares (ALS) algorithm to compute X and Y.

## Preparing the Data

In [1]:
val rawUserArtistData = sc.textFile("hdfs:///user/root/recommendation/user_artist_data.txt")

In [2]:
rawUserArtistData.first()

1000002 1 55

In [3]:
rawUserArtistData.map(_.split(' ')(0).toDouble).stats()

(count: 24296858, mean: 1947573.265353, stdev: 496000.544975, max: 2443548.000000, min: 90.000000)

In [4]:
rawUserArtistData.map(_.split(' ')(1).toDouble).stats()

(count: 24296858, mean: 1718704.093757, stdev: 2539389.040171, max: 10794401.000000, min: 1.000000)

In [5]:
rawUserArtistData.map(_.split(' ')(2).toDouble).stats()

(count: 24296858, mean: 15.295762, stdev: 153.915321, max: 439771.000000, min: 1.000000)

In [6]:
val rawArtistData = sc.textFile("hdfs:///user/root/recommendation/artist_data.txt")

In [7]:
val artistByID = rawArtistData.flatMap { line =>
        val (id, name) = line.span(_ != '\t')
        if (name.isEmpty) {
            None
        } else {
            try {
                Some((id.toInt, name.trim))
            } catch {
                case e: NumberFormatException => None
            }
        }
    }

In [8]:
artistByID.first()

(1134999,06Crazy Life)

In [9]:
val rawArtistAlias = sc.textFile("hdfs:///user/root/recommendation/artist_alias.txt")

In [10]:
val artistAlias = rawArtistAlias.flatMap { line =>
        val tokens = line.split('\t')
        if (tokens(0).isEmpty) {
            None
        } else {
            Some((tokens(0).toInt, tokens(1).toInt))
        }
    }.collectAsMap()

In [11]:
artistAlias

Map(6803336 -> 1000010, 6663187 -> 1992, 2124273 -> 2814, 10412283 -> 1010353, 9969191 -> 1320354, 2024757 -> 1001941, 10208201 -> 4605, 2139121 -> 1011083, 1186393 -> 78, 2094504 -> 1012167, 9931106 -> 1000289, 2167517 -> 2060894, 1351735 -> 1266817, 6943682 -> 1003342, 2027368 -> 1000024, 2056419 -> 1020783, 1214789 -> 1001066, 1022944 -> 1004983, 6640739 -> 1010367, 6902331 -> 411, 10303141 -> 82, 10029249 -> 2070, 7001129 -> 739, 6627784 -> 1046699, 1113560 -> 1275800, 2155414 -> 1000790, 1291139 -> 4163, 10061700 -> 831, 1043158 -> 1301875, 10294241 -> 1234737, 9991298 -> 1001419, 9965450 -> 1016520, 6800447 -> 1078506, 1042440 -> 304, 1068288 -> 1001417, 6729982 -> 1809, 1138035 -> 1406, 1278247 -> 1239248, 1115453 -> 3824, 7035536 -> 3447, 70...

In [12]:
artistByID.lookup(6803336)

WrappedArray(Aerosmith (unplugged))

In [13]:
artistByID.lookup(1000010)

WrappedArray(Aerosmith)

## Building a First Model

In [14]:
import org.apache.spark.mllib.recommendation._

val bArtistAlias = sc.broadcast(artistAlias)

val trainData = rawUserArtistData.map { line =>
        val Array(userID, artistID, count) = line.split(' ').map(_.toInt)
        val finalArtistID = bArtistAlias.value.getOrElse(artistID, artistID)
        Rating(userID, finalArtistID, count)
    }.cache()

In [15]:
trainData.first()

Rating(1000002,1,55.0)

### Broadcast Variables

When Spark runs a stage, it creates a binary representation of all the information needed to run tasks in that stage, called the closure of the function that needs to be executed. This closure includes all the data structures on the driver referenced in the function. Spark distributes it to every executor on the cluster.

Broadcast variables are useful in situations where many tasks need access to the same (immutable) data structure. They extend normal handling of task closures to enable:

1. Caching data as raw Java objects on each executor, so they need not be deserialized for each task
2. Caching data across multiple jobs and stages

In [None]:
val model = ALS.trainImplicit(trainData, 10, 5, 0.01, 1.0)

## Spot Checking Recommendations

We should first see if the artist recommendations make any intuitive sense, by examining a user, his or her plays, and recommendations for that user.

In [16]:
val rawArtistsForUser = rawUserArtistData.map(_.split(' ')).
    filter { case Array(user, _, _) => user.toInt == 2093760}

In [18]:
val existingProducts = rawArtistsForUser.map { case Array(_, artist, _) => artist.toInt}.collect().toSet

In [20]:
artistByID.filter { case (id, name) => existingProducts.contains(id)}.values.collect().foreach(println)

                                                                                David Gray
Blackalicious
Jurassic 5
The Saw Doctors
Xzibit


In [None]:
val recommendations = model.recommendProducts(2093760, 5)
recommendations.foreach(println)

## Evaluating Recommendation Quality

To make this meaningful, some of the artist play data can be set aside and hidden from the ALS model building process. Then, this held-out data can be interpreted as a collection of good recommendations for each user, but one that the recommender has not already been given. The recommender is asked to rank all items in the model, and
the ranks of the held-out artists are examined. Ideally, the recommender places all of them at or near the top of the list.

We can then compute the recommender’s score by comparing all held-out artists’ ranks to the rest. (In practice, we compute this by examining only a sample of all such pairs, because a potentially huge number of such pairs may exist.) The fraction of pairs where the held-out artist is ranked higher is its score. 1.0 is perfect, 0.0 is the
worst possible score, and 0.5 is the expected value achieved from randomly ranking artists.

This metric is directly related to an information retrieval concept, called the Receiver Operating Characteristic (ROC) curve. The metric in the preceding paragraph equals the area under this ROC curve, and is indeed known as AUC, for Area Under the Curve. AUC may be viewed as the probability that a randomly chosen good recommendation ranks above a randomly chosen bad recommendation.

## Hyperparameter Selection

Choosing good hyperparameter values is a common problem in machine learning. The most basic way to choose values is to simply try combinations of values and evaluate a metric for each of them, and choose the combination that produces the best value of the metric.