In [1]:
from cassandra.cluster import Cluster, ConsistencyLevel, ResultSet
from cassandra.query import SimpleStatement

import random
import uuid
from datetime import datetime, timezone

nodes = {
    'cassandra1': ('127.0.0.1', 9042),
    'cassandra2': ('127.0.0.1', 9043),
    'cassandra3': ('127.0.0.1', 9044),
}

def get_current_datetime():
    return datetime.now(timezone.utc)

def format_datetime(datetime):
    return datetime.strftime('%Y-%m-%dT%H:%M:%S.') + datetime.strftime('%f')[:3] + '+0000'

def get_current_datetime_str():
    return format_datetime(get_current_datetime())

def parse_datetime_str(datetime_str: str):
    return datetime.strptime(datetime_str, '%Y-%m-%d %H:%M:%S.%f')

def random_node():
    return random.choice(list(nodes.keys()))

def connect_to_cassandra(node):
    return Cluster([nodes[node][0]], port=nodes[node][1]).connect()

def statement(cql):
    return SimpleStatement(
        query_string=cql,
        consistency_level=ConsistencyLevel.LOCAL_QUORUM,
        is_idempotent=True
    )

def create_keyspace():
    session = connect_to_cassandra(random_node())

    ### Create keyspace
    create_keyspace = """
create keyspace if not exists statements with
    replication = {'class': 'NetworkTopologyStrategy', 'DC1': 3}
AND
    durable_writes = true;
"""
    session.execute(create_keyspace)

    ### Create tables
    session.execute("USE statements")

    create_property_statements_table = """
create table if not exists property_statements (
    id varchar,
    observation_id varchar,
    created_at timestamp,
    updated_at timestamp,
    property_name varchar,
    property_type varchar,
    property_value_varchar varchar,
    property_value_blob blob,
    property_value_bigint bigint,
    property_value_double double,
    property_value_boolean boolean,
    property_value_timestamp timestamp,
    property_value_uuid uuid,
    version bigint,
    primary key (id)
) WITH compaction = {
    'class' : 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy',
    'only_purge_repaired_tombstones' : 'true'
};
"""
    session.execute(create_property_statements_table)

    ## Custom function to increment version
    create_increment_function = """
    CREATE FUNCTION IF NOT EXISTS increment(version bigint)
    CALLED ON NULL INPUT
    RETURNS bigint
    LANGUAGE java
    AS $$
        if (version == null) return 1L;
        else return version + 1;
    $$;
    """
    session.execute(create_increment_function)

    ## Create observation_id index
    create_property_statements_observation_id_index = """
create index property_statements_observation_id_index on 
property_statements (observation_id);
"""

    ## Create property_name index
    session.execute(create_property_statements_observation_id_index)

    create_property_statements_property_name_index = """
create index property_statements_property_name_index on property_statements (property_name);
"""

    session.execute(create_property_statements_property_name_index)

def nuke_keyspace():
    session = connect_to_cassandra(random_node())

    nuke_keyspace = "drop keyspace if exists statements;"
    session.execute(nuke_keyspace)

def insert_property_statement(id: str, observation_id: str, created_at: str, property_name: str, property_type:str, property_value: any, version: int):
    
    return (f"""
        INSERT INTO property_statements (id, observation_id, created_at, updated_at, property_name, property_type, property_value_{property_type}, version)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s) IF NOT EXISTS;
    """,
    (id, observation_id, created_at, created_at, property_name, property_type, property_value, version))

# Create the keyspace

In [2]:
create_keyspace()

# Nuke the keyspace

In [None]:
nuke_keyspace()

# Insert some random data

In [4]:
session = connect_to_cassandra(random_node())
session.execute("USE statements;")

for i in range(1000):
    formatted_datetime = get_current_datetime_str()

    property_names = ['first_name', 'last_name', 'email', 'phone_number', 'address', 'city', 'state', 'zip_code', 'country', 'age', 'date_of_birth', 'is_active', 'trust_score']

    property_types_and_values = {
        'first_name': ('varchar', 'John'),
        'last_name': ('varchar', 'Doe'),
        'email': ('varchar', 'john@someemail.com'),
        'phone_number': ('varchar', '123-456-7890'),
        'address': ('varchar', '123 Main St'),
        'city': ('varchar', 'Springfield'),
        'state': ('varchar', 'IL'),
        'zip_code': ('varchar', '62701'),
        'country': ('varchar', 'USA'),
        'age': ('bigint', 30),
        'date_of_birth': ('timestamp', '1978-11-22T00:00:00.000+0000'),
        'is_active': ('boolean', True),
        'trust_score': ('double', 0.95)
    }

    property_name = random.choice(property_names)

    property_type, property_value = property_types_and_values[property_name]
    version = 1
    statement_id = f"{uuid.uuid4()}"
    observation_id = f"{uuid.uuid4()}"

    cql, params = insert_property_statement(statement_id, observation_id, formatted_datetime, property_name, property_type, property_value, version)

    results = session.execute(statement(cql), params)
    assert results.one()[0] == True

    cql = f"select observation_id, created_at, updated_at, property_name, property_type, property_value_{property_type}, version from property_statements where id = %s;"

    results = session.execute(statement(cql), (statement_id,))
    row = results.one()

    assert row.observation_id == observation_id
    assert format_datetime(row.created_at) == formatted_datetime
    assert format_datetime(row.updated_at) == formatted_datetime
    assert row.property_name == property_name
    assert row.property_type == property_type
    assert row.version == version
    if property_type == 'timestamp':
        assert format_datetime(row[5]) == property_value
    else:
        assert row[5] == property_value
    assert row.version == version


# CAS Example

In [3]:
session = connect_to_cassandra(random_node())
session.execute("USE statements;")

# Insert a property statement with an int value
formatted_datetime = get_current_datetime_str()
property_name = 'amount'
property_type = 'bigint'
property_value = 100
version = 1
statement_id = f"{uuid.uuid4()}"

cql, params = insert_property_statement(statement_id, f"{uuid.uuid4()}", formatted_datetime, property_name, property_type, property_value, version)

session.execute(statement(cql), params)

cql = f"update property_statements set property_value_bigint = 200, version = increment(%s), updated_at = %s where id = %s IF version = %s;"

results = session.execute(statement(cql), (version, formatted_datetime, statement_id, version))
assert results.one()[0] == True

cql = f"select property_value_bigint, version, updated_at from property_statements where id = %s;"

results = session.execute(statement(cql), (statement_id,))
row = results.one()
assert row[0] == 200
assert row[1] == 2
assert format_datetime(row[2]) == formatted_datetime