In [1]:
import boto3
import boto3.session
from botocore.exceptions import ClientError
import os
from dotenv import load_dotenv

In [2]:
load_dotenv(dotenv_path="/root/work/vhe/env.txt")
ACCESS_KEY_ID=os.getenv("ACCESS_KEY_ID")
SECRET_ACCESS_KEY=os.getenv("SECRET_ACCESS_KEY")

In [3]:
s3 = boto3.session.Session().client(
    service_name='s3',
    region_name='fr-par',
    use_ssl=True,
    endpoint_url='http://s3.fr-par.scw.cloud',
    aws_access_key_id=ACCESS_KEY_ID,
    aws_secret_access_key=SECRET_ACCESS_KEY
)

In [6]:
# List of parquet files to use

prefix = 'crypto/parquet/'
response = s3.list_objects(Bucket='crypto-histo',Prefix=prefix)
parquetFiles = []
for on in [c['Key'] for c in response['Contents']]:
    parquetFiles.append(on)
if 'NextMarker' in response:
    next = response['NextMarker']
else :
    next = ""

while next != '':
    response = s3.list_objects(Bucket='crypto-histo',Prefix=prefix, Marker=next)
    newObjects = [c['Key'] for c in response['Contents']]
    for on in newObjects:
        parquetFiles.append(on)
    
    if 'NextMarker' in response:
        next = response['NextMarker']
    else :
        next = ""

print(len(parquetFiles)) 
print(parquetFiles[:5])

411
['crypto/parquet/1INCHBTC.parquet', 'crypto/parquet/AAVEBTC.parquet', 'crypto/parquet/ACABTC.parquet', 'crypto/parquet/ACHBTC.parquet', 'crypto/parquet/ACMBTC.parquet']


# Read using PYARROW

In [7]:
import pyarrow.parquet as pq
s3.download_file('crypto-histo', 'crypto/parquet/ACABTC.parquet', '/root/work/vhe/tmp/ACABTC.parquet')
tableACABTC = pq.read_table('/root/work/vhe/tmp/ACABTC.parquet')
print(tableACABTC)

pyarrow.Table
OpenTime: timestamp[ms]
Open: float
High: float
Low: float
Close: float
Volume: float
CloseTime: timestamp[ms]
QuoteAssetVolume: float
NumberOfTrade: int32
TakerBuyBaseAssetVolume: float
TakerBuyQuoteAssetVolume: float
Ignore: int32
----
OpenTime: [[2022-01-25 12:00:00.000,2022-01-25 12:01:00.000,2022-01-25 12:02:00.000,2022-01-25 12:03:00.000,2022-01-25 12:04:00.000,...,2022-04-26 12:27:00.000,2022-04-26 12:28:00.000,2022-04-26 12:29:00.000,2022-04-26 12:30:00.000,2022-04-26 12:31:00.000],[2022-04-26 12:32:00.000,2022-04-26 12:33:00.000,2022-04-26 12:34:00.000,2022-04-26 12:35:00.000,2022-04-26 12:36:00.000,...,2022-07-26 12:59:00.000,2022-07-26 13:00:00.000,2022-07-26 13:01:00.000,2022-07-26 13:02:00.000,2022-07-26 13:03:00.000],...,[2022-10-25 13:37:00.000,2022-10-25 13:38:00.000,2022-10-25 13:39:00.000,2022-10-25 13:40:00.000,2022-10-25 13:41:00.000,...,2023-01-24 14:04:00.000,2023-01-24 14:05:00.000,2023-01-24 14:06:00.000,2023-01-24 14:07:00.000,2023-01-24 14:08:00.

In [8]:
parquet_file = pq.ParquetFile('/root/work/vhe/tmp/ACABTC.parquet')
metadata = parquet_file.metadata
metadata

<pyarrow._parquet.FileMetaData object at 0x7f85b7dad210>
  created_by: Arrow2 - Native Rust implementation of Arrow
  num_columns: 12
  num_rows: 568079
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 2240

# Use DuckDB !

In [9]:
import duckdb
import pandas

In [10]:
# setup datalake S3 connection 

duckdb.install_extension("httpfs")
duckdb.load_extension("httpfs")
duckdb.sql(f"SET s3_region='fr-par';")
duckdb.sql(f"SET s3_access_key_id='{ACCESS_KEY_ID}';")
duckdb.sql(f"SET s3_secret_access_key='{SECRET_ACCESS_KEY}';")
duckdb.sql(f"SET s3_endpoint='s3.fr-par.scw.cloud';")

S3_BUCKET = os.environ.get("S3_BUCKET", "crypto-histo")
PARQUET_FOLDER = os.environ.get("PARQUET_FOLDER", "parquet")

In [11]:
# Querry on 1 parquet file
parquet_file = f"s3://{S3_BUCKET}/crypto/{PARQUET_FOLDER}/1INCHBTC.parquet"

query = """
SELECT
    s.opentime,
    s.high
FROM
    '%s' s
"""%(parquet_file)


res = duckdb.sql(query).df()
res

Unnamed: 0,OpenTime,High
0,2021-07-01 00:00:00,0.000072
1,2021-07-01 00:01:00,0.000072
2,2021-07-01 00:02:00,0.000072
3,2021-07-01 00:03:00,0.000072
4,2021-07-01 00:04:00,0.000072
...,...,...
867924,2023-02-23 23:55:00,0.000026
867925,2023-02-23 23:56:00,0.000026
867926,2023-02-23 23:57:00,0.000026
867927,2023-02-23 23:58:00,0.000026


In [12]:
# Query on all multiple files (parquet_read)

query = f"""
SELECT
    COUNT(*)
FROM
    read_parquet({[f"s3://{S3_BUCKET}/{f}" for f in parquetFiles]}) s
"""

res = duckdb.sql(query).df()

print(res)

   count_star()
0     257928187


In [14]:
# Get the "filename" column

query = f"""
SELECT
    string_split(string_split(filename, '/')[-1], '.')[1] symbol, COUNT(*) cnt

FROM
    read_parquet({[f"s3://{S3_BUCKET}/{f}" for f in parquetFiles]}, filename=True)
GROUP BY symbol
"""
res = duckdb.sql(query).df()

print(res)

       symbol     cnt
0    1INCHBTC  867929
1      ACABTC  568079
2      ACMBTC  867929
3      ADAEUR  867929
4     AGIXBTC  867929
..        ...     ...
356    SNTBTC  867930
357    VETEUR  867928
358    BLZBTC  867930
359   HARDBTC  867929
360   WBTCBTC  867905

[361 rows x 2 columns]


In [15]:
# Save it to S3 

duckdb.sql(f"COPY (SELECT * FROM res) TO 's3://{S3_BUCKET}/res_count_by_symbol.parquet' (FORMAT PARQUET);")

In [16]:
# Averages 

avgs = duckdb.sql(f"SELECT avg(cnt) FROM read_parquet('s3://{S3_BUCKET}/res_count_by_symbol.parquet') ")
print(avgs)

┌──────────────────┐
│     avg(cnt)     │
│      double      │
├──────────────────┤
│ 714482.512465374 │
└──────────────────┘



In [17]:
# aggregate to have one value per day 
# (on 1 file)

parquet_file = f"s3://{S3_BUCKET}/crypto/{PARQUET_FOLDER}/1INCHBTC.parquet"

query = """
SELECT
    date_trunc('day', s.opentime) d,
    MIN(s.high) min,
    MAX(s.high) max
FROM
    '%s' s
GROUP BY d
"""%(parquet_file)


res = duckdb.sql(query).df()
res

Unnamed: 0,d,min,max
0,2021-07-01,0.000070,0.000072
1,2021-07-02,0.000069,0.000072
2,2021-07-03,0.000069,0.000072
3,2021-07-04,0.000070,0.000075
4,2021-07-05,0.000071,0.000075
...,...,...,...
598,2022-08-23,0.000033,0.000034
599,2022-09-04,0.000033,0.000035
600,2022-09-12,0.000029,0.000030
601,2023-01-08,0.000023,0.000024


In [18]:
# Get one line per day and symbol 

query = f"""
SELECT
    string_split(string_split(filename, '/')[-1], '.')[1] symbol,
    date_trunc('day', opentime) d,
    MIN(high) min,
    MAX(high) max
FROM
    read_parquet({[f"s3://{S3_BUCKET}/{f}" for f in parquetFiles]}, filename=True)
GROUP BY symbol, d
"""

res = duckdb.sql(query)


In [19]:
# It's a lazy execution
res

┌─────────┬────────────┬──────────┬──────────┐
│ symbol  │     d      │   min    │   max    │
│ varchar │    date    │  float   │  float   │
├─────────┼────────────┼──────────┼──────────┤
│ AAVEBTC │ 2021-07-02 │ 0.006595 │  0.00695 │
│ AAVEBTC │ 2021-07-04 │ 0.007218 │ 0.008083 │
│ AAVEBTC │ 2021-07-05 │ 0.007849 │ 0.009336 │
│ AAVEBTC │ 2021-07-06 │ 0.009164 │   0.0101 │
│ AAVEBTC │ 2021-07-07 │  0.00908 │ 0.009571 │
│ AAVEBTC │ 2021-07-08 │ 0.008599 │ 0.009451 │
│ AAVEBTC │ 2021-07-09 │ 0.008548 │ 0.009361 │
│ AAVEBTC │ 2021-07-15 │ 0.008392 │ 0.008733 │
│ AAVEBTC │ 2021-07-16 │ 0.008067 │ 0.008741 │
│ AAVEBTC │ 2021-07-17 │ 0.007873 │ 0.008171 │
│   ·     │     ·      │     ·    │     ·    │
│   ·     │     ·      │     ·    │     ·    │
│   ·     │     ·      │     ·    │     ·    │
│ BRDBTC  │ 2022-03-15 │  6.3e-06 │ 7.09e-06 │
│ BRDBTC  │ 2022-04-19 │ 4.79e-06 │ 5.68e-06 │
│ BRDBTC  │ 2022-05-08 │ 3.51e-06 │ 3.69e-06 │
│ BRDBTC  │ 2022-07-31 │ 3.39e-06 │  4.2e-06 │
│ BTGBTC  │ 2

In [20]:
# save it on s3

duckdb.sql(f"COPY res TO 's3://{S3_BUCKET}/crypto/index.parquet' (FORMAT PARQUET);")

In [25]:
# query that index 

maxValueBySymbol = duckdb.sql(f"""
SELECT symbol, max(max) max
FROM read_parquet('s3://{S3_BUCKET}/crypto/index.parquet')
GROUP BY symbol
""")
print(maxValueBySymbol)

┌───────────┬────────────┐
│  symbol   │    max     │
│  varchar  │   float    │
├───────────┼────────────┤
│ ALGOBTC   │  5.542e-05 │
│ ALPACABTC │  3.916e-05 │
│ ALPINEEUR │      12.37 │
│ AMPBTC    │   1.48e-06 │
│ ANKRBTC   │   3.94e-06 │
│ ANYBTC    │  0.0009368 │
│ API3BTC   │ 0.00026137 │
│ APTEUR    │    18.6728 │
│ ARDRBTC   │   1.43e-05 │
│ ARPABTC   │    4.4e-06 │
│   ·       │       ·    │
│   ·       │       ·    │
│   ·       │       ·    │
│ QIBTC     │   6.48e-06 │
│ RVNBTC    │   3.88e-06 │
│ SANDBTC   │ 0.00014824 │
│ SCBTC     │    5.5e-07 │
│ SFPBTC    │   5.54e-05 │
│ SKLBTC    │  1.055e-05 │
│ SNMBTC    │ 0.00083304 │
│ CTXCBTC   │    2.5e-05 │
│ NKNBTC    │   1.25e-05 │
│ PIVXBTC   │      3e-05 │
├───────────┴────────────┤
│  361 rows (20 shown)   │
└────────────────────────┘



In [24]:
distinctDate = duckdb.sql(f"""
SELECT d, COUNT(symbol)
FROM read_parquet('s3://{S3_BUCKET}/crypto/index.parquet')
GROUP BY d
""")
print(distinctDate)


┌────────────┬───────────────┐
│     d      │ count(symbol) │
│    date    │     int64     │
├────────────┼───────────────┤
│ 2021-07-01 │           274 │
│ 2021-07-02 │           274 │
│ 2021-07-03 │           274 │
│ 2021-07-04 │           274 │
│ 2021-07-05 │           276 │
│ 2021-07-06 │           276 │
│ 2021-07-07 │           276 │
│ 2021-07-08 │           276 │
│ 2021-07-09 │           276 │
│ 2021-07-10 │           276 │
│     ·      │            ·  │
│     ·      │            ·  │
│     ·      │            ·  │
│ 2023-02-14 │           280 │
│ 2023-02-15 │           280 │
│ 2023-02-16 │           280 │
│ 2023-02-17 │           279 │
│ 2023-02-18 │           279 │
│ 2023-02-19 │           279 │
│ 2023-02-20 │           279 │
│ 2023-02-21 │           279 │
│ 2023-02-22 │           280 │
│ 2023-02-23 │           280 │
├────────────┴───────────────┤
│    603 rows (20 shown)     │
└────────────────────────────┘



In [26]:
maxValueBySymbol = duckdb.sql(f"""
SELECT symbol, max(max) max
FROM read_parquet('s3://{S3_BUCKET}/crypto/index.parquet')
GROUP BY symbol
ORDER BY max DESC 
LIMIT 20
""")
print(maxValueBySymbol)

┌──────────┬──────────┐
│  symbol  │   max    │
│ varchar  │  float   │
├──────────┼──────────┤
│ BTCEUR   │ 59744.44 │
│ YFIEUR   │ 37182.52 │
│ ETHEUR   │   4223.0 │
│ BCHEUR   │    679.6 │
│ BNBEUR   │    586.8 │
│ EGLDEUR  │   483.25 │
│ LTCEUR   │    256.8 │
│ SOLEUR   │   226.39 │
│ LAZIOEUR │    200.0 │
│ WAVESEUR │    137.5 │
│ AVAXEUR  │   130.67 │
│ LUNAEUR  │   109.06 │
│ ICPEUR   │    73.55 │
│ ETCEUR   │   66.057 │
│ GALEUR   │     49.0 │
│ DOTEUR   │    47.62 │
│ PORTOEUR │     40.0 │
│ LINKEUR  │    33.22 │
│ ATOMEUR  │    31.37 │
│ UNIEUR   │   26.784 │
├──────────┴──────────┤
│ 20 rows   2 columns │
└─────────────────────┘



In [30]:
data = duckdb.sql(f"""
    SELECT d, max(max) max
    FROM read_parquet('s3://{S3_BUCKET}/crypto/index.parquet')
    WHERE symbol = 'ETHEUR' AND d > '2021-12-31' AND d < '2023-01-01'
    GROUP BY d
    ORDER BY d ASC 
    """).to_df()

In [31]:
data

Unnamed: 0,d,max
0,2022-01-01,3327.399902
1,2022-01-02,3394.110107
2,2022-01-03,3398.129883
3,2022-01-04,3447.860107
4,2022-01-05,3405.000000
...,...,...
360,2022-12-27,1155.750000
361,2022-12-28,1142.339966
362,2022-12-29,1133.050049
363,2022-12-30,1127.469971
