In [1]:
%run ./utils.py

import concurrent.futures
from pathlib import Path

import numpy as np
import pandas as pd
import pandas_datareader.data as web
from tqdm import tqdm
max_workers = 3


def main():
    # read unprocessed dfs
    processed_trades = set([path.name for path in Path("./trades/processed/").glob("*.csv")])
    raw_trades = set([path.name for path in Path("./trades/raw/").glob("*.csv")])
    unprocessed_trades = raw_trades.difference(processed_trades)
    if len(unprocessed_trades)==0:
        return "Nothing to Process"
    dfs = UtilsIO.parallel_read_df(files=[f"./trades/raw/{name}" for name in unprocessed_trades], returns="list")
    print(dfs[0].head(3))
    
    
    # open prices of traded stocks
    def get_trade_openings(df_):
        df = df_.copy()
        if df["date"].nunique()!=1:
            raise ValueError("number of unique dates should be 1")
        else:
            # set date
            trade_date = df.iloc[0]["date"]

        # date's open price
        open_configs = [{"ticker":ticker, "date":trade_date, "on":"Open"} for ticker in df["ticker"].unique()] # on ark's trade date
        open_prices = UtilsFinancial.parallel_fetch_yahoo_daily(open_configs)

        # join key: date + ticker
        open_prices["key1"] = open_prices["ticker"].astype(str) + "_" + open_prices["Date"].astype(str)
        df["key1"] = df["ticker"].astype(str) + "_" + df["date"].astype(str)
        df = df.set_index("key1").join(open_prices.set_index("key1")[["Open"]], how="inner")
        df.drop_duplicates(inplace=True)

        return df
    priced_trades = []
    for df_ in tqdm(dfs):
        priced_trades.append(get_trade_openings(df_))
    print(priced_trades[0].head(3))
    
    
    # net & volume of trades (assume on open price)
    def estimate_volume(priced_df):
        net = priced_df.copy()[["fund", "date", "direction", "ticker", "company", "shares", "Open"]]
        net["shares"] = net.apply(lambda row:row["shares"] if row["direction"]=="Buy" else -row["shares"], axis=1)
        net = net.groupby(["ticker", "date", "company", "fund"], as_index=False).agg({'shares':'sum', 'Open':'mean'})
        # new cols
        net["signal"] = net["shares"].apply(lambda x: "Buy" if x>0 else "Sell")
        net["volume"] = net["shares"] * net["Open"]
        net["abs_volume"] = abs(net["volume"])
        # reorder
        net = net[["date", "fund", "ticker", "company", "signal", "shares", "Open", "volume", "abs_volume"]]
        return net
    nets = [] # list of dfs
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        nets.extend(tqdm(executor.map(estimate_volume, priced_trades), total=len(priced_trades)))
    print(nets[-1].head(3))
    
    
    # save processed locally
    def save_processed_dfs(processed_df):
        if processed_df["date"].nunique()!=1:
            raise ValueError("more than 1 value for date")
        trade_date = processed_df["date"].iloc[0]
        processed_df.sort_values("abs_volume", ascending=False).\
        to_csv(f"./trades/processed/{trade_date}_ARK_TRADES.csv", index=False)
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        list(tqdm(executor.map(save_processed_dfs, nets), total=len(nets)))
    
    
    return "OK"
main()

100%|██████████| 1/1 [00:00<00:00, 4048.56it/s]
  0%|          | 0/1 [00:00<?, ?it/s]

   fund        date direction ticker      cusip             company  shares  \
0  ARKG  2021-02-18       Buy   VEEV  922475108   VEEVA SYSTEMS INC   40818   
1  ARKG  2021-02-18       Buy   SGFY  82671G100  SIGNIFY HEALTH INC   44305   
2  ARKG  2021-02-18       Buy   MASS  65443P102     908 DEVICES INC    6191   

   % of etf  
0    0.1036  
1    0.0135  
2    0.0032  


100%|██████████| 1/1 [00:05<00:00,  5.83s/it]
100%|██████████| 1/1 [00:00<00:00, 49.20it/s]
100%|██████████| 1/1 [00:00<00:00, 102.92it/s]

                 fund        date direction ticker      cusip  \
key1                                                            
ABBV_2021-02-18  ARKG  2021-02-18       Buy   ABBV  00287Y109   
AVAV_2021-02-18  ARKQ  2021-02-18      Sell   AVAV  008073108   
BEAM_2021-02-18  ARKK  2021-02-18       Buy   BEAM  07373V105   

                               company  shares  % of etf        Open  
key1                                                                  
ABBV_2021-02-18             ABBVIE INC  186105    0.1605  105.320000  
AVAV_2021-02-18      AEROVIRONMENT INC     400    0.0013  127.970001  
BEAM_2021-02-18  BEAM THERAPEUTICS INC   59522    0.0225  102.180000  
         date  fund ticker                company signal  shares        Open  \
0  2021-02-18  ARKG   ABBV             ABBVIE INC    Buy  186105  105.320000   
1  2021-02-18  ARKQ   AVAV      AEROVIRONMENT INC   Sell    -400  127.970001   
2  2021-02-18  ARKK   BEAM  BEAM THERAPEUTICS INC    Buy   59522  102.180000   




'OK'