Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

f/iterator-fn fails #102

Closed
ogis-hashi opened this issue Nov 18, 2016 · 4 comments
Closed

f/iterator-fn fails #102

ogis-hashi opened this issue Nov 18, 2016 · 4 comments

Comments

@ogis-hashi
Copy link
Contributor

The following code contains f/iterator-fn fails and repl dies.

(ns flambo-example.core
  (:require [flambo.api :as f]
            [flambo.tuple :as ft]
            [flambo.conf :as conf
            [clojure.string :as s]))

(def c (-> (conf/spark-conf)
           (conf/master "local")
           (conf/app-name "flame_princess")))
(def sc (f/spark-context c))
(-> (f/text-file sc "data/advs.txt")
     (f/flat-map (f/iterator-fn [l] (s/split l #" ")))
     (f/map-to-pair (f/fn [w] (ft/tuple w 1)))
     (f/reduce-by-key (f/fn [x y] (+ x y)))
     f/sort-by-key
     f/collect
     clojure.pprint/pprint)
16/11/18 14:13:11 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 127.1 KB, free 912.2 MB)
16/11/18 14:13:11 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 14.3 KB, free 912.2 MB)
16/11/18 14:13:11 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 127.0.0.1:45348 (size: 14.3 KB, free: 912.3 MB)
16/11/18 14:13:11 INFO SparkContext: Created broadcast 0 from textFile at NativeMethodAccessorImpl.java:-2
16/11/18 14:13:12 INFO FileInputFormat: Total input paths to process : 1
16/11/18 14:13:12 INFO SparkContext: Starting job: collect at NativeMethodAccessorImpl.java:-2
16/11/18 14:13:12 INFO DAGScheduler: Registering RDD 3 (mapToPair at NativeMethodAccessorImpl.java:-2)
16/11/18 14:13:12 INFO DAGScheduler: Registering RDD 5 (mapToPair at NativeMethodAccessorImpl.java:-2)
16/11/18 14:13:12 INFO DAGScheduler: Got job 0 (collect at NativeMethodAccessorImpl.java:-2) with 1 output partitions
16/11/18 14:13:12 INFO DAGScheduler: Final stage: ResultStage 2 (collect at NativeMethodAccessorImpl.java:-2)
16/11/18 14:13:12 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 1)
16/11/18 14:13:12 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 1)
16/11/18 14:13:12 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[3] at mapToPair at NativeMethodAccessorImpl.java:-2), which has no missing parents
16/11/18 14:13:12 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 6.7 KB, free 912.2 MB)
16/11/18 14:13:12 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 3.4 KB, free 912.2 MB)
16/11/18 14:13:12 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 127.0.0.1:45348 (size: 3.4 KB, free: 912.3 MB)
16/11/18 14:13:12 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1012
16/11/18 14:13:12 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[3] at mapToPair at NativeMethodAccessorImpl.java:-2)
16/11/18 14:13:12 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
16/11/18 14:13:12 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0, PROCESS_LOCAL, 5423 bytes)
16/11/18 14:13:12 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
16/11/18 14:13:12 INFO HadoopRDD: Input split: file:/home/makoto/clojure/flambo-example/data/advs.txt:0+610921
16/11/18 14:13:12 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
16/11/18 14:13:12 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
16/11/18 14:13:12 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
16/11/18 14:13:12 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
16/11/18 14:13:12 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
16/11/18 14:13:12 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.AbstractMethodError: flambo.function.FlatMapFunction.call(Ljava/lang/Object;)Ljava/util/Iterator;
	at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:124)
	at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:124)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
16/11/18 14:13:12 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main]
java.lang.AbstractMethodError: flambo.function.FlatMapFunction.call(Ljava/lang/Object;)Ljava/util/Iterator;
	at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:124)
	at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:124)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
16/11/18 14:13:12 INFO SparkContext: Invoking stop() from shutdown hook
16/11/18 14:13:12 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.AbstractMethodError: flambo.function.FlatMapFunction.call(Ljava/lang/Object;)Ljava/util/Iterator;
	at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:124)
	at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:124)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

16/11/18 14:13:12 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
16/11/18 14:13:12 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
16/11/18 14:13:12 INFO TaskSchedulerImpl: Cancelling stage 0
16/11/18 14:13:12 INFO DAGScheduler: ShuffleMapStage 0 (mapToPair at NativeMethodAccessorImpl.java:-2) failed in 0.220 s
16/11/18 14:13:12 INFO DAGScheduler: Job 0 failed: collect at NativeMethodAccessorImpl.java:-2, took 0.360568 s

AbstractMethodError flambo.function.FlatMapFunction.call(Ljava/lang/Object;)Ljava/util/Iterator;  org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply (JavaRDDLike.scala:124)
16/11/18 14:13:12 INFO SparkUI: Stopped Spark web UI at http://127.0.0.1:4040
flambo-example.core=> 16/11/18 14:13:12 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/11/18 14:13:12 INFO MemoryStore: MemoryStore cleared
16/11/18 14:13:12 INFO BlockManager: BlockManager stopped
16/11/18 14:13:12 INFO BlockManagerMaster: BlockManagerMaster stopped
16/11/18 14:13:12 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/11/18 14:13:12 INFO SparkContext: Successfully stopped SparkContext
16/11/18 14:13:12 INFO ShutdownHookManager: Shutdown hook called
16/11/18 14:13:12 INFO ShutdownHookManager: Deleting directory /tmp/spark-5306472d-3025-498a-ac8e-a9ecc40c91c8
Exception in thread "Thread-3" clojure.lang.ExceptionInfo: Subprocess failed {:exit-code 50}
	at clojure.core$ex_info.invokeStatic(core.clj:4617)
	at clojure.core$ex_info.invoke(core.clj:4617)
	at leiningen.core.eval$fn__5732.invokeStatic(eval.clj:264)
	at leiningen.core.eval$fn__5732.invoke(eval.clj:260)
	at clojure.lang.MultiFn.invoke(MultiFn.java:233)
	at leiningen.core.eval$eval_in_project.invokeStatic(eval.clj:366)
	at leiningen.core.eval$eval_in_project.invoke(eval.clj:356)
	at leiningen.repl$server$fn__11838.invoke(repl.clj:243)
	at clojure.lang.AFn.applyToHelper(AFn.java:152)
	at clojure.lang.AFn.applyTo(AFn.java:144)
	at clojure.core$apply.invokeStatic(core.clj:646)
	at clojure.core$with_bindings_STAR_.invokeStatic(core.clj:1881)
	at clojure.core$with_bindings_STAR_.doInvoke(core.clj:1881)
	at clojure.lang.RestFn.invoke(RestFn.java:425)
	at clojure.lang.AFn.applyToHelper(AFn.java:156)
	at clojure.lang.RestFn.applyTo(RestFn.java:132)
	at clojure.core$apply.invokeStatic(core.clj:650)
	at clojure.core$bound_fn_STAR_$fn__4671.doInvoke(core.clj:1911)
	at clojure.lang.RestFn.invoke(RestFn.java:397)
	at clojure.lang.AFn.run(AFn.java:22)
	at java.lang.Thread.run(Thread.java:745)
    clojure.pprint/pprint)
@sorenmacbeth
Copy link
Owner

sorenmacbeth commented Nov 18, 2016

What version of flambo and spark are you running this under? iterator-fn only works with flambo 0.8.0 and spark 2.x? It looks to me like you are running spark 1.x based on the error

@ogis-hashi
Copy link
Contributor Author

I'm running my code on flambo 0.8.0 and spark 2.0.1/2.0.2.

It looks to me like you are running spark 1.x based on the error
Oh, really ?
My project.clj is as follows:

(defproject spark-example "0.1.0-SNAPSHOT"
  :description "FIXME: write description"
  :url "http://example.com/FIXME"
  :license {:name "Eclipse Public License"
            :url "http://www.eclipse.org/legal/epl-v10.html"}
  ;;:main spark-example.word-count
  :jvm-opts ["-Xmx2G"]
  :auto-clean false
    :profiles {:dev
               {:aot [flambo.function flambo.kryo]}
             :provided
             {:dependencies
              [
               [org.apache.spark/spark-core_2.11 "2.0.1"]               
               ]}               
             :uberjar
               {:aot :all}}
    :dependencies [[org.clojure/clojure "1.8.0"]
                 [yieldbot/flambo "0.8.0"]
                 ])

@sorenmacbeth
Copy link
Owner

try doing a lein clean to clear out your compiled classes and then relaunching your REPL

@ogis-hashi
Copy link
Contributor Author

@sorenmacbeth It worked after doing lein clean.
Thank you for your advice !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants