In [61]:
import os
import pandas as pd
from deltalake import write_deltalake, DeltaTable
from histdata.api import download_hist_data



In [230]:
output = os.environ.get("FX_DATA_OUTPUT", 'output')
dt = DeltaTable(output)
partitions = pd.DataFrame(dt.partitions())

pairs = partitions.pair.unique()


In [233]:
# REMOVE DUPLICATES
for pair in pairs:
    print(f"Removing duplicates for {pair}")

    df = dt.to_pandas(partitions=[('pair', '=', pair)])
    df2 = df.drop_duplicates(subset=['date', 'pair']).reset_index(drop=True)

    number_of_duplicates = df.shape[0] - df2.shape[0]
    print(f"  Found {number_of_duplicates} duplicates for {pair}.\nShape before: {df.shape}, after: {df2.shape}")

    write_deltalake(
        table_or_uri=dt,
        data=df2,
        mode='overwrite',
        predicate=f"pair = '{pair}'",
        partition_by=['pair', 'year'],
    )

    print(f"  Removed duplicates for {pair}")

Removing duplicates for NZDJPY
  Found 0 duplicates for NZDJPY.
Shape before: (6863573, 7), after: (6863573, 7)
  Removed duplicates for NZDJPY
Removing duplicates for AUDNZD
  Found 360 duplicates for AUDNZD.
Shape before: (6512812, 7), after: (6512452, 7)
  Removed duplicates for AUDNZD
Removing duplicates for USDTRY
  Found 314 duplicates for USDTRY.
Shape before: (4454993, 7), after: (4454679, 7)
  Removed duplicates for USDTRY
Removing duplicates for AUDCAD
  Found 360 duplicates for AUDCAD.
Shape before: (6497688, 7), after: (6497328, 7)
  Removed duplicates for AUDCAD
Removing duplicates for USDNOK
  Found 360 duplicates for USDNOK.
Shape before: (6139575, 7), after: (6139215, 7)
  Removed duplicates for USDNOK
Removing duplicates for GRXEUR
  Found 114 duplicates for GRXEUR.
Shape before: (3380178, 7), after: (3380064, 7)
  Removed duplicates for GRXEUR
Removing duplicates for GBPCAD
  Found 360 duplicates for GBPCAD.
Shape before: (6495779, 7), after: (6495419, 7)
  Removed du

In [234]:
# Verify that there are no duplicates left
for pair in pairs:
    df = dt.to_pandas(partitions=[('pair', '=', pair)])
    duplicates = df.duplicated(subset=['date', 'pair']).sum()
    if duplicates > 0:
        print(f"Error: Found {duplicates} duplicates for {pair} after cleanup!")
    else:
        print(f"No duplicates found for {pair} after cleanup.")

No duplicates found for NZDJPY after cleanup.
No duplicates found for AUDNZD after cleanup.
No duplicates found for USDTRY after cleanup.
No duplicates found for AUDCAD after cleanup.
No duplicates found for USDNOK after cleanup.
No duplicates found for GRXEUR after cleanup.
No duplicates found for GBPCAD after cleanup.
No duplicates found for USDSGD after cleanup.
No duplicates found for USDSEK after cleanup.
No duplicates found for USDHKD after cleanup.
No duplicates found for XAUUSD after cleanup.
No duplicates found for GBPNZD after cleanup.
No duplicates found for GBPJPY after cleanup.
No duplicates found for WTIUSD after cleanup.
No duplicates found for FRXEUR after cleanup.
No duplicates found for AUDUSD after cleanup.
No duplicates found for USDZAR after cleanup.
No duplicates found for USDCHF after cleanup.
No duplicates found for GBPCHF after cleanup.
No duplicates found for EURDKK after cleanup.
No duplicates found for CHFJPY after cleanup.
No duplicates found for GBPAUD aft

KeyboardInterrupt: 

In [235]:
# We could compact in a single call, but I'm not sure hwo much memory it will use
for pair in pairs:
    dt.optimize.compact(partition_filters=[('pair', '=', pair)])
dt.vacuum(retention_hours=0, enforce_retention_duration=False, dry_run=True)
dt.vacuum(retention_hours=0, enforce_retention_duration=False, dry_run=False)


['pair=AUDCAD/year=2007/part-00001-0c4b4488-04be-4745-960d-c23515a3ed65-c000.snappy.parquet',
 'pair=AUDCAD/year=2007/part-00001-7dcd76d4-cff1-4700-9640-f2ee726c1fbf-c000.snappy.parquet',
 'pair=AUDCAD/year=2008/part-00001-348ca853-f7ab-42d8-ad32-ee79001a138f-c000.snappy.parquet',
 'pair=AUDCAD/year=2008/part-00001-7d2ccf33-6cd2-4bac-91bb-e6f8a98dc0a8-c000.snappy.parquet',
 'pair=AUDCAD/year=2009/part-00001-5944499e-d3a6-483e-9074-29ec30b5c337-c000.snappy.parquet',
 'pair=AUDCAD/year=2009/part-00001-aae2a929-1134-4ef9-a46b-bd3fb84adb85-c000.snappy.parquet',
 'pair=AUDCAD/year=2010/part-00001-168c818c-9487-4f30-b49a-8880a3c08dc9-c000.snappy.parquet',
 'pair=AUDCAD/year=2010/part-00001-8f6a7c2e-8473-4edf-b192-f1e4e5cb6852-c000.snappy.parquet',
 'pair=AUDCAD/year=2011/part-00001-81ec2434-26fb-4bb5-9bfc-db5e0202c260-c000.snappy.parquet',
 'pair=AUDCAD/year=2011/part-00001-bdd41a24-682a-4f20-be6f-abaa8e384b08-c000.snappy.parquet',
 'pair=AUDCAD/year=2012/part-00001-6fe0985b-6b16-4e02-a0f8-3

In [236]:
dt.to_pyarrow_table(partitions=[('pair', '=', 'EURUSD'), ('year', '>=', 2015)])

pyarrow.Table
date: timestamp[us]
open: double
high: double
low: double
close: double
year: int32
pair: string
----
date: [[2018-11-21 09:54:00.000000,2018-01-01 18:15:00.000000,2018-01-01 18:16:00.000000,2018-01-01 18:17:00.000000,2018-01-01 18:18:00.000000,...,2018-05-09 01:22:00.000000,2018-05-09 01:23:00.000000,2018-05-09 01:24:00.000000,2018-05-09 01:25:00.000000,2018-05-09 01:26:00.000000],[2018-05-09 01:27:00.000000,2018-05-09 01:28:00.000000,2018-05-09 01:29:00.000000,2018-05-09 01:30:00.000000,2018-05-09 01:31:00.000000,...,2018-09-13 07:29:00.000000,2018-09-13 07:30:00.000000,2018-09-13 07:31:00.000000,2018-09-13 07:32:00.000000,2018-09-13 07:33:00.000000],...,[2025-06-23 12:03:00.000000,2025-02-19 19:03:00.000000,2025-02-19 19:04:00.000000,2025-02-19 19:05:00.000000,2025-02-19 19:06:00.000000,...,2025-05-28 13:39:00.000000,2025-05-28 13:40:00.000000,2025-05-28 13:41:00.000000,2025-05-28 13:42:00.000000,2025-05-28 13:43:00.000000],[2025-05-28 13:44:00.000000,2025-05-28 13:45: