## DiskCacheライブラリの使い方
+ インメモリではなく, SSDやHDDにキャッシュを作成する
+ key-valueデータ構造
+ Cacheオブジェクトはスレッドセーフ(プロセスセーフ)
+ Cache操作はすべてアトミック
+ プロセスフォークに対応(Python Pickle Serializerを利用)
+ 読み込み, 書き込み関数はすべてNon-blocking
+ 各関数の引数で`read=True`を指定すると, key-valueをファイルとして読み込めることを示す

### Cache
+ キャッシュディレクトリを指定しない場合, `一時ディレクトリ`が自動作成される

In [21]:
from diskcache import Cache

cachedir = "F:/DiskCacheTest/Cache"
cache = Cache(cachedir) # ディレクトリ指定がない場合, 一時ディレクトリが自動的に作成される.
cache.close()


withステートメントに対応

In [22]:
with Cache(cache.directory) as reference:
    reference.set('key', 'value')

In [23]:
print("cache.get('key')", cache.get('key')) # Automatically opens, but slower.

cache.get('key') value


PythonのDictionary型と同じ操作ができる

In [24]:
cache['my-key'] = 'my-value'
print(cache['my-key'])
del cache['my-key']
print(cache['my-key'])


my-value


KeyError: 'my-key'

時間制限付きキャッシュ

In [25]:
from io import BytesIO
cache.set('my-key', BytesIO(b'my-value'), expire=5, read=True, tag='data') # expire_time : 5sec

result = cache.get('my-key', read=True, expire_time=True, tag=True)
reader, timestamp, tag = result
print(reader.read().decode())
print(type(timestamp).__name__)
print(tag)

my-value
float
data


時間制限を超過した場合

In [26]:
from io import BytesIO
cache.set('my-key', BytesIO(b'my-value'), expire=5, read=True, tag='data') # expire_time : 5sec

from time import sleep

sleep(6) # 6sec

result = cache.get('my-key', read=True, expire_time=True, tag=True) # read=Trueでファイルオブジェクトとして認識する
reader, timestamp, tag = result # タプル
print(reader.read().decode())
print(type(timestamp).__name__)
print(tag)

AttributeError: 'NoneType' object has no attribute 'read'

キャッシュオブジェクトの生存時間を更新するには`touch()`メソッドを使用する

In [27]:
cache.touch('my-key', expire=None)

False

In [28]:
cache.touch('dose-not-exist', expire=1) # 1sec

False

+ PythonのSet型と同じ操作ができる
+ valueの数値は, `SQLite interger column (64bit-singed integers)`

In [29]:
cache.add(b'test', 123)

True

In [30]:
cache[b'test']

123

In [33]:
cache.add(b'test', 456)

False

In [36]:
cache[b'test'] # 123 not 456

123

値の加算increment:`incr()`メソッドと減算decriment:`decr()`メソッド 

In [37]:
cache.incr(b'test') # 123 -> 124

124

In [38]:
cache.decr(b'test', 24) # 124 -> 100

100

`default=None`を指定することで, 存在しないkeyに対して`incr()`と`decr()`でKeyErrorを投げることができる

In [39]:
cache.incr('alice')

1

In [40]:
cache.decr('bob', default=-9) # 下駄=-9

-10

In [41]:
cache.incr('carol', default=None)

KeyError: 'carol'

`delete()`, `get()`, `pop()`操作

In [42]:
cache.pop('alice')

1

In [43]:
cache.pop('dave', default='does not exist')

'does not exist'

In [44]:
cache.set('dave', 0, expire=None, tag='admin')

True

In [45]:
cache.get('dave')

0

In [46]:
cache.set('dave', 0, expire=None, tag='admin')

True

In [47]:
result = cache.pop('dave', expire_time=True, tag=True) # key`dave`のvalueを取得&削除
value, timestampe, tag = result
print(value)
print(timestamp)
print(tag)

0
None
admin


In [48]:
cache.get('dave') # valueがない

削除系 `clear()`, `reset()`, `expire()`, `evict()`

In [49]:
cache.clear()

109

In [50]:
cache.reset('cull_limit', 0) # Disable automatic evictions(立ち退き)

0

In [51]:
cache.get('cull_limit') # まだkey-valueは存在している

In [52]:
for num in range(10):
    _ = cache.set(num, num, expire=1e-9) # Expire immediately
len(cache)

10

In [53]:
list(cache)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

`expire()`関数でexpiredされたkeyをすべて削除する

In [54]:
import time
time.sleep(1) # 1sec
cache.expire()

10

+ `evict()`関数は, タグにマッチしたkey-valueをすべて削除する. defaultタグは`None`
+ タグは, interger, float, string, bytes, Noneがあり得る
+ envict()関数を高速化するために, `tag_index=True`でインデックスを作成できる.

In [55]:
from time import perf_counter_ns

cache.clear()

tp_start = perf_counter_ns()

for num in range(100):
    _ = cache.set(num, num, tag='odd' if num % 2 else 'even')
cache.evict('even')

tp_end = perf_counter_ns()

duration = (tp_end - tp_start) / 1000.0
print("duration[ms]", duration)

duration[ms] 10596.3


In [56]:
cachedir2 = "F:/DiskCacheTest/TagIndexCache"
cache_with_tag_index = Cache(cachedir2, tag_index=True)

cache_with_tag_index.clear()

tp_start = perf_counter_ns()

for num in range(100):
    _ = cache_with_tag_index.set(num, num, tag='odd' if num % 2 else 'even')
cache.evict('even') # 処理速度が上昇する?

tp_end = perf_counter_ns()

duration = (tp_end - tp_start) / 1000.0
print("duration[ms]", duration)

duration[ms] 12111.2


`tag_index`の作成と削除. よくわからない

In [57]:
cache_with_tag_index.drop_tag_index()

In [58]:
cache_with_tag_index.create_tag_index()

`cull()`関数で強制的にキャッシュサイズの上限を決める

In [59]:
cache.clear()

50

In [60]:
cache.reset('size_limit', int(1e6))

1000000

In [61]:
cache.reset('culll_limit', 0)

0

In [62]:
for count in range(1000):
    cache[count] = b'A' * 1000

In [63]:
cache.volume() > int(1e6)

True

In [64]:
cache.volume()

1454080

In [65]:
cache.cull() > 0 # キャシュサイズの上限を決める

True

In [66]:
cache.volume() > int(1e6)

False

In [67]:
cache.volume()

999424

ome users may defer all culling to a cron-like process by setting the cull_limit to zero and manually calling cull to remove items. Like evict and expire, calls to cull will work regardless of the cull_limit.

In [68]:
cache.clear() > 0 # simple remove all items from the cache

True

+ デフォルトのキャッシュ順序は挿入順.
+ `iterkeys`を用いることで, キャッシュ順序をソートできる
+ ソート順序はDB(SQLite)に準じる

In [69]:
for key in 'cab':
    cache[key] = None

In [70]:
list(cache)

['c', 'a', 'b']

In [71]:
list(cache.iterkeys())

['a', 'b', 'c']

`peekitem()`関数で挿入順序で最初と最後に効率よくアクセスできる

In [72]:
cache.peekitem()

('b', None)

In [73]:
cache.peekitem(last=False)

('c', None)

+ キュー型のデータ構造キャッシュの順序ソートに`push()`,`pull()`,`peek()`を利用できる
+ 上記, 3つの関数は, 自動的にキャッシュ範囲にキーを割り当てる

In [74]:
key = cache.push('first')
print(key)

500000000000000


In [75]:
cache[key]

'first'

In [76]:
_ = cache.push('second') # エンキュー side='back'
_ = cache.push('zeroth', side='front') # キューの先頭に挿入
_, value = cache.peek()
value

'zeroth'

In [77]:
key, value = cache.pull() # デキュー
print(key)
print(value)

499999999999999
zeroth


+ prefixパラメータを使用して, 一つのキャッシュ内に複数のキュー型データ構造を保持できる
+ prefixが`None`のとき, 整数keyが使用される.
+ prefixに対してstringを`prefix-integer`フォーマットで使用した場合, 500 trillionまでOK

`volume()`関数は, ディスクにあるキャッシュのトータル予測バイト数を出力する

In [78]:
cache.volume() < int(1e5)

True

+ `stats()`関数は, キャシュヒット率を返す
+ 統計量の計算はデフォルトでFalse

In [79]:
cache.stats(enable=True) # 計測開始

(0, 0)

In [80]:
for num in range(100):
    _ = cache.set(num, num)

In [81]:
for num in range(150):
    _ = cache.get(num)

In [82]:
hits, misses = cache.stats(enable=False, reset=True) # 計測終了
(hits, misses)

(100, 50)

+ `check()`関数は, キャッシュの一貫性を保つ.
+ 一貫性が崩れた状態のキャッシュを立て直すのも`check()`関数が行う

In [83]:
warnings = cache.check()

In [84]:
warnings



キャッシュの基礎となるディレクトリ(key-value)は自動で削除されない. 必ず手動で削除する

In [85]:
cache.close()
cache_with_tag_index.close()
import shutil
try:
    print(cache.directory)
    print(cache_with_tag_index.directory)
    shutil.rmtree(cache.directory)
    shutil.rmtree(cache_with_tag_index.directory)
except OSError:
    pass

F:/DiskCacheTest/Cache
F:/DiskCacheTest/TagIndexCache


## FanoutCache
+ データベースを共有するレイヤー(データベースを水平分割する)を自動的に作成する
+ write処理はblocking, read処理はnon-blocking
+ `shards`パラメータ 分割数 (default=8). write操作の(他のwrite操作に対する)blockingによるoverheadを減らすための策
+ `timeout`パラメータ データベースのトランザクションタイムアウト時間 (トランザクションはwrite処理で発生)
+ timeout時間`(default=0.010)`10msを超過した場合, `diskcache.Timeout`エラーが内部で発行される 
+ Cacheにも同様のtimeoutパラメータが存在する. Cacheで`diskcache.Timeout`が発行される -> pythonの例外が投げられる
+ 一方, FanoutCacheは`diskcache.Timeout`例外を内部で貯める&無視する(`set()`と`delete()`は暗黙的に失敗する)
+ `size_limit`パラメータ`(default=1GB)`は, キャッシュのトータルサイズを決める. 各shardごとの最大キャッシュサイズは`size_limit/shards`になる
+ `size_limit/shards`より大きなキャッシュアイテムは一時的に間引き?(分割)される

In [86]:
# fancache.close()

In [87]:
from diskcache import FanoutCache
cachedir3 = 'F:/DiskCacheTest/FanoutCache'
fancache = FanoutCache(directory=cachedir3, shards=4, timeout=1)

+ `memoizing`デコレータを追加できる. 関数オブジェクト・引数・戻り値をディスクキャッシュできる
+ `functools.lru_cache`&`Cache.set()`と似たような挙動をする(lru_cacheはインメモリ)

In [101]:
# memoizing callable
@fancache.memoize(typed=True, expire=1, tag='fib')
def fibonacci_memoizing(number):
    if number == 0:
        return 0
    elif number == 1:
        return 1
    else:
        return fibonacci_memoizing(number - 1) + fibonacci_memoizing(number - 2)

In [91]:
# non-memoizing callable
def fibonacci(number):
    if number == 0:
        return 0
    elif number == 1:
        return 1
    else:
        return fibonacci(number - 1) + fibonacci(number - 2)

non-memoizing fibonacci

In [93]:
import time
tp_start = time.perf_counter_ns()

# non-memoizing
print(sum(fibonacci(value) for value in range(10)))

tp_end = time.perf_counter_ns()

duration = (tp_end - tp_start) / 1000.0
print("elapsed_time {} [ms]".format(duration))

88
elapsed_time 110.3 [ms]


memoizing fibonacchi

In [121]:
import time
tp_start = time.perf_counter_ns()

print(sum(fibonacci_memoizing(value) for value in range(10))) # 10

tp_end = time.perf_counter_ns()

duration = (tp_end - tp_start) / 1000.0
print("elapsed_time {} [ms]".format(duration))

print(fancache.volume())
print(list(fancache))
fancache.evict('fib') # tag='fib'のkey-valueをすべて消す
print(list(fancache))
print(fancache.volume())


88
elapsed_time 18755.8 [ms]
131072
[('__main__.fibonacci_memoizing', 0, None, <class 'int'>), ('__main__.fibonacci_memoizing', 4, None, <class 'int'>), ('__main__.fibonacci_memoizing', 8, None, <class 'int'>), ('__main__.fibonacci_memoizing', 1, None, <class 'int'>), ('__main__.fibonacci_memoizing', 5, None, <class 'int'>), ('__main__.fibonacci_memoizing', 9, None, <class 'int'>), ('__main__.fibonacci_memoizing', 2, None, <class 'int'>), ('__main__.fibonacci_memoizing', 6, None, <class 'int'>), ('__main__.fibonacci_memoizing', 3, None, <class 'int'>), ('__main__.fibonacci_memoizing', 7, None, <class 'int'>)]
[]
131072


In [107]:
tp_start = time.perf_counter_ns()

print(sum(fibonacci_memoizing(value) for value in range(100))) # 100

tp_end = time.perf_counter_ns()

duration = (tp_end - tp_start) / 1000.0
print("elapsed_time {} [ms]".format(duration))

573147844013817084100
elapsed_time 36698.3 [ms]


In [108]:
print(list(fancache))
print(fancache.volume())
print(fancache.clear())
print(fancache.volume())


[('__main__.fibonacci_memoizing', 0, None, <class 'int'>), ('__main__.fibonacci_memoizing', 4, None, <class 'int'>), ('__main__.fibonacci_memoizing', 8, None, <class 'int'>), ('__main__.fibonacci_memoizing', 12, None, <class 'int'>), ('__main__.fibonacci_memoizing', 16, None, <class 'int'>), ('__main__.fibonacci_memoizing', 20, None, <class 'int'>), ('__main__.fibonacci_memoizing', 24, None, <class 'int'>), ('__main__.fibonacci_memoizing', 28, None, <class 'int'>), ('__main__.fibonacci_memoizing', 32, None, <class 'int'>), ('__main__.fibonacci_memoizing', 36, None, <class 'int'>), ('__main__.fibonacci_memoizing', 40, None, <class 'int'>), ('__main__.fibonacci_memoizing', 44, None, <class 'int'>), ('__main__.fibonacci_memoizing', 48, None, <class 'int'>), ('__main__.fibonacci_memoizing', 52, None, <class 'int'>), ('__main__.fibonacci_memoizing', 56, None, <class 'int'>), ('__main__.fibonacci_memoizing', 60, None, <class 'int'>), ('__main__.fibonacci_memoizing', 64, None, <class 'int'>),

In [109]:
print(fancache.volume())
print(list(fancache))

131072
[]


In [110]:
fancache.close()

## Disk
+ キャッシュに保存されたデータのシリアライズとデシリアライズ機能を持つ
+ keyとvalueでシリアライズの挙動が異なる
+ keyは常にキャッシュのメタデータベースに保存される
+ デフォルトのメタデータベースの型: intergers, floats, string, bytes. その他の型はPickleプロトコルを使用してbytesデータに変換されてメタデータベースに保存される
+ valueは時々ファイルに分割して保存される
+ シリアライズをカスタマイズするには, Diskクラスのサブクラスを作成する(キャッシュの初期化時に使用される)
+ キャッシュにアクセスするすべてのクライアントは, 同じシリアライズ=DiskクラスorDiskクラスのサブクラスを使用すること
+ シリアライズのデフォルト実装はPythonの`Pickle`を使用している

Python と同様に、1 や 1.0 などの整数と浮動小数点はキーとして同等に比較されることに注意してください。

#### Jsonファイルを圧縮ディスクキャッシュするJSONDiskサブクラス

In [117]:
from diskcache import Disk

import json
import zlib

class JSONDisk(Disk):

    def __init__(self, directory, compress_level=1, **kwargs):
        super().__init__(directory, **kwargs)
        self.compless_level = compress_level

    # override
    def put(self, key):
        json_bytes = json.dumps(key).encode('utf-8')
        data = zlib.compress(json_bytes, self.compless_level)
        return super().put(data)
    
    # override
    def get(self, key, raw):
        data = super().get(key, raw)
        return json.loads(zlib.decompress(data).decode('utf-8'))
    
    # override
    def store(self, value, read, key=None):
        if not read:
            json_bytes = json.dumps(value).encode('utf-8')
            value = zlib.compress(json_bytes, self.compless_level)
        return super().store(value, read, key=key)
    
    # override
    def fetch(self, mode, filename, value, read):
        data = super().fetch(mode, filename, value, read)
        if not read:
            data = json.loads(zlib.decompress(data).decode('utf-8'))
        return data
    
    
        

In [118]:
cachedir4 = "F:/DiskCacheTest/JsonDiskCache"
with Cache(directory=cachedir4, disk=JSONDisk, disk_compress_level=6) as jsoncache:
    pass

### GZipを使ったファイルを圧縮ディクスキャッシュするGZipDiskサブクラス

In [119]:
# gzip
import gzip

# diskcache
from diskcache import FanoutCache, Disk
from diskcache.core import MODE_BINARY

# cassandra
from cassandra.cqltypes import BytesType

# io
from io import BytesIO

class GzipDisk(Disk):

    def __init__(self, compress_level=1, **kwargs):
        super().__init__(kwargs)
        self.compress_level = compress_level
        self.chunk_size = 2**30

    # override
    def store(self, value, read, key=None):

        if type(value) is BytesType:
            if read:
                value = value.read()
                read = False
                
            str_io = BytesIO() # stream io
            gz_file = gzip.GzipFile(mode='wb', 
                                    compresslevel=self.compress_level, 
                                    fileobj=str_io)
            
            
            for offset in range(0, len(value), self.chunk_size):
                gz_file.write(value[offset:offset+self.chunk_size])
            gz_file.close()

            value = str_io.getvalue()

        return super().store(value, read)

    # override
    def fetch(self, mode, filename, value, read):
        value = super().fetch(mode, filename, value, read)

        if mode == MODE_BINARY:
            str_io = BytesIO(value) # stream io
            gz_file = gzip.GzipFile(mode='rb', fileobj=str_io)
            read_csio = BytesIO()

            while True:
                uncompressed_data = gz_file.read(self.chunk_size)
                if uncompressed_data:
                    read_csio.write(uncompressed_data)
                else:
                    break

            value = read_csio.getvalue()

        return value


In [132]:
def get_cache(cachedir : str, version : str, scope : str):
    caching_path = cachedir + f"/data-{version}/" + scope
    return FanoutCache(
        directory=caching_path,
        disk=GzipDisk,
        shards=64, # 64 db partition
        timeout=1, # 1sec
        size_limit=3e11, # max total db size
    )

In [133]:
gzip_cachedir = "F:/DiskCacheTest/GZipDiskCache"
version = "unversioned"
scope = "test"
gzip_cache = get_cache(gzip_cachedir, version, scope) # 後で必ずclose()すること!

callableオブジェクトをdbに登録する(memoizing)

In [134]:
# memoizing callable
@gzip_cache.memoize(typed=True, expire=1, tag='fib')
def fibonacci_gzip_cache(number):
    if number == 0:
        return 0
    elif number == 1:
        return 1
    else:
        return fibonacci_gzip_cache(number - 1) + fibonacci_gzip_cache(number - 2)

In [135]:
tp_start = time.perf_counter_ns()

print(sum(fibonacci_gzip_cache(value) for value in range(1000))) # 1000

tp_end = time.perf_counter_ns()

duration = (tp_end - tp_start) / 1000.0
print("elapsed_time {} [ms]".format(duration))

70330367711422815821835254877183549770181269836358732742604905087154537118196933579742249494562611733487750449241765991088186363265450223647106012053374121273867339111198139373125598767690091902245245323403500
elapsed_time 507885.9 [ms]


In [136]:
gzip_cache.close()