# Flint Python Cookbook

In [None]:
from   ts.flint              import FlintContext
from   ts.flint              import TimeSeriesDataFrame
from   ts.flint              import summarizers, windows

## Basic arithmetic on each row
### Calculate logarithm of a column

In [None]:
df = price.withColumn('logVolume', pyspark_fn.log(price.volume))
df.show()

### Raise a column to an exponent

In [None]:
df = price.withColumn('squaredVolume', pyspark_fn.pow(price.volume, 2))
df.show()

### Calculate difference between two columns

In [None]:
df = price.withColumn('priceChange', price.closePrice - price.openPrice)
df.show()

### Calculate percent change between two columns

In [None]:
@ts.flint.udf(DoubleType())
def pricePercentChange(openPrice, closePrice):
    if openPrice > 0:
        return (closePrice - openPrice) / openPrice
    else:
        return None

df = price.withColumn('pricePercentChange', pricePercentChange(price.openPrice, price.closePrice))
df.show()

### Get the first two characters of a column

In [None]:
@ts.flint.udf(StringType())
def gicsSector(gicsCode):
    return gicsCode[0:2]

df = active_inst.withColumn("gicsSector", gicsSector(active_inst.gicsCode))
df.show()

## Filtering
### Select rows where the price went up

In [None]:
df = price.filter(price.closePrice > price.openPrice)
df.show()

### Filter using a regular expression

In [None]:
df = active_inst.dropna(subset=["gicsCode"])
df = df.filter(df.gicsCode.startswith("45"))
df.show()

### Remove all rows that don't have a value in a particular column

In [None]:
df = active_inst.dropna(subset=["gicsCode"])
df.show()

## Joining
### Join trades to quotes

In [None]:
df = trade.leftJoin(quote, tolerance="1min", key="tid")
df.show()

## Time-based Windowing

### Exponential moving average

In [None]:
# Exponential moving average over the last 10 days with a decay factor of 0.9 for IBM

def EMA(decay):
    @ts.flint.udf(DoubleType())
    def _EMA(time, window):
        from pandas import Timedelta
        num = 0
        den = 0
        currentnanos = time
        for row in window:
            rownanos = row.time
            days_between = Timedelta(nanoseconds=(currentnanos - rownanos)).days
            weight = pow(decay, days_between)
            num += weight * row.closePrice
            den += weight
        return (num/den) if den > 0 else 0
    return _EMA

decay = 0.9

df = price.addWindows(windows.past_absolute_time("10days"))

df = df.withColumn("EMA", EMA(decay)(df.time, df.window_past_10days))
df.show()

### Moving average

In [None]:
# Moving average over the last two weeks for IBM

@ts.flint.udf(DoubleType())
def movingAverage(window):
    nrows = len(window)
    if nrows == 0:
        return 0
    return sum(row.closePrice for row in window) / nrows

df = price.addWindows(windows.past_absolute_time("14days"))
df = df.withColumn("movingAverage", movingAverage(df.window_past_14days))
df.show()

In [None]:
# Moving average over the last two weeks for all tids in ACTIVE_3000_US

df = price.addWindows(windows.past_absolute_time("14days"), key="tid")
df = df.withColumn("movingAverage", movingAverage(df.window_past_14days))
df.show()

## Cycles

`TimeSeriesDataFrame.addColumnsForCycle()` can be used to compute a new column based on all rows that share a timestamp.

### Adding universe info

In [None]:
# Add a column containing the number of instruments in the universe on each day

def universeSize(rows):
    size = len(rows)
    return {row:size for row in rows}

df = active_price.addColumnsForCycle(
            {"universeSize": (IntegerType(), universeSize)})
df.show()

In [None]:
# Add a column containing the number of instruments that share a GICS code
# with the current row on each day

df = active_inst.addColumnsForCycle(
            {"universeSize": (IntegerType(), universeSize)},
            key="gicsCode")
df.show()

In [None]:
# Add a column containing the number of instruments that share a GICS sector
# with the current row on each day

@ts.flint.udf(StringType())
def gicsSector(gicsCode):
    return gicsCode[0:2] if gicsCode else ""

df = active_inst.withColumn("gicsSector", gicsSector(active_inst.gicsCode))
df = df.addColumnsForCycle(
            {"universeSize": (IntegerType(), universeSize)},
            key="gicsSector")
df.show()

## Z-score

In [None]:
# Compute the Z-score across an interval

import math

def volumeZScore(rows):
    size = len(rows)
    if size <= 1:
        return {row:0 for row in rows}
    mean = sum(row.volume for row in rows) / size
    stddev = math.sqrt(sum((row.closePrice - mean)**2 for row in rows)) / (size - 1)
    return {row:(row.closePrice - mean)/stddev for row in rows}

df = active_price.addColumnsForCycle(
            {"volumeZScore": (DoubleType(), volumeZScore)})
df.show()

### Ranking

In [None]:
# Add a column with rankings from 0.0 to 1.0 relative to other rows with the same timestamp

import scipy.stats as stats

def rank_by(column):
    def rank(rows):
        ranks = stats.rankdata([row[column] for row in rows])
        return dict(zip(rows, (float(r)/len(ranks) for r in ranks)))
    return rank

df = active_price.addColumnsForCycle(
            {"r": (DoubleType(), rank_by('volume'))})
df.show()

## Intervalizing
### Volume-weighted average price

In [None]:
# Volume weighted average price for every 30 minute trading interval for IBM

@ts.flint.udf(DoubleType())
def meanPrice(rows):
    weighted_sum = sum(row.tradePrice * row.tradeSize for row in rows)
    return weighted_sum / sum(row.tradeSize for row in rows)

df = trade.groupByInterval(intervals)
df = df.withColumn("meanPrice", meanPrice(df.rows))
df = df.drop("rows")
df.show()

## Aggregating
### Average daily volume

In [None]:
# Average daily volume for all tids in u.ACTIVE_3000_US

df = active_price.summarize(summarizers.nth_moment("volume", 1), key="tid")
df.show()