In [1]:
using Dates, CSV, JuliaDB, Plots, PlotThemes, Interact, 
    HTTP, SingularSpectrumAnalysis, ProgressMeter
using OnlineStats
theme(:bright)

┌ Info: Precompiling CSV [336ed68f-0bac-5ca0-87d4-7b16caf5d00b]
└ @ Base loading.jl:1273
┌ Info: Precompiling JuliaDB [a93385a2-3734-596a-9a66-3cfbb77141e6]
└ @ Base loading.jl:1273
ERROR: LoadError: LoadError: UndefVarError: Split not defined
Stacktrace:
 [1] top-level scope at /Users/joshday/.julia/dev/OnlineStats/src/ml/trees.jl:36
 [2] include at ./boot.jl:328 [inlined]
 [3] include_relative(::Module, ::String) at ./loading.jl:1105
 [4] include at ./Base.jl:31 [inlined]
 [5] include(::String) at /Users/joshday/.julia/dev/OnlineStats/src/OnlineStats.jl:1
 [6] top-level scope at /Users/joshday/.julia/dev/OnlineStats/src/OnlineStats.jl:102
 [7] include at ./boot.jl:328 [inlined]
 [8] include_relative(::Module, ::String) at ./loading.jl:1105
 [9] include(::Module, ::String) at ./Base.jl:31
 [10] top-level scope at none:2
 [11] eval at ./boot.jl:330 [inlined]
 [12] eval(::Expr) at ./client.jl:425
 [13] top-level scope at ./none:3
in expression starting at /Users/joshday/.julia/dev/Onlin

ErrorException: Failed to precompile JuliaDB [a93385a2-3734-596a-9a66-3cfbb77141e6] to /Users/joshday/.julia/compiled/v1.3/JuliaDB/4FA8g_wM9Rk.ji.

# OnlineStats and Big Data Viz

## `Partition` and `IndexedPartition`

- Plot any-sized dataset.
- Here are two plots with **100 Million observations**:

In [None]:
o = Partition(Series(Mean(), Extrema()))

y = randn()

@showprogress for _ in 1:10^8
    fit!(o,  global y += randn())
end

plot(o)

In [None]:
o = IndexedPartition(Float64, KHist(10))

@showprogress for _ in 1:10^8
    fit!(o,  (randn(), randn()))
end

plot(o)

<br><br><br><br><br><br><br><br><br><br><br><br><br><br>

# Kaggle's Huge Stock Market Dataset

- Source: https://www.kaggle.com/borismarjanovic/price-volume-data-for-all-us-stocks-etfs
- OHLC data for each stock/ETF (each is a separate CSV) traded in the US
- Just over 700MB

In [None]:
path = "/Users/joshday/datasets/price-volume-data-for-all-us-stocks-etfs/Stocks/"

readdir(path)

In [None]:
;head "$path/aapl.us.txt"

## Statistics/Plots Directly From CSV

- Via **`CSV.File`**

In [None]:
function plot_high_low(stock; kw...)
    o = IndexedPartition(Date, Extrema(), 500)
    for row in CSV.File("$path/$stock.us.txt") 
        fit!(o, [row.Date => row.Low, row.Date => row.High])
    end
    t = uppercase(stock) * " (nobs = $(nobs(o)))"
    plot(o; xlab="Date", title=t, legend=false, kw...)
end

In [None]:
plot_high_low("aapl")

<br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br>

## ...But this loads the entire CSV!

Even though we are creating the plots with OnlineStats, the entire file is loaded into memory.

### `CSV.Rows`

- `CSV.Rows` lets you lazily read from a CSV file
    - **Minimal memory footprint**
    - At the cost of **no type inference** (everything is treated as `String`)

In [None]:
function plot_high_low2(stock; kw...)
    o = IndexedPartition(Date, Extrema(), 500)
    for row in CSV.Rows("$path/$stock.us.txt")
        dt = Date(row.Date, "yyyy-mm-dd")
        low = parse(Float64, row.Low)
        hi = parse(Float64, row.High)
        fit!(o, [dt => low, dt => hi])
    end
    t = uppercase(stock) * " (nobs = $(nobs(o)))"
    plot(o; xlab="Date", title=t, legend=false, kw...)
end

In [None]:
plot_high_low2("aapl")

In [None]:
plot(plot_high_low2("aapl"), plot_high_low2("aapl"), layout=(2,1), link=:x)

<br><br><br><br><br><br><br><br><br><br><br><br><br>

## Loading Multiple Datasets at Once

- Working with only one CSV at a time limits what kinds of analyses we can do.

In [None]:
t = loadtable(path, filenamecol = :Stock)

In [None]:
table(t, pkey=(1,2))

In [None]:
o = CovMatrix()

fit!(o, select(t, (:Open, :High)))

In [None]:
plot(o)

### Passing an `OnlineStat` as a reducer

In [None]:
reduce(Mean(), t, select=(:Open, :Close) => x -> -(x...))

### `groupreduce`

- Like `groupby`, but **much** more memory efficient!

In [None]:
temp = groupreduce(Mean(), t, :Stock, select=:Open => x -> x + 1)
select(temp, (1, 2 => value))

### Make All Plots at Once

In [None]:
allplots = groupreduce(IndexedPartition(Date, Extrema()), t, :Stock, select=(:Date, :Close))

In [None]:
@manipulate for s in ["aapl", "msft", "ibm", "googl", "nflx"]
    t2 = filter(==("$s.us.txt"), allplots, select=:Stock)
    plot(t2[1].IndexedPartition, title=s)
end

<br><br><br><br><br><br><br><br><br><br><br><br><br>

## Time Series Analysis Tools

### Singular Spectrum Analysis

- Extract seasonal components out of a time series
- Calculated "offline", but a useful tool

In [None]:
temp = filter(==("aapl.us.txt"), t, select=:Stock)

yt, ys = analyze(select(temp, :Close), 90)
plot(yt, lab="Trend")
plot!(ys, lab="Season")

### Autocorrelation

In [None]:
temp = filter(==("aapl.us.txt"), t, select=:Stock)


plot(fit!(AutoCov(365), select(temp, :Close)))

<br><br><br><br><br><br><br><br><br><br><br><br><br><br>
# TrueFX API

In [None]:
function get_data(q = "")
    endpoint = "https://webrates.truefx.com/rates/connect.html?f=csv&$q"
    hdr = [:pair, :utc, :big_bid_figure, :bid_points, :offer_bid_figure,
           :offer_points, :high, :low, :open]
    r = HTTP.get(endpoint)
    CSV.read(r.body; header=hdr, footerskip=1)
end

In [None]:
get_data("c=EUR/USD")

In [None]:
o = IndexedPartition(Int, Extrema(), 10)

while true
    IJulia.clear_output(true)
    df = get_data("c=USD/JPY")
    fit!(o, (df[1, :utc], df[1, :bid_points]))
    display(plot(o, xlab="UTC Time", ylab="Bid Points", title="nobs = $(nobs(o))"))
end

<br><br><br><br><br><br><br><br><br><br>
## NYC Yellow Taxi Data (2018)

- Source: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page
- Data on every single yellow taxi trip in 2018!

In [None]:
datapath = "/Users/joshday/datasets/nyc_yellow_taxi_2018"

sum(filesize, ("$datapath/$file") for file in readdir(datapath)) / 1024^3

In [None]:
;head "$datapath/yellow_tripdata_2018-01.csv"

In [None]:
passenger_counts = Partition(CountMap(Int), 365)
@showprogress for file in readdir(datapath), row in CSV.Rows("$datapath/$file", skipto=3)
    fit!(passenger_counts, parse(Int, row.passenger_count))
end
plot(passenger_counts)

### In Parallel

In [None]:
using Distributed
addprocs()

@everywhere begin 
    datapath = "/Users/joshday/datasets/nyc_yellow_taxi_2018"
    using Dates, OnlineStats, CSV
end 

In [None]:
@time begin
    gb = @distributed merge for file in readdir(datapath)
        stat = GroupBy(Date, CountMap(Int))
        for row in CSV.Rows("$datapath/$file", skipto=3)
            fit!(stat, Date(row.tpep_pickup_datetime[1:10]) => parse(Int, row.passenger_count))
        end
        stat
    end
end
nothing

In [None]:
@manipulate for day in Date(2018,1,1):Day(1):Date(2018,12,31)
    plot(gb.value[day], title=string(day))
end