<a href="https://colab.research.google.com/github/t-fuchi/RedisMMap/blob/main/RedisMMap.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Redisでmmapしたファイルから数値を読み込むモジュールをGoogle Colabで作ってみた

In [None]:
!wget https://download.redis.io/redis-stable.tar.gz

--2023-06-15 16:48:55--  https://download.redis.io/redis-stable.tar.gz
Resolving download.redis.io (download.redis.io)... 45.60.121.1
Connecting to download.redis.io (download.redis.io)|45.60.121.1|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 3068843 (2.9M) [application/octet-stream]
Saving to: ‘redis-stable.tar.gz’


2023-06-15 16:48:55 (32.4 MB/s) - ‘redis-stable.tar.gz’ saved [3068843/3068843]



In [None]:
!tar -xzf redis-stable.tar.gz
%cd /content/redis-stable/src/modules

必要なヘッダーのインクルードです。ちょっとした便利マクロも定義します。

In [None]:
%%writefile fmmap.c

#pragma GCC diagnostic ignored "-Wtypedef-redefinition"

#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdbool.h>
#include <stdint.h>

#include "../redismodule.h"
#include "../sds.h"
#include "../server.h"
#include "../zmalloc.h"

static inline int mstringcmp(const RedisModuleString *rs1, const char *s2)
{
  return strcasecmp(RedisModule_StringPtrLen(rs1, NULL), s2);
}

Overwriting fmmap.c


MMapObjectを定義します。mmapに必要な情報を詰め込みました。sdsはRedis内で使われる文字列型です。

In [None]:
%%writefile -a fmmap.c

typedef struct _MMapObject
{
  sds file_path;
  int fd;
  void *mmap;
  size_t file_size;
} MMapObject;

In [None]:
%%writefile -a fmmap.c

RedisModuleType *MMapType = NULL;

MMapObject *MCreateObject(void)
{
  return (MMapObject *)zcalloc(sizeof(MMapObject));
}

In [None]:
%%writefile -a fmmap.c

void MFree(void *value)
{
  if (value == NULL) return;
  const MMapObject *obj_ptr = value;
  if (obj_ptr->mmap != NULL) munmap(obj_ptr->mmap, obj_ptr->file_size);
  if (obj_ptr->fd != -1) close(obj_ptr->fd);
  sdsfree(obj_ptr->file_path);
  zfree(value);
}

In [None]:
%%writefile -a fmmap.c

// MMAP key file_path
int MMap_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
  RedisModule_AutoMemory(ctx); /* Use automatic memory management. */

  RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ | REDISMODULE_WRITE);

  int type = RedisModule_KeyType(key);

  /* Create an empty value object if the key is currently empty. */
  MMapObject *obj_ptr;
  if (type == REDISMODULE_KEYTYPE_EMPTY) {
    obj_ptr = MCreateObject();
    obj_ptr->file_path = sdsnew(RedisModule_StringPtrLen(argv[2], NULL));
    obj_ptr->fd = open(obj_ptr->file_path, O_CREAT | O_RDWR);

    struct stat sb;
    fstat(obj_ptr->fd, &sb);
    obj_ptr->file_size = sb.st_size;
    obj_ptr->mmap = mmap(NULL, sb.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, obj_ptr->fd, 0);
    RedisModule_ModuleTypeSetValue(key, MMapType, obj_ptr);
  }
  else {
    obj_ptr = RedisModule_ModuleTypeGetValue(key);
  }

  return REDISMODULE_OK;
}


Appending to fmmap.c


In [None]:
%%writefile -a fmmap.c

// MGET key index
int MGet_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
  RedisModule_AutoMemory(ctx); /* Use automatic memory management. */

  RedisModuleKey *key =
    RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ | REDISMODULE_WRITE);
  int type = RedisModule_KeyType(key);

  MMapObject *obj_ptr = RedisModule_ModuleTypeGetValue(key);

  if (obj_ptr->file_size < (size_t)index * sizeof(float) || index < 0) {
    RedisModule_ReplyWithNull(ctx);
  }
  else {
    RedisModule_ReplyWithDouble(ctx, ((float*)obj_ptr->mmap)[index]);
  }
  return REDISMODULE_OK;
}

In [None]:
%%writefile -a fmmap.c

// MMGET key index [index ...]
int MMGet_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
  RedisModule_AutoMemory(ctx); /* Use automatic memory management. */

  RedisModuleKey *key =
      RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ | REDISMODULE_WRITE);
  int type = RedisModule_KeyType(key);

  MMapObject *obj_ptr = RedisModule_ModuleTypeGetValue(key);

  RedisModule_ReplyWithArray(ctx, argc - 2);
  for (int i = 2; i < argc; i++) {
    RedisModule_StringToLongLong(argv[i], &index);
    if (obj_ptr->file_size < (size_t)index * sizeof(float) || index < 0) {
      RedisModule_ReplyWithNull(ctx);
    }
    else {
      RedisModule_ReplyWithDouble(ctx, ((float*)obj_ptr->mmap)[index]);
    }
  }
  return REDISMODULE_OK;
}

In [None]:
%%writefile -a fmmap.c

// MSET key index value [index value ...]
int MSet_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
  RedisModule_AutoMemory(ctx); /* Use automatic memory management. */

  RedisModuleKey *key =
      RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ | REDISMODULE_WRITE);
  int type = RedisModule_KeyType(key);

  MMapObject *obj_ptr = RedisModule_ModuleTypeGetValue(key);

  long long index;
  double value;
  for (int i = 3; i < argc; i += 2) {
    if (RedisModule_StringToDouble(argv[i], &value) == REDISMODULE_ERR) {
      return RedisModule_ReplyWithError(ctx, "value must be float.");
    }
  }
  for (int i = 2; i < argc; i += 2) {
    RedisModule_StringToLongLong(argv[i], &index);
    RedisModule_StringToDouble(argv[i + 1], &value);
    *((float*)obj_ptr->mmap + index) = (float)value;
  }

  return RedisModule_ReplyWithLongLong(ctx, (argc - 2) / 2);
}


In [None]:
%%writefile -a fmmap.c

// MADD key value [value ...]
int MAdd_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
  RedisModule_AutoMemory(ctx); /* Use automatic memory management. */

  RedisModuleKey *key =
      RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ | REDISMODULE_WRITE);
  int type = RedisModule_KeyType(key);

  MMapObject *obj_ptr = RedisModule_ModuleTypeGetValue(key);
  double value;
  for (int i = 2; i < argc; ++i) {
    if (RedisModule_StringToDouble(argv[i], &value) == REDISMODULE_ERR) {
      return RedisModule_ReplyWithError(ctx, "value must be float.");
    }
  }
  size_t new_size = obj_ptr->file_size + obj_ptr->value_size * (argc - 2);
  ftruncate(obj_ptr->fd, new_size);
  munmap(obj_ptr->mmap, obj_ptr->file_size);
  obj_ptr->mmap = mmap(NULL, new_size, PROT_READ | PROT_WRITE, MAP_SHARED, obj_ptr->fd, 0);
  for (int i = 2; i < argc; ++i) {
    size_t index = obj_ptr->file_size / obj_ptr->value_size;
    RedisModule_StringToDouble(argv[i], &value);
    obj_ptr->file_size += obj_ptr->value_size;
    *((float*)obj_ptr->mmap + index) = (float)value;
  }

  return RedisModule_ReplyWithLongLong(ctx, argc - 2);
}

In [None]:
%%writefile -a fmmap.c

// MSIZE key
int MSize_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
  RedisModule_AutoMemory(ctx); /* Use automatic memory management. */

  RedisModuleKey *key =
      RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ | REDISMODULE_WRITE);
  int type = RedisModule_KeyType(key);

  MMapObject *obj_ptr = RedisModule_ModuleTypeGetValue(key);

  return RedisModule_ReplyWithLongLong(ctx, obj_ptr->file_size / sizeof(float));
}

In [None]:
%%writefile -a fmmap.c

// MCLEAR key
int MClear_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
  RedisModule_AutoMemory(ctx); /* Use automatic memory management. */

  RedisModuleKey *key =
      RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ | REDISMODULE_WRITE);
  int type = RedisModule_KeyType(key);

  MMapObject *obj_ptr = RedisModule_ModuleTypeGetValue(key);

  munmap(obj_ptr->mmap, obj_ptr->file_size);
  ftruncate(obj_ptr->fd, 0);
  obj_ptr->mmap = mmap(NULL, 0, PROT_READ | PROT_WRITE, MAP_SHARED, obj_ptr->fd, 0);
  obj_ptr->file_size = 0;

  return REDISMODULE_OK;
}

In [None]:
%%writefile -a fmmap.c

// MPOP key
int MPop_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
  RedisModule_AutoMemory(ctx); /* Use automatic memory management. */

  RedisModuleKey *key =
      RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ | REDISMODULE_WRITE);
  int type = RedisModule_KeyType(key);

  MMapObject *obj_ptr = RedisModule_ModuleTypeGetValue(key);

  if (obj_ptr->file_size == 0) {
    RedisModule_ReplyWithNull(ctx);
  }
  else {
    size_t index = obj_ptr->file_size / sizeof(float);
    RedisModule_ReplyWithDouble(ctx, ((float*)obj_ptr->mmap)[index]);
    munmap(obj_ptr->mmap, obj_ptr->file_size);
    obj_ptr->file_size -= obj_ptr->value_size;
    ftruncate(obj_ptr->fd, obj_ptr->file_size);
    obj_ptr->mmap = mmap(NULL, obj_ptr->file_size, PROT_READ | PROT_WRITE, MAP_SHARED, obj_ptr->fd, 0);
  }
  return REDISMODULE_OK;
}

In [None]:
%%writefile -a fmmap.c

void MRdbSave(RedisModuleIO *rdb, void *value)
{
  MMapObject *obj_ptr = value;
  RedisModule_SaveStringBuffer(rdb, obj_ptr->file_path, sdslen(obj_ptr->file_path));
  msync(obj_ptr->mmap, obj_ptr->file_size, MS_ASYNC);
}

In [None]:
%%writefile -a fmmap.c

void *MRdbLoad(RedisModuleIO *rdb, int encver)
{
  MMapObject *obj_ptr = MCreateObject();
  obj_ptr->file_path = sdsnew(RedisModule_StringPtrLen(RedisModule_LoadString(rdb), NULL));
  obj_ptr->fd = open(obj_ptr->file_path, O_CREAT | O_RDWR);

  struct stat sb;
  fstat(obj_ptr->fd, &sb);
  obj_ptr->file_size = sb.st_size;
  obj_ptr->mmap = mmap(NULL, sb.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, obj_ptr->fd, 0);

  return obj_ptr;
}

In [None]:
%%writefile -a fmmap.c

void MAofRewrite(RedisModuleIO *aof, RedisModuleString *key, void *value)
{
  char buffer[0x200];
  MMapObject *obj_ptr = (MMapObject*)value;
  RedisModule_EmitAOF(aof, "MMAP", "scclc",
                      key,
                      obj_ptr->file_path,
                      obj_ptr->value_type,
                      obj_ptr->value_size,
                      "writable");
  RedisModule_EmitAOF(aof, "MCLEAR", "ss", key, obj_ptr->file_path);
  for (size_t i = 0; i < obj_ptr->file_size; i += sizeof(float)) {
    float value = *(float *)((uint8_t*)obj_ptr->mmap + i);
    sprintf(buffer, "%.16f", value);
    RedisModule_EmitAOF(aof, "MADD", "sbc", key, buffer);
  }
}

In [None]:
%%writefile -a fmmap.c

size_t MMemUsage(const void *value)
{
  const MMapObject *obj_ptr = value;
  return obj_ptr->file_size;
}

void MDigest(RedisModuleDigest *md, void *value)
{
  REDISMODULE_NOT_USED(md);
  REDISMODULE_NOT_USED(value);
}

In [None]:
%%writefile -a fmmap.c

#define CREATE_CMD(name, tgt, attr, key_pos, key_last)                     \
  do {                                                                     \
    if (RedisModule_CreateCommand(ctx, name, tgt, attr, key_pos, key_last, \
                                  1) != REDISMODULE_OK) {                  \
      return REDISMODULE_ERR;                                              \
    }                                                                      \
  } while (0);


In [None]:
%%writefile -a fmmap.c

int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
  REDISMODULE_NOT_USED(argv);
  REDISMODULE_NOT_USED(argc);

  RedisModule_Init(ctx, "FuchiMMap", 1, REDISMODULE_APIVER_1);

  RedisModuleTypeMethods tm = {.version = REDISMODULE_TYPE_METHOD_VERSION,
                               .rdb_load = MRdbLoad,
                               .rdb_save = MRdbSave,
                               .aof_rewrite = MAofRewrite,
                               .mem_usage = MMemUsage,
                               .free = MFree,
                               .digest = MDigest};

  MMapType = RedisModule_CreateDataType(ctx, "FuchiMMap", 0, &tm);

  // MMAP key file_path
  CREATE_CMD("MMAP", MMap_RedisCommand, "write fast", 1, 1);

  // MCLEAR key
  CREATE_CMD("MCLEAR", MClear_RedisCommand, "write fast", 1, 1);

  // MADD key value [value ...]
  CREATE_CMD("MADD", MAdd_RedisCommand, "write fast", 1, 1);

  // MGET key index
  CREATE_CMD("MGET", MGet_RedisCommand, "readonly fast", 1, 1);

  // MGET key index [index ...]
  CREATE_CMD("MMGET", MMGet_RedisCommand, "readonly fast", 1, 1);

  // MSET key index value [index value ...]
  CREATE_CMD("MSET", MSet_RedisCommand, "write fast", 1, 1);

  // MSIZE key
  CREATE_CMD("MSIZE", MSize_RedisCommand, "readonly fast", 1, 1);

  // MPOP key
  CREATE_CMD("MPOP", MPop_RedisCommand, "write fast", 1, 1);

  return REDISMODULE_OK;
}


In [None]:
%%writefile Makefile.fmmap

SHOBJ_CFLAGS ?= -W -Wall -fno-common -g -ggdb -std=c99 -O2
SHOBJ_LDFLAGS ?= -shared

.SUFFIXES: .c .so .xo .o

all: fmmap.so

CFLAGS := -march=native

.c.xo:
	$(CC) -I. -I../../deps/lua/src $(CFLAGS) $(SHOBJ_CFLAGS) -fPIC -c $< -o $@

fmmap.xo: ../redismodule.h

fmmap.so: fmmap.xo
	$(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc

clean:
	rm -rf *.xo *.so


In [None]:
!make -f Makefile.fmmap

In [None]:
!cp redis.conf redis_mmap.conf

In [None]:
%%writefile -a redis_mmap.conf
loadmodule /content/redis-stable/src/modules/fmmap.so

In [None]:
!sudo apt-get install redis

Reading package lists... Done
Building dependency tree       
Reading state information... Done
redis is already the newest version (5:5.0.7-2ubuntu0.1).
0 upgraded, 0 newly installed, 0 to remove and 21 not upgraded.


In [None]:
!nohup redis-server redis_mmap.conf &

/content/redis-stable/src/modules


In [None]:
%%writefile command
MMAP file.mmap

In [None]:
!redis-cli < command