In [None]:
MAKING PREDICTIONS

One of the great things about scikit-learn is that we can easily try a bunch of different linear models on the same data. This will give us some clues as to where we can start tuning. We will start with six of them: vanilla linear regression, ridge and lasso regressions, ElasticNet, bayesian ridge, and a lesser used one called Orthogonal Matching Pursuit.

To evaluate which model(s) are doing better, we will need some way to score the results. In this example I've chosen median absolute error, mainly because it makes sense at a glance (it easily translates to a dollar amount relative to price) and is less sensitive to outliers than other metrics like mean squared error.

Speaking of which, it is fairly likely that we have some outliers in the data since we haven't done any filtering or clustering for them, so this is a good way to get a quick and dirty measure of performance before we move to fine-tuning (and of course, we could do more with outliers in the data preparation step).

In [None]:
rs = 1
ests = [ linear_model.LinearRegression(), linear_model.Ridge(),
        linear_model.Lasso(), linear_model.ElasticNet(),
        linear_model.BayesianRidge(), linear_model.OrthogonalMatchingPursuit() ]
ests_labels = np.array(['Linear', 'Ridge', 'Lasso', 'ElasticNet', 'BayesRidge', 'OMP'])
errvals = np.array([])

X_train, X_test, y_train, y_test = train_test_split(alldata.drop(['price'], axis=1),
                                                    alldata.price, test_size=0.2, random_state=20)

for e in ests:
    e.fit(X_train, y_train)
    this_err = metrics.median_absolute_error(y_test, e.predict(X_test))
    #print "got error %0.2f" % this_err
    errvals = np.append(errvals, this_err)

pos = np.arange(errvals.shape[0])
srt = np.argsort(errvals)
plt.figure(figsize=(7,5))
plt.bar(pos, errvals[srt], align='center')
plt.xticks(pos, ests_labels[srt])
plt.xlabel('Estimator')
plt.ylabel('Median Absolute Error')

In [None]:
Looking at the error from each of these six estimators, they appear to be roughly the same with most of the estimators being able to predict the price with a median error around 30-35 dollars, with BayesRidge coming out on top by a small margin. Having the results be this close isn't surprising, because we haven't done any tuning. The results give us a good general idea of where we stand with the individual estimators.

Next we'll try an ensemble method to see if we can get better results. The upside is that we will likely get a better score, and the downside is that the ensemble methods have a bewildering variety of hyperparameters that must be tuned, each of which can affect our model and requires some experimentation to get right. The common way to approach this is to use an exhaustive "grid search" which simply tries all the supplied parameter combinations and uses cross-validation folding to find the best one. Scikit-learn provides the very handy GridSearchCV function for this purpose.

The tradeoff of using GridSearchCV is that the exhaustive search and cross-validation can take a lot of CPU and time. This is where we can use Spark to distribute the search to more machines and cores, enabling us to test more combinations faster.

For our first attempt, we'll limit the number of parameters just so we can get the results back quickly and see if we're doing better than any of the individual methods above.

In [None]:
n_est = 300

tuned_parameters = {
    "n_estimators": [ n_est ],
    "max_depth" : [ 4 ],
    "learning_rate": [ 0.01 ],
    "min_samples_split" : [ 1 ],
    "loss" : [ 'ls', 'lad' ]
}

gbr = ensemble.GradientBoostingRegressor()
clf = GridSearchCV(gbr, cv=3, param_grid=tuned_parameters,
        scoring='median_absolute_error')
preds = clf.fit(X_train, y_train)
best = clf.best_estimator_

In [None]:
The result of this attempt is a median error of $23.64.

Already, it looks like we're doing better with GradientBoostingRegressor than we were with any of the prior attempts. Without doing any tuning, the median error is around 20% less than the best error of the previous group (with BayesRidge()).

Let's get a quick measure of how the error is affected by each round of boosting, so we can see if throwing more iterations at the problem is going to help.

In [None]:
# plot error for each round of boosting
test_score = np.zeros(n_est, dtype=np.float64)

train_score = best.train_score_
for i, y_pred in enumerate(best.staged_predict(X_test)):
    test_score[i] = best.loss_(y_test, y_pred)

plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.plot(np.arange(n_est), train_score, 'darkblue', label='Training Set Error')
plt.plot(np.arange(n_est), test_score, 'red', label='Test Set Error')
plt.legend(loc='upper right')
plt.xlabel('Boosting Iterations')
plt.ylabel('Least Absolute Deviation')

In [None]:
It looks like the curve is flattening near the right side of the plot (after around 200-250 iterations), but is still benefitting from more iterations, so we could increase the iterations to 500 without much thought.

Now we can do some tuning with GridSearchCV and explore more of the hyperparameter combinations. However, this requires some serious CPU and a complete run of all of the combinations can easily take hours to finish, even on a small number of rows.

By simply replacing the following lines in our file, we can use the new spark-sklearn integration package running on the MapR 5.1 platform with Spark as a YARN client to distribute the iterations across the machines in a cluster. This allows us to test more hyperparameter combinations, ultimately reducing error, and we can do it all in less time.

from pyspark import SparkContext, SparkConf
from spark_sklearn import GridSearchCV

conf = SparkConf()
sc = SparkContext(conf=conf)
clf = GridSearchCV(sc, gbr, cv=3, param_grid=tuned_parameters, scoring='median_absolute_error')
It's worth pausing here to note that the architecture of this approach is different than that used by MLlib in Spark. Using spark-sklearn, we're simply distributing the cross-validation run of each model (with a specific combination of hyperparameters) across each Spark executor. Spark MLlib, on the other hand, will distribute the internals of the actual learning algorithms across the cluster.
The main advantage of spark-sklearn is that it enables leveraging the very rich set of machine learning algorithms in scikit-learn. These algorithms do not run natively on a cluster (although they can be parallelized on a single machine) and by adding Spark, we can unlock a lot more horsepower than could ordinarily be used.

Using this approach we can get the error down even further, to $21.43, with a substantial reduction in time, as shown in the below chart. This was run on a 4-node MapR 5.1 cluster with Spark in YARN client mode, each node having the following configuration:

Machine: HP DL380 G6
Memory: 128G
CPU: (2x) Intel X5560
Disk: (6x) 1TB 7200RPM disks
time_chart

Finally let's look at the feature importances to see which features were most influential in predicting the listing price. This will show us a relative scoring of how important each feature is relative to the feature with the most importance.

feature_importance = clf.best_estimator_.feature_importances_
feature_importance = 100.0 * (feature_importance / feature_importance.max())
sorted_idx = np.argsort(feature_importance)
pos = np.arange(sorted_idx.shape[0]) + .5
pvals = feature_importance[sorted_idx]
pcols = X_train.columns[sorted_idx]
plt.figure(figsize=(8,12))
plt.barh(pos, pvals, align='center')
plt.yticks(pos, pcols)
plt.xlabel('Relative Importance')
plt.title('Variable Importance')
feature_importances

Clearly some of the variables have more influence than others, and the results here are somewhat intuitive. The most influential feature is the 'Entire home/apt' attribute; this indicates whether or not the unit is shared with other people, and has the most effect in setting the price. We would probably expect this feature to be high on the list, and it might be even higher o Reviews are important, as is the short-term availability of the unit, and the Ocean View and Excelsior neighborhood features also scored high.

CONCLUSIONS

In this example we looked at how to predict the price using multiple factors, then scale-out our cross validation and hyperparameter search across a MapR cluster using Spark. We learned the following key points:
In this case, the ensemble method we tried (GradientBoostingRegressor) had better results than any individual estimator.
With GridSearchCV, we tested more hyperparameter combinations that ultimately led us to a better result.
Using spark-sklearn is a straightforward way to throw more CPU at any machine learning problem you might have. We used the package to reduce the time spent searching and reduce the error for our estimator.
There are many ways in which our first-pass analysis could be improved. For example, we might start including more of the data from Inside Airbnb. We could do sentiment analysis on the text of the reviews, bringing this in as an additional feature.

A quick way to try the example code here on your own is with the MapR Sandbox, which comes pre-installed with MapR-FS and Spark.

A complete example

As an illustration, let’s take an example of a Pig script that loads a log file, filters it for a specific day, calculates the number of log entries grouped by item, and adds the item description from another file :

/* load a log file of user sessions. Filter for a specific date and count entries per item
*/
 
f0 = LOAD 'logfile' using PigStorage('\t') AS (log_date:chararray, item_id:chararray, some_stuff:chararray);
 
f1 = FILTER f0 BY log_date == '20160515';
 
f2 = FOREACH f1 GENERATE item_id;
 
f3 = GROUP f2 BY item_id;
 
f4 = FOREACH f3 GENERATE group AS item_id, COUNT(f2) AS nb_entries;
 
/* add item name
*/
 
item1 = LOAD 'item' using PigStorage('\t') AS (item_id:chararray, item_name:chararray);
 
join1 = JOIN f4 BY item_id LEFT, item1 BY item_id;
 
result = FOREACH join1 GENERATE f4::item_id, item_name, nb_entries;
 
STORE result INTO 'result_file' USING PigStorage('\t');
The code is fairly simple, and each step performs one transformation.

Now in Spark, we start with raw Spark using low-level RDDs to show similarities with Pig code. In the code, things are detailed one alias at a time, but obviously production code would be more compact.

Raw Spark (using RDD)

conf = SparkConf()
sc = SparkContext(conf=conf)
 
f0 = sc.textFile('logfile').map(lambda x: x.split('\t'))
 
f1 = f0.filter(lambda x: x[0] == '20160515')
 
f3 = f1.groupBy(lambda (log_date, item_id, some_stuff): item_id)
f4 = f3.map (lambda (item_id, iterable): (item_id, len(iterable)))
 
# add item name
item1 = sc.textFile('item').map(lambda x: x.split('\t'))
 
# no need to set the key item_id on both parts before performing the join,
# It's already on first place on each part.
 
join1 = f4.leftOuterJoin(item1)
 
result = join1.map(lambda (item_id, (nb_entries, item_name)): (item_id, item_name, str(nb_entries)))
 
# creating a line of tab separated fields, and save it in the result file
result_to_store = result.map (lambda record : '\t'.join(record))
result_to_store.saveAsTextFile('result_file')
We can see here a similar code outline between Pig and Spark, which makes it easier for a Pig developer to start coding in Spark. One drawback, however, is that for relatively simple operations like this, Pig is still more productive than Spark, even if execution time is better (but not astoundingly better) with Spark.

Now that we are getting familiar with this low-level RDD, code could be improved by using DataFrames and SparkSQL. The previous code could be rewritten in a more readable form:

Spark with DataFrames and SparkSQL

conf = SparkConf()

sc = SparkContext(conf=conf)
 
sqlContext = SQLContext(sc)
 
f0 = sc.textFile('logfile').map(lambda x: x.split('\t'))
 
fpFields = [ \
   StructField('log_date', StringType(), True), \
   StructField('item_id', StringType(), True), \
   StructField('some_stuff', StringType(), True) \
]
 
fpSchema = StructType(fpFields)
df_f0 = sqlContext.createDataFrame(f0, fpSchema)
df_f0.registerTempTable('log')
 
f1_df = sqlContext.sql(
   "SELECT log.item_id, count(*) AS nb_entries \
      FROM log \
     WHERE log_date = '20160515'\
  GROUP BY item_id"
)
f1_df.registerTempTable('log_agg')


# items dataframe
 
item1 = sc.textFile('item').map(lambda x: x.split('\t'))
 
itemFields = [ \
   StructField('item_id', StringType(), True), \
   StructField('item_name', StringType(), True) \
]
 
itemSchema = StructType(itemFields)
df_item1 = sqlContext.createDataFrame(item1, itemSchema)
 
df_item1.registerTempTable('item')
 
result = sqlContext.sql(
   'SELECT log_agg.item_id, item_name, format_number(nb_entries, 0) \
      FROM log_agg \
  LEFT OUTER JOIN item ON log_agg.item_id = item.item_id'
)
 
result_to_store = result.rdd \
     .map (lambda record : '\t'.join(record))
 
result_to_store.saveAsTextFile('result_file')
I’m sure there are even more compact and elegant ways to do it in Spark SQL, but this is the outline.

Now we have named fields, type safety, and compact SQL code that is more readable by a data analyst. Productivity has increased, and this is a better alternative to Pig.

The drawback is that each piece of SQL is now a black box that can be only tested as a whole, which can prove tricky if the result differs from the expected or if execution time is slow. It is then up to the developer to design steps that are still readable and could be executed as individual units of code.

Loading data from Hive metastore HCatalog

If our data would have been stored in Hive HCatalog, all the DataFrame metadata would be inherited from the metastore and the Spark code would have been even simpler:

conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
 
f1_df = sqlContext.sql(
   "SELECT item_id, count(*) AS nb_entries \
   FROM my_db.log \
   WHERE log_date = '20160515' \
   GROUP BY item_id"
)
 
f1_df.registerTempTable('log_agg')
 
result = sqlContext.sql(
   "SELECT log_agg.item_id, item_name, format_number(nb_entries, 0) \
      FROM log_agg \
LEFT OUTER JOIN my_db.item item ON log_agg.item_id = item.item_id"
)
 
result_to_store = result.rdd \
   .map (lambda record : '\t'.join(record))
 
result_to_store.saveAsTextFile(outputFileName)
Now this is a more compact and readable piece of code :)

Now let's push the advantage a bit further in favor of Spark: user-defined functions.

User-defined functions

As stated previously, in Spark there is obviously no need for UDFs; you would just write the function as a Python method :

in Pig :

/* the function below has been written and deployed in a jar file */
DEFINE myFancyUdf com.mydomain.myfunction1;
 
...
 
log1 = FOREACH log0 GENERATE field1, myFancyUdf (field1l);
In Spark:

def myFancyUdf(f1):
   someStuff
   return result
 
log1 = log0.map (lambda field1: (field1, myFancyUdf(field1))
More advanced topics

In this section, let's take a look at more powerful features of Pig in Spark through two examples :

Map-side joins

One handy feature of Pig is map-side joins, where one of the tables to join is small enough to be sent to each worker to take part in the Map job (not requiring the more expensive Reduce job). This is conveniently performed by using the “replicated” hint on the JOIN.

Imagine that in our previous example, the ‘item’ table is small enough to fit in memory. The join1 alias becomes :

join1 = JOIN f4 BY item_id, item1 BY item_id USING ‘replicated;
 
result = FOREACH join1 GENERATE f4::item_id, item_name, nb_entries;
In Spark this is performed quite easily with broadcast variables:

# broadcast items
item_bc = sc.broadcast(item.collect())
 

'''
gets item name from its id
'''

def getItemName (item_id_to_match): # we know there will be only one result, so we take the first from the list
  (id, name) = filter(lambda (id, name): id == item_id_to_match, item_bc.value)[0]
The item table is broadcasted on each worker node. The getItemName() function then finds in the broadcasted table which record holds a given item_id and returns its name. This function is called in the map side of the Spark job, for each record processed.

The complete code now looks like:


'''
gets item name from its id
'''


def getItemName (item_id_to_match):

# we know there will be only one result, so we take the first from the

(id, name) = filter(lambda (id, name): id == item_id_to_match, item_bc.value)[0]
   return name
 
f1_df = sqlContext.sql(
  "SELECT item_id, count(*) AS nb_entries \
     FROM my_db.log \
    WHERE log_date = '20160515' \
   GROUP BY item_id"
)
 
item_df = sqlContext.sql(
   "SELECT item_id, item_name \
      FROM my_db.item"
)
 
item_bc = sc.broadcast(item_df.rdd.collect())
 
result = f1_df.rdd.map (lambda= result.map (lambda record : '\t'.join(record))
result_to_store.saveAsTextFile('result_file')
Window function : get n first occurrences of a sorted list of Grouped By items

It is sometimes required to find the top-n first records of a table, grouped by a common feature. From the log files of our example, let's get, for each item, the 10 most recent records (in SQL this would be a windowing function like PARTITION BY).

In Pig, this can be accomplished with a piece of code like :

f0 = LOAD ‘logfile’ using PigStorage('\t') AS (log_date:char array, item_id:chararray, some_stuff:chararray); 
 
f1 = GROUP f0 BY item_id; 
 
f2 = FOREACH f1 {
   o = ORDER f0 BY log_date DESC;
   l = LIMIT o 10;
   GENERATE FLATTEN(l) AS (log_date, item_id, some_stuff);
}
In Spark it’s also feasible, either with low-level RDD stuff or with SparkSQL Windowing capabilities.

Let’s start with the RDD low-level solution :

# create a tuple with the key for the GroupBy
f1 = f0.map (lambda (log_date, item_id, some_stuff): (item_id, (log_date, some_stuff)))
 
f2 = f1.groupByKey()
 
# result of the GroupBy is a tuple (item_id, iterable over grouped items)
# we sort the iterable according to log_date and retain only first 10 elements
f3 = f2.map (lambda (item_id, iter1): (item_id, sorted(list(iter1), key=lambda (log_date, item_id, some_stuff):log_date, reverse=True)[:10]))
 
# transform tuples of (item_id, [(log_date, item_id, some_stuff), ...]) into tuples of (log_date, item_id, some_stuff)
f4 = f3.flatMapValues(lambda x:x) \
.map (lambda (item_id, (log_date, some_stuff)):(log_date, item_id, some_stuff)
It's not very elegant, but it does the job.

Then the SparkSQL solution :

f1_df = sqlContext.sql(
'SELECT \
  log_date, \
  item_id,  \
  some_stuff  \
FROM (  \
  SELECT  \
  log_date, \
  item_id,  \
  some_stuff, \
  dense_rank() OVER (PARTITION BY item_id ORDER BY log_date DESC) as rank \
FROM my_db.log) tmp \
WHERE rank <= 10')
 
f2 = f1_df.rdd.map (lambda row: (row.log_date, row.item_id, row.some_stuff))
Much better!

Conclusion

I have voluntarily excluded from this blog post some interesting topics such as deploying, debugging, execution monitoring, dynamic resource allocation, partition and split size tuning, sampling, etc. The goal of this particular blog post is to show Pig developers how to start coding in Spark; I hope that from this perspective, you find it is helpful. If you have any further questions, please ask them in the comments section below.


Ebook: Getting Started with Apache Spark
Interested in Apache Spark? Experience our interactive ebook with real code, running in real time, to learn more about Spark.
Download now

BLOG SIGN UP

Sign up and get the top posts from each week delivered to your inbox every Friday!

*
Email Address
Subscribe

Philippe de Cuzey
DATA ANALYST, SELF EMPLOYED
Philippe has a background as an Oracle DBA working on JEE projects, he has welcomed and quickly embraced the new era of NoSQL and scalable distributed computing. Since 2014 he has been working mainly on Hadoop, specifically, working with Pig, Spark and Spark ML. He is based in Paris, France.

FOLLOW MAPR



Dev Ops Hub RSS
Big Data Hub RSS
STREAMING DATA ARCHITECTURE:
New Designs Using Apache Kafka and MapR Streams

 

 

 

Download for free 
RELATED POSTS
Using Apache Spark DataFrames for Processing of Tabular Data
Real Time Credit Card Fraud Detection with Apache Spark and Event Streaming
Apache Spark Machine Learning Tutorial
WHY MAPR
Why MapR
Why Hadoop
SQL on Hadoop
What is Apache Hadoop
Customer Value
 
PRODUCTS & SERVICES
MapR Converged Data Platform
Platform Services
Open Source Engines
Commercial Engines
Deployment Options
Professional Services
Solutions
 
PARTNERS
Featured Partners
Converge Partners
App Gallery
Find a Partner
Submit an Opportunity
 
TRAINING
Hadoop On-Demand Training
Instructor-Led Training
Certification
 
COMMUNITY
Technical Questions & Answers
Training FAQs & Discussions
Release Announcements
Product Ideas
Meetups
 
RESOURCES
Resource Library
Docs
Community
Blog
Events
Webinars
Demos
TCO Calculator
 
COMPANY
About Us
Press Releases
News
Awards
Leadership
Investors
Careers
Contact MapR
 
© 2016 MapR Technologies, Inc. All Rights Reserved EULA | Privacy Policy | Trademarks
 
