# Import FXCM data to Parquet for Spark processing in Python

The goal of this project is to explore forex data using Spark. 

Using [fxcmpy](https://github.com/fxcm/fxcmpy) this script imports 10 years of minutes data of currency pairs into [parquet](https://parquet.apache.org/) format that Spark will handle seamlessly

pairs :

- EUR/USD (Euro – US dollar)
- EUR/CHF (Euro - Swiss franc )
- EUR/JPY (Euro - Japanese yen)
- USD/JPY (US dollar – Japanese yen)
- USD/CHF (US dollar – Swiss franc)
- USD/CAD (US dollar – Canadian dollar)
- GBP/USD (British pound – US dollar)
- AUD/USD (Australian dollar – US dollar)

It represents almost 30 millions of valid candles over 46 millions of samples ( every minutes for 10 years )

Requirements :

- install [spark](https://spark.apache.org/) ( 2.3 recommended for later ) 
- free [fxcm](https://www.fxcm.com) demo account.

create a virtualenv and install python packages:

```console
pip install -r requirements.txt
```


## Connect to FXCM

Copy paste the fxcm demo token here

In [1]:
import fxcmpy
#token needed
token = ""

con = fxcmpy.fxcmpy(access_token=token, log_level='error')

## Import FXCM minutes Data

Import in Parquet minutes historic candles from FXCM by year and instruments. 
not parallelised so will take few hours to run.

the results are sampled by minutes for later interpolation

In [None]:
import datetime as dt
import pandas as pd
import os


def fxcm_to_parquet(inst,year,folder):
    cut = 20
    beg = dt.datetime(year=year,month=1,day=1)  
    end = dt.datetime(year=year,month=12,day=31,hour=23,minute=59)
    diff =  dt.datetime(year=year+1,month=1,day=1) - beg 
    delta = diff/cut
    
    dfsampled = pd.DataFrame(index = pd.date_range(beg,end, freq='T'))
    dfsampled.index.name = "date"

    frames=[]
    for i in range(1,cut+1):
        end = beg + delta
        print(beg,end)
        df = con.get_candles(inst,start=beg,end=end,period='m1')
        if df.empty is False:
            df.reset_index(inplace=True)       
            frames.append(df)
        beg = end +  dt.timedelta(minutes = 1)

    result = pd.concat(frames)
    instnorm = inst.replace("/","")
    directory = "{}/inst={}/".format(folder,instnorm)
    if not os.path.exists(directory):
        os.makedirs(directory)
    filename = "{}/inst={}/data_{}.parquet".format(folder,instnorm,year)
    result.set_index("date",inplace=True)
    for col in result.columns:
        dfsampled[col] = result[col]
    dfsampled.to_parquet(filename)

          
instlist = ['EUR/USD','EUR/CHF','EUR/JPY','USD/JPY','USD/CHF',"USD/CAD", 'GBP/USD', 'AUD/USD']


for linst in instlist: 
    print("inst",linst)
    for i in range(2008,2019):
        fxcm_to_parquet(linst,i,'data/forexdatam')
    

## Connect to Spark

Findspark package will do the job

In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark import SparkContext
import pyspark

spark = SparkSession \
    .builder \
    .appName("Paktolos") \
    .getOrCreate()

sc = SparkContext.getOrCreate()
sqlc = SQLContext(sc)

## Create the Spark dataframe

Open the parquets files in one dataframe, parallel processing and sql queries are now possible

In [13]:
import time
beg = time.time()
df_spark = spark.read.parquet("data/forexdatam").orderBy("date")
count = df_spark.count()
print("Number of candles ( bid and ask ) :", count)
print("Count time", time.time()-beg)
print("Columns saved :", df_spark.columns)

Number of candles ( bid and ask ) : 46287360
Count time 7.417025804519653
Columns saved : ['bidopen', 'bidclose', 'bidhigh', 'bidlow', 'askopen', 'askclose', 'askhigh', 'asklow', 'tickqty', 'date', 'inst']


## Quantiles Computation

Compute the spread quantiles by computing the difference between the ask and bid price for each minute over ten years.
Displays the results using [Plotly](https://plot.ly/). This is for testing data manipulation

In [12]:
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
init_notebook_mode(connected=True)
import plotly.graph_objs as go


def compute_quantiles(inst,pip=10000):
    ninst = inst.replace("/","")
    df_spark_eurusd = df_spark.filter(df_spark["inst"]==ninst)
    df_spark_eurusd = df_spark_eurusd.withColumn("spread_"+ninst,pip*(df_spark_eurusd["askopen"] - df_spark_eurusd["bidopen"]) )
    pip_quantiles = df_spark_eurusd.approxQuantile("spread_"+ninst, [0.1,0.25,0.5,0.75,0.9], 0)
    return pip_quantiles


# trick to provide already computed quantiles to plotly
def get_boxplot_data(inst,pipf=10000):   
    quantiles = compute_quantiles(inst,pipf)
    data = [quantiles[0],quantiles[1],quantiles[1],quantiles[2],quantiles[3],quantiles[3],quantiles[4]]
    boxplot = go.Box(
        y=data,
        name=inst,
        boxpoints=False
    )
    return boxplot

data = []
instlist = ['EUR/USD','EUR/CHF','EUR/JPY','USD/JPY','USD/CHF',"USD/CAD", 'GBP/USD', 'AUD/USD']

for inst in instlist:
    pipf = 10000
    if 'JPY' in inst:
        pipf = 100 
    data.append(get_boxplot_data(inst,pipf))

layout = go.Layout(
    title='FXCM Spread distribution based on minutes bid ask differences since 2008 computed using spark (min=p10 max=p90)',
    xaxis=dict( title='Instrument'),
    yaxis=dict( title='Spread'),
    bargap=0.2,
    bargroupgap=0.1
)

fig = go.Figure(data=data, layout=layout)
iplot(fig, filename='boxplot spread')


## Get the list of currency pairs 

In [2]:
import datetime as dt
df_spark_glob = spark.read.parquet("data/forexdatam")
beg = dt.datetime(year=2018,month=1,day=3)
end = dt.datetime(year=2018,month=1,day=5)
listinst = df_spark_glob.filter(df_spark_glob.date.between(beg,end)).select('inst').distinct().rdd.flatMap(lambda x: x).collect()       

## Interpolate the data

Linear Interpolate candles ( even for the ticks, yep )  up to 10 minutes.

In [None]:
import pandas as pd
import os

for inst in listinst: 
    print("inst",inst)
    for year in range(2008,2019):
        filename = "data/forexdatam/inst={}/data_{}.parquet".format(inst,year)
        df = pd.read_parquet(filename)
        df = df.interpolate(limit=10)
        directory = "data/forexdataint/inst={}".format(inst)
        if not os.path.exists(directory):
            os.makedirs(directory)
        filename = "{}/data_{}.parquet".format(directory,year)
        df.to_parquet(filename)

## Join the data

Store the data in this schema :

- Each row is a minute of 40 columns = 5 (OHLCV) * 8 (pairs)

In [None]:
import datetime as dt

df_spark_glob = spark.read.parquet("data/forexdataint")
df_spark_glob.createOrReplaceTempView("fxcmtable")
df_list = []

for ins in listinst:
    request = "SELECT date,bidopen as O_{0},bidclose as C_{0} \
    ,bidhigh as H_{0},bidlow as L_{0}, tickqty as tick_{0} \
    FROM fxcmtable WHERE inst='{0}'".format(ins)
    df_list.append(spark.sql(request))

df_first = df_list.pop(0)
for df in df_list:
    df_first = df_first.join(df,on=['date']).orderBy('date')
    
df_first.write.parquet("data/forexdatajoin/",mode='overwrite')
   

## Remove the empty minute intervals 

Drop the null values. The sampling rate is not anymore constant  

In [None]:
dfjoin = spark.read.parquet("data/forexdatajoin/")    
dfjoin.dropna().write.parquet("data/forexdatajoinclean/",mode='overwrite')