## Script to load financial time-series (per-minute ETFs) data from CSV files into a Pandas DF and a Postgres table
### The ingestion for Pandas is also done in its own perf tests notebook

In [2]:
data_path = '/workspace/data/datasets/unianalytica/group/analytics-perf-tests/symbols/'

In [4]:
import sys
import os
import csv
import psycopg2
import pandas as pd
import numpy as np
from datetime import datetime
import pytz
import time

### 1.Load up all files to one Pandas DF

In [30]:
symbol_dfs_list = []
records_count = 0
symbols_files = sorted(os.listdir(data_path))
for ix in range(len(symbols_files)):
    current_symbol_df = pd.read_csv(data_path + symbols_files[ix], parse_dates=[2], infer_datetime_format=True,
                                    names=['symbol_record_id', 'symbol', 'datetime', 'open', 'high', 'low', 'close', 'volume', 'split_factor', 'earnings', 'dividends'])
    records_count = records_count + len(current_symbol_df)
    symbol_dfs_list.append(current_symbol_df)
    #print('Loaded symbol #{}'.format(ix+1))

print('Now concatenating the DFs...')
symbols_df = pd.concat(symbol_dfs_list)
symbols_df.index = np.arange(records_count)
del(symbol_dfs_list)

Now concatenating the DFs...


#### Adding `symbol_id` column

In [31]:
symbols_list = sorted(pd.unique(symbols_df.symbol))
keys = symbols_list
values = list(range(1, len(symbols_list)+1))
dictionary = dict(zip(keys, values))
symbols_df.insert(0, 'symbol_id', np.array([dictionary[x] for x in symbols_df.symbol.values]))
symbols_df.head()

Unnamed: 0,symbol_id,symbol_record_id,symbol,datetime,open,high,low,close,volume,split_factor,earnings,dividends
0,1,0,aaxj,2008-08-15 12:44:00,43.07,43.07,43.07,43.07,232.759,1.0,0.0,0.0
1,1,1,aaxj,2008-08-15 16:00:00,43.07,43.07,43.07,43.07,116.379,1.0,0.0,0.0
2,1,2,aaxj,2008-08-18 09:28:00,42.63,42.75,42.63,42.75,10143.6,1.0,0.0,0.0
3,1,3,aaxj,2008-08-18 09:30:00,42.77,42.77,42.77,42.77,24439.7,1.0,0.0,0.0
4,1,4,aaxj,2008-08-18 10:07:00,42.53,42.53,42.53,42.53,2327.59,1.0,0.0,0.0


### 2.Import all files to a single table in the tests DB on Postgres

In [32]:
symbols_files = sorted(os.listdir(data_path))
keys = symbols_files
values = list(range(1, len(symbols_files)+1))
dictionary = dict(zip(keys, values))

In [5]:
try:
    conn = psycopg2.connect(dbname='tests', user='hitchhiker', host='localhost', password='freeride', port='9478')
except:
    print('I am unable to connect to the database')
    
cur = conn.cursor()

In [34]:
sqlQuery = '''
DROP TABLE IF EXISTS public.symbols_minute;

DROP TABLE IF EXISTS public.symbols_minute_staging;
'''
cur.execute(sqlQuery)
conn.commit()

In [35]:
sqlQuery = '''
CREATE TABLE public.symbols_minute
(
  symbol_id int,
  symbol_record_id int,
  symbol character varying(4),
  datetime timestamp without time zone NOT NULL,
  open real,
  high real,
  low real,
  close real,
  volume real,
  split_factor real,
  earnings real,
  dividends real
);

CREATE INDEX symbols_minute_datetime_idx ON public.symbols_minute (datetime);
'''
cur.execute(sqlQuery)
conn.commit()
    
sqlQuery = '''
CREATE TABLE public.symbols_minute_staging
(
  symbol_record_id int,
  symbol character varying(4),
  datetime timestamp without time zone PRIMARY KEY NOT NULL,
  open real,
  high real,
  low real,
  close real,
  volume real,
  split_factor real,
  earnings real,
  dividends real
);
'''
cur.execute(sqlQuery)
conn.commit()

In [36]:
num_files = 0
for ix in range(len(symbols_files)):
    num_files += 1
    #if num_files > 2: break
    try:
        cur.execute("TRUNCATE TABLE public.symbols_minute_staging;")
        conn.commit()

        f = open(data_path + symbols_files[ix], 'r')
        cur.copy_from(f, 'symbols_minute_staging', sep=',')
        conn.commit()

        sqlQuery = '''
        SELECT COUNT(*)
        FROM public.symbols_minute_staging;
        '''
        cur.execute(sqlQuery)
        current_count = cur.fetchall()[0][0]
        
        sqlQuery = '''
        INSERT INTO symbols_minute
        SELECT '%(symbol_id)s', *
        FROM public.symbols_minute_staging;
        '''% {'symbol_id': dictionary[symbols_files[ix]]}
        cur.execute(sqlQuery)
        conn.commit()
        
        print('{} records from {} are imported'.format(current_count, symbols_files[ix]))
    except:
        print('Cound not import ' + symbols_files[ix])
        e = sys.exc_info()[0]
        print("Error: %s" % e)
            
print('Data import finished.')

713747 records from aaxj.csv are imported
740226 records from acwi.csv are imported
1176236 records from agg.csv are imported
744534 records from agq.csv are imported
97408 records from bal.csv are imported
209734 records from bik.csv are imported
724294 records from biv.csv are imported
311684 records from bkf.csv are imported
971145 records from bnd.csv are imported
329759 records from brf.csv are imported
897540 records from bsv.csv are imported
613534 records from bwx.csv are imported
801695 records from csj.csv are imported
151729 records from dag.csv are imported
911223 records from dba.csv are imported
396196 records from dbb.csv are imported
1120318 records from dbc.csv are imported
222477 records from dbe.csv are imported
596675 records from dbo.csv are imported
965754 records from ddm.csv are imported
686974 records from dem.csv are imported
738617 records from dgaz.csv are imported
540873 records from dgp.csv are imported
447393 records from dgs.csv are imported
2205551 reco

In [39]:
cur.execute("SELECT count(*) FROM public.symbols_minute;")
print(cur.fetchall()[0][0])
    
cur.execute("SELECT * FROM public.symbols_minute LIMIT 10;")
for row in cur.fetchall():
    print(row)

50470570
(1, 0, 'aaxj', datetime.datetime(2008, 8, 15, 12, 44), 43.07, 43.07, 43.07, 43.07, 232.759, 1.0, 0.0, 0.0)
(1, 1, 'aaxj', datetime.datetime(2008, 8, 15, 16, 0), 43.07, 43.07, 43.07, 43.07, 116.379, 1.0, 0.0, 0.0)
(1, 2, 'aaxj', datetime.datetime(2008, 8, 18, 9, 28), 42.63, 42.75, 42.63, 42.75, 10143.6, 1.0, 0.0, 0.0)
(1, 3, 'aaxj', datetime.datetime(2008, 8, 18, 9, 30), 42.77, 42.77, 42.77, 42.77, 24439.7, 1.0, 0.0, 0.0)
(1, 4, 'aaxj', datetime.datetime(2008, 8, 18, 10, 7), 42.53, 42.53, 42.53, 42.53, 2327.59, 1.0, 0.0, 0.0)
(1, 5, 'aaxj', datetime.datetime(2008, 8, 18, 10, 43), 42.4, 42.4, 42.4, 42.4, 2327.59, 1.0, 0.0, 0.0)
(1, 6, 'aaxj', datetime.datetime(2008, 8, 18, 10, 53), 42.4, 42.4, 42.4, 42.4, 2327.59, 1.0, 0.0, 0.0)
(1, 7, 'aaxj', datetime.datetime(2008, 8, 18, 12, 4), 42.24, 42.24, 42.23, 42.23, 232.759, 1.0, 0.0, 0.0)
(1, 8, 'aaxj', datetime.datetime(2008, 8, 18, 12, 44), 42.1, 42.1, 42.1, 42.1, 116.379, 1.0, 0.0, 0.0)
(1, 9, 'aaxj', datetime.datetime(2008, 8, 18,

## License

Copyright (c) 2019, PatternedScience Inc.

This code was originally run on the [UniAnalytica](https://www.unianalytica.com) platform, is published by PatternedScience Inc. on [GitHub](https://github.com/patternedscience/GPU-Analytics-Perf-Tests) and is licensed under the terms of Apache License 2.0; a copy of the license is available in the GitHub repository.