## Scraping stock market prices and distribution fitting using Dagger

In [1]:
Threads.nthreads()

12

In [3]:
using Dagger

using Distributions, HypothesisTests, DataFrames
using MarketData, Dates, DataFrames, Statistics

In [4]:
"""
    fit_dist(vals, try_dists)

This function finds the best distribution for the given values `vals` among the distributions in `try_dists`.
"""
function fit_dist(vals,
    try_dists = [Exponential, Normal, LogNormal, Laplace, Rayleigh, Uniform])
    df = DataFrame(;dist=Any[], δ=Float64[],pvalue=Float64[], fitdist=Any[])
    for dist in try_dists
        try
            fitdist = fit(dist, vals);
            ks_test = ExactOneSampleKSTest(vals, fitdist)
            append!(df,DataFrame(;dist, δ=ks_test.δ, pvalue=pvalue(ks_test), fitdist))
        catch
            println(stderr, "Error fitting $dist")
        end
    end
    sort!(df, :δ)
    dist = df.fitdist[1]
end

fit_dist

In [5]:
"""
    get_daily_returnsp1(company, months=1)

This function gets the daily returns for the given `company` for the last `months` months.
The data additionally includes the thread id and process id that was used to execute the computation.
"""
function get_daily_returnsp1(company, months=1)
    yo = YahooOpt(period1 = DateTime(now()) - Month(months))
    dat =  yahoo(company, yo) |> DataFrame;
    dat.Close[2:end] ./ dat.Close[1:end-1] .- 1.0 .+ 1.0 # daily returns plus 1
end

get_daily_returnsp1

In [6]:
"""
    get_stock_data_dagger(companies)

Parallize the computations utilizing Dagger.
"""
function get_stock_data_dagger(companies)
    res = DataFrame()
    for company in companies
        daily_returnsp1 = Dagger.@spawn get_daily_returnsp1(company)
        dist = Dagger.@spawn fit_dist(daily_returnsp1)
        push!(res, (;company, daily_returnsp1, dist))
    end
    mapcols!(x -> fetch.(x), res)
    res
end

get_stock_data_dagger

In [7]:
"""
    get_stock_data_threads(companies)

Parallize the computations utilizing threads.
"""
function get_stock_data_threads(companies)
    res = DataFrame(company=String[], daily_returnsp1=Vector{Float64}[], dist=Distribution[])
    rl = ReentrantLock()
    Threads.@threads for company in companies
        daily_returnsp1 = get_daily_returnsp1(company)
        dist = fit_dist(daily_returnsp1)
        lock(rl) do
            push!(res, (;company, daily_returnsp1, dist))
        end
    end
    res
end

get_stock_data_threads

In [8]:
companies = ["MSFT"]
println("THREADS 1 company")
@time get_stock_data_threads(companies);
@time get_stock_data_threads(companies);
@time res_threads = get_stock_data_threads(companies);

THREADS 1 company
 94.067500 seconds (43.20 M allocations: 2.406 GiB, 2.17% gc time, 725.96% compilation time: <1% of which was recompilation)
  0.217833 seconds (2.70 k allocations: 136.727 KiB, 1 lock conflict)
  0.165312 seconds (2.86 k allocations: 273.367 KiB)


In [10]:
println("DAGGER 1 company")
@time get_stock_data_dagger(companies);
@time get_stock_data_dagger(companies);
@time res_dagger = get_stock_data_dagger(companies);
@show res_threads == res_dagger;

DAGGER 1 company
  0.111536 seconds (5.06 k allocations: 253.734 KiB, 4 lock conflicts)
  0.084687 seconds (5.34 k allocations: 271.656 KiB, 9 lock conflicts, 13.56% compilation time)
  0.069304 seconds (5.63 k allocations: 299.773 KiB, 8 lock conflicts)
res_threads == res_dagger = true


In [11]:
companies = ["MSFT", "AAPL", "NVDA", "AMZN", "META", "GOOGL", "GOOG", "LLY", "AVGO","JPM"]
println("THREADS 10 companies")
@time get_stock_data_threads(companies);
@time get_stock_data_threads(companies);
@time res_threads2 = get_stock_data_threads(companies);


THREADS 10 companies
  2.952851 seconds (311.10 k allocations: 16.940 MiB, 14 lock conflicts, 453.22% compilation time)
  0.820077 seconds (25.28 k allocations: 1.465 MiB, 15 lock conflicts)
  0.249381 seconds (24.98 k allocations: 1.326 MiB, 11 lock conflicts)


In [12]:
println("DAGGER 10 companies")
@time get_stock_data_dagger(companies);
@time get_stock_data_dagger(companies);
@time res_dagger2 = get_stock_data_dagger(companies);
@show sort(res_threads2, :company) == sort(res_dagger2, :company);


DAGGER 10 companies
  5.634656 seconds (492.22 k allocations: 25.271 MiB, 0.60% gc time, 67 lock conflicts, 317.25% compilation time: 16% of which was recompilation)
  0.257847 seconds (52.41 k allocations: 3.074 MiB, 95 lock conflicts)
  0.170920 seconds (50.03 k allocations: 2.605 MiB, 73 lock conflicts)
sort(res_threads2, :company) == sort(res_dagger2, :company) = true
