Requirements
==

You need to process, in Python 2, a set of client-provided trades that they will be providing in CSV format. Each trade will consist of an instrument, a price, a quantity and a timestamp - together with a number of optional columns such as the trade reference, instrument type, underlying asset, client reference. They may provide one or more files, each consisting of zero or more trades, into a common directory

Assume we need to store the trades somewhere for further processing as well provide an in-memory representation to work with them. Specifically we want to enrich the in-memory representation with the following derived information:

- For each trade we need the market value, i.e. the price multiplied by the quantity
- For each instrument we need the total market value, the closing value, and average price per day
- For each trade reference we need the constituent trades
- For each day we need to the total traded value, closing value, and the closing position

Your code should output the enriched data to the console, a file or some other store

You can submit the working code, including any relevant supporting files and specifically a README with instructions to run it, as a zip file or as a link to a GitHub repo

Please state any assumptions and describe your engineering choices




Solution
==

First of all we need to break down the whole proces into few parts:
- Generate random sample trades lists and save as `csv` file in common directory
- Read the data from all the files from the common directory into Pandas
- Store data to the database (SQlite3)
- Enrich the in-memory representation
- Output the enriched data to the file
- Display enriched data

### Importing required libraries

In [207]:
from datetime import datetime
import pandas as pd
import sqlite3 as db
import random
import time
import os

print(pd.__version__)

0.19.2


### Generating random `csv` trades lists


First of all we need to create a list of elements to appear in our dataset.
- instruments with ratios
- trade references
- instrument types
- underlying assets
- client references
- generate random timestamps

Let's create list of possible instruments from currencies array:

In [66]:
def createInstruments(currencies = {
    'USD': 1, 'EUR': 0.85, 'JPY': 110.79, 'GBP': 0.76,
    'CHF': 0.97, 'CAD': 1.24, 'AUD': 1.25, 'NZD': 1.33, 'ZAR': 13.03
    }):
    instruments = dict()
    for currency_1 in currencies.keys():
        for currency_2 in currencies.keys():
            if currency_1 != currency_2:
                ratio = currencies[currency_2] / currencies[currency_1]
                instruments[currency_1 + currency_2] = ratio
    return instruments

instruments = createInstruments()

Create references list

In [39]:
def createReferences(total):
    references = []
    for i in range(total):
        references.append('Reference_' + str(i))
    return references

totalReferences = 5

references = createReferences(totalReferences)

Create instrument types list

In [40]:
def createInstrumentTypes():
    return ['SELL', 'BUY']

instrumentTypes = createInstrumentTypes()

As the trade references, client references and underlying assets lists have the same structure, let's create one universal method to create them.

In [42]:
def createCustomList(name, total=5):
    elements = []
    for i in range(total):
        elements.append(name + '_' + str(i+1))
    return elements

tradeReferences = createCustomList('TradeReference', 5)
underlyingAssets = createCustomList('UnderlyingAsset', 10)
clientReferences = createCustomList('ClientReference', 2)

We need one more method to populate our dataset. The method to generate random timestamp when start and end dates are given.

In [65]:
def generateTimestamp(start_date, end_date):
    start_timestamp = time.mktime(time.strptime(start_date, '%b %d %Y %I:%M:%S'))
    end_timestamp = time.mktime(time.strptime(end_date, '%b %d %Y %I:%M:%S'))
    return random.randrange(start_timestamp,end_timestamp)

date = generateTimestamp('Jun 1 2010  01:33:00', 'Jun 5 2010  01:33:00')

OK so now we have all the methods to create our sample files. Let's create a method with the optional inputs of:
- output file directory path
- `csv` files count
- max trades per file
- currencies list
- trade references list
- instrument types
- underlying assets list
- client references list
- start date
- end date

In [274]:
def priceTreshhold(instrument, key):
    treshhold = 0.1 # 10%
    start = instrument[key] - instrument[key]*treshhold
    stop = instrument[key] + instrument[key]*treshhold
    return round(random.uniform(start, stop), 4)


def deleteFilesInFolder(folder):
    if os.path.isdir(folder):
        for the_file in os.listdir(folder):
            file_path = os.path.join(folder, the_file)
            try:
                if os.path.isfile(file_path):
                    os.unlink(file_path)
            except Exception as e:
                print(e)


def optional(value):
    return random.choice([value, ''])
            

def createSampleDataCSV(path='common_directory/',
                        filesCount=3, 
                        maxTrades=1000,
                        instruments=createInstruments(),
                        tradeReferences=createCustomList('TR', 5),
                        instrumentTypes=createInstrumentTypes(),
                        underlyingAssets=createCustomList('UA', 10),
                        clientReferences=createCustomList('CR', 3),
                        startDate='Jul 1 2017  08:30:00',
                        endDate='Jul 5 2017  01:33:00',
                        deletePrevious=True
                        ):
    if deletePrevious:
        deleteFilesInFolder(path)
        
    columns = ['Instrument','Price','Quantity',
               'Timestamp', 'Trade Reference', 
               'Instrument Type','Underlying Asset',
               'Client Reference']
    
    for i in range(filesCount):
        fileTrades = {title : [] for title in columns }
        for k in range(random.randint(0, maxTrades)):
            currentInstrument = random.choice(instruments.keys())
            fileTrades[columns[0]].append(currentInstrument)
            fileTrades[columns[1]].append(priceTreshhold(instruments, currentInstrument))
            fileTrades[columns[2]].append(random.randint(1,10000))
            fileTrades[columns[3]].append(generateTimestamp(startDate, endDate))
            fileTrades[columns[4]].append(optional(random.choice(tradeReferences)))
            fileTrades[columns[5]].append(optional(random.choice(instrumentTypes)))
            fileTrades[columns[6]].append(optional(random.choice(underlyingAssets)))
            fileTrades[columns[7]].append(optional(random.choice(clientReferences)))
            
        trades = pd.DataFrame(fileTrades, columns=columns)
        trades.to_csv(path + 'trades_list_' + str(i) + '.csv', sep=',', encoding='utf-8', index=False)  


Generate default sample cvs dataset files

In [283]:
createSampleDataCSV()

### Read data from common folder files into pandas data frame

In [284]:
def readCSVFromFolder(folder, deleteFiles=True):
    tradesList = []
    for filename in os.listdir(folder):
        if filename != '.ipynb_checkpoints' and filename.endswith('.csv'):
            trades = pd.read_csv(folder + '/' + filename, sep=',')
            tradesList.append(trades.fillna(''))
    
    if deleteFiles:
        deleteFilesInFolder(folder)
    
    return pd.concat(tradesList, ignore_index=True) if len(tradesList) > 0 else pd.DataFrame()

allTrades = readCSVFromFolder(folder='common_directory', deleteFiles=False)

allTrades.tail()


Unnamed: 0,Instrument,Price,Quantity,Timestamp,Trade Reference,Instrument Type,Underlying Asset,Client Reference
1020,ZARNZD,0.0923,9021,1499173617,TR_5,BUY,,CR_3
1021,JPYUSD,0.0086,419,1499054679,TR_2,BUY,UA_10,CR_3
1022,EURGBP,0.8338,6123,1498990134,,,UA_8,
1023,AUDGBP,0.6049,8310,1498972453,TR_2,,UA_5,
1024,CADEUR,0.626,16,1499155805,TR_2,SELL,UA_7,


### Store data to SQLite3 database

In [198]:
def saveDataFrameToDb(dataframe, table_name='trades', if_exists='append', db_name='database.db'):
    con = db.connect(db_name)
    normalizedToDB = dataframe.copy()
    normalizedToDB.columns = [x.lower().replace(" ", "_") for x in normalizedToDB.columns]
    normalizedToDB.to_sql(table_name, con, index=False, if_exists=if_exists)
    pass
    
saveDataFrameToDb(allTrades, if_exists='replace')

### Enriching data with derived information
1. For each trade we need the market value, i.e. the price multiplied by the quantity
2. For each instrument we need the total market value, the closing value, and average price per day
3. For each trade reference we need the constituent trades
4. For each day we need to the total traded value, closing value, and the closing position


#### 1. For each trade we need the market value
Add to pandas DataFrame additional column `Market Value` which is equal to `Price` * `Quantity`

In [203]:
def addMarketValueColumn(dataframe):
    dataframe['Market Value'] = dataframe['Price'] * dataframe['Quantity']
    return dataframe

allTrades = addMarketValueColumn(allTrades)
allTrades.head()

Unnamed: 0,Instrument,Price,Quantity,Timestamp,Trade Reference,Instrument Type,Underlying Asset,Client Reference,Market Value
0,EURCAD,1.4221,6703,1498974387,,,UA_2,CR_2,9532.3363
1,NZDJPY,75.7404,5385,1499144484,TR_1,,,,407862.054
2,CADGBP,0.6036,3816,1499023995,,SELL,UA_9,,2303.3376
3,USDNZD,1.2228,5356,1499040037,TR_5,SELL,,,6549.3168
4,GBPUSD,1.281,7555,1498937891,TR_4,,UA_8,CR_1,9677.955


#### 2. For each instrument we need the total market value, the closing value, and average price per day

Let's create a dictionary where keys are instruments and values are DataFrames containing columns `total market value`, `closing value` and `average price` listed by day

In [262]:
def getInstrumentsDailyStats(trades):
    instrumentsDailyStats = dict()
    columns = ['Day', 'Total Market Value', 'Closing Value', 'Average Price']
    for instrument in trades['Instrument'].unique():
        instrumentData = trades.loc[trades['Instrument'] == instrument]
        instrumentData = instrumentData.assign(Date = pd.Series([datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d') for timestamp in instrumentData['Timestamp']], index=instrumentData.index))
        dailyData = {title: [] for title in columns}
        
        for day in instrumentData['Date'].unique():
            currentDay = instrumentData.loc[instrumentData['Date'] == day]
            # Date
            dailyData[columns[0]].append(day)
            # Sum of Market Value column values
            dailyData[columns[1]].append(currentDay['Market Value'].sum())
            # Get price where Timestamp is max
            dailyData[columns[2]].append(currentDay.loc[currentDay['Timestamp'].idxmax()]['Price'])
            # Calculate mean of Price column
            dailyData[columns[3]].append(currentDay['Price'].mean())
            
        dailyDF = pd.DataFrame(dailyData)
        instrumentsDailyStats[instrument] = dailyDF

    return instrumentsDailyStats

instrumentsDailyStats = getInstrumentsDailyStats(allTrades)

instrument = random.choice(instrumentsDailyStats.keys())
print(instrument)
instrumentsDailyStats[instrument]

USDAUD


Unnamed: 0,Average Price,Closing Value,Day,Total Market Value
0,1.2218,1.1525,2017-07-03,35956.0975
1,1.2985,1.3282,2017-07-01,9116.9506
2,1.226033,1.1378,2017-07-04,31366.7186
3,1.236425,1.2233,2017-07-02,22550.4894


#### 3. For each trade reference we need the constituent trades
Let's create a dictionary where keys are equal to trade references (TR) and values are DataFrames containing all constituent trades

In [261]:
def getTradeReferencesConstituents(trades):
    TrCt = dict()
    for tr in trades['Trade Reference'].unique():
        if tr != '':
            TrCt[tr] = trades.loc[trades['Trade Reference'] == tr]
    return TrCt

TRConsistTrades = getTradeReferencesConstituents(allTrades)

TR = random.choice(TRConsistTrades.keys())
print(TR)
TRConsistTrades[TR]

TR_1


Unnamed: 0,Instrument,Price,Quantity,Timestamp,Trade Reference,Instrument Type,Underlying Asset,Client Reference,Market Value
1,NZDJPY,75.7404,5385,1499144484,TR_1,,,,4.078621e+05
22,EURAUD,1.5939,9435,1499011213,TR_1,,,CR_3,1.503845e+04
55,EURNZD,1.6826,5342,1499019600,TR_1,,UA_9,CR_3,8.988449e+03
56,GBPJPY,153.8194,4616,1499005527,TR_1,,,,7.100304e+05
60,ZARCHF,0.0752,7158,1498919544,TR_1,,UA_7,,5.382816e+02
61,CADCHF,0.8389,7104,1498952246,TR_1,BUY,,,5.959546e+03
69,AUDGBP,0.6017,5760,1498904144,TR_1,SELL,UA_5,CR_2,3.465792e+03
72,EURJPY,132.7310,6186,1498946538,TR_1,,UA_6,CR_2,8.210740e+05
109,AUDJPY,83.9013,3011,1499143942,TR_1,,UA_2,CR_2,2.526268e+05
124,USDJPY,106.9472,3324,1499195586,TR_1,,,CR_3,3.554925e+05


#### 4. For each day we need to the total traded value, closing value, and the closing position

I am not sure it I unterstand correctly this task as I am a little confused with the terminology.

- total traded value - total value of instrument trades in a stock market (sum the trades market values for each instrument by day)
- closing value - profit/loss by the end of the day (for each instrument calculate SELL Market Values minus BUY Market Values)
- closing position - the last each instrument type for the day (get the last instrument type for each instrument)

In [271]:
def getClosingPosition(currentInstrument):
    instrumentType = ''
    typeSet = currentInstrument.loc[currentInstrument['Instrument Type'] != '']
    if len(typeSet) > 0:
        instrumentType = currentInstrument.loc[typeSet['Timestamp'].idxmax()]['Instrument Type']
    return instrumentType

def getDailyStats(trades):
    dailyStats = dict()
    columns = ['Instrument', 'Total Traded Value', 'Closing Value', 'Closing Position']
    trades = trades.assign(Date = pd.Series([datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d') for timestamp in trades['Timestamp']], index=trades.index))
    for day in trades['Date'].unique():
        tradesByDay = trades.loc[trades['Date'] == day]
        dailyInstruments = {title: [] for title in columns}
        for instrument in tradesByDay['Instrument'].unique():
            currentInstrument = tradesByDay.loc[tradesByDay['Instrument'] == instrument]
            dailyInstruments[columns[0]].append(instrument)
            dailyInstruments[columns[1]].append(currentInstrument['Market Value'].sum())
            dailyInstruments[columns[2]].append(
                currentInstrument.loc[currentInstrument['Instrument Type'] == 'SELL']['Market Value'].sum() - 
                currentInstrument.loc[currentInstrument['Instrument Type'] == 'BUY']['Market Value'].sum())
            dailyInstruments[columns[3]].append(getClosingPosition(currentInstrument))
        dailyInstrumentsDF = pd.DataFrame(dailyInstruments)
        dailyStats[day] = dailyInstrumentsDF
    return dailyStats

dailyStats = getDailyStats(allTrades)

day = random.choice(dailyStats.keys())
print(day)
dailyStats[day]

2017-07-02


Unnamed: 0,Closing Position,Closing Value,Instrument,Total Traded Value
0,,0.0000,EURCAD,1.790173e+04
1,SELL,1826.5797,CADGBP,5.413246e+03
2,BUY,22145.8899,GBPCHF,7.506156e+04
3,SELL,-10019.3058,CADAUD,3.555437e+04
4,BUY,-7051.9251,NZDGBP,2.727411e+04
5,BUY,196.8299,AUDCAD,3.864815e+04
6,SELL,-8196.7910,EURAUD,3.914245e+04
7,SELL,-2028.6102,AUDNZD,6.857863e+04
8,BUY,6098.1727,CHFNZD,6.221410e+04
9,SELL,4770.9952,CADCHF,3.056538e+04


## Creating a function to output the data to csv files
Create folder `output` with 4 folders inside each for the different enriched data task.

In [280]:
def saveAsCsv(path, title, data, folder):
    fullPath = path + '/' + folder + '/'
    if not os.path.isdir(fullPath):
        os.makedirs(fullPath)
    data.to_csv(fullPath + title + '.csv',
                sep=',',encoding='utf-8', index=False)
    pass


def outputDataToFiles(trades, instDaily, trConsist, dailyStats, path='output', deleteBefore=True):
    if deleteBefore:
        deleteFilesInFolder(path)
    saveAsCsv(path, 'added_market_value', trades, 'Task1')
    for instrument, data in instDaily.items():
        saveAsCsv(path, instrument, data, 'Task2')
    for tr, data in trConsist.items():
        saveAsCsv(path, tr, data, 'Task3')
    for day, data in dailyStats.items():
        saveAsCsv(path, day, data, 'Task4')
        
outputDataToFiles(allTrades, instrumentsDailyStats,
                 TRConsistTrades, dailyStats)