# Flint Cookbook with Spark UI

In [1]:
%classpath config resolver jitpack.io https://jitpack.io

Added new repo: jitpack.io


In [2]:
%%classpath add mvn
com.github.twosigma flint master-SNAPSHOT
org.apache.spark spark-sql_2.11 2.2.1
org.apache.spark spark-mllib_2.11 2.2.1

Added jars: [jcl-over-slf4j-1.7.16.jar, jersey-container-servlet-core-2.22.2.jar, spark-launcher_2.11-2.2.1.jar, aopalliance-repackaged-2.4.0-b34.jar, activation-1.1.1.jar, shapeless_2.11-2.3.2.jar, curator-framework-2.6.0.jar, hadoop-mapreduce-client-jobclient-2.6.5.jar, hadoop-yarn-api-2.6.5.jar, xercesImpl-2.9.1.jar, xbean-asm5-shaded-4.4.jar, spark-streaming_2.11-2.2.1.jar, hadoop-yarn-server-common-2.6.5.jar, hadoop-mapreduce-client-common-2.6.5.jar, commons-compress-1.4.1.jar, spire_2.11-0.13.0.jar, java-xmlbuilder-1.0.jar, jetty-util-6.1.26.jar, parquet-format-2.3.1.jar, xmlenc-0.52.jar, parquet-hadoop-1.8.2.jar, validation-api-1.1.0.Final.jar, commons-io-2.4.jar, breeze-macros_2.11-0.13.2.jar, univocity-parsers-2.2.1.jar, pmml-model-1.2.15.jar, play-json_2.11-2.3.10.jar, slf4j-log4j12-1.7.16.jar, core-1.1.2.jar, commons-net-2.2.jar, hadoop-mapreduce-client-core-2.6.5.jar, commons-lang-2.6.jar, avro-ipc-1.7.7.jar, metrics-json-3.1.2.jar, jackson-databind-2.6.5.jar, janino-3.0.0.

In [3]:
val begin = "20150101"
val end   = "20160101"

20160101

In [4]:
%%spark
import org.apache.spark.sql.SparkSession

SparkSession.builder().appName("Simple Application").master("local[100]")

In [5]:
val sc = spark.sparkContext

org.apache.spark.SparkContext@3a4bb56f

In [6]:
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
//import com.twosigma.flint.timeseries.io.read
import com.twosigma.flint.timeseries.Windows
import com.twosigma.flint.timeseries.Summarizers
import scala.concurrent.duration._
//import com.twosigma.flint.timeseries.implicits._
import com.twosigma.flint.timeseries._

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import com.twosigma.flint.timeseries.Windows
import com.twosigma.flint.timeseries.Summarizers
import scala.concurrent.duration._
import com.twosigma.flint.timeseries._


In [7]:
def preview(tsRdd: TimeSeriesRDD, num: Integer = 5) : TableDisplay = {
    val cols = tsRdd.schema.fieldNames
    val rows = tsRdd.toDF.take(num)
    val listOfMaps = rows.map {r => (cols zip r.toSeq).toMap}
    val display = new TableDisplay(listOfMaps)
    return display
}

preview: (tsRdd: com.twosigma.flint.timeseries.TimeSeriesRDD, num: Integer)com.twosigma.beakerx.scala.table.TableDisplay


## Read Local Files

In [31]:
import com.twosigma.flint.timeseries.CSV

//Load prices.csv from https://www.kaggle.com/dgawlik/nyse

//Creates a TimeSeriesRDD from a CSV file
var pricesRdd = CSV.from(
  spark.sqlContext,
  "file:///Users/spot/src/beakerx/nyse/prices.csv",
  header = true,
  timeColumnName = "date",  
  dateFormat = "dd/MM/yyyy HH:mm",
  sorted = false
)
preview(pricesRdd)

## Basic Operations

In [9]:
val priceAsInteger = pricesRdd.cast("close" -> IntegerType)
preview(priceAsInteger)

In [10]:
val filteredRowsByPrice = pricesRdd.keepRows { row: Row => row.getAs[Double]("low") > 4.0 }
preview(filteredRowsByPrice)

In [11]:
val timeColumnOnly = pricesRdd.keepColumns("time")
preview(timeColumnOnly)

In [12]:
val withoutIdColumn = pricesRdd.deleteColumns("symbol")
preview(withoutIdColumn)

In [13]:
val renamedColumns = pricesRdd.renameColumns("symbol" -> "ticker", "low" -> "lowPrice", "open" -> "openPrice", "close" -> "closePrice", "high" -> "highPrice")
preview(renamedColumns)

## Basic arithmetic on each row

In [14]:
// Calculate logarithm of a column
val logVolumeRdd = pricesRdd.addColumns("logVolume" -> DoubleType -> { row => Math.log(row.getAs[Double]("volume")) })
preview(pricesRdd)

In [15]:
// Raise a column to an exponent
val squaredVolumeRdd = pricesRdd.addColumns("squaredVolume" -> DoubleType -> { row => Math.pow(row.getAs[Double]("volume"), 2) })
preview(squaredVolumeRdd)

In [16]:
// Calculate difference between two columns
val priceChangeRdd = pricesRdd.addColumns("priceChange" -> DoubleType -> { row => 
    row.getAs[Double]("close") - row.getAs[Double]("open")
})
preview(priceChangeRdd)

In [17]:
val pricePercentChange = pricesRdd.addColumns("pricePercentChange" -> DoubleType -> { row =>
    val openPrice = row.getAs[Double]("open")
    val closePrice = row.getAs[Double]("close")
    if (openPrice != 0) (closePrice - openPrice) / openPrice else null
})
preview(pricePercentChange)

## Filtering

In [18]:
// Select rows where the price went up
val priceIncreasedRdd = pricesRdd.keepRows { row =>
    row.getAs[Double]("close") > row.getAs[Double]("open")
}
preview(priceIncreasedRdd)

In [19]:
// The keepRows and deleteRows functions take a function from Row to Boolean as a filtering criteria.
// Only get rows whose symbol starts with 'A'
val startsWithARdd = pricesRdd.keepRows { row =>
    val symbol = row.getAs[String]("symbol")
    symbol != null && symbol.startsWith("A")
}
preview(startsWithARdd)

In [20]:
//Remove all rows whose volumn is less than 2000000
val lowVolumeRdd  = pricesRdd.keepRows { row =>
    row.getAs[Double]("volume") < 2000000
}
preview(lowVolumeRdd)

## Using history with window

In [21]:
// Moving average over the last two weeks 
val ibmPricesRdd = pricesRdd.keepRows { row =>
    row.getAs[String]("symbol") == "IBM"
}
var windowedIbmPricesRdd = ibmPricesRdd.addWindows(Windows.pastAbsoluteTime("14days"))
windowedIbmPricesRdd = windowedIbmPricesRdd.addColumns("movingAverage" -> DoubleType -> { row =>
    val pastRows = row.getAs[Seq[Row]]("window_past_14days")
    pastRows.map(_.getAs[Double]("close")).sum / pastRows.size
})
preview(windowedIbmPricesRdd)

In [22]:
// Moving average over the last two weeks for all symbols 
var pastWindowRdd = pricesRdd.addWindows(Windows.pastAbsoluteTime("14days"), Seq("symbol"))
pastWindowRdd = pastWindowRdd.addColumns("movingAverage" -> DoubleType -> { row =>
    val pastRows = row.getAs[Seq[Row]]("window_past_14days")
    pastRows.map(_.getAs[Double]("close")).sum / pastRows.size
})
preview(pastWindowRdd)

In [23]:
// Compute the difference from the previous trading day
var allSymbolsRdd = pricesRdd.addWindows(Windows.pastTradingTime("1days", "US"), Seq("symbol"))
allSymbolsRdd = allSymbolsRdd.addColumns("priceChange" -> DoubleType -> { row =>
    val pastRows = row.getAs[Seq[Row]]("window_past_trading_1days")
    if (pastRows.size == 2) {
        pastRows(1).getAs[Double]("open") - pastRows(0).getAs[Double]("close")
    } else {
        null
    }
})
preview(allSymbolsRdd)

<console>: 93

## Calculating values for a cycle

In [24]:
// addColumnsForCycle takes a closure that is applied to a list of rows and returns a map from row to result. The list contains all rows that share a timestamp.

// Add a column containing the number of instruments in the universe on each day
val cycleRdd = pricesRdd.addColumnsForCycle("universeSize" -> IntegerType -> { rows: Seq[Row] =>
    rows.map { row => row -> rows.size }.toMap
})
preview(cycleRdd)

In [25]:
// Compute the Z score across an interval
val zScoreRdd = pricesRdd.addColumnsForCycle("volumeZScore" -> DoubleType -> { rows: Seq[Row] =>
    val mean = rows.map(_.getAs[Double]("volume")).sum / rows.size
    val stddev = Math.sqrt(rows.map { row =>
        Math.pow(row.getAs[Double]("close") - mean, 2)
    }.sum ) / (rows.size - 1)
    rows.map { row =>
        row -> (row.getAs[Double]("close") - mean) / stddev
    }.toMap
})
preview(zScoreRdd)

In [26]:
// Add a column with rankings with the same timestamp
import org.apache.commons.math3.stat.ranking.NaturalRanking

val rankedRdd = pricesRdd.addColumnsForCycle("r" -> DoubleType -> { rows: Seq[Row] =>
    val ranking = new NaturalRanking()
    val ranks = ranking.rank(rows.map(_.getAs[Double]("volume")).toArray)
    (rows zip ranks).toMap
})
preview(rankedRdd)

## Intervalizing

In [27]:
// Volume weighted average price for every 7 days for IBM
val clock = Clocks.uniform(sc, "7d")
var ibmPricesRdd = pricesRdd.keepRows { row =>
    row.getAs[String]("symbol") == "IBM"
}
var volumeWeightedRdd = ibmPricesRdd.groupByInterval(clock).addColumns("volumeWeightedPrice" -> DoubleType -> { row =>
    val rows = row.getAs[Seq[Row]]("rows")
    val weightedSum = rows.map { row =>
        (row.getAs[Double]("open") + row.getAs[Double]("close")) / 2 * row.getAs[Double]("volume")
    }.sum
    weightedSum / rows.map (_.getAs[Double]("volume")).sum
}).deleteColumns("rows")
preview(volumeWeightedRdd)

## Aggregating

In [28]:
// Average daily volume
val volumeRdd = pricesRdd.summarize(Summarizers.nthMoment("volume", 1), Seq("symbol"))
preview(volumeRdd)

## Regression with Open Source Package

In [30]:
//stat.regression
import breeze.linalg.DenseVector
import org.apache.spark.mllib.random.RandomRDDs
import com.twosigma.flint.math.stats.regression.WeightedLabeledPoint
import com.twosigma.flint.math.stats.regression.OLSMultipleLinearRegression

// Generate a random data set from a linear model with beta = [1.0, 2.0] and intercept = 3.0
val data = WeightedLabeledPoint.generateSampleData(spark.sparkContext, DenseVector(1.0, 2.0), 3.0)

// Fit the data using the OLS linear regression.
val model = OLSMultipleLinearRegression.regression(data)

// Retrieve the estimate beta and intercept.
val denseVector = model.estimateRegressionParameters

Map(denseVector.activeIterator.toSeq.map { m => m._1 -> m._2} : _*)