In [None]:
# python3 -m pip install clickhouse-driver

import os,sys
from clickhouse_driver import connect,Client

sys.path.append("../")

# you need a ~/.nsdf/vault/vault.yaml file
from nsdf.kernel import NormalizeEnv, SetEnv
env=NormalizeEnv({
	"include-vault": [
		"s3-cloudbank",
		"clickhouse"
	]
})
SetEnv(env)

AWS_ACCESS_KEY_ID     = env["AWS_ACCESS_KEY_ID"]
AWS_SECRET_ACCESS_KEY = env["AWS_SECRET_ACCESS_KEY"]
AWS_ENDPOINT_URL      = env["AWS_ENDPOINT_URL"]

CLICKHOUSE_HOST       = env["CLICKHOUSE_HOST"]
CLICKHOUSE_PORT       = env["CLICKHOUSE_PORT"]
CLICKHOUSE_USER       = env["CLICKHOUSE_USER"]
CLICKHOUSE_PASSWORD   = env["CLICKHOUSE_PASSWORD"]

BUCKET                = "nsdf-catalog"

# ////////////////////////////////////////////////////////////////////////
def RunShellCommand(cmd):
	import shlex,subprocess
	print(cmd)
	result=subprocess.run(shlex.split(cmd), shell=False, check=False,stdout=subprocess.PIPE,stderr=subprocess.STDOUT)
	print(f"Run {cmd} returncode={result.returncode}")
	print(result.stdout.decode('utf-8'))

# ////////////////////////////////////////////////////////////////////////
def RunClientCommand(query, prefix_cmd="", allow_errors=0):
	assert '"'  not in query
	assert "$"  not in query
	cmd=f"clickhouse-client  --host {CLICKHOUSE_HOST} --port {CLICKHOUSE_PORT} --secure --user {CLICKHOUSE_USER} --password {CLICKHOUSE_PASSWORD} --query=\"{query}\""
	
	if prefix_cmd:
		cmd=prefix_cmd + " " + cmd

	if allow_errors:
		cmd=cmd + f" --input_format_allow_errors_num={allow_errors}"
	RunShellCommand(cmd)
	
# ////////////////////////////////////////////////////////////////////////
def InsertRecordsFromS3(prefix, allow_errors=0):
	"""
	https://dzone.com/articles/clickhouse-s3-compatible-object-storage
	"""
	return RunClientCommand(f"INSERT INTO nsdf.catalog SELECT * FROM s3('{prefix}','{AWS_ACCESS_KEY_ID}', '{AWS_SECRET_ACCESS_KEY}', 'CSV');")

# ////////////////////////////////////////////////////////////////////////
def InsertRecordsFromCSV(filename):
	RunClientCommand("INSERT INTO nsdf.catalog FORMAT CSV", prefix_cmd=f"cat {filename} |")

# ////////////////////////////////////////////////////////////////////////
def Connect():
	return Client(host=CLICKHOUSE_HOST, port=str(CLICKHOUSE_PORT), user=CLICKHOUSE_USER, password=CLICKHOUSE_PASSWORD, secure=True)

client = Connect()
client.execute("""CREATE DATABASE IF NOT EXISTS nsdf""")
# client.execute("""SHOW DATABASES""")
# client.execute("""DROP TABLE IF EXISTS nsdf.catalog""")
client.execute("""CREATE TABLE IF NOT EXISTS nsdf.catalog(
		catalog         varchar(64),
		bucket          varchar(64),
		name            varchar(1024),
		size            BIGINT,
		last_modified   varchar(32) NULL,
		etag            varchar(256) NULL
	) 
	ENGINE = MergeTree() 
	ORDER BY(catalog,bucket)
""")

In [None]:
# check AWS credentials work
if False:
	RunShellCommand(f"aws s3 --endpoint-url {AWS_ENDPOINT_URL} ls s3://{BUCKET}/mc/")
	RunShellCommand(f"aws s3 --endpoint-url {AWS_ENDPOINT_URL} ls s3://{BUCKET}/aws-open-data/")
	RunShellCommand(f"aws s3 --endpoint-url {AWS_ENDPOINT_URL} ls s3://{BUCKET}/digitalrocksportal/projects/")
	RunShellCommand(f"aws s3 --endpoint-url {AWS_ENDPOINT_URL} ls s3://{BUCKET}/20220624/csv/mdf/")
	RunShellCommand(f"aws s3 --endpoint-url {AWS_ENDPOINT_URL} ls s3://{BUCKET}/arecibo/")
	RunShellCommand(f"aws s3 --endpoint-url {AWS_ENDPOINT_URL} ls s3://{BUCKET}/ranch/")

In [None]:
# Examples of SQL INSERT (with statistics)

# 25 seconds, 129,117 files, 5,533,076,354,245 (~5TB)
# InsertRecordsFromS3(f"{AWS_ENDPOINT_URL}/{BUCKET}/mc/*.csv")

# 4m 51 seconds, 49,995,452 files, 10,531,224,818,071,513 (~10PB) todo... records I think are more than that
# InsertRecordsFromS3(f"{AWS_ENDPOINT_URL}/{BUCKET}/aws-open-data/*.csv")

# 42 seconds, 8,638 files, 3,328,980,888,119 (~3TB)
# InsertRecordsFromS3(f"{AWS_ENDPOINT_URL}/{BUCKET}/digitalrocksportal/projects/*.csv")

# 2m 39 seconds, 1,075,706 files, 5,243,595,789,858 (~5TB)
# InsertRecordsFromS3(f"{AWS_ENDPOINT_URL}/{BUCKET}/20220624/csv/mdf/*.csv")

# <2m, 2,045,049 files, 491,912,368,698,644 total size (~500TB)
# InsertRecordsFromS3(f"{AWS_ENDPOINT_URL}/{BUCKET}/arecibo/*.csv")

# <2m,  3,745,760 files, 36,750,124,831,337,770 (~34PB)
# InsertRecordsFromS3(f"{AWS_ENDPOINT_URL}/{BUCKET}/ranch/*.csv", allow_errors=9999999999)

In [None]:
# total count per catalog
client.execute("""
select catalog,COUNT(size),SUM(size)/(1024*1024*1024)
from nsdf.catalog
group by catalog;
""")

In [None]:
import os,sys
from pprint import pprint

buckets=[it for it in client.execute(f"SELECT DISTINCT catalog,bucket FROM nsdf.catalog")]
print(buckets[0:10])

In [None]:
# example of plot of filesize inside a dataset

import pandas as pd
import matplotlib.pyplot as plt

def PlotSizes(filename,sizes):
	sizes=sorted(sizes)
	ipd = 1/plt.rcParams['figure.dpi'] 
	plt.figure(figsize=(1024*ipd,768*ipd))
	plt.title(f"{filename} #({len(sizes)}) m({sizes[0]}) M({sizes[-1]})")
	plt.plot(range(len(sizes)), sorted(sizes))
	os.makedirs(os.path.dirname(filename),exist_ok=True)
	plt.savefig(filename)
	plt.show()

for catalog, bucket in buckets:
	sizes=[it[0] for it in client.execute(f"SELECT size FROM nsdf.catalog WHERE catalog='{catalog}' and bucket='{bucket}'")]
	if not sizes: continue
	PlotSizes(filename=f"/tmp/plots/{catalog}/{bucket}.png",sizes=sizes)

	# remove the `break` if you want all the plots
	break 

In [None]:
# total records 56M files 44PB
client = Connect()
TOT_FILES,TOT_BYTES=client.execute(f"SELECT count(size),SUM(size)/(1024*1024*1024) FROM nsdf.catalog;")[0]
print(TOT_FILES,TOT_BYTES)

In [None]:
# # total size per catalog/bucket
client.execute("""SELECT catalog,bucket, SUM(size) as TotSize
FROM nsdf.catalog
GROUP BY catalog,bucket
ORDER BY TotSize DESC;
""")

In [None]:
# total number of objeccts per catalog/bucket
client.execute("""
SELECT catalog,bucket, COUNT(size) As NumObjects
FROM nsdf.catalog
group by catalog,bucket
ORDER BY NumObjects DESC;
""")

In [None]:
# example of looking to a specific catalog,bucket
client.execute("""
SELECT SUM(size) from nsdf.catalog 
WHERE catalog='aws-open-data' and bucket='noaa-cors-pds';
""")


In [None]:
# LIKE querry for looking into filenames
client.execute("""
SELECT count(*) from nsdf.catalog
where name like '%a%'
""")

In [None]:
# size per catalog
client.execute("""
SELECT catalog,SUM(size)
from nsdf.catalog
group by catalog;
""")

In [None]:
# SUM per bucket
client.execute("""
select catalog,bucket,SUM(size)
from nsdf.catalog
group by catalog,bucket
order by COUNT(size) DESC;
""")

In [None]:
# COUNT per bucket 
client.execute("""
select catalog,bucket,COUNT(size)
from nsdf.catalog
group by catalog,bucket
order by COUNT(size) DESC;
""")

In [None]:
# file size distribution 
client.execute("""
select size
from nsdf.catalog
where catalog='mc' and bucket='102'
order by size ASC;
""")

In [None]:
# file type
client.execute("""
select substring_index(name,'.',-1), count(substring_index(name,'.',-1)) as FileExtension 
from nsdf.catalog
where catalog='mc' and bucket='102'
group by substring_index(name,'.',-1);
""")

In [None]:
# delete records
# ALTER TABLE nsdf.catalog DELETE WHERE 1=1;

In [35]:
client.execute("""
SELECT splitByChar('.','giorgio.scorzelli.h5')[-1]
FROM nsdf.catalog             
ORDER BY size DESC
LIMIT 10
""")

!!!!! rw.chcql1nlq37luu78jtmk.at.double.cloud 9440


[('h5',),
 ('h5',),
 ('h5',),
 ('h5',),
 ('h5',),
 ('h5',),
 ('h5',),
 ('h5',),
 ('h5',),
 ('h5',)]