# RedGrease Demos

Quick demonstration of how to create and run Redis Gears functions, using RedGrease.

## [Demos](#Demos):
1. [The Basics](#1.-The-Basics)
2. [Complex Query](#2.-Complex-Query)
3. [Transaction Stream Processing](#3.-Transaction-Stream-Processing)
4. [Custom Command](#4.-Custom-Command)


# Preparations
Before running the demos, make sure that the prerequisites are met and that the preparation steps have successfully been executed. 
Some preparation steps, particularly the downloads, may take quite some time. 

## 1. Prerequisites
- Python3.7
- Pip
- Docker
- Jupyter

Run the cell below tho validate your prerequisites.

In [1]:
# Run cell to test your environment requirements

import sys
import re
pyver = !{sys.executable} --version  # type: ignore 
pipver = !{sys.executable} -m pip --version  # type: ignore
dockver = !docker --version  # type: ignore

if not re.match("Python 3.7", pyver[0]):
    raise SystemExit(f"This demo only supports Python 3.7. You are running {pyver[0]}.")

if not re.match(".*\(python 3.7\)", pipver[0]):
    raise SystemExit("Please install Pip for yout Python 3.7 environment.")

if not re.match("Docker version", dockver[0]):
    raise SystemExit("Please install Docker")

print("Requirements all look good!")

Requirements all look good!


## 2. Python Requirements

Install the Python packages required for the demo:

- `redgrease[client]` - The RedGrease client library for Redis Gears. This is what is being demonstrated.

- `ipywidgets` - Jupyter notebook exetension, for displaying widgets, e.g. buttons, in this notebook.
- `requests` - For downloading content.

Run the cell below to install the requirements.

In [None]:
# %%capture reqs_install_output
!{sys.executable} -m pip install redgrease[client] ipywidgets requests
!jupyter nbextension enable --py widgetsnbextension

## 3. Download Datasets
Some of the demos requiere a portion of the [COCO Dataset](https://cocodataset.org) to be uploaded into the Redis Gears Cluster.
The COCO Dataset (Common Objects in Context) is a fairly large set of (~247,000) images and corresponding annotations of what tey are depicting.

### Example:
<img src="coco_example.jpg" > [COCO Example](coco_example.jpg)

```
a man riding a snowboard down a ski slope.
a snowboarder sailing down a snowy hillside on a mountain.
a man is snowboarding past blue markers on a mountain.
a man on a snowboard in the snow.
a man snow boarding in the snow on a slope. 
```


For the demo we will only pre-download the annotations (json), not the images (jpeg), but it is still between 250 - 500 MB of data, depending on which portions you choose.

There are two annotation packages to choose from. 
- **COCO Train/Cal 2014** - Annotations for 124,000 images (241 MB)
- **COCO Train/Val 2017** - Annotations for 123,000 images (241 MB)

Either or both may be used. 
Run the cell below and select using the buttons which dataset(s) to download.

In [None]:
# This code is just for preparation of the demo.
# It is NOT part of the demo itself
#
# Download COCO Annotations 
# Run the cell, then:
# - Validate or modify the Download directory
# - Click the button, or buttons for the annotations to download

import ipywidgets as widgets
import os
import requests

coco_annotations_url = "http://images.cocodataset.org/annotations"
annotations_file_pattern = "annotations_trainval{}.zip"

layout = widgets.Layout(width="30%")
output = widgets.Output()

def get_download_path():
    download_dir = "."
    if os.name == 'nt':
        import winreg
        sub_key = r'SOFTWARE\Microsoft\Windows\CurrentVersion\Explorer\Shell Folders'
        downloads_guid = '{374DE290-123F-4565-9164-39C4925E467B}'
        with winreg.OpenKey(winreg.HKEY_CURRENT_USER, sub_key) as key:
            download_dir = winreg.QueryValueEx(key, downloads_guid)[0]
    else:
        download_dir = os.path.join(os.path.expanduser('~'), 'Downloads')

    return os.path.join(download_dir, "COCO")

download_location = widgets.Text(
    value=get_download_path(),
    placeholder="Download directory",
    description="Directory to download annotations to.",
    layout=layout,
)
display(download_location)

def dl_state(button, downloading=None):
    year = button.value
    annotations_file_name = annotations_file_pattern.format(year)
    destination = os.path.join(download_location.value, annotations_file_name)
    is_downloaded = os.path.isfile(destination)
    button.disabled = is_downloaded or downloading is not None
    if downloading:
        button.description=f"Downloading COCO {year} annotations (241 MB): {downloading}%. Please wait!"
    elif is_downloaded:
        button.description=f"Congrats! COCO {year} annotataions is downloaded!"
    else:
        button.description=f"Download COCO {year} annotations (241 MB)"
    return is_downloaded, annotations_file_name, destination


def download_button_pressed(btn):
    downloaded, file_name, destination = dl_state(btn)
    if downloaded:
        return
    if not os.path.isdir(download_location.value):
        os.mkdir(download_location.value)
    try:
        response = requests.get(
            f"{coco_annotations_url}/{file_name}",
            stream=True
        )
        total_length = response.headers.get('content-length')
        with open(destination, "wb") as f:
            if total_length is None: # no content length header
                dl_state(btn, "???")
                f.write(response.content)
                return
            total_length = int(total_length)
            dl = 0
            for data in response.iter_content(chunk_size=4096):
                dl += len(data)
                f.write(data)
                dl_state(btn, int(100*(dl/total_length)))

    except Exception:
        try:
            os.remove(destination)
        except Exception:
            pass
    finally:
        dl_state(btn)

for year in ["2014", "2017"]:
    download_button = widgets.Button(
        tooltip='Start download of selected datasets into the selected download directory.',
        layout=layout
    )
    download_button.value = year
    dl_state(download_button)
    download_button.on_click(download_button_pressed)
    display(download_button)

display(output)

## 4. Download and run Redis Gears Cluster Docker image
Run the cell below to download a Redis Gears Cluster Docker image (~605 MB), if not already present, and run it. 

In [None]:
redis_gears_cluster_image = "redislabs/rgcluster:1.2.1"
redis_gears_cluster_container_name = "demo_gears_cluster"

redis_gears_single_image = "redislabs/redisgears:1.2.1"
redis_gears_single_container_name = "demo_gears_single"

# Get the correct Redis Gears Images
!docker pull {redis_gears_single_image}
!docker pull {redis_gears_cluster_image}

# Check if the single container is already running.
container_info = !docker container inspect {redis_gears_single_container_name}
if container_info[0] == "[]":
    print("Starting Redis Gears single instance")
    !docker run --name {redis_gears_single_container_name} --rm -d -p 6379:6379 {redis_gears_single_image}


# Check if the cluster container is already running.
container_info = !docker container inspect {redis_gears_cluster_container_name}
if container_info[0] == "[]":
    print("Starting Redis Gears cluster instance")
    !docker run --name {redis_gears_cluster_container_name} --rm -d -p 30001:30001 -p 30002:30002 -p 30003:30003 {redis_gears_cluster_image}

print("Redis Gears containers are running!")

## 5. Load Annotation Data into Redis cluster
By running the cell below, the COCO annotations downloaded above will be loaded into the Redis Cluster.

In [None]:
import glob
import itertools
import json
import os
import re
import redgrease
import zipfile

annotation_archive_files = os.path.join(download_location.value, "annotations_trainval*.zip")
annotation_archives = glob.glob(annotation_archive_files)

if not annotation_archives:
    print("no archives")
    raise SystemExit("Please download either or both COCO annotations as per instructions above.")

r = redgrease.RedisGears(host="localhost", port=30001)

annotation_json_pattern = re.compile("annotations/(\w+)_([a-zA-Z]+)([0-9]+).json")

annotation_types = ["instances"]  #, "person_keypoints", "captions"]
years = ["2014", "2017"]
purpose = ["val", "train"]

output = widgets.Output()
progress = widgets.Text("", layout=layout)

def load_annotation_info(base_key, info):
    annotation_info_key = f"{base_key}/info"
    r.hset(annotation_info_key, mapping=info)
    return annotation_info_key

def load_license_info(base_key, license):
    license_key = f"{base_key}/license/{license['id']}"
    if not r.exists(license_key):
        r.hset(license_key, mapping=license)
    return license_key

def load_image_info(base_key, image_info):
    img_info_key = f"{base_key}/image/{image_info['id']}/info"
    if not r.exists(img_info_key):
        r.hset(img_info_key, mapping=image_info)
    return img_info_key

def load_keypoint_names(base_key, keypoints):
    keypoints_key = f"{base_key}/keypoints"
    r.lpush(keypoints_key, *keypoints)
    return keypoints_key

def load_list_of_str(base_key, sequence):
    list_key = f"{base_key}/skeleton"
    r.lpush(list_key, *map(str, sequence))
    return list_key

def load_category(base_key, category):
    category_key = f"{base_key}/category/{category['id']}"

    if "keypoints" in category:
        category["keypoints"] = load_keypoint_names(category_key, category["keypoints"])
    if "skeleton" in category:
        category["skeleton"] = load_list_of_str(category_key, category["skeleton"])

    r.hset(category_key, mapping=category)
    return category_key

def load_segmentation(base_key, segmentation):
    segmentation_key = f"{base_key}/segmentation"
    if not r.exists(segmentation_key):
        for i, segment in enumerate(segmentation):
            segment_key = f"{segmentation_key}/{i}"
            r.lpush(segment_key, *segment)
            r.rpush(segmentation_key, segment_key)
    return segmentation_key

def load_annotation(base_key, annotation):
    
    annotation_key = f"{base_key}/image/{annotation['image_id']}/annotation/{annotation['id']}"
    
    if not r.exists(annotation_key):
        if "segmentation" in annotation:
            # Replace the 'segmentation' list-of-lists, with a key with a list of keys, that in turn point to the inner lists :)
            annotation["segmentation"] = load_segmentation(annotation_key, annotation["segmentation"])
        
        if "bbox" in annotation:
            # Replace the 'bbox' with a string reepresentaton.load_segmentation
            annotation["bbox"] = str(annotation["bbox"])

        if "keypoints" in annotation:
            annotation["keypoints"] = load_list_of_str(annotation_key, annotation["keypoints"])

        r.hset(annotation_key, mapping=annotation)
    return annotation_key

def load_annotation_jsons_from_zip(zip_file):
    with zipfile.ZipFile(zip_file) as archive:
        for file_name in archive.namelist():
            is_annotation_file = annotation_json_pattern.match(file_name)
            if not is_annotation_file:
                continue
            
            annotation_type = is_annotation_file.group(1)
            dataset_purpose = is_annotation_file.group(2)
            dataset_year = is_annotation_file.group(3)

            if not annotation_type in annotation_types:
                continue
            
            if not dataset_purpose in purpose:
                continue
                
            if not dataset_year in years:
                continue

            with archive.open(file_name) as json_file:
                contents = json.load(json_file)
    
            base_key = f"/dataset/coco/{dataset_year}"
            info_key = f"{base_key}/general/{annotation_type}/{dataset_purpose}"
                
            # info
            if "info" in contents:
                progress.value = f"Loading info for {dataset_purpose} {dataset_year} {annotation_type}"
                load_annotation_info(info_key, contents["info"])

            # licenses
            if "licenses" in contents:
                progress.value = f"Loading licenses for {dataset_purpose} {dataset_year} {annotation_type}"
                for lic in contents["licenses"]:
                    load_license_info(info_key, lic)
                    
            # images
            if "images" in contents:
                progress.value = f"Loading images for {dataset_purpose} {dataset_year} {annotation_type}"
                for image_info in contents["images"]:
                    load_image_info(base_key, image_info)

            # annotations
            if "annotations" in contents:
                progress.value = f"Loading annotations for {dataset_purpose} {dataset_year} {annotation_type}"
                for annotation in contents["annotations"]:
                    load_annotation(base_key, annotation)

            # categories (for "instances" and "person_keypoints")
            if "categories" in contents:
                progress.value = f"Loading categories for {dataset_purpose} {dataset_year} {annotation_type}"
                for category in contents["categories"]:
                    load_category(base_key, category)

            
display(progress)
for archive in annotation_archives:
    progress.value = f"Unzipping {archive}"
    load_annotation_jsons_from_zip(archive)
progress.value = "Done!"


# Demos
This is the actual Demo section. Everything above is just preparations.

1. [The Basics](#1.-The-Basics)
2. [Complex Query](#2.-Complex-Query)
3. [Transaction Stream Processing](#3.-Transaction-Stream-Processing)
4. [Custom Command](#4.-Custom-Command)


<a id="demm-basics"></a>
## 1. The Basics
Showcasing some of the basic features and commands of the redgrease package.

Instantiation of client / connection to Redis Gears engines

In [2]:
import redgrease
import redgrease.utils
from IPython.display import Image
from IPython.core.display import HTML

# Create connection / client for single instance Redis
single = redgrease.RedisGears() 

# Create connection / client for Redis cluster 
cluster = redgrease.RedisGears(port=30001)

# Create using existing Redis Connection
import redis
r = redis.Redis()
gears = redgrease.Gears(r)

print(f"single: {single.ping()}")
print(f"cluster:\n{cluster.ping()}")
print(f"gears: {gears.pystats()}")

single: True
cluster:
{'172.17.0.3:30001': True, '172.17.0.3:30003': True, '172.17.0.3:30002': True}
gears: PyStats(TotalAllocated=1254607075, PeakAllocated=18437092, CurrAllocated=17635809)


Redis v6 commands are accessible

In [3]:
a = single.flushall()
b = single.set("Foo", 21)
c = single.hset("Bar", mapping={"spam":"eggs", "meaning":8})
d = single.hincrby("Bar", "meaning", 34)
e = single.xadd("clogs::0", {"msg":"START", "from":0, "to":0, "amount":0})

a, b, c, d, e

(True, True, 2, 42, b'1629165817483-0')

Gears-specific commands can be accessed through the `gears` property.

Examples:

In [4]:
cluster_pystats = cluster.gears.pystats()

print(f"Cluster Redis - Python Stats:\n{cluster_pystats}\n")

Cluster Redis - Python Stats:
{'172.17.0.3:30001': PyStats(TotalAllocated=90801744305, PeakAllocated=64222599, CurrAllocated=9490314), '172.17.0.3:30003': PyStats(TotalAllocated=90566502099, PeakAllocated=70399261, CurrAllocated=9513372), '172.17.0.3:30002': PyStats(TotalAllocated=91089844267, PeakAllocated=61443787, CurrAllocated=9174986)}



In [5]:
cluster_info = cluster.gears.infocluster()

print(f"Cluster Redis - Cluster Info:\n{cluster_info}\n")
print(f"Number of shards: {len(cluster_info.shards)}")

Cluster Redis - Cluster Info:
ClusterInfo(my_id='22cda3202c7ab2fb9575ceacc304127bd975466a', my_run_id='f9255947c8347fc9f4bdbd35cd33e08b654d7b23', shards=[ShardInfo(id='6752eaa8ceb1a6635385f80cba773ee5bdd7e66a', ip='172.17.0.3', port=30001, unixSocket='None', runid='6bec5be7c3b6cddb8d1e6b74c4559cf198ba3daa', minHslot=0, maxHslot=5460, pendingMessages=0), ShardInfo(id='678476b5bd2421f8a8b8edb32f848b7fad7fd2e9', ip='172.17.0.3', port=30003, unixSocket='None', runid='ff683468255dc12a175acd1de5ed24f0e7413b19', minHslot=10923, maxHslot=16383, pendingMessages=0), ShardInfo(id='22cda3202c7ab2fb9575ceacc304127bd975466a', ip='172.17.0.3', port=30002, unixSocket='None', runid='f9255947c8347fc9f4bdbd35cd33e08b654d7b23', minHslot=5461, maxHslot=10922, pendingMessages=0)])

Number of shards: 3


In [6]:
cluster_refreshed = cluster.gears.refreshcluster()

print(f"Cluster Redis - Cluster Refresh Response:\n{cluster_refreshed}\n")

Cluster Redis - Cluster Refresh Response:
True



Gear functions can be invoked as strings

In [7]:
#> Iterate through all Redis key-value records, and return all record data.
all_records_gear = single.gears.pyexecute("GearsBuilder().run()")

print("Single-node Redis - All-records gear:")
for result in all_records_gear:
    print(f"  {result}")

Single-node Redis - All-records gear:
  b"{'event': None, 'key': 'Bar', 'type': 'hash', 'value': {'meaning': '42', 'spam': 'eggs'}}"
  b"{'event': None, 'key': 'Foo', 'type': 'string', 'value': '21'}"
  b"{'event': None, 'key': 'clogs::0', 'type': 'unknown', 'value': None}"


In [8]:
#> Iterate through all Redis key-value records, and return just the key and type
key_type_gear = single.gears.pyexecute(
    "GearsBuilder().map(lambda record:(record['key'], record['type'])).run()"
)

print("Single-node Redis - Key-types gear:")
for result in key_type_gear:
    print(f"  {result}")


Single-node Redis - Key-types gear:
  b"('Bar', 'hash')"
  b"('Foo', 'string')"
  b"('clogs::0', 'unknown')"


In [9]:
#> Count the total number of keys / records
single_record_count = single.gears.pyexecute("GearsBuilder().count().run()")

print(f"Single-node Redis - Record count: {int(single_record_count)}")

Single-node Redis - Record count: 3


### GearFunction objects
RedGrease allows for cunstruction of GearFuntion objects instread of function strings.

In [10]:
### Programatic / dynamic definition of Gears functions
record_count = redgrease.KeysOnlyReader().count().run()

cluster_record_count = cluster.gears.pyexecute(record_count)

print(f"Cluster Redis - Total records: {cluster_record_count}")

Cluster Redis - Total records: 2946308


Open RedGrease GearFunction objects can be composed and reused

In [11]:
#>
images = redgrease.KeysReader("/dataset/coco/*/image/*/info").values()

image_count = images.count()
square_images = images.filter(lambda img: img['height'] == img['width'])

some_square_image_urls = (
    square_images
    .collect()
    .limit(4)
    .map(lambda record: record['coco_url'])
)

type(images), type(image_count), type(square_images), type(some_square_image_urls)


(redgrease.gears.OpenGearFunction,
 redgrease.gears.OpenGearFunction,
 redgrease.gears.OpenGearFunction,
 redgrease.gears.OpenGearFunction)

You can dynamically create parameterized open RedGrease GearFunctions in normal functions

In [12]:
#> Normal functions can create paramenterized Gear Functions
def instance_annotations(year="*"):
    return redgrease.KeysReader(
        f"/dataset/coco/{year}/image/*/annotation/*"
    ).values(type="hash")

Gears Functions can be exectuted in a number of different ways

In [13]:
### The "textbook" way
img_cnt = cluster.gears.pyexecute(image_count.run())

print(f"Total number of images: {img_cnt}\n")

Total number of images: 123287



In [14]:
### As an open function, i.e. without a closing 'run' or 'register' (Run is inferred)
annotation_cnt = cluster.gears.pyexecute(instance_annotations().count())

print(f"Total number of annotations: {annotation_cnt}\n")

Total number of annotations: 896782



In [15]:
#> Directly in the closing `run` or `register` operation, using the `on` argument
img_urls = some_square_image_urls.run(on=cluster)

print(f"Some square images")
for img_url in img_urls:
    display(Image(url=img_url))

Some square images


<a id="demo-query"></a>
## 2. Complex Query 

Let's construct a query GearFunction that can take a number of annotation category names and for each an optional min and max count, 
and then finds images that fit those constraint.

Firstly, notice how annotations are stored

In [16]:
a1 = cluster.hgetall("/dataset/coco/2017/image/22222/annotation/2027787")
a2 = cluster.hgetall("/dataset/coco/2017/image/22222/annotation/1727529")
a1, a2

({b'segmentation': b'/dataset/coco/2017/image/22222/annotation/2027787/segmentation',
  b'area': b'160.06269999999992',
  b'iscrowd': b'0',
  b'image_id': b'22222',
  b'bbox': b'[57.33, 86.94, 15.06, 16.69]',
  b'category_id': b'1',
  b'id': b'2027787'},
 {b'segmentation': b'/dataset/coco/2017/image/22222/annotation/1727529/segmentation',
  b'area': b'2565.190249999999',
  b'iscrowd': b'0',
  b'image_id': b'22222',
  b'bbox': b'[519.53, 83.45, 42.88, 118.36]',
  b'category_id': b'1',
  b'id': b'1727529'})

Lets create some lookup tables for the annotation categories and their IDs

In [17]:
#> Merging dicttionaries
def dict_merge(d1, d2):
    return {**d1, **d2}

# Lookup from category name to category id
category_id_lookup = (
    redgrease.KeysReader("/dataset/*/category/*")
    .values(type="hash")
    .map(lambda annotation: {annotation['name']:annotation['id']})
    .aggregate({},dict_merge, dict_merge)
    .run(on=cluster)
)

# Lookup from category id to category name
category_name_lookup = {cat_id:cat_name for cat_name, cat_id in category_id_lookup.items() }

print(f"Number of categories: {len(category_id_lookup)}")
print(f"Errors: {category_id_lookup.errors}")
print()
print(f"Lookup id by name:\n{category_id_lookup}")

Number of categories: 80
Errors: []

Lookup id by name:
{'bowl': '51', 'keyboard': '76', 'refrigerator': '82', 'cell phone': '77', 'spoon': '50', 'car': '3', 'tv': '72', 'parking meter': '14', 'surfboard': '42', 'hot dog': '58', 'orange': '55', 'tie': '32', 'cup': '47', 'tennis racket': '43', 'bed': '65', 'bus': '6', 'pizza': '59', 'giraffe': '25', 'train': '7', 'laptop': '73', 'scissors': '87', 'bicycle': '2', 'vase': '86', 'cow': '21', 'wine glass': '46', 'dog': '18', 'snowboard': '36', 'cake': '61', 'traffic light': '10', 'toothbrush': '90', 'sandwich': '54', 'bear': '23', 'motorcycle': '4', 'backpack': '27', 'bird': '16', 'baseball bat': '39', 'chair': '62', 'baseball glove': '40', 'toaster': '80', 'teddy bear': '88', 'truck': '8', 'remote': '75', 'elephant': '22', 'handbag': '31', 'apple': '53', 'bottle': '44', 'oven': '79', 'fork': '48', 'skis': '35', 'stop sign': '13', 'cat': '17', 'book': '84', 'carrot': '57', 'knife': '49', 'bench': '15', 'suitcase': '33', 'broccoli': '56', 'f

Collect the number of annotations of each category per image

In [18]:
### for each annotation we add one the accumulator for the image, under the category of the annotation
def accumulate_categories(image_id, accumulator, annotation):
    if 'category_id' in annotation:
        annotation_category_id = annotation['category_id']
        accumulator[annotation_category_id] = accumulator.get(annotation_category_id, 0) + 1
        return accumulator

    
# add the previousls accumlated counts from eace shard to a global ccumulator for the image
def accumulate_category_counts(image_id, accumulator, category_count):
    for category, count in category_count.items():
        accumulator[category] = accumulator.get(category, 0) + count
    return accumulator


# Just renaming fields so it's clearer what they contain
def format_img_stats(img_stats):
    return {
        'image_id': img_stats['key'],
        'instances': img_stats['value']
    }


# GearFunction that counts the number of annotations of each category in each image
category_count_by_image = instance_annotations(2017).aggregateby(
    extractor = lambda annotation : annotation.get('image_id', -1),  # Group the annotatioms by image_id
    zero = {},  # For each group we use a dict to accumulate the counts of each category of annotaion
    seqOp = accumulate_categories,  # Accumulate/reduce the category counts locally on each shard
    combOp = accumulate_category_counts  # Accumulate/reduce the local results globally
).map(format_img_stats)



# Run the GearFunction, but limit to 10 results per shard (for sanity)
cats_by_img = category_count_by_image.limit(10).run(on=cluster)

In [19]:
#>
print(f"Number of results: {len(cats_by_img)}")
print(f"Errors: {cats_by_img.errors}")
print()
for img in cats_by_img:
    inst = { category_name_lookup.get(cid, cid):cnt for cid, cnt in img['instances'].items() }
    print(f"Image #{img['image_id']} instances: {inst}")
print("...")
print("and so on")

Number of results: 30
Errors: []

Image #483381 instances: {'teddy bear': 1, 'chair': 1, 'person': 1}
Image #237137 instances: {'person': 2, 'cake': 8}
Image #17267 instances: {'car': 3, 'person': 3, 'traffic light': 2, 'bicycle': 2, 'truck': 1, 'handbag': 1}
Image #197756 instances: {'cell phone': 2, 'person': 3, 'bottle': 1}
Image #451278 instances: {'bed': 2, 'teddy bear': 1, 'chair': 2, 'person': 1}
Image #193332 instances: {'person': 2, 'broccoli': 1, 'cake': 1, 'tie': 1, 'dining table': 1, 'knife': 1}
Image #422878 instances: {'person': 2, 'knife': 1, 'horse': 1}
Image #475564 instances: {'orange': 1}
Image #247368 instances: {'zebra': 2, 'bird': 1}
Image #543468 instances: {'vase': 1}
Image #131174 instances: {'airplane': 1, 'person': 2}
Image #253219 instances: {'book': 12, 'cat': 1, 'tv': 1}
Image #361316 instances: {'wine glass': 2, 'fork': 1, 'person': 1, 'spoon': 1, 'dining table': 1, 'sandwich': 2}
Image #575461 instances: {'person': 2, 'cell phone': 1, 'tie': 1, 'handbag'

In [20]:
### Querying 

def contstrain(constraints):
    # return a predicate for instance counts where the count of a set of categories
    # Constraints is a dict from category name to a tuple of (min_count, max_count)
    
    id_constraints = { category_id_lookup[cat_name]:x for cat_name, x in constraints.items()}
    
    def predicate(record):
        instances = record['instances']
        
        # iterate through each of the constraints, to check if any fails
        
        for cat_id, constraint in id_constraints.items():
            min_count, max_count = constraint
            
            if cat_id not in instances:
                if min_count is not ... and min_count > 0:
                    return False
                continue
            
            if min_count is not ... and instances[cat_id] < min_count:
                return False
            
            if max_count is not ... and instances[cat_id] > max_count:
                return False
            
        return True

    return predicate


# Our query params is a dict of category name to a min and max count tuple (... meaninig any)
query_params = {
    'truck': (1, 2),
    'banana': (5, ...),
    'person': (1, 1),
    'bottle': (..., 0),
}


query = category_count_by_image.filter(contstrain(query_params))

query_result = query.limit(30).run(on=cluster)

query_result



[{'image_id': '454762',
  'instances': {'52': 13, '3': 1, '8': 1, '1': 1, '53': 2, '2': 1}},
 {'image_id': '182497', 'instances': {'52': 13, '8': 2, '1': 1}},
 {'image_id': '415880', 'instances': {'52': 14, '8': 1, '1': 1}},
 {'image_id': '527553', 'instances': {'52': 11, '3': 1, '1': 1, '8': 1}}]

In [21]:
#> Show the resulting images
for image in query_result:
    image_url = cluster.hget(
        f"/dataset/coco/2017/image/{image['image_id']}/info", 
        "coco_url",
    )
    display(Image(url=image_url.decode())) 

<a id="demo-stream"></a>
## 3. Transaction Stream Processing

In [38]:
import random
import datetime

user_count = 5
min_start_balance = 100
max_start_balance = 1000


# Create some 'user' accounts with some existing balance
for user_id in range(user_count):
    start_balance = random.randint(min_start_balance, max_start_balance)
    single.hset(
        f"/user/{user_id}",
        mapping={
            "id": user_id, 
            "balance": start_balance,
            "start_balance": start_balance,
        }
    )
    

# Helper function for sending transaction requests.
def attempt_random_transaction(channel, max_amount=100, message="This is a random transaction",):
    single.xadd(
        f"transactions:{channel}", 
        {
            "msg": message, 
            "from": random.randint(0, user_count-1), 
            "to": random.randint(0, user_count-1), 
            "amount": random.randint(1, max_amount),
        }
    )


# Print a summary balance sheet for all users 
def balance_sheet():
    sum_balance = 0
    for user_id in range(user_count):
        current_balance, start_balance = map(
            int, 
            single.hmget(f"/user/{user_id}", "balance", "start_balance")
        )
        print(f"User {user_id} balance: {current_balance}  ({current_balance-start_balance})")
        sum_balance += current_balance
    print("----------------------------")
    print(f"Total balance : {sum_balance}")
    return sum_balance

start_total_balance = balance_sheet()

User 0 balance: 935  (0)
User 1 balance: 893  (0)
User 2 balance: 473  (0)
User 3 balance: 202  (0)
User 4 balance: 382  (0)
----------------------------
Total balance : 2885


In [39]:

# Transform a key-space event to a transaction
def initialize_transaction(event):
    transaction = event['value']
    transaction['timestamp'] = datetime.datetime.utcnow().isoformat()
    transaction['channel'] = event['key']
    transaction['id'] = event['id']
    transaction['status'] = "pending"
    return transaction

    
# Handle the transaction safely
def handle_transaction(transaction):
    
    # Log the transaction event to the Redis engine log
    redgrease.log(f"Procesing transaction {transaction['id']}: {transaction}")
    
    sender = transaction['from']
    recipient = transaction['to']
    
    # Perform a sequence of commands atomically
    with redgrease.atomic():
        
        # Check if the 'sender' has sufficient balance
        sender_balance = redgrease.cmd.hget(
                f"/user/{sender}",
                "balance"
            )
        amount = int(transaction.get('amount', 0))
        
        if not sender_balance or amount > int(sender_balance):
            # If balance is not sufficient, the transaction is marked as failed.
            transaction['status'] = f"FAILED: Missing {int(sender_balance)-amount}"
            
        else:                      
            # If there is sufficient balance, 
            # remove the amount from sender and add it to the recipient
            # and mark as successful
            redgrease.cmd.hincrby(
                f"/user/{sender}",
                "balance",
                -amount
            )
            redgrease.cmd.hincrby(
                f"/user/{recipient}",
                "balance",
                amount
            )
            transaction['status'] = "successful"
            
            # If successful, add the transaction to the statement of the recipient
            redgrease.cmd.xadd(f"/user/{recipient}/statement", transaction)
        
        # Regardless of status, add the transaction to the statement of the sender
        redgrease.cmd.xadd(f"/user/{sender}/statement", transaction)
        
    redgrease.log(f"Done processing transaction {transaction['id']}: {transaction['status']}")
    return transaction

    
# Transaction processing pipeline
transsaction_pipe = (
    redgrease.StreamReader()  # Listen to streams
    .map(initialize_transaction)  # Map stream events to a 'transaction' dict, and adds default.
    .map(handle_transaction)  # Execute the transaction
    .register(prefix="transactions:*", batch=10, duration=30) # Listen to transaction stream and use batching
)


# Register the processing pipeline
transsaction_pipe.on(single)


ExecutionResult[bool](True)

In [40]:
for registration in single.gears.dumpregistrations():
    print(
        f"Registered Gear function {registration.id} has been "
        f"triggered {registration.RegistrationData.numTriggered} times."
    )

Registered Gear function 0000000000000000000000000000000000000000-138 has been triggered 0 times.


In [41]:
attempt_random_transaction("sample")

In [42]:
balance_sheet()

User 0 balance: 935  (0)
User 1 balance: 852  (-41)
User 2 balance: 473  (0)
User 3 balance: 243  (41)
User 4 balance: 382  (0)
----------------------------
Total balance : 2885


2885

In [43]:
from concurrent.futures import ThreadPoolExecutor
from itertools import repeat

# Run a bunch of transactions in parallell
parallell_transaction_job_count = 100
sequential_transactions_count = 100
max_transaction_amount = 500 


def sequential_transactions(channel="foo"):
    def attempt_transactions():
        for transaction_id in range(sequential_transactions_count):
            attempt_random_transaction(
                channel, 
                max_amount=max_transaction_amount,
                message=f"This is a transaction #{transaction_id} on channel {channel}",
            )
    return attempt_transactions
    
    
def run_in_parallell(jobs):
    with ThreadPoolExecutor() as worker:
        tasks = [worker.submit(job) for job in jobs]

        
run_in_parallell(
    [sequential_transactions(nm) for nm in range(parallell_transaction_job_count)]
)
        

In [45]:
end_total_balance = balance_sheet()
print(f"Total difference: {start_total_balance - end_total_balance}")
print()
for registration in single.gears.dumpregistrations():
    print(
        f"Registered Gear function {registration.id} has been "
        f"triggered {registration.RegistrationData.numTriggered} times."
    )

User 0 balance: 689  (-246)
User 1 balance: 617  (-276)
User 2 balance: 1310  (837)
User 3 balance: 202  (0)
User 4 balance: 67  (-315)
----------------------------
Total balance : 2885
Total difference: 0

Registered Gear function 0000000000000000000000000000000000000000-138 has been triggered 1697 times.


In [46]:
statement = single.xrange("/user/3/statement","-", "+", 5)
statement

[(b'1629166525493-0',
  {b'to': b'3',
   b'from': b'1',
   b'msg': b'This is a random transaction',
   b'amount': b'41',
   b'timestamp': b'2021-08-17T02:15:25.493316',
   b'channel': b'transactions:sample',
   b'id': b'1629166525492-0',
   b'status': b'successful'}),
 (b'1629166559785-0',
  {b'to': b'3',
   b'from': b'1',
   b'msg': b'This is a transaction #0 on channel 0',
   b'amount': b'85',
   b'timestamp': b'2021-08-17T02:15:59.785167',
   b'channel': b'transactions:0',
   b'id': b'1629166559784-0',
   b'status': b'successful'}),
 (b'1629166559787-0',
  {b'to': b'3',
   b'from': b'4',
   b'msg': b'This is a transaction #1 on channel 2',
   b'amount': b'116',
   b'timestamp': b'2021-08-17T02:15:59.787150',
   b'channel': b'transactions:2',
   b'id': b'1629166559786-0',
   b'status': b'successful'}),
 (b'1629166559791-0',
  {b'to': b'1',
   b'from': b'3',
   b'msg': b'This is a transaction #2 on channel 0',
   b'amount': b'101',
   b'timestamp': b'2021-08-17T02:15:59.791478',
   b'

### Cleanup

In [37]:
# Unregister all registrations
for reg in single.gears.dumpregistrations():
    single.gears.unregister(reg.id)

# Remove all executions
for exe in single.gears.dumpexecutions():
    single.gears.dropexecution(str(exe.executionId))

# Clear all keys
single.flushall()

# Check that there are no keys
single.keys()


[]

<a id="demo-command"></a>
## 4. Custom Command

A simple image cache

In [None]:
import requests

def cache_get(url):
    if redgrease.cmd.exists(url):
        return bytes(redgrease.cmd.get(url))
    
    response = requests.get(url)
    
    if response.status_code != 200:
        return bytes()
    
    
    response_data = bytes(response.content)
    redgrease.cmd.set(url, response_data)
    
    return response_data




get_image = (
    redgrease.CommandReader()
    .map(lambda trigger: trigger[1])
    .map(cache_get, requirements=["requests"])
    .register(trigger="cache_get", on=single, convertToStr=False)
)


In [None]:
%%time

image_urls_1 = [
    "http://images.cocodataset.org/train2017/000000246070.jpg",
    "http://images.cocodataset.org/train2017/000000167133.jpg",
    "http://images.cocodataset.org/train2017/000000559366.jpg",
    "http://images.cocodataset.org/train2017/000000156242.jpg",
    "http://images.cocodataset.org/train2017/000000169188.jpg",
    "http://images.cocodataset.org/train2017/000000135016.jpg",
    "http://images.cocodataset.org/train2017/000000248334.jpg",
    "http://images.cocodataset.org/train2017/000000445906.jpg",
    "http://images.cocodataset.org/train2017/000000318733.jpg",
    "http://images.cocodataset.org/train2017/000000316672.jpg", 
]

for image_url in image_urls_1:
    
    image_data = single.gears.trigger("cache_get", image_url)
    
    display(Image(data=image_data.value))
    
for registration in single.gears.dumpregistrations():
    print(
        f"Registered Gear function {registration.id} has been "
        f"triggered {registration.RegistrationData.numTriggered} times."
    )

### An even shorter version
Only the command function with a function decorator

In [None]:
@redgrease.trigger(on=single, convertToStr=False, requirements=["requests"], replace=True)
def cache_get(url):
    if redgrease.cmd.exists(url):
        return bytes(redgrease.cmd.get(url))
    
    response = requests.get(url)
    
    if response.status_code != 200:
        return bytes()
    
    response_data = bytes(response.content)
    redgrease.cmd.set(url, response_data)
    
    return response_data

In [None]:
%%time

image_urls_2 = [
    "http://images.cocodataset.org/train2017/000000483381.jpg",
    "http://images.cocodataset.org/train2017/000000237137.jpg",
    "http://images.cocodataset.org/train2017/000000017267.jpg",
    "http://images.cocodataset.org/train2017/000000197756.jpg",
    "http://images.cocodataset.org/train2017/000000451278.jpg",
    "http://images.cocodataset.org/train2017/000000193332.jpg",
    "http://images.cocodataset.org/train2017/000000475564.jpg",
    "http://images.cocodataset.org/train2017/000000247368.jpg",
] 

for image_url in image_urls_2:
    
    image_data = cache_get(image_url)
    
    display(Image(data=image_data.value))
    
for registration in single.gears.dumpregistrations():
    print(
        f"Registered Gear function {registration.id} has been "
        f"triggered {registration.RegistrationData.numTriggered} times."
    )