In [1]:
import datetime, itertools, os, glob, shutil
from io import BytesIO
import gzip
import urllib.request
import pathlib


class FxcmDownloader:

    def __init__(self, start_year=2022, start_week=1, end_wk_of=datetime.datetime.now(), instrument='EURUSD', base_data_path='/Volumes/vol2/data/fxcm'):

        end_year = end_wk_of.year
        end_week = end_wk_of.isocalendar()[1]
        print("Range: {}-{} ->  {}-{}".format(start_year, start_week, end_year, end_week))
        if start_year > end_year or end_year == start_year and start_week>end_week:
            raise ValueError("Incorrect range: {}-{} ->  {}-{}".format(start_year, start_week, end_year, end_week))

        self.start_year = start_year
        self.start_week = start_week
        self.end_year = end_year
        self.end_week = end_week
        self.instrument = instrument
        self.base_data_path = base_data_path

    @staticmethod
    def ensure_target_dir_exists(target_dir):
        pathlib.Path(target_dir).mkdir(parents=True, exist_ok=True)

    @staticmethod
    def missing_files_for_range(existing_files, potential_to_download):
        return [x for x in filter(lambda x: x['local_csv'] not in existing_files, potential_to_download)]

    @staticmethod
    def year_week_pairs_for_week_range(years, start_wk, end_wk):
        return [x for x in itertools.product(years,range(start_wk, end_wk+1))]

    @staticmethod
    def last_week_of_year(year):
        return 53

    def mapToUrl(self, year,week):
         return "https://tickdata.fxcorporate.com/{}/{}/{}.csv.gz".format(self.instrument, year,week)

    def mapToFile(self, year, week, suffix):
         return os.path.join(self.base_data_path, "{}/{}/{}.{}".format(self.instrument,year,week, suffix))

    @staticmethod
    def year_week_pairs_for_range(start_yr, start_wk, end_yr, end_wk):
        result = []
        #if only 1 year, do yr x start_wk-end_wk
        if start_yr==end_yr:
            result = FxcmDownloader.year_week_pairs_for_week_range([start_yr], start_wk, end_wk)
        else:
            result = FxcmDownloader.year_week_pairs_for_week_range([start_yr], start_wk, FxcmDownloader.last_week_of_year(start_yr))
            result += FxcmDownloader.year_week_pairs_for_week_range(range(start_yr+1, end_yr), 1, 53)
            result += FxcmDownloader.year_week_pairs_for_week_range([end_yr], 1, end_wk)

        return result

    def existing_files(self):
        return glob.glob(os.path.join(self.base_data_path, self.instrument, "*", "*.csv"))

    def construct_files_to_download_with_destinations(self, yr_wk_pairs):
        urls_and_dest = [r for r in map(lambda yr_wk:
                                        {"url": self.mapToUrl(yr_wk[0], yr_wk[1]),
                                         "local_csv": self.mapToFile(yr_wk[0], yr_wk[1], "csv"),
                                         "local_csv_gz": self.mapToFile(yr_wk[0], yr_wk[1], "csv.gz")
                                         },
                        yr_wk_pairs)]

        return urls_and_dest

    def download(self, url, local_target):
        try:
            print("Downloading {} => {}".format(url, local_target))
            req = urllib.request.Request(
                url, 
                data=None, 
                headers={
                    'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.47 Safari/537.36'
                }
            )
            requests = urllib.request.urlopen(req)
            buf = BytesIO(requests.read())
            f = gzip.GzipFile(fileobj=buf)
            self.ensure_target_dir_exists(os.path.basename(local_target))
            with open(local_target, 'wb') as f_out:
                shutil.copyfileobj(f, f_out)
        except Exception as e:
            print("ERROR, problem during {} download".format(url))
            print(e)

    def download_new_files(self):
        yr_wk_pairs = self.year_week_pairs_for_range(self.start_year, self.start_week, self.end_year, self.end_week)
        existing_files = self.existing_files()
        potential_files_to_download = self.construct_files_to_download_with_destinations(yr_wk_pairs)
        files_to_download_with_destinations = self.missing_files_for_range(existing_files, potential_files_to_download)
        for target in files_to_download_with_destinations:
            self.download(target['url'], target['local_csv'])



In [5]:
instrument='EURGBP'
target="/home/dcai/data"
FxcmDownloader(start_year=2019, instrument=instrument, base_data_path=target).download_new_files()

Range: 2019-1 ->  2023-5
Downloading https://tickdata.fxcorporate.com/EURGBP/2019/1.csv.gz => /home/dcai/data/EURGBP/2019/1.csv
Downloading https://tickdata.fxcorporate.com/EURGBP/2019/2.csv.gz => /home/dcai/data/EURGBP/2019/2.csv
Downloading https://tickdata.fxcorporate.com/EURGBP/2019/3.csv.gz => /home/dcai/data/EURGBP/2019/3.csv
Downloading https://tickdata.fxcorporate.com/EURGBP/2019/4.csv.gz => /home/dcai/data/EURGBP/2019/4.csv
Downloading https://tickdata.fxcorporate.com/EURGBP/2019/5.csv.gz => /home/dcai/data/EURGBP/2019/5.csv
Downloading https://tickdata.fxcorporate.com/EURGBP/2019/6.csv.gz => /home/dcai/data/EURGBP/2019/6.csv
Downloading https://tickdata.fxcorporate.com/EURGBP/2019/7.csv.gz => /home/dcai/data/EURGBP/2019/7.csv
Downloading https://tickdata.fxcorporate.com/EURGBP/2019/8.csv.gz => /home/dcai/data/EURGBP/2019/8.csv
Downloading https://tickdata.fxcorporate.com/EURGBP/2019/9.csv.gz => /home/dcai/data/EURGBP/2019/9.csv
Downloading https://tickdata.fxcorporate.com/EUR

Downloading https://tickdata.fxcorporate.com/EURGBP/2020/26.csv.gz => /home/dcai/data/EURGBP/2020/26.csv
Downloading https://tickdata.fxcorporate.com/EURGBP/2020/27.csv.gz => /home/dcai/data/EURGBP/2020/27.csv
Downloading https://tickdata.fxcorporate.com/EURGBP/2020/28.csv.gz => /home/dcai/data/EURGBP/2020/28.csv
Downloading https://tickdata.fxcorporate.com/EURGBP/2020/29.csv.gz => /home/dcai/data/EURGBP/2020/29.csv
Downloading https://tickdata.fxcorporate.com/EURGBP/2020/30.csv.gz => /home/dcai/data/EURGBP/2020/30.csv
Downloading https://tickdata.fxcorporate.com/EURGBP/2020/31.csv.gz => /home/dcai/data/EURGBP/2020/31.csv
Downloading https://tickdata.fxcorporate.com/EURGBP/2020/32.csv.gz => /home/dcai/data/EURGBP/2020/32.csv
Downloading https://tickdata.fxcorporate.com/EURGBP/2020/33.csv.gz => /home/dcai/data/EURGBP/2020/33.csv
Downloading https://tickdata.fxcorporate.com/EURGBP/2020/34.csv.gz => /home/dcai/data/EURGBP/2020/34.csv
Downloading https://tickdata.fxcorporate.com/EURGBP/202

Downloading https://tickdata.fxcorporate.com/EURGBP/2021/52.csv.gz => /home/dcai/data/EURGBP/2021/52.csv
Downloading https://tickdata.fxcorporate.com/EURGBP/2021/53.csv.gz => /home/dcai/data/EURGBP/2021/53.csv
ERROR, problem during https://tickdata.fxcorporate.com/EURGBP/2021/53.csv.gz download
HTTP Error 404: Not Found
Downloading https://tickdata.fxcorporate.com/EURGBP/2022/1.csv.gz => /home/dcai/data/EURGBP/2022/1.csv
Downloading https://tickdata.fxcorporate.com/EURGBP/2022/2.csv.gz => /home/dcai/data/EURGBP/2022/2.csv
Downloading https://tickdata.fxcorporate.com/EURGBP/2022/3.csv.gz => /home/dcai/data/EURGBP/2022/3.csv
Downloading https://tickdata.fxcorporate.com/EURGBP/2022/4.csv.gz => /home/dcai/data/EURGBP/2022/4.csv
Downloading https://tickdata.fxcorporate.com/EURGBP/2022/5.csv.gz => /home/dcai/data/EURGBP/2022/5.csv
Downloading https://tickdata.fxcorporate.com/EURGBP/2022/6.csv.gz => /home/dcai/data/EURGBP/2022/6.csv
Downloading https://tickdata.fxcorporate.com/EURGBP/2022/7.c

In [6]:
import os

! pip3  install --user pyarrow



In [12]:
import multiprocessing
import os
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
import csv
import time
import threading


def csv_to_parqet(csv_file_path):
   
    parquet_file = csv_file_path.replace('.csv', '.parquet')
    print(f'Doing {parquet_file}')
    
    with open(csv_file_path, 'rb') as csv_file:
        csv_contents = csv_file.read()

    # Remove the null characters from the contents
    csv_contents = csv_contents.replace(b'\0', b'')

    # Parse the contents into rows using the csv module
    reader = csv.reader(csv_contents.decode('utf-8-sig').splitlines())

    # Get the column names from the first row
    column_names = next(reader)


    df = pd.DataFrame(reader, columns=column_names)


    # Convert the datetime column to a datetime type
    df['DateTime'] = pd.to_datetime(df['DateTime'])
    

    # Convert the dataframe to a Parquet file
    pq.write_table(pa.Table.from_pandas(df), parquet_file)
    print(f'Done {parquet_file}')

# This function will recursively traverse a directory tree and convert all CSV files to Parquet format
def convert_csv_to_parquet(root_dir):
    filenames = []
    for root, dirs, files in os.walk(root_dir):
        for file in files:
            # Check if the file is a CSV file
            if file.endswith('.csv'):
                
                # Read the CSV file
                csv_file = os.path.join(root, file)

                # Convert the dataframe to a Parquet file
                filenames.append(csv_file)

                # Create and start 10 threads

    with multiprocessing.Pool() as pool:
        # Process the CSV files concurrently
        pool.map(csv_to_parqet, filenames)
                

# Call the function to convert all CSV files in the directory tree
start_time = time.time()
#convert_csv_to_parquet('/home/dcai/data/EURCHF')
#convert_csv_to_parquet('/home/dcai/data/EURUSD')
#convert_csv_to_parquet('/home/dcai/data/GBPCHF')
convert_csv_to_parquet('/home/dcai/data/EURGBP')
end_time = time.time()

elapsed_time = end_time - start_time
print(f'Execution time: {elapsed_time:.3f} seconds')



Doing /home/dcai/data/EURGBP/2023/3.parquetDoing /home/dcai/data/EURGBP/2020/13.parquetDoing /home/dcai/data/EURGBP/2020/7.parquetDoing /home/dcai/data/EURGBP/2020/38.parquetDoing /home/dcai/data/EURGBP/2023/4.parquetDoing /home/dcai/data/EURGBP/2020/41.parquetDoing /home/dcai/data/EURGBP/2020/4.parquetDoing /home/dcai/data/EURGBP/2020/26.parquetDoing /home/dcai/data/EURGBP/2020/2.parquetDoing /home/dcai/data/EURGBP/2020/29.parquetDoing /home/dcai/data/EURGBP/2020/42.parquetDoing /home/dcai/data/EURGBP/2020/48.parquetDoing /home/dcai/data/EURGBP/2020/40.parquetDoing /home/dcai/data/EURGBP/2020/35.parquetDoing /home/dcai/data/EURGBP/2020/20.parquet





Doing /home/dcai/data/EURGBP/2020/43.parquetDoing /home/dcai/data/EURGBP/2020/24.parquet
Doing /home/dcai/data/EURGBP/2020/6.parquetDoing /home/dcai/data/EURGBP/2020/5.parquet
Doing /home/dcai/data/EURGBP/2020/39.parquet
Doing /home/dcai/data/EURGBP/2020/16.parquetDoing /home/dcai/data/EURGBP/2020/10.parquetDoing /home/dcai/data/EURGBP/2

Done /home/dcai/data/EURGBP/2019/10.parquet
Doing /home/dcai/data/EURGBP/2019/32.parquet
Done /home/dcai/data/EURGBP/2023/1.parquet
Doing /home/dcai/data/EURGBP/2022/46.parquet
Done /home/dcai/data/EURGBP/2019/51.parquet
Doing /home/dcai/data/EURGBP/2022/3.parquet
Done /home/dcai/data/EURGBP/2019/21.parquet
Doing /home/dcai/data/EURGBP/2019/8.parquet
Done /home/dcai/data/EURGBP/2019/50.parquet
Doing /home/dcai/data/EURGBP/2019/25.parquet
Done /home/dcai/data/EURGBP/2019/16.parquet
Doing /home/dcai/data/EURGBP/2019/19.parquet
Done /home/dcai/data/EURGBP/2019/45.parquet
Doing /home/dcai/data/EURGBP/2022/31.parquet
Done /home/dcai/data/EURGBP/2019/46.parquet
Done /home/dcai/data/EURGBP/2019/44.parquet
Doing /home/dcai/data/EURGBP/2022/45.parquet
Doing /home/dcai/data/EURGBP/2022/51.parquet
Done /home/dcai/data/EURGBP/2022/33.parquet
Doing /home/dcai/data/EURGBP/2022/41.parquet
Done /home/dcai/data/EURGBP/2019/22.parquet
Doing /home/dcai/data/EURGBP/2022/15.parquet
Done /home/dcai/data/EUR

Done /home/dcai/data/EURGBP/2022/37.parquet
Doing /home/dcai/data/EURGBP/2021/17.parquet
Done /home/dcai/data/EURGBP/2021/42.parquet
Doing /home/dcai/data/EURGBP/2021/12.parquet
Done /home/dcai/data/EURGBP/2021/32.parquet
Doing /home/dcai/data/EURGBP/2021/6.parquet
Done /home/dcai/data/EURGBP/2022/42.parquet
Done /home/dcai/data/EURGBP/2022/21.parquet
Done /home/dcai/data/EURGBP/2021/30.parquet
Doing /home/dcai/data/EURGBP/2021/10.parquet
Done /home/dcai/data/EURGBP/2022/11.parquet
Done /home/dcai/data/EURGBP/2021/2.parquet
Done /home/dcai/data/EURGBP/2021/47.parquet
Doing /home/dcai/data/EURGBP/2021/39.parquet
Done /home/dcai/data/EURGBP/2021/35.parquet
Done /home/dcai/data/EURGBP/2022/50.parquet
Done /home/dcai/data/EURGBP/2021/22.parquet
Doing /home/dcai/data/EURGBP/2021/16.parquet
Done /home/dcai/data/EURGBP/2022/52.parquet
Done /home/dcai/data/EURGBP/2021/27.parquet
Doing /home/dcai/data/EURGBP/2021/52.parquet
Done /home/dcai/data/EURGBP/2022/10.parquet
Done /home/dcai/data/EURGBP

In [11]:
csv_to_parqet('/home/dcai/data/GBPUSD/2018/20.csv','/home/dcai/data/GBPUSD/2018/20.parquet')

Error: line contains NUL

In [11]:
import pyarrow.parquet as pq
import pandas as pd

# Read the Parquet file
pq_file = '/home/dcai/data/EURGBP/2021/46.parquet'
table = pq.read_table(pq_file)

# Convert the PyArrow table to a Pandas dataframe
df = table.to_pandas()



# Print the dataframe
print(df[0:20])

                         Bid      Ask   spread  spread_pip      mean
DateTime                                                            
1636927260319000000  0.85279  0.85400  0.00121        12.1  3.452000
1636927260338000000  0.85278  0.85400  0.00122        12.2  3.477000
1636927260374000000  0.85277  0.85400  0.00123        12.3  3.502000
1636927261320000000  0.85277  0.85399  0.00122        12.2  3.476995
1636927261372000000  0.85278  0.85399  0.00121        12.1  3.451995
1636927262063000000  0.85278  0.85398  0.00120        12.0  3.426990
1636927264330000000  0.85279  0.85398  0.00119        11.9  3.401990
1636927293358000000  0.85278  0.85398  0.00120        12.0  3.426990
1636927294553000000  0.85279  0.85398  0.00119        11.9  3.401990
1636927320916000000  0.85278  0.85398  0.00120        12.0  3.426990
1636927321649000000  0.85279  0.85398  0.00119        11.9  3.401990
1636927322644000000  0.85278  0.85398  0.00120        12.0  3.426990
1636927323113000000  0.85279  0.85

In [16]:
def set_datetime_index(root_dir):
    filenames = []
    for root, dirs, files in os.walk(root_dir):
        for file in files:
            # Check if the file is a CSV file
            if file.endswith('.parquet'):
                parquet_file = os.path.join(root, file)
                table = pq.read_table(parquet_file)
                # Convert the PyArrow table to a Pandas dataframe
                df = table.to_pandas()
                df.set_index('DateTime', inplace=True)
                pq.write_table(pa.Table.from_pandas(df), parquet_file)
                print(f'Done {parquet_file}')
                
def set_mean(root_dir):
    filenames = []
    for root, dirs, files in os.walk(root_dir):
        for file in files:
            # Check if the file is a CSV file
            if file.endswith('.parquet'):
                parquet_file = os.path.join(root, file)
                table = pq.read_table(parquet_file)
                # Convert the PyArrow table to a Pandas dataframe
                df = table.to_pandas()
                df  = df.apply(pd.to_numeric, errors='ignore')
                df['mean'] = df[['Bid','Ask']].mean(axis=1, numeric_only=True)
                pq.write_table(pa.Table.from_pandas(df), parquet_file)
                print(f'Done {parquet_file}')
                
def set_spread(root_dir):
    filenames = []
    for root, dirs, files in os.walk(root_dir):
        for file in files:
            # Check if the file is a CSV file
            if file.endswith('.parquet'):
                parquet_file = os.path.join(root, file)
                table = pq.read_table(parquet_file)
                # Convert the PyArrow table to a Pandas dataframe
                df = table.to_pandas()
                df  = df.apply(pd.to_numeric, errors='ignore')
                df['spread'] = df['Ask']-df['Bid']
                df['spread_pip'] = df['spread']*10000.0
                pq.write_table(pa.Table.from_pandas(df), parquet_file)
                print(f'Done {parquet_file}')                
                

In [17]:
import datetime, itertools, os, glob, shutil
from io import BytesIO
import gzip
import urllib.request
import pathlib
import os
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
import time


start_time = time.time()
#set_spread('/home/dcai/data/EURCHF')
#set_spread('/home/dcai/data/EURUSD')
#set_spread('/home/dcai/data/GBPCHF')
#set_spread('/home/dcai/data/EURGBP')
set_mean('/home/dcai/data/EURGBP')
end_time = time.time()

elapsed_time = end_time - start_time
print(f'Execution time: {elapsed_time:.3f} seconds')

Done /home/dcai/data/EURGBP/2023/1.parquet
Done /home/dcai/data/EURGBP/2023/4.parquet
Done /home/dcai/data/EURGBP/2023/3.parquet
Done /home/dcai/data/EURGBP/2023/2.parquet
Done /home/dcai/data/EURGBP/2020/26.parquet
Done /home/dcai/data/EURGBP/2020/5.parquet
Done /home/dcai/data/EURGBP/2020/17.parquet
Done /home/dcai/data/EURGBP/2020/13.parquet
Done /home/dcai/data/EURGBP/2020/36.parquet
Done /home/dcai/data/EURGBP/2020/22.parquet
Done /home/dcai/data/EURGBP/2020/50.parquet
Done /home/dcai/data/EURGBP/2020/27.parquet
Done /home/dcai/data/EURGBP/2020/45.parquet
Done /home/dcai/data/EURGBP/2020/48.parquet
Done /home/dcai/data/EURGBP/2020/51.parquet
Done /home/dcai/data/EURGBP/2020/19.parquet
Done /home/dcai/data/EURGBP/2020/41.parquet
Done /home/dcai/data/EURGBP/2020/34.parquet
Done /home/dcai/data/EURGBP/2020/35.parquet
Done /home/dcai/data/EURGBP/2020/12.parquet
Done /home/dcai/data/EURGBP/2020/23.parquet
Done /home/dcai/data/EURGBP/2020/44.parquet
Done /home/dcai/data/EURGBP/2020/33.p

Done /home/dcai/data/EURGBP/2021/25.parquet
Done /home/dcai/data/EURGBP/2021/10.parquet
Done /home/dcai/data/EURGBP/2021/30.parquet
Done /home/dcai/data/EURGBP/2021/16.parquet
Done /home/dcai/data/EURGBP/2021/4.parquet
Done /home/dcai/data/EURGBP/2021/6.parquet
Done /home/dcai/data/EURGBP/2021/39.parquet
Done /home/dcai/data/EURGBP/2021/32.parquet
Done /home/dcai/data/EURGBP/2021/9.parquet
Done /home/dcai/data/EURGBP/2021/21.parquet
Done /home/dcai/data/EURGBP/2021/37.parquet
Done /home/dcai/data/EURGBP/2021/8.parquet
Done /home/dcai/data/EURGBP/2021/3.parquet
Done /home/dcai/data/EURGBP/2021/46.parquet
Done /home/dcai/data/EURGBP/2021/15.parquet
Done /home/dcai/data/EURGBP/2021/7.parquet
Done /home/dcai/data/EURGBP/2021/29.parquet
Done /home/dcai/data/EURGBP/2021/47.parquet
Done /home/dcai/data/EURGBP/2021/18.parquet
Done /home/dcai/data/EURGBP/2021/2.parquet
Done /home/dcai/data/EURGBP/2021/42.parquet
Done /home/dcai/data/EURGBP/2021/11.parquet
Done /home/dcai/data/EURGBP/2021/31.par