In [1]:
from src.data_examples.ex2_data_loader_1m import ExampleDataLoader

import logging
import pandas as pd
import numpy as np
from pathlib import Path
from src.emulator import UnitProcess, TaskWrapper

pd.options.mode.chained_assignment = None

log_root_path = Path('./log')
log_root_path.mkdir(exist_ok=True)

In [2]:
data_loader = ExampleDataLoader()
data_loader.download().load().remap()

using cached file cache\datasets\movielens\ml-1m.zip
extracting zip file content:
file already exists: data\ex2_1m\ml-1m
file already exists: data\ex2_1m\ml-1m\movies.dat
file already exists: data\ex2_1m\ml-1m\ratings.dat
file already exists: data\ex2_1m\ml-1m\README
file already exists: data\ex2_1m\ml-1m\users.dat


  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(


<src.data_examples.ex2_data_loader_1m.ExampleDataLoader at 0x2e22e2230d0>

In [3]:
def init_logger(name, log_path=None):
  formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

  logger = logging.getLogger(name)
  logger.setLevel(logging.DEBUG)

  if not logger.hasHandlers():
    if log_path is None:
      ch = logging.StreamHandler()
    else:
      ch = logging.FileHandler(log_path)
    ch.setFormatter(formatter)
    logger.addHandler(ch)

  return logger


In [4]:
# fixed size units

movie_size = 10e6

num_cl = 100
num_ue = data_loader.df_users['user_id'].nunique()

df_movies = data_loader.df_movies
df_movies['sizeof'] = movie_size
df_movies['movie_id'] = df_movies['movie_id'].astype('int')
df_movies.set_index('movie_id', drop=False, inplace=True)

In [5]:
df_movies

Unnamed: 0_level_0,movie_id,title,genres,sizeof
movie_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
0,0,Toy Story (1995),Animation|Children's|Comedy,10000000.0
1,1,Jumanji (1995),Adventure|Children's|Fantasy,10000000.0
2,2,Grumpier Old Men (1995),Comedy|Romance,10000000.0
3,3,Waiting to Exhale (1995),Comedy|Drama,10000000.0
4,4,Father of the Bride Part II (1995),Comedy,10000000.0
...,...,...,...,...
3947,3947,Meet the Parents (2000),Comedy,10000000.0
3948,3948,Requiem for a Dream (2000),Drama,10000000.0
3949,3949,Tigerland (2000),Drama,10000000.0
3950,3950,Two Family House (2000),Drama,10000000.0


In [6]:
def handle_cache_overflow(tw, host, local_vars):
  tw.log_event('invoked: handle_cache_overflow')
  host.run_task('handle_disk_usage_calc')
  while local_vars.get('used') > local_vars.get('alloc'):
    host.run_task('handle_clean_cache')

def handle_disk_usage_calc(tw, host, local_vars):
  tw.log_event('invoked: handle_disk_usage_calc')
  disk = local_vars.get('disk')
  local_vars['used'] = sum([i['sizeof'] for i in disk])

def handle_clean_cache_lifo(tw, host, local_vars):
  tw.log_event('invoked: handle_clean_cache_lifo')
  local_vars.get('disk').pop(0)
  host.run_task('handle_disk_usage_calc')

def handle_clean_cache_fifo(tw, host, local_vars):
  tw.log_event('invoked: handle_clean_cache_fifo')
  local_vars.get('disk').pop(-1)
  host.run_task('handle_disk_usage_calc')

def handle_request(tw, host, local_vars, client_name, movie_id):
  tw.log_event(f'invoked: handle_request: {host.name} -> {client_name} | {movie_id}')
  client = host.get_child_by_name(client_name)
  if client is None:
    return

  disk_movie_ids = [i['movie_id'] for i in local_vars['disk']]
  content = None
  if movie_id in disk_movie_ids:
    tw.log_event(f'event: {host.name} <- {client_name} cache hit')
    content = local_vars['disk'].pop(disk_movie_ids.index(movie_id))
  else:
    tw.log_event(f'event: {host.name} <- {client_name} cache missed')
    parent = host
    db = None
    while not parent is None:
      db = parent.local_vars.get('movie_db')
      if db is None:
        parent = parent.parent
      else:
        content = db.loc[movie_id]
        break
  
  local_vars['disk'].append(content)
  host.run_task('handle_cache_overflow')

  client.local_vars['content_buf'] = content

def invoke_content_request(tw, host, local_vars, movie_id):
  tw.log_event(f'invoked: invoke_content_request: {host.name} | {movie_id}')
  content = None
  disk_movie_ids = [i['movie_id'] for i in local_vars['disk']]
  if movie_id in disk_movie_ids:
    tw.log_event(f'event: {host.name} cache hit')
    content = local_vars['disk'].pop(disk_movie_ids.index(movie_id))
    local_vars['content_buf'] = content
  else:
    tw.log_event(f'event: {host.name} cache missed')
    parent_server = host.parent
    if parent_server is None: return
    parent_server.run_task('handle_request', client_name=host.name, movie_id=movie_id)
    content = local_vars['content_buf']

  local_vars['disk'].append(content)
  host.run_task('handle_cache_overflow')


In [7]:
def handle_logging_shutdown(tw, host, local_vars):
  tw.log_event(f'invoked: handle_logging_shutdown')
  for handler in tw.host_logger.handlers[:]:
    handler.close()

In [8]:
bs = UnitProcess('BaseServer')
bs.set_logger(init_logger('BaseServer', log_path='log.log'))
bs.local_vars['movie_db'] = df_movies
bs.local_vars.update({
  'alloc': movie_size * 1000,
  'used': 0,
  'disk': list(),
  'content_buf': None,
})

bs.create_task('handle_cache_overflow').set_task(handle_cache_overflow)
bs.create_task('handle_used_storage_calc').set_task(handle_disk_usage_calc)
bs.create_task('handle_clean_cache').set_task(handle_clean_cache_lifo)
bs.create_task('handle_request').set_task(handle_request)
bs.create_task('handle_logging_shutdown').set_task(handle_logging_shutdown)

In [9]:
for i in range(num_cl):
  name = f'ClientServer [{i:03}]'
  print(name)
  cl = bs.spawn_child(name)
  cl.set_logger(init_logger(name, log_path='log.log'))
  cl.local_vars.update({
    'alloc': movie_size * 10,
    'used': 0,
    'disk': list(),
    'content_buf': None,
  })

  cl.create_task('handle_cache_overflow').set_task(handle_cache_overflow)
  cl.create_task('handle_used_storage_calc').set_task(handle_disk_usage_calc)
  cl.create_task('handle_clean_cache').set_task(handle_clean_cache_lifo)
  cl.create_task('handle_request').set_task(handle_request)
  cl.create_task('invoke_content_request').set_task(invoke_content_request)
  cl.create_task('handle_logging_shutdown').set_task(handle_logging_shutdown)



ClientServer [000]
ClientServer [001]
ClientServer [002]
ClientServer [003]
ClientServer [004]
ClientServer [005]
ClientServer [006]
ClientServer [007]
ClientServer [008]
ClientServer [009]
ClientServer [010]
ClientServer [011]
ClientServer [012]
ClientServer [013]
ClientServer [014]
ClientServer [015]
ClientServer [016]
ClientServer [017]
ClientServer [018]
ClientServer [019]
ClientServer [020]
ClientServer [021]
ClientServer [022]
ClientServer [023]
ClientServer [024]
ClientServer [025]
ClientServer [026]
ClientServer [027]
ClientServer [028]
ClientServer [029]
ClientServer [030]
ClientServer [031]
ClientServer [032]
ClientServer [033]
ClientServer [034]
ClientServer [035]
ClientServer [036]
ClientServer [037]
ClientServer [038]
ClientServer [039]
ClientServer [040]
ClientServer [041]
ClientServer [042]
ClientServer [043]
ClientServer [044]
ClientServer [045]
ClientServer [046]
ClientServer [047]
ClientServer [048]
ClientServer [049]
ClientServer [050]
ClientServer [051]
ClientServer

In [10]:
cl = bs.children[1]
cl.run_task('invoke_content_request', movie_id=420)
cl.run_task('invoke_content_request', movie_id=123)
cl.run_task('invoke_content_request', movie_id=123)
cl.run_task('invoke_content_request', movie_id=420)
cl.local_vars['content_buf']

movie_id                     420
title        Black Beauty (1994)
genres      Adventure|Children's
sizeof                10000000.0
Name: 420, dtype: object

In [11]:
cl.local_vars['disk']

[movie_id                                               123
 title       Star Maker, The (Uomo delle stelle, L') (1995)
 genres                                               Drama
 sizeof                                          10000000.0
 Name: 123, dtype: object,
 movie_id                     420
 title        Black Beauty (1994)
 genres      Adventure|Children's
 sizeof                10000000.0
 Name: 420, dtype: object]

In [12]:
cl = bs.children[-1]
cl.run_task('invoke_content_request', movie_id=234)
cl.run_task('invoke_content_request', movie_id=345)


bs.local_vars['disk']

[movie_id                     420
 title        Black Beauty (1994)
 genres      Adventure|Children's
 sizeof                10000000.0
 Name: 420, dtype: object,
 movie_id                                               123
 title       Star Maker, The (Uomo delle stelle, L') (1995)
 genres                                               Drama
 sizeof                                          10000000.0
 Name: 123, dtype: object,
 movie_id               234
 title       Ed Wood (1994)
 genres        Comedy|Drama
 sizeof          10000000.0
 Name: 234, dtype: object,
 movie_id                345
 title       Backbeat (1993)
 genres        Drama|Musical
 sizeof           10000000.0
 Name: 345, dtype: object]