In [1]:
#!/usr/bin/env python
import os
import sys
import warnings
import shutil
import time
import datetime
import calendar
import schedule
import pandas as pd
import numpy as np
import mysql.connector as mysql
from fastparquet import ParquetFile, write
import pickle
import boto3
import logging

In [2]:
s3 = boto3.resource('s3')
#s3bucketName = 'legio-data'
s3bucketName = 'legio-test'
bucket = s3.Bucket(name=s3bucketName)

#s3_StorageLocation = "database_data/"
s3_StorageLocation = "Legio/data/"

localStorage = '/general/legiotrader/MarketData/'
#localStorage = '/root/MarketDataTemp/'

dbHostContainer = "ec2-3-104-151-3.ap-southeast-2.compute.amazonaws.com"
dbHostPort = "31330"
user = "root"
passwd = "root"

prefix = "databasestorage/"

In [3]:
baseConfigData = pd.read_csv("/general/legiotrader/DockerFiles/BaseConfigInfo.csv")

In [None]:
starttime = datetime.datetime.now()
print(starttime)
#for index, row in baseConfigData.loc[baseConfigData['ActiveTrading']==True].iterrows():    
for index, row in baseConfigData.iloc[12:].iterrows():    
    
    baseCode = row['Symbol']
    print("Processing: "+baseCode)
    
    ## Process Minutes
    database = mysql.connect(host=dbHostContainer,port=dbHostPort,user=user,passwd=passwd,
                             connect_timeout=30,database=baseCode)
    dbConnection = database.cursor()
    dbConnection.execute("SELECT * FROM "+baseCode+"_Min")
    minsDF = pd.DataFrame(dbConnection.fetchall(), columns=["Timestamp", "Open", "High", "Low", "Close", "Vol"])
    minsDF = minsDF.drop_duplicates('Timestamp',keep='first').sort_values('Timestamp')
    
    print("\t-Minute data pulled and duplicates dropped")
    print((datetime.datetime.now()-starttime).total_seconds()/60)
    
    ## Save to Cloud
    minsDF['Timestamp'] = pd.to_datetime(minsDF['Timestamp'])    
    for year in list(set([date.year for date in minsDF['Timestamp']])):
        for i in range(1,13):
            if len(str(i)) == 1:
                monthStr = '0'+str(i)
            else:
                monthStr = str(i)
            minfilename = baseCode+'_M_'+str(year)+monthStr+".pkl"

            monthlymins = minsDF.loc[(minsDF['Timestamp'] > datetime.datetime(year,i,1,0,0,0))&(minsDF['Timestamp'] < datetime.datetime(year,i,calendar.monthrange(year,i)[1],23,59,59))]
            if len(monthlymins)>0:
                with open(localStorage+minfilename,"wb") as f:
                    pickle.dump(monthlymins, f)

                bucket.upload_file(localStorage+minfilename,prefix+minfilename)

                if minfilename in [x.key.split('/')[-1] for x in bucket.objects.filter(Prefix=prefix)]:
                    os.remove(localStorage+minfilename)    
    
    print("\t-Minutes saved to cloud")
    print((datetime.datetime.now()-starttime).total_seconds()/60)
    
    ## Drop & Recreate Table
#     database = mysql.connect(host=dbHostContainer,port=dbHostPort,user=user,passwd=passwd,
#                              connect_timeout=30,database=baseCode)
#     dbConnection = database.cursor()
#     dbConnection.execute("DROP TABLE "+baseCode+"_Min")
    
#     print("\t-Minute database table dropped")
#     print((datetime.datetime.now()-starttime).total_seconds()/60)
    
#     database = mysql.connect(host=dbHostContainer,port=dbHostPort,user=user,passwd=passwd,
#                              connect_timeout=30,database=baseCode)
#     dbConnection = database.cursor()
#     dbConnection.execute("CREATE TABLE "+baseCode+"_Min (Timestamp VARCHAR(255), Open VARCHAR(10), High VARCHAR(10), Low VARCHAR(10), Close VARCHAR(10), Vol VARCHAR(255))")
    
#     print("\t-Minute database table recreated")
#     print((datetime.datetime.now()-starttime).total_seconds()/60)
    
    ## Recommit DB    
#     for commitData in list(np.array_split(minsDF,25000)):
#         vals = [tuple(x) for x in commitData.values]
#         sql = "INSERT INTO "+baseCode+"_Min (Timestamp, Open, High, Low, Close, Vol) VALUES (%s, %s, %s, %s, %s, %s)"
#         database = mysql.connect(host=dbHostContainer,port=dbHostPort,user=user,passwd=passwd,
#                                  connect_timeout=30,database=baseCode)
#         dbConnection = database.cursor()
#         dbConnection.executemany(sql, vals)
#         database.commit()
    
    print("\t-Minute data processed")
    print((datetime.datetime.now()-starttime).total_seconds()/60)
    
    ## Process Hours
    database = mysql.connect(host=dbHostContainer,port=dbHostPort,user=user,passwd=passwd,
                             connect_timeout=30,database=baseCode)
    dbConnection = database.cursor()
    dbConnection.execute("SELECT * FROM "+baseCode+"_Hour")
    hoursDF = pd.DataFrame(dbConnection.fetchall(), columns=["Timestamp", "Open", "High", "Low", "Close"])
    hoursDF = hoursDF.drop_duplicates('Timestamp',keep='first').sort_values('Timestamp')
    
    print("\t-Hour data pulled and duplicates dropped")
    print((datetime.datetime.now()-starttime).total_seconds()/60)
    
    ## Save to Cloud
    hoursDF['Timestamp'] = pd.to_datetime(hoursDF['Timestamp'])    
    for year in list(set([date.year for date in hoursDF['Timestamp']])):
        for i in range(1,13):
            if len(str(i)) == 1:
                monthStr = '0'+str(i)
            else:
                monthStr = str(i)
            hrsfilename = baseCode+'_H_'+str(year)+monthStr+".pkl"

            monthlyhours = hoursDF.loc[(hoursDF['Timestamp'] > datetime.datetime(year,i,1,0,0,0))&(hoursDF['Timestamp'] < datetime.datetime(year,i,calendar.monthrange(year,i)[1],23,59,59))]
            if len(monthlyhours)>0:
                with open(localStorage+hrsfilename,"wb") as f:
                    pickle.dump(monthlyhours, f)

                bucket.upload_file(localStorage+hrsfilename,prefix+hrsfilename)

                if hrsfilename in [x.key.split('/')[-1] for x in bucket.objects.filter(Prefix=prefix)]:
                    os.remove(localStorage+hrsfilename)    
    
    print("\t-Hours saved to cloud")
    print((datetime.datetime.now()-starttime).total_seconds()/60)
    
    ## Drop & Recreate Table
#     database = mysql.connect(host=dbHostContainer,port=dbHostPort,user=user,passwd=passwd,
#                              connect_timeout=30,database=baseCode)
#     dbConnection = database.cursor()
#     dbConnection.execute("DROP TABLE "+baseCode+"_Hour")
    
#     print("\t-Hours database table dropped")
#     print((datetime.datetime.now()-starttime).total_seconds()/60)
    
#     database = mysql.connect(host=dbHostContainer,port=dbHostPort,user=user,passwd=passwd,
#                              connect_timeout=30,database=baseCode)
#     dbConnection = database.cursor()
#     dbConnection.execute("CREATE TABLE "+baseCode+"_Hour (Timestamp VARCHAR(255), Open VARCHAR(10), High VARCHAR(10), Low VARCHAR(10), Close VARCHAR(10))")
    
#     print("\t-Hours database table recreated")
#     print((datetime.datetime.now()-starttime).total_seconds()/60)
    
    ## Recommit DB    
#     for commitData in list(np.array_split(hoursDF,1000)):
#         vals = [tuple(x) for x in commitData.values]
#         sql = "INSERT INTO "+baseCode+"_Hour (Timestamp, Open, High, Low, Close) VALUES (%s, %s, %s, %s, %s)"
#         database = mysql.connect(host=dbHostContainer,port=dbHostPort,user=user,passwd=passwd,
#                                  connect_timeout=30,database=baseCode)
#         dbConnection = database.cursor()
#         dbConnection.executemany(sql, vals)
#         database.commit()
        
    print("\t-Hours data processed")
    print((datetime.datetime.now()-starttime).total_seconds()/60)    

2019-04-22 23:07:40.777888
Processing: CADJPY
	-Minute data pulled and duplicates dropped
0.36965086666666663
	-Minutes saved to cloud
2.7338457000000003
	-Minute data processed
2.7338666333333332
	-Hour data pulled and duplicates dropped
2.7405418833333335
	-Hours saved to cloud
3.5291626
	-Hours data processed
3.52916365
Processing: CHFCNH
	-Minute data pulled and duplicates dropped
3.9327127833333337
	-Minutes saved to cloud
5.7990212166666675
	-Minute data processed
5.799037666666666
	-Hour data pulled and duplicates dropped
5.806113966666667
	-Hours saved to cloud
6.688114283333333
	-Hours data processed
6.688131283333333
Processing: CHFCZK
	-Minute data pulled and duplicates dropped
6.824756216666667
	-Minutes saved to cloud
7.4395277
	-Minute data processed
7.439529216666667
	-Hour data pulled and duplicates dropped
7.44506925
	-Hours saved to cloud
7.742713666666666
	-Hours data processed
7.742727583333333
Processing: CHFDKK
	-Minute data pulled and duplicates dropped
8.1579724

	-Minutes saved to cloud
104.02657331666666
	-Minute data processed
104.02657573333333
	-Hour data pulled and duplicates dropped
104.03845226666667
	-Hours saved to cloud
107.28658528333334
	-Hours data processed
107.28659491666667
Processing: EURMXN
	-Minute data pulled and duplicates dropped
107.64314445
	-Minutes saved to cloud
111.05801856666668
	-Minute data processed
111.0580304
	-Hour data pulled and duplicates dropped
111.067421
