Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provided Spark example expected output? #4

Closed
mboyanna opened this issue Jun 24, 2015 · 2 comments
Closed

Provided Spark example expected output? #4

mboyanna opened this issue Jun 24, 2015 · 2 comments

Comments

@mboyanna
Copy link

The provided Spark example from README for reading xz files is returning output where:

  • number of lines don't match
  • output comes out almost like bytestream:
    ....
    (4377758,�����b�H�n��8NĂ��6�z.RS��6�q>����@�⧚2u�oX�+�׃�,�=E�(�X�1͜���v郕����ch�U{0PT�Hz�1`uX荲�͉�2q�N�l{�c6��Z�\�� M��&��]s^���P��$��+u|��=���Xh�<|�*)
    (4377930,��KJ�0�Q0d������ִ��RVY(�o�����V�<I�8��M�6��cԖ�>,k)
    ...
    Is this expected format? Why are there fewer lines on the output than uncompressed input?

Here's the Spark/Scala code :

def readXzfile() {

val conf = new SparkConf(true).setAppName("XzUncompressExample").set("spark.shuffle.manager", "SORT").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.akka.frameSize", "50").set("spark.storage.memoryFraction", "0.8").set("spark.cassandra.output.batch.size.rows", "6000").set("spark.executor.extraJavaOptions", "-XX:MaxJavaStackTraceDepth=-1").set("io.compression.codecs", "io.sensesecure.hadoop.xz.XZCodec");

val sc = new SparkContext(conf)

val hadoopConfiguration = new Configuration()

//val file = sc.textFile(fileName.getFileName)

//val rddOfXz = sc.newAPIHadoopFile("file:///Users/bparman/Perforce/testOldAnalyticsCommons10/gn-perseng/eg-analytics/analytics-commons/src/test/resources/*.xz", classOf[org.apache.hadoop.mapred.TextInputFormat], classOf[org.apache.hadoop.io.LongWritable], classOf[org.apache.hadoop.io.Text], conf)

val rddOfXz = sc.newAPIHadoopFile("/user/ubuntu/raw/*.xz", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], hadoopConfiguration)

rddOfXz.foreach(println)

println("Total number of lines is "+rddOfXz.count())

rddOfXz.saveAsTextFile ("/user/ubuntu/uncompressed")

}

Here's my build file:

name := "detailed-commons"

organization := "com.mycompany.commons"

version := "1.0.2"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.2.0" % "provided"

libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.2.0" % "provided"

libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.2.0" % "provided"

libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "1.1.1"

libraryDependencies ++= Seq(
("io.sensesecure" % "hadoop-xz" % "1.4").
exclude("commons-beanutils", "commons-beanutils-core").
exclude("commons-collections", "commons-collections")
)

publishTo := Some(Resolver.file("detailed-commons-assembly-1.0.2.jar", new File( Path.userHome.absolutePath+"/.ivy2/cache" )) )

@yongtang
Copy link
Owner

yongtang commented Jul 6, 2015

You may need to set the hadoop configuration for compression, not the spark configuration:

val hadoopConfiguration = new Configuration()
hadoopConfiguration.set("io.compression.codecs","io.sensesecure.hadoop.xz.XZCodec")

I tested on Spark 1.3.1 + Hadoop 2.6.0. If you use Spark 1.2.0, which typically bundled with Hadoop 2.4, you may also encounter other issues since hadoop-xz depends on hadoop 2.6. ---yongtang

@yongtang
Copy link
Owner

yongtang commented Jul 7, 2015

Closed for now. Please re-open if you encounter any other issues.

@yongtang yongtang closed this as completed Jul 7, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants