In [1]:
# Creating Version

In [2]:
# Store file by version

# Get files
How we want to get the files greatly determines the accessing schemes.

## Accessing Schemes

### By Query Key

Here we want to find a model by query key. If our query is:

```
{
    "entity": "storage",
    "category": "model",
    "subcategory": {"info": "...", "goes": "...", "here": "..."},
    "name": "name goes here"
}
```

We would want to seek to find such a model inside of our data-store. This is the most basic of uses.

### By Version

We want to be able to find our models by version as well. That's so we can play a game of version control over all of our models. So for example, if we want to find all models with the above key, and say we want version `1.0.4`, we could immediately find that version.

### By Checksum

We want to be able to find models by checksum as well. That's so we can delete all related information later and update models. To prevent the same file from being stored to multiple query keys we'll append both the query key and version number to the checksum information.

## Required fields for all files:

1. **Global checksum set** - here we'll store all the existing checksums
2. **Checksum information** - here we'll store all relavent information for the given checksum. Information includes:
    * File size
    * Query Key
    * Latest Version Number
3. **Sorted version set** - Here we'll store all existing versions inside of a sorted set. We can get the latest version for any given query key by simply getting the latest version.
    * We could end up running a global sorting algorithm by finding the highest ranking.
    * We could create a score per version if it comes to that
4. **Save file by `querykey:version`** - here we'll actually store the model. The reason we'll run this type of query is so we can store the file only in a single place. If we're running a replace operation by checksum we can also run a checksum lookup (if the checksum exist) and simply create a key that will replace the given file/blob.

In [3]:
from redis import Redis
from jamboree.utils.core import consistent_hash
from jamboree.utils.support import serialize, deserialize, create_checksum
from jamboree.utils.core import consistent_hash, consistent_unhash

  from pandas.util.testing import assert_frame_equal
  from pandas import DataFrame, Series, Panel


In [4]:
import hashlib
import hmac

In [5]:
redconnection = Redis()
test_query_key = {
    "entity": "storage",
    "category": "model",
    "subcategory": {
        "info": "...", 
        "goes": "...", 
        "here": "..."
    },
    "name": "name_goes_here"
}
def to_ser(func):
    sblob = serialize(func)
    csum = create_checksum(sblob)
    return sblob, csum

In [6]:
# Example Function and file information
example_function = lambda x: x + 1
ser, csum = to_ser(example_function)
json_key_hash = consistent_hash(test_query_key)
init_version = "0.0.1"

## Global Checksum list

We're going to start with all of the basic operations. Then actually get to the real meat of the global checksum list. That would be using it in everyday operations. 

* Saving the checksum inside of a global set
* Storing a serialized file to a query key and version
* creating a hash function and serializing the file to be used for both above

After we do that we'll be doing the following in order:

1. Checking to see if we've already stored a file inside of the database
2. Getting the latest version by file
3. Preventing storage again if the file already exist
4. Removing existing files
5. Correcting the information about the given file (version and query information) by checksum (if it exist)
6. Allowing overwritting a file if we allow it
7. Try loading a file that's not inside of our checksum set and make sure it doesn't load
8. Updating the version number if we already have a version number
9. Getting the latest version inside of a given list of versions for a file

## Save Operations

In [7]:
def save_checksum(s_sum:str, q_hash:str, f_version:str):
    """ Gets checksum information and saves it to a global table """
    checksumlistname = "globalchecksumset"
    current_checksum_key = f"{s_sum}:sum_info"
    redconnection.sadd(checksumlistname, s_sum)
    sum_info = {
        "version": f_version,
        "query_key": q_hash
    }
    redconnection.hmset(current_checksum_key, sum_info)

In [8]:
def file_store(item, q_hash, version):
    """Stores a file by query_hash and version number"""
    qkey = f"{q_hash}:{version}"
    redconnection.set(qkey, item)

In [9]:
def save_blob(query, item):
    """ Carries out all operations pertaining saving a single item """
    ser, csum = to_ser(item)
    json_key_hash = consistent_hash(query)
    init_version = "0.0.1"
    save_checksum(csum, json_key_hash, init_version)
    file_store(ser, json_key_hash, init_version)

In [10]:
save_blob(test_query_key, example_function)

## Boolean Operations
We'll use these operations to determine if something already exist. Some other things to consider:

1. Is there a version set that's not empty?
2. Is there a checksum that has associated information included?
3. Is the latest version information correct? Does the information from the checksum match up?


We might want to separate everything about the data here into a set of classes:

1. SuperStorage - The class that everything will be inside. Checksums, version control, syncing data at varying levels.
1. Versioning - Controls everything about the version control. Such as getting the latest version, checking if a version exist for a given query key, etc.
2. Checksums - A way of determining everything there is about the checksum. 
3. Synchronization - A way of tying everything together using coherent logic.

In [11]:
def is_checksum(csum):
    checksumlistname = "globalchecksumset"
    global_exist = bool(redconnection.exists(checksumlistname))
    if global_exist is False:
        print("Global Checklist Doesn't Exist")
        return False
    redconnection.sadd(checksumlistname, csum)
    file_exist = bool(redconnection.sismember(checksumlistname, csum))
    if file_exist is False:
        print("The file doesn't exist inside of the system")
        return False
    
    current_checksum_key = f"{checksumlistname}:sum_info"
    file_info_exist = bool(redconnection.exists(current_checksum_key))
    if file_info_exist is False:
        print("The file info doesn't exist inside of the system")
        return False
    return True

In [12]:
is_checksum(csum)

The file info doesn't exist inside of the system


False

In [328]:
from typing import Any
import maya
from redis.client import Pipeline

In [406]:
import cloudpickle
import dill
import lz4.frame
import version_query
from version_query import VersionComponent 

In [407]:
from loguru import logger

In [330]:
class RedisFileProcessor(object):
    def __init__(self, *args, **kwargs):
        self._pipe = None
        self._conn = None
    
    @property
    def conn(self) -> Redis:
        if self._conn is None:
            raise AttributeError("Pipe hasn't been set")
        return self._conn
    @conn.setter
    def conn(self, _pipe:Redis):
        self._conn = _pipe
    
    @property
    def pipe(self) -> Pipeline:
        if self._pipe is None:
            raise AttributeError("Pipe hasn't been set")
        return self._pipe
    @pipe.setter
    def pipe(self, _pipe:Pipeline):
        self._pipe = _pipe
    
    
    def reset(self):
        self.pipe = None
        self.conn = None

In [331]:
class RedisVersioning(RedisFileProcessor):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._current_checksum = None
        self._query_hash = None
        self._current_version = None
        self.is_update = False
        
    def index(self):
        """Get the current index of the versions"""
        pass
    
    @property
    def vkey(self):
        version_key = f"{self.qhash}:version_set"
        return version_key
    
    @property
    def version(self):
        if self._current_version is None:
            self._current_version = self.latest
        return self._current_version
    
    @property
    def qhash(self):
        if self._query_hash is None:
            raise AttributeError("Checksum hasn't been loaded yet")
        return self._query_hash
    
    @qhash.setter
    def qhash(self, qhash:str):
        self._query_hash = qhash
    
    
    @property
    def exist(self) -> bool:
        is_exist = bool(self.conn.exists(self.vkey))
        version_count = self.conn.zcard(self.vkey)
        above_zero = (version_count >= 0)
        if is_exist and above_zero:
            self.is_update = True
            return True
        return False
    
    @property
    def latest(self):
        """Get the latest version given a hash"""
        if self.exist is False:
            self.conn.zadd(self.vkey, {"0.0.1": maya.now()._epoch})
            return "0.0.1"
        return (self.conn.zrange(self.vkey, -1, -1)[0]).decode()
        
    @property
    def updated(self):
        current = self.version
        if self.is_update is False:
            return current
        vs = version_query.Version.from_str(current)
        
        new_vs = vs.increment(VersionComponent.Patch)
        new_vs_str = new_vs.to_str()
        
        self.conn.zadd(self.vkey, {new_vs_str: maya.now()._epoch})
        return new_vs_str
        
    def reset(self):
        super().reset()
        self.is_update = False
        self._current_version = None
        print("Resetting versioning variables")

In [332]:
class RedisChecksum(RedisFileProcessor):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._current_checksum = None
        self.master = "f35e91e082494a278d64221230cbaae7:master"
    
    
    
    @property
    def checksum(self):
        if self._current_checksum is None:
            raise AttributeError("Checksum hasn't been loaded yet")
        return self._current_checksum
    
    @checksum.setter
    def checksum(self, _checksum:str):
        self._current_checksum = _checksum
    
    @property
    def incheck(self):
        return f"{self.checksum}:sum_info"
    
    @property
    def exists(self) -> bool:
        """Does the checksum exist? """
#         self.watch_all()
        
        global_exist = bool(self.conn.exists(self.master))
        file_exist = bool(self.conn.sismember(self.master, self.checksum))
        file_info_exist = bool(self.conn.exists(self.incheck))
        # Check to see if the global set exist
        if global_exist is False:
            print("Global checksum store doesn't exist")
            
        if file_exist is False:
            print("File doesn't exist")
            return False
        
        if file_info_exist is False:
            print("File info doesn't exist")
            return False
        return True
    
    def watch_all(self):
        """Watch all of the given variables """
        self.pipe.watch(self.master)
        self.pipe.watch(self.incheck)
        
    @property
    def info(self):
#         self.watch_all()
        if not self.exists:
            raise AttributeError("Information given the checksum doesn't exist")
        
        _info = self.conn.hmgetall(self.incheck)
        return _info
    
    def add(self, version:str, qkey:str):
#         self.watch_all()
        """
            Add the checksum into the database with the given query key.
            
            We're not dynamically adding version information into the database
        """
        info = {
            "version": version,
            "query_key": qkey
        }
        self.conn.hmset(self.incheck, info)
        self.conn.sadd(self.master, self.checksum)
        print(self.conn.smembers(self.master))
    
    def remove(self):
        """
            Remove everything related to the checksum from the database
        """
        if self.exists:
            self.conn.delete(self.incheck)
            self.conn.srem(self.master, self.checksum)            
        
    
    def reset(self):
        super().reset()

In [637]:
class RedisSynchronizer(RedisFileProcessor):
    """
        Runs all of the logic about the given object:
        
        1. Appropiately saves information for the file and related to the file
        2. Deletes all related information for a given file 
        3. Checks for consistency between bits of information
        4. Has rules to save information related to the given bit of data
            * If we're going to just replace the current version
            * If we're going to increment the given version
            * Has deletion rules as well (deleting large swabs of information)
    """
    
    def add(self, qhash, version, pickled_file):
        vkey = f"{qhash}:{version}"
        self.conn.set(vkey, pickled_file)
    
    def load(self, qhash:str, version:str):
        vkey = f"{qhash}:{version}"
        is_exist = bool(self.conn.exists(vkey))
        if is_exist == False:
            raise AttributeError("File doesn't exist")
        pickled_file = self.conn.get(vkey)
        return pickled_file
    
    """This will """
    def reset(self):
        super().reset()
        print("Reset all syncing information")

In [666]:
class RedisStorage(RedisFileProcessor):
    def __init__(self):
        self.versioning = RedisVersioning()
        self.checksums = RedisChecksum()
        self.sync = RedisSynchronizer()
        self._connection = None
        self._pipe = None
        
        # Current Placeholder Values
        self.current_query = None
        self.current_hash = None
        self.current_pickle = None
        self.current_checksum = None
    
    @property
    def connection(self) -> Redis:
        if self._connection is None:
            raise AttributeError("Missing a redis connection")
        return self._connection
    
    @connection.setter
    def connection(self, _connection):
        self._connection = _connection
    
    @property
    def query(self):
        if self.current_query is None:
            raise AttributeError("Current Query Doesn't Exist")
        return self.current_query
    
    @property
    def chash(self):
        if self.current_hash is None:
            self.current_hash = consistent_hash(self.query)
        return self.current_hash
    
    def process(self, obj:Any):
        """Completely process and object so you can use it elsewhere"""
        
        self.current_pickle = serialize(obj)
        return self.current_pickle
    
    def save(self, query:dict, obj:Any, is_version_update=True):
        """ Save object at query """
        self.reset()
        self.current_query = query
        self.process(obj)
        self.load_subs()
        prior_existing = self.versioning.exist
        
        # If we're updating the version
        if is_version_update and prior_existing:
            updated = self.versioning.updated
            self.sync.add(self.chash, updated, self.current_pickle)
            logger.info(f"Version: {updated}")
            return self
        
        version = self.versioning.version
        logger.info(f"Version: {version}")
        self.sync.add(self.chash, version, self.current_pickle)
        return self
    
    def load(self, query:dict):
        """ Load the latest model """
        
        self.reset()
        self.current_query = query
        self.load_subs()
        prior_existing = self.versioning.exist
#         logger.info(prior_existing)
        if prior_existing == False:
            return None
        version = self.versioning.version
        _file = self.sync.load(self.chash, version)
        n_file = deserialize(_file)
        return n_file
        
        
    def load_subs(self):
        """
            # Load Subs
            Load relavent information into the subclasses. 
            This is so we can check for the integrity of the checksums and
        """
#         self.checksums.checksum = self.current_checksum
        self.versioning.qhash = self.chash
        
    
    def reset_placeholder_values(self):
        self.current_query = None
        self.current_hash = None
    
    def reset_subclasses(self):
        self.versioning.reset()
        self.checksums.reset()
        self.sync.reset()
    
    def reset_pipes(self):
        self.versioning.conn = self.connection
        self.checksums.conn = self.connection
        self.sync.conn = self.connection        
        
    
    def reset(self):
        """Set a new pipe. Reset all of the other variables """
        super().reset()
        self.reset_placeholder_values()
        self.reset_subclasses()
        self.reset_pipes()
        return self

In [667]:
rstore = RedisStorage()
rstore.connection = redconnection

In [668]:
class SampleObject:
    def __init__(self):
        self.one = "two"

In [669]:
# !pip install dill

In [670]:
# sobj = SampleObject()

In [671]:
# pickle1 = dill.dumps(sobj, ) # sobj.__reduce__()

In [672]:
# sobj2  = SampleObject()

In [673]:
# pickle2 = dill.dumps(sobj2) # sobj.__reduce__()

In [674]:
# pickle1 == pickle2

In [675]:
"""
    cpickle1 = cloudpickle.dumps(sobj)
    cpickle2 = cloudpickle.dumps(sobj2)
    compressed1 = lz4.frame.compress(cpickle1)
    compressed2 = lz4.frame.compress(cpickle2)
    csum1 = create_checksum(compressed1)
    csum2 = create_checksum(compressed2)
"""

'\n    cpickle1 = cloudpickle.dumps(sobj)\n    cpickle2 = cloudpickle.dumps(sobj2)\n    compressed1 = lz4.frame.compress(cpickle1)\n    compressed2 = lz4.frame.compress(cpickle2)\n    csum1 = create_checksum(compressed1)\n    csum2 = create_checksum(compressed2)\n'

In [676]:
# csum1 == csum2

In [677]:
# new_digest = hmac.new('shared-key', pickled_data, hashlib.sha1).hexdigest()

In [678]:
sobj3  = SampleObject()

In [682]:
(rstore.reset()
.save({'one': 'twosss'}, sobj3, is_version_update=False).load({'one': 'twosss'}))

Resetting versioning variables
Reset all syncing information
Resetting versioning variables
Reset all syncing information


2020-03-23 22:30:12.562 | INFO     | __main__:save:59 - Version: 0.0.1


Resetting versioning variables
Reset all syncing information


<__main__.SampleObject at 0x7f710e1bad68>

In [327]:
import uuid

In [199]:
item = uuid.uuid4().hex

In [200]:
redconnection.zadd(f"{item}:version_set", {"0.0.1": maya.now()._epoch})

1

In [114]:
redconnection.zrange(f"{item}:version_set", -1, -1)

[b'0.0.1']