# ML with large datasets
In this tutorial, we'll explore what happens when a dataset becomes large as we 
train a machine learning model with standard packages such as sklearn. We will do the following:
1. Investigate model training running time on a ```4-GB``` dataset while we modify the input number of rows and number of cores being run on.
2. Explore how to build a simple ML model using sparkMLlib

We will use the dataset from Kaggl Expedia competition. Please read about the competition, lear about whats being predicted and more from [here](https://www.kaggle.com/c/expedia-hotel-recommendations). You can log into Kaggle with your Google account. 

In [3]:
import pandas as pd
from sklearn.ensemble import RandomForestClassifier  as RF
from sklearn.model_selection import KFold, train_test_split
import datetime

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.ml.classification import RandomForestClassifier
from pyspark.mllib.util import MLUtils
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

## Investigate model training time in sklearn

In [None]:
def map5eval(preds, dtrain):
    actual = dtrain.get_label()
    predicted = preds.argsort(axis=1)[:,-np.arange(5)]
    metric = 0.
    for i in range(5):
        metric += np.sum(actual==predicted[:,i])/(i+1)
    metric /= actual.shape[0]
    return 'MAP@5', -metric

In [11]:
DATA_FILE = "../DATA/kaggle-expedia-train.csv"
COLS = ['site_name', 'user_location_region', 'is_package', 'srch_adults_cnt', 'srch_children_cnt','srch_destination_id', 'hotel_market', 'hotel_country', 'hotel_cluster']

In [None]:
df = pd.read_csv(DATA_FILE, nrows=10000)

In [None]:
df.hotel_cluster.value_counts()

In [5]:
def read_pandas_df(num_rows=100000, chunk_size=100000):
    
    tot_rows = 0
    df_train = pd.DataFrame(columns=COLS)
    train_chunk = pd.read_csv(DATA_FILE, chunksize=chunk_size)
    i = 0
    for chunk in train_chunk:
        df_train = pd.concat([df_train, chunk[chunk['is_booking'] == 1][COLS]])
        tot_rows += df_train.shape[0]
        i = i + 1
#         if i % 10 == 0:
#             print("Rows loaded: " + str(i / 10) + "mill")
        
#         if (num_rows - tot_rows) < 1000:
#             break
    
    return df_train

In [6]:
def build_RF_with_sklearn(df):
    for column in df:
        df[column] = df[column].astype(str).astype(int)

    # print(df_train.shape())
    X = df.drop(['hotel_cluster'],axis=1)
    y = df['hotel_cluster'].values
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42)
    start = datetime.datetime.now()
    clf = RF(n_jobs=-1)
    clf.fit(X_train, y_train)
    pred = clf.predict(X_test)
    end = datetime.datetime.now()
    time_taken = (end-start).total_seconds()/60
    num_rows = X.shape[0]
    print('Training {:,} rows took {} minutes with all cores'.format(num_rows, time_taken))
    return time_taken

In [9]:
rows = [i*1e6 for i in range(5,20)]
data = []
current_nrows = 0
for r in rows:
    try:
        df = read_pandas_df(num_rows=r, chunk_size=2000000)
        time_taken = build_RF_with_sklearn(df)
        pandas_outputs.append({'TimeTaken': round(time_taken, 4), 'NumRows': df.shape[0]})
    except:
        continue

In [None]:
df = read_pandas_df(num_rows=5000000, chunk_size=1000000)

In [None]:
df.sha

## Explore sparkMLlib API
We will follow this simple tutorial from [sparkMLlib](https://spark.apache.org/docs/1.2.1/mllib-guide.html) page before using our own dataset.

### Data structures
The bad news is that you often have to transform your dataset to match with data 
structured required by MLlib. The way that features and labels are represented in spark-MLLib is 
different from how its done in sklearn. In spark, they use the class, 
LabeledPoint, which takes both dense and sparse feature vectors.

**EXERCISE-1:** Learn more about LabeledPoint, dense and sparse feature vectors.
Use MLlib documentation page to learn more about these data structures.
- How do they differ from regular Python Numpy arrays which are used in sklelarn and pandas?
- How do you create them?

In [13]:
# Create a labeled point with a positive label and a dense feature vector.
pos = LabeledPoint(1.0, [1.0, 0.0, 3.0])

# Create a labeled point with a negative label and a sparse feature vector.
neg = LabeledPoint(0.0, SparseVector(3, [0, 2], [1.0, 3.0]))

### Build simple model using RDD API

In [16]:
def parsePoint(line):
    """
    Convert row into a LabeledPoint data type as required by spark
    """
    values = [float(x) for x in line.split(' ')]
    return LabeledPoint(values[0], values[1:])

In [14]:
spark = SparkSession.builder.master("local[*]").appName("ML-app").getOrCreate()
sc = spark.sparkContext
data_rdd = sc.textFile("../DATA/sample_svm_data.txt")
# parsedData = data_rdd.map(parsePoint)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/19 08:32:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/01/19 08:32:06 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [15]:
data_rdd.take(1)

                                                                                

['1 0 2.52078447201548 0 0 0 2.004684436494304 2.000347299268466 0 2.228387042742021 2.228387042742023 0 0 0 0 0 0']

**EXERCISE-2:** Convert this RDD to dataframe

In [18]:
# Load and parse the data file into an RDD of LabeledPoint.
data = data_rdd.map(parsePoint)
# data = MLUtils.loadLibSVMFile(sc, '../DATA/sample_libsvm_data.txt')
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
#  Empty categoricalFeaturesInfo indicates all features are continuous.
#  Note: Use larger numTrees in practice.
#  Setting featureSubsetStrategy="auto" lets the algorithm choose.
model = RandomForest.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={},
                                     numTrees=3, featureSubsetStrategy="auto",
                                     impurity='gini', maxDepth=4, maxBins=32)

# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(
    lambda lp: lp[0] != lp[1]).count() / float(testData.count())
print('Test Error = ' + str(testErr))
print('Learned classification forest model:')
print(model.toDebugString())

# Save and load model
model_output = "../MODELS/spark-models/spark-RF2"
model.save(sc, model_output)
sameModel = RandomForestModel.load(sc, model_output)

Test Error = 0.5
Learned classification forest model:
TreeEnsembleModel classifier with 3 trees

  Tree 0:
    If (feature 3 <= 1.3099825520441275)
     If (feature 14 <= 6.364083791088865)
      If (feature 11 <= 1.027501437932207)
       If (feature 0 <= 1.428869016623521)
        Predict: 1.0
       Else (feature 0 > 1.428869016623521)
        Predict: 0.0
      Else (feature 11 > 1.027501437932207)
       Predict: 0.0
     Else (feature 14 > 6.364083791088865)
      Predict: 1.0
    Else (feature 3 > 1.3099825520441275)
     If (feature 1 <= 1.26039223600774)
      If (feature 11 <= 1.027501437932207)
       Predict: 1.0
      Else (feature 11 > 1.027501437932207)
       If (feature 4 <= 2.372526427751653)
        Predict: 1.0
       Else (feature 4 > 2.372526427751653)
        Predict: 0.0
     Else (feature 1 > 1.26039223600774)
      Predict: 1.0
  Tree 1:
    If (feature 3 <= 1.3099825520441275)
     If (feature 9 <= 1.1141935213710115)
      Predict: 0.0
     Else (feature 9 >

                                                                                

In [20]:
predictions = sameModel.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(
    lambda lp: lp[0] != lp[1]).count() / float(testData.count())
print('Test Error = ' + str(testErr))

Test Error = 0.5


### Build model with our own custom dataset
We will use the expedia dataset for this.

In [None]:
def build_model_with_sparkMLlib(input_file, sample_size=0.5):
    
    # Load and parse the data file, converting it to a DataFrame.
    spark = SparkSession.builder.master("local[*]").appName("ML-app").getOrCreate()
    schema = "`user_location_region` INT, `is_package` INT,`srch_adults_cnt` INT, `srch_children_cnt` INT,`srch_destination_id` INT, `hotel_market` INT, `hotel_country` INT,`hotel_cluster` INT"
    sdf = spark.read.schema(schema).csv(input_file)
    sdf = sdf.dropna()
    sdf_sample = sdf.sample(fraction=0.5).cache()
    nrows = sdf_sample.count()
    
    
    feature_cols = list(set(sdf.columns)-set(['hotel_cluster']))
    print(feature_cols)
    assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
    transformed_train_data = assembler.transform(sdf_sample).cache()
    
    (train, test) = transformed_train_data.randomSplit([0.8, 0.2])
    
    rf = RandomForestClassifier(labelCol='hotel_cluster', featuresCol='features')
    rf.fit(train)
    
    print('Done fitting model')

In [21]:
spark = SparkSession.builder.master("local[*]").appName("ML-app").getOrCreate()
schema = "`user_location_region` INT, `is_package` INT,`srch_adults_cnt` INT, `srch_children_cnt` INT,`srch_destination_id` INT, `hotel_market` INT, `hotel_country` INT,`hotel_cluster` INT"
sdf = spark.read.schema(schema).csv(DATA_FILE)
sdf_sample = sdf.sample(fraction=0.5).cache()
#nrows = sdf_sample.count()

In [22]:
# Check nulls
sdf_sample.select([count(when(isnull(c), c)).alias(c) for c in sdf_sample.columns]).show()



+--------------------+----------+---------------+-----------------+-------------------+------------+-------------+-------------+
|user_location_region|is_package|srch_adults_cnt|srch_children_cnt|srch_destination_id|hotel_market|hotel_country|hotel_cluster|
+--------------------+----------+---------------+-----------------+-------------------+------------+-------------+-------------+
|            18827216|         0|              0|                0|                  0|           0|     18827216|            0|
+--------------------+----------+---------------+-----------------+-------------------+------------+-------------+-------------+



                                                                                

In [23]:
# drop nulls
cols_to_drop = ['user_location_region', 'hotel_country']
sdf_sample2 = sdf_sample.drop(*cols_to_drop)

In [24]:
sdf_sample3 = sdf_sample2.dropna(how='any')
#sdf_sample3.count()

In [None]:
# labelIndexer = StringIndexer(inputCol="hotel_cluster", outputCol="indexedLabel").fit(sdf_sample3)

In [None]:
sdf_sample3.select([count(when(isnull(c), c)).alias(c) for c in sdf_sample3.columns]).show()

In [26]:
feature_cols = list(set(sdf_sample3.columns)-set(['hotel_cluster']))
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
transformed_train_data = assembler.transform(sdf_sample3)

In [27]:
transformed_train_data.show(5)

+----------+---------------+-----------------+-------------------+------------+-------------+--------------------+
|is_package|srch_adults_cnt|srch_children_cnt|srch_destination_id|hotel_market|hotel_cluster|            features|
+----------+---------------+-----------------+-------------------+------------+-------------+--------------------+
|         2|              3|               66|                348|       48862|           12|[348.0,3.0,66.0,2...|
|         2|              3|               66|                348|       48862|           12|[348.0,3.0,66.0,2...|
|         2|              3|               66|                442|       35390|           93|[442.0,3.0,66.0,2...|
|         2|              3|               66|                189|       10067|          501|[189.0,3.0,66.0,2...|
|         2|              3|               66|                189|       10067|          501|[189.0,3.0,66.0,2...|
+----------+---------------+-----------------+-------------------+------------+-

In [28]:
(train, test) = transformed_train_data.randomSplit([0.8, 0.2])

In [29]:
rf = RandomForestClassifier(labelCol='hotel_cluster', featuresCol='features')
rf.fit(train)



23/01/19 08:46:42 ERROR Instrumentation: java.lang.IllegalArgumentException: requirement failed: Classifier inferred 1198786 from label values in column RandomForestClassifier_9a220eeb559c__labelCol, but this exceeded the max numClasses (100) allowed to be inferred from values.  To avoid this error for labels with > 100 classes, specify numClasses explicitly in the metadata; this can be done by applying StringIndexer to the label column.
	at scala.Predef$.require(Predef.scala:281)
	at org.apache.spark.ml.classification.Classifier.getNumClasses(Classifier.scala:157)
	at org.apache.spark.ml.classification.RandomForestClassifier.$anonfun$train$1(RandomForestClassifier.scala:143)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.classification.RandomForestClassifier.train(RandomForestClassifier.

                                                                                

IllegalArgumentException: requirement failed: Classifier inferred 1198786 from label values in column RandomForestClassifier_9a220eeb559c__labelCol, but this exceeded the max numClasses (100) allowed to be inferred from values.  To avoid this error for labels with > 100 classes, specify numClasses explicitly in the metadata; this can be done by applying StringIndexer to the label column.

23/01/19 11:02:09 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 368429 ms exceeds timeout 120000 ms
23/01/19 11:02:09 WARN SparkContext: Killing executors is not supported by current scheduler.
