Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark cluster #21

Open
szilard opened this issue May 10, 2019 · 13 comments
Open

Spark cluster #21

szilard opened this issue May 10, 2019 · 13 comments

Comments

@szilard
Copy link
Owner

szilard commented May 10, 2019

previous results single server:

    100M     10M    
trees depth time [s] AUC RAM [GB] time [s] AUC RAM [GB]
1 1 1150 0.634 620 70 0.635 110
1 10 1350 0.712 620 90 0.712 112
10 10 7850 0.731 780 830 0.731 125
100 10 crash OOM   >960 (OOM) 8070 0.755 230

10M ran on:
r4.8xlarge (32 cores, 1 NUMA, 240GB RAM)

100M ran on:
x1e.8xlarge (32 cores, 1 NUMA, 960GB RAM)

@szilard
Copy link
Owner Author

szilard commented May 11, 2019

first try same 1 master+1 slave:

Screen Shot 2019-05-11 at 10 56 09 AM
Screen Shot 2019-05-11 at 10 56 55 AM

wget https://s3.amazonaws.com/benchm-ml--main/train-10m.csv && \
wget https://s3.amazonaws.com/benchm-ml--main/test.csv

hadoop fs -copyFromLocal train-10m.csv /train-10m.csv
hadoop fs -copyFromLocal test.csv /test.csv

spark-shell
import org.apache.spark.ml.feature.RFormula
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.types.DoubleType

val loader = spark.read.format("com.databricks.spark.csv").option("header", "true")
val trainDF = loader.load("/train-10m.csv")
val testDF = loader.load("/test.csv")

val fullDF0 = trainDF.withColumn("isTrain", lit(true)).unionAll(testDF.withColumn("isTrain", lit(false)))

val fullDF = fullDF0.withColumn("DepTime", col("DepTime").cast(DoubleType)).withColumn("Distance", col("Distance").cast(DoubleType))

fullDF.printSchema
fullDF.show(5)


val res = new RFormula().setFormula("dep_delayed_15min ~ . - isTrain").fit(fullDF).transform(fullDF)

res.printSchema
res.show(5)

val finalTrainDF = res.where(col("isTrain"))
val finalTestDF = res.where(!col("isTrain"))

finalTrainDF.write.mode("overwrite").parquet("/spark_ohe-train-10m.parquet")
finalTestDF.write.mode("overwrite").parquet("/spark_ohe-test.parquet")

//// NEW:

val finalTrainDF_100m = finalTrainDF.union(finalTrainDF.union(finalTrainDF.union(finalTrainDF.union(finalTrainDF.union(finalTrainDF.union(finalTrainDF.union(finalTrainDF.union(finalTrainDF.union(finalTrainDF)))))))))
finalTrainDF_100m.count()

finalTrainDF_100m.write.mode("overwrite").parquet("/spark_ohe-train-100m.parquet")

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.GBTClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

val d_train = spark.read.parquet("/spark_ohe-train-10m.parquet").cache()
val d_test = spark.read.parquet("/spark_ohe-test.parquet").cache()
(d_train.count(), d_test.count())

///

d_train.rdd.getNumPartitions

/////

val gbm = new GBTClassifier().setLabelCol("label").setFeaturesCol("features").
  setMaxIter(10).setMaxDepth(10).setStepSize(0.1).
  setMaxBins(100).setMaxMemoryInMB(10240)     // max possible setMaxMemoryInMB (otherwise errors out)
val pipeline = new Pipeline().setStages(Array(gbm))

val now = System.nanoTime
val model = pipeline.fit(d_train)
val elapsed = ( System.nanoTime - now )/1e9
elapsed

val predictions = model.transform(d_test)

val evaluator = new BinaryClassificationEvaluator().setLabelCol("label").setRawPredictionCol("probability").setMetricName("areaUnderROC")
evaluator.evaluate(predictions)

@szilard
Copy link
Owner Author

szilard commented May 11, 2019

64 partitions

runs 1180sec, AUC=0.7313, RAM 73G (1 slave)
2nd run: 1450 sec, 0.731, RAM 73G


import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.GBTClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

/// NEW: 

val d_train0 = spark.read.parquet("/spark_ohe-train-10m.parquet").cache()
val d_test = spark.read.parquet("/spark_ohe-test.parquet").cache()
(d_train0.count(), d_test.count())

d_train0.rdd.getNumPartitions
val d_train = d_train0.repartition(32).cache()
d_train.count()
d_train.rdd.getNumPartitions

@szilard
Copy link
Owner Author

szilard commented May 11, 2019

32 partitions:

runs 1560sec, AUC=0.7298, RAM 72GB


local again:

runs 830 sec, 0.7308, RAM 125GB

local with 64 partitions:

runs 830 sec, 0.7292, RAM 123GB

@szilard
Copy link
Owner Author

szilard commented May 11, 2019

compare cluster with local (64 partitions):

local runs 830 sec, 0.7292, RAM 123GB
cluster runs 1180sec-1450 sec, AUC=0.7313, RAM 73G (1 slave)
1.4-1.7x slower

local:

Screen Shot 2019-05-11 at 1 39 15 PM
Screen Shot 2019-05-11 at 1 40 16 PM

cluster:

Screen Shot 2019-05-11 at 1 43 21 PM
Screen Shot 2019-05-11 at 1 43 34 PM

@szilard
Copy link
Owner Author

szilard commented May 13, 2019

cluster with 10 slave nodes:

Screen Shot 2019-05-13 at 12 23 29 AM

10M records
after reading parquet, num. partitions (auto) 117 (vs 320 total cores), so running larger dataset instead:

100M records
after reading parquet, num. partitions (auto) 585 (vs 320 total cores)

@szilard
Copy link
Owner Author

szilard commented May 13, 2019

name value
spark.driver.memory 219695M
spark.executor.instances 10
spark.default.parallelism 640
spark.submit.deployMode client
spark.master yarn
spark.executor.cores 32

@szilard
Copy link
Owner Author

szilard commented May 13, 2019

scala> d_train.rdd.getNumPartitions
res1: Int = 585

scala> val gbm = new GBTClassifier().setLabelCol("label").setFeaturesCol("features").
     |   setMaxIter(10).setMaxDepth(10).setStepSize(0.1).

scala> elapsed
res2: Double = 1825.94445773

scala> evaluator.evaluate(predictions)
res3: Double = 0.731131999798129

@szilard
Copy link
Owner Author

szilard commented May 13, 2019

100M, 10 trees

single server - x1e.8xlarge (32 cores, 960GB RAM)

time [s] AUC RAM [GB]
7850 0.731 780

cluster with 10 slave nodes - r4.8xlarge (32 cores, 240GB RAM)

time [s] AUC RAM [GB]
1825 0.731 10*72

4.3x time ratio (but should do 10 nodes cluster vs 1 node cluster as we've seen the cluster overhead penalty)

if penalty on cluster is same as for 10M above (1.4-1.7x), then 6-7.3x speedup from 1 slave to 10

Screen Shot 2019-05-13 at 12 48 55 AM
Screen Shot 2019-05-13 at 12 49 46 AM
Screen Shot 2019-05-13 at 12 50 04 AM
Screen Shot 2019-05-13 at 12 50 24 AM

@szilard
Copy link
Owner Author

szilard commented May 13, 2019


import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.GBTClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

/// NEW: 

val d_train0 = spark.read.parquet("/spark_ohe-train-100m.parquet").cache()
val d_test = spark.read.parquet("/spark_ohe-test.parquet").cache()
(d_train0.count(), d_test.count())

d_train0.rdd.getNumPartitions
val d_train = d_train0.repartition(320).cache()
d_train.count()
d_train.rdd.getNumPartitions

320 partitions

run stopped after 2 trees as it was getting slower:

Screen Shot 2019-05-13 at 1 21 16 AM
Screen Shot 2019-05-13 at 1 21 21 AM

Screen Shot 2019-05-13 at 1 24 14 AM

@szilard
Copy link
Owner Author

szilard commented May 13, 2019

10M:

scala> d_train.rdd.getNumPartitions
res1: Int = 117

scala> val gbm = new GBTClassifier().setLabelCol("label").setFeaturesCol("features").
     |   setMaxIter(10).setMaxDepth(10).setStepSize(0.1).

scala> elapsed
res2: Double = 414.150597334

scala> evaluator.evaluate(predictions)
res3: Double = 0.7292960760201486

match partitions to cores:


import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.GBTClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

/// NEW: 

val d_train0 = spark.read.parquet("/spark_ohe-train-10m.parquet").cache()
val d_test = spark.read.parquet("/spark_ohe-test.parquet").cache()
(d_train0.count(), d_test.count())

d_train0.rdd.getNumPartitions
val d_train = d_train0.repartition(320).cache()
d_train.count()
d_train.rdd.getNumPartitions
scala> d_train.rdd.getNumPartitions
res3: Int = 320

scala> val gbm = new GBTClassifier().setLabelCol("label").setFeaturesCol("features").
     |   setMaxIter(10).setMaxDepth(10).setStepSize(0.1).

scala> elapsed
res4: Double = 327.705288398

scala> evaluator.evaluate(predictions)
res5: Double = 0.7304255377773654

@szilard
Copy link
Owner Author

szilard commented May 13, 2019

10 trees, depth 10

size system nodes cores partitions time [s] AUC RAM [GB] total RAM [GB]
10M local r4.8xl 32 32 830 0.731 125 240
10M local r4.8xl 32 64 (m) 830 0.731 123 240
10M Cluster_1 r4.8xl 32 64 1180 0.731 73 240
10M Cluster_1 r4.8xl 32 32 (m) 1560 0.73 72 240
10M Cluster_10 r4.8xl 320 117 415 0.73   2400
10M Cluster_10 r4.8xl 320 320 (m) 330 0.73   2400
100M local x1e.8xl 32   7850 0.731 780 960
100M Cluster_10 r4.8xl 320 585 1825 0.731 10*72 2400
100M Cluster_10 r4.8xl 320 320 (m) >1825     2400

@szilard
Copy link
Owner Author

szilard commented May 13, 2019

best of partition (auto/manual):

size system nodes cores partitions time [s] RAM [GB] total RAM [GB]
10M local r4.8xl 32 32 830 125 240
10M Cluster_1 r4.8xl 32 64 1180 73 240
10M Cluster_10 r4.8xl 320 320 (m) 330   2400
100M local x1e.8xl 32   7850 780 960
100M Cluster_10 r4.8xl 320 585 1825 10*72 2400

@szilard
Copy link
Owner Author

szilard commented May 13, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant