Permalink
Browse files

Spark 2.0 and 2.1 support; Fix python build

Remove unneeded dependencies
  • Loading branch information...
icexelloss committed Feb 3, 2017
1 parent 0902f6b commit e1aa7a73695afad591ae20308f5a1849b7f49ab0
@@ -0,0 +1,23 @@
SHELL = /bin/bash
VERSION = $(shell cat version.txt)

.PHONY: clean clean-pyc clean-dist dist

clean: clean-dist clean-pyc

clean-pyc:
find . -name '*.pyc' -exec rm -f {} +
find . -name '*.pyo' -exec rm -f {} +
find . -name '*~' -exec rm -f {} +
find . -name '__pycache__' -exec rm -fr {} +

clean-dist:
rm -rf target
rm -rf python/build/
rm -rf python/*.egg-info

dist: clean-pyc
sbt "set test in assembly := {}" clean assembly
cd python; \
find . -mindepth 2 -name '*.py' -print | \
zip ../target/scala-2.11/flint-assembly-$(VERSION)-SNAPSHOT.jar -@
@@ -50,7 +50,7 @@ lazy val formattingPreferences = {
}

lazy val compilationSettings = scalariformSettings ++ Seq(
version := "0.3.1-SNAPSHOT",
version := "0.2.0-SNAPSHOT",
organization := "com.twosigma",
scalaVersion := "2.11.8",
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false),
@@ -79,16 +79,13 @@ lazy val compilationSettings = scalariformSettings ++ Seq(
)

lazy val versions = new {
val avro = "1.7.7"
val play_json = "2.3.10"
val spark_csv = "1.4.0"
val commons_math = "3.5"
val commons_csv = "1.2"
val joda_time = "2.9.4"
val httpclient = "4.3.2" // Note that newer versions need to be configured differently
val spark = "1.6.3"
val spark = sys.props.getOrElse("spark.version", default = "2.1.0")
val scalatest = "2.2.4"
val scala_logging = "2.1.2"
val grizzled_slf4j = "1.3.0"
}

lazy val lazyDependencies = new {
@@ -98,14 +95,11 @@ lazy val lazyDependencies = new {
}

lazy val dependencySettings = libraryDependencies ++= Seq(
"com.databricks" %% "spark-csv" % versions.spark_csv,
"com.typesafe.scala-logging" %% "scala-logging-slf4j" % versions.scala_logging,
"com.typesafe.play" %% "play-json" % versions.play_json,
"org.apache.avro" % "avro" % versions.avro,
"org.apache.commons" % "commons-math3" % versions.commons_math,
"org.apache.commons" % "commons-csv" % versions.commons_csv,
"joda-time" % "joda-time" % versions.joda_time,
"org.apache.httpcomponents" % "httpclient" % versions.httpclient,
"org.clapper" %% "grizzled-slf4j" % versions.grizzled_slf4j,
lazyDependencies.sparkCore,
lazyDependencies.sparkML,
lazyDependencies.sparkSQL,
@@ -7,31 +7,22 @@ for PySpark.
Building
----------------------

To build flint jar:
You can build flint by running:

sbt assembly

To build flint python egg:

python setup.py bdist_egg


Building Documentation
----------------------

Docs live in `docs/` and can be built with `setup.py`:

python setup.py build_sphinx
(cd build/sphinx/html; python -m http.server 8080)
make dist

This will create a jar under target/scala-2.11/flint-assembly-0.2.0-SNAPSHOT.jar

Running with PySpark
--------------------

You can use ts-flint with PySpark by:

spark-submit --jars target/scala-2.11/flint-assembly-0.3.1-SNAPSHOT.jar --py-files python/dist/ts_flint.egg ...
pyspark --jars /path/to/flint-assembly-0.2.0-SNAPSHOT.jar --py-files /path/to/flint-assembly-0.2.0-SNAPSHOT.jar

or

spark-submit --jars /path/to/flint-assembly-0.2.0-SNAPSHOT.jar --py-files /path/to/flint-assembly-0.2.0-SNAPSHOT.jar myapp.py

Bugs
----
@@ -0,0 +1,15 @@
#
# Copyright 2015-2017 TWO SIGMA OPEN SOURCE, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
@@ -24,6 +24,6 @@
__version__ = get_versions()['version']
del get_versions

__author__ = 'Leif Walsh'
__maintainer__ = 'Leif Walsh'
__email__ = 'leif@twosigma.com'
__author__ = 'Li Jin, Leif Walsh'
__maintainer__ = 'Li Jin, Leif Walsh'
__email__ = 'ljin@twosigma.com, leif@twosigma.com'
@@ -275,27 +275,6 @@ def _timedelta_ns(self, varname, timedelta, *, default=None):
raise Exception("{} should be a pandas.Timedelta object or string formattable pandas.Timedelta".format(varname))
return '{}ns'.format(int(timedelta.total_seconds()*1e9))

@staticmethod
def _from_alf(sql_ctx, tsuri, begin, end, *, num_partitions=None, requests_per_partition=None, timeout=None):
sc = sql_ctx._sc
jpkg = java.Packages(sc)
if not num_partitions:
num_partitions = sc.defaultParallelism
if not requests_per_partition:
requests_per_partition = jpkg.alf.defaultAlfRequestsPerPartition()
if not timeout:
timeout = jpkg.WaiterClient.DEFAULT_TIMEOUT()
# This should not be needed but unfornately alf.timeSeriesRDD requires a timeColumn
timeColumn = "time"
# Convert timeout to millis
if isinstance(timeout, pd.Timedelta):
timeout = int(timeout.total_seconds() * 1000)
tsrdd = jpkg.alf.timeSeriesRDD(utils.jsc(sc), tsuri,
begin, end, timeColumn,
num_partitions, requests_per_partition,
jpkg.alf.defaultWaiterConfig(timeout))
return TimeSeriesDataFrame._from_tsrdd(tsrdd, sql_ctx)

@staticmethod
def _from_tsrdd(tsrdd, sql_ctx):
"""Returns a :class:`TimeSeriesDataFrame` from a Scala ``TimeSeriesRDD``
@@ -17,19 +17,23 @@
package com.twosigma.flint.hadoop

import org.apache.hadoop.conf.Configuration
import org.apache.spark.{ Logging, SparkContext }
import org.apache.spark.SparkContext

import com.twosigma.flint.rdd.Range
import grizzled.slf4j.Logger

object Hadoop {

val logger = Logger(Hadoop.getClass)

object Hadoop extends Logging {
def fileSplits[K1, V1, K: Ordering](
sc: SparkContext,
file: String,
ifConf: InputFormatConf[K1, V1] // TODO consider just straight up making this (K, K) as we CAN get it, it's just a pain.
)(parseKey: (ifConf.KExtract#Extracted, ifConf.VExtract#Extracted) => K): Map[Int, (Range[K], WriSer[ifConf.Split])] = {
val splits = ifConf.makeSplits(new Configuration())
logInfo(s"Total number of splits: ${splits.size}")
splits.foreach { s => logDebug(s.get.toString) }
logger.info(s"Total number of splits: ${splits.size}")
splits.foreach { s => logger.debug(s.get.toString) }
// TODO implement the version which does the more rigorous thing, at least for splits that
// support it
val m = getSplitTimes(sc, ifConf)(parseKey, splits)
@@ -64,7 +68,7 @@ object Hadoop extends Logging {
val recordReader = inputFormat.createRecordReader(split, tac)
recordReader.initialize(split, tac)

logInfo(s"Beginning to read lines from split: $split")
logger.info(s"Beginning to read lines from split: $split")
new Iterator[(ifConf.KExtract#Extracted, ifConf.VExtract#Extracted)] {
var stillMore = false
lazy val init = stillMore = recordReader.nextKeyValue()
@@ -16,10 +16,14 @@

package com.twosigma.flint.rdd

import grizzled.slf4j.Logger

import org.apache.spark.rdd.RDD
import org.apache.spark.{ Logging, Partition, TaskContext }
import org.apache.spark.{ Partition, TaskContext }

protected[flint] object PartitionsIterator {
val logger = Logger(PartitionsIterator.getClass)

def apply[T](
rdd: RDD[T],
partitions: Seq[Partition],
@@ -43,7 +47,9 @@ protected[flint] class PartitionsIterator[T](
partitions: Seq[Partition],
context: TaskContext,
preservesPartitionsOrdering: Boolean = false // FIXME: This is a band-aid which should be fixed.
) extends BufferedIterator[T] with Logging {
) extends BufferedIterator[T] {

val logger = PartitionsIterator.logger

var _partitions = partitions
if (!preservesPartitionsOrdering) {
@@ -57,7 +63,7 @@ protected[flint] class PartitionsIterator[T](
private[this] def nextIter() {
if (curIdx < _partitions.size) {
val part = _partitions(curIdx)
logInfo(s"Opening iterator for partition: ${part.index}")
logger.debug(s"Opening iterator for partition: ${part.index}")
curIter = rdd.iterator(part, context).buffered
curPart = part
curIdx += 1
@@ -69,7 +75,7 @@ protected[flint] class PartitionsIterator[T](
}

lazy val init = {
logInfo(s"Beginning to read partitions: ${_partitions}")
logger.debug(s"Beginning to read partitions: ${_partitions}")
nextIter()
}

@@ -18,11 +18,14 @@ package com.twosigma.flint.rdd.function.summarize

import com.twosigma.flint.rdd.OverlappedOrderedRDD
import com.twosigma.flint.rdd.function.summarize.summarizer.subtractable.LeftSubtractableSummarizer
import org.apache.spark.{ OneToOneDependency, Logging }
import com.twosigma.flint.rdd.{ OrderedRDD, Range, CloseClose }

import org.apache.spark.OneToOneDependency
import com.twosigma.flint.rdd.{ CloseClose, OrderedRDD, Range }
import scala.reflect.ClassTag
import scala.collection.mutable

import com.twosigma.flint.rdd.function.summarize.summarizer.Summarizer
import grizzled.slf4j.Logger

object SummarizeWindows {

@@ -74,7 +77,9 @@ private[rdd] class WindowSummarizerIterator[K, SK, V, U, V2](
windowFn: K => (K, K),
summarizer: Summarizer[V, U, V2],
skFn: V => SK
)(implicit ord: Ordering[K]) extends Iterator[(K, (V, V2))] with Logging {
)(implicit ord: Ordering[K]) extends Iterator[(K, (V, V2))] {

val logger = Logger(this.getClass)

val windows = mutable.Map[SK, Vector[(K, V)]]()
val summarizerStates = mutable.Map[SK, U]()
@@ -87,14 +92,14 @@ private[rdd] class WindowSummarizerIterator[K, SK, V, U, V2](

lazy val rampUp = {
val initWindowRange = getWindowRange(coreRange.begin)
logDebug(s"Initial window range in rampUp: $initWindowRange")
logger.debug(s"Initial window range in rampUp: $initWindowRange")
if (iter.hasNext) {
logDebug(s"rampUp: head: ${iter.head}")
logger.debug(s"rampUp: head: ${iter.head}")
}
while (iter.hasNext && ord.lt(iter.head._1, coreRange.begin)) {
val (k, v) = iter.next
val sk = skFn(v)
logDebug(s"rampUp: reading: ($k, $sk, $v)")
logger.debug(s"rampUp: reading: ($k, $sk, $v)")

if (initWindowRange.contains(k)) {
val window = windows.getOrElseUpdate(sk, Vector.empty)
@@ -113,7 +118,7 @@ private[rdd] class WindowSummarizerIterator[K, SK, V, U, V2](
val (coreK, coreV) = coreRowBuffer.headOption.getOrElse(iter.head)
val coreSk = skFn(coreV)
val windowRange = getWindowRange(coreK)
logDebug(s"Invoking next() with core row: ($coreK, $coreSk, $coreV) and the window of $coreK: $windowRange")
logger.debug(s"Invoking next() with core row: ($coreK, $coreSk, $coreV) and the window of $coreK: $windowRange")

// Drop rows.
val window = windows.getOrElse(coreSk, Vector.empty)
@@ -149,7 +154,7 @@ private[rdd] class WindowSummarizerIterator[K, SK, V, U, V2](
}

val dequed = coreRowBuffer.dequeue
logDebug(s"Invoking next() deque: $dequed")
logger.debug(s"Invoking next() deque: $dequed")

(coreK,
(
@@ -80,6 +80,7 @@ object CSV {
* @param schema The schema for the CSV file. If the schema is given, use it otherwise infer the
* schema from the data itself.
* @param dateFormat The pattern string to parse the date time string under the time column.
* Defaults to "yyyy-MM-dd'T'HH:mm:ss.SSSZZ". For example, "2016-01-01T12:00:00+00:00"
* @param keepOriginTimeColumn The schema of return [[TimeSeriesRDD]] will always have a column named "time"
* with LongType. The original time column will not be kept by default.
* @param codec compression codec to use when reading from file. Should be the fully qualified
@@ -111,6 +112,8 @@ object CSV {
codec: String = null
): TimeSeriesRDD = {
// scalastyle:on parameter.number
// TODO: In Spark 2, we should use the following CSV reader instead of databricks one
// "org.apache.spark.sql.execution.datasources.csv.CSVFileFormat"
val reader = sqlContext.read.format("com.databricks.spark.csv")
.option("header", header.toString)
.option("delimiter", delimiter.toString)
@@ -122,7 +125,11 @@ object CSV {
.option("ignoreLeadingWhiteSpace", ignoreLeadingWhiteSpace.toString)
.option("ignoreTrailingWhiteSpace", ignoreTrailingWhiteSpace.toString)
.option("charset", charset)
.option("dateFormat", dateFormat)

if (dateFormat != null) {
reader.option("dateFormat", dateFormat)
reader.option("timestampFormat", dateFormat)
}

// If the schema is given, use it otherwise infer the schema from the data itself.
if (schema == null) {
@@ -29,7 +29,6 @@ object DFConverter {
def toDataFrame(sqlContext: SQLContext, schema: StructType, rdd: OrderedRDD[Long, InternalRow]): DataFrame = {
val internalRows = rdd.values

val logicalPlan = LogicalRDD(schema.toAttributes, internalRows)(sqlContext)
DataFrame(sqlContext, logicalPlan)
sqlContext.internalCreateDataFrame(internalRows, schema)
}
}
@@ -1,12 +1,12 @@
time
20080102 00:00:00.000
20080102 02:00:00.000
20080102 04:00:00.000
20080102 06:00:00.000
20080102 08:00:00.000
20080102 10:00:00.000
20080102 12:00:00.000
20080102 14:00:00.000
20080102 16:00:00.000
20080102 18:00:00.000
20080102 20:00:00.000
2008-01-02T00:00:00.000+00:00
2008-01-02T02:00:00.000+00:00
2008-01-02T04:00:00.000+00:00
2008-01-02T06:00:00.000+00:00
2008-01-02T08:00:00.000+00:00
2008-01-02T10:00:00.000+00:00
2008-01-02T12:00:00.000+00:00
2008-01-02T14:00:00.000+00:00
2008-01-02T16:00:00.000+00:00
2008-01-02T18:00:00.000+00:00
2008-01-02T20:00:00.000+00:00
Oops, something went wrong.

0 comments on commit e1aa7a7

Please sign in to comment.