In [6]:
import sys
import operator
import csv
from os import path
from copy import copy
from tqdm import *
import numpy as np
import pandas as pd

In [7]:
traces = [
    ('T1' , '/home/tiratatp/Repositories/snia_traces/T1/LiveMapsBackEnd/Combined/disk1_filtered_1hrs.txt'),
    ('T2' , '/home/tiratatp/Repositories/snia_traces/T2/DisplayAdsDataServer/Combined/disk0_1hrs.txt'),
    ('T3' , '/home/tiratatp/Repositories/snia_traces/T3/DisplayAdsPayload/Combined/disk0_1hrs.txt'),
    ('T4' , '/home/tiratatp/Repositories/snia_traces/T4/Exchange-Server-Traces/Combined/disk8_1hrs.txt'),
    ('T5' , '/home/tiratatp/Repositories/snia_traces/T5/MSNStorageCFS/Combined/disk6_filtered_1hrs.txt'),
    ('T6' , '/home/tiratatp/Repositories/snia_traces/T6/MSNStorageFileServer/Combined/disk5_filtered_1hrs.txt'),
    ('T7' , '/home/tiratatp/Repositories/snia_traces/T7/BuildServer/Combined/disk0_filtered_1hrs.txt'),
    ('T8' , '/home/tiratatp/Repositories/snia_traces/T8/DevelopmentToolsRelease/Combined/disk6_filtered_1hrs.txt'),
    ('T9' , '/home/tiratatp/Repositories/snia_traces/T9/RadiusAuthentication/Combined/disk0_filtered_1hrs.txt'),
    ('T10', '/home/tiratatp/Repositories/snia_traces/T10/RadiusBackEndSQLServer/Combined/disk4_filtered_1hrs.txt'),    
]

last_block=1953525167

#cutoff size in KB
policy_size = 32

#info about size of preallocated shelters:
sheltersize = 10
shelterrange = 100

# swap shelter if it's full
swap = True

#cleanup = False
cleanup = True


In [8]:
# from Nora
"""
Sheltering Simulator.
Writes <= `policy_size` KB are sheltered.
"""
#MB in sectors
one_MB = 2 * 1024

x = sheltersize * one_MB
y = shelterrange * one_MB
assert x < y

#FUNCTIONS
def round_down(num, divisor):
    return num - (num%divisor)

#return 1st block number of next shelter after block_num
def get_next_shelter (block_num):
    shelter = round_down(block_num, y) + (x * 9)
    #If shelter starts before block_num, last write must have been sheltered too
    #That's okay--we just want beginning of shelter
    return shelter

#given a request, break it into a list of requests 
#none of which overlaps with a shelter
def dodge_shelters(req):
    start = req[2]
    size = req[3]
    next_shelter = get_next_shelter(start)
    if start + size <= next_shelter:
        return [req]
    else:
        overlap = start + size - next_shelter
        new_req = copy(req)
        new_req[3] = size - overlap
        next_req = copy(req)
        next_req[2] = next_shelter + x
        next_req[3] = overlap
        new_reqs = dodge_shelters(next_req)
        new_reqs.append(new_req)
        return new_reqs

#CLASSES
class Shelter:
    """Keep track of information about shelter,
    including how full it is and requests in it that need to be cleaned."""
    def __init__(self, blk):
        self.start = blk
        self.tail = blk
        self.end = self.start + x
        assert(self.end % y == 0)
        #a request is of the form (blknum, blksize)
        #we only keep track of these if we are cleaning up
        self.reqs = []
        return
    def enough_space(self, reqs):
        # we don't really care about this we just want to enable "cleanup" to keep track of blocks
        #return True
        if not(cleanup):
            #no cleanup, so doesn't matter
            return True
        else:
            total_size = sum([row[3] for row in reqs])
            if total_size <= (self.end - self.tail):
                return True
            else:
                return False
    def shelter_writes(self, current_reqs):
        global window
        #assert self.enough_space(current_reqs.requests) 
        if swap:
            while not self.enough_space(current_reqs.requests):
                # find the least crowded shelter (window size of `window`)
                sorted_shelters = sorted(shelters.items(), key=operator.itemgetter(0))
                shelter_by_size = map(lambda tup: sum([bcount for _, bcount in tup[1].reqs]), sorted_shelters)
                shelter_by_moving_average_size=list(np.convolve(shelter_by_size, np.ones(window)/window, mode='same'))
                selected = sorted_shelters[np.argmin(shelter_by_moving_average_size)]

                # swap requests
                tmp = selected[1].reqs
                selected[1].reqs = self.reqs
                self.reqs = tmp

                # change tails
                selected_size = selected[1].tail - selected[1].start
                current_size = self.tail - self.start
                selected[1].tail = selected[1].start + current_size
                self.tail = self.start + selected_size

                if not self.enough_space(current_reqs.requests):    
                    if window == 1 or np.average(shelter_by_size) >= (x * 0.9):
                        raise ValueError("Shelters are really full!")                    
                    window /= 2

        #if we need to clean up later, consolidate this list of requests,
        #which we know are sequential, into one request, remembering
        #where it will need to be written later
        if cleanup:
            cleanup_req_blk = current_reqs.requests[0][2]
            cleanup_req_size = 0
        for req in current_reqs.requests:
            size = req[3]
            req[5] = 1 # mark as sheltered
            #make sure request isn't larger than whole shelter
            if size > x:
                print req
                print x
                sys.exit(0)
            if self.tail + size > self.end:
                #out of space, back to beginning of buffer
                self.tail = self.start
            #modify request to write to tail of shelter
            req[2] = self.tail
            self.tail += size
            if cleanup:
                cleanup_req_size += size
        if cleanup:
            self.reqs.append((cleanup_req_blk, cleanup_req_size))
        return

class CurrentReq:
    """Keep track of information about current sequential series of writes."""
    def __init__(self, request):
        #total size of current series of requests
        self.size = request[3]
        #are these reads or writes?
        self.flag = request[4]
        #we used pandas to calculate this
        self._should_shelter = request[5]
        #the requests themselves
        self.requests = [request]        
        return
    #the next sequential block in this series
    def get_next_blk(self):
        #we just look at last block; after shifting, this series may no longer be sequential
        return self.requests[-1][2] + self.requests[-1][3]
    def should_shelter(self):
        return self._should_shelter
    def shift_requests(self):
        #if our code is B, don't modify requests
        #return
        # since in our implementation, we don't shift
        #if args.code != "B":
            new_reqs = []
            for req in self.requests:
                start_blk = req[2]
                size = req[3]
                #num_shelters is number of shelters that END behind the FIRST 
                #block of the request
                num_shelters = start_blk / y
                shift = num_shelters * x
                new_start = start_blk + shift
                next_shelter = get_next_shelter(new_start)
                if new_start >= next_shelter:
                    #new_start is inside a shelter, shift it to just after shelter
                    #new_start = next_shelter + x
                    new_start += x
                    next_shelter = get_next_shelter(new_start)
                req[2] = new_start
                #split up request as needed so it doesn't run into a shelter
                split_reqs = dodge_shelters(req)
                split_reqs.reverse()
                if not split_reqs:
                    print "no split_reqs!"
                new_reqs.extend(split_reqs)
            self.requests = new_reqs
            return

#HERE IS ANOTHER FUNCTION    
            
#modify requests in request[disk_num] so they're sheltered
def shelter_writes (current_requests):
    #if tail is negative, there's nothing to shelter behind,
    #because this is the first request for this disk
    #so don't change it
    if tail >= 0:
        shelter_blk = get_next_shelter(tail)
        if shelter_blk < tail and not(last_sheltered):
            #the tail is inside a shelter
            #this should only happen if we're in policy B, so not shifting
            #in which case we jump to next shelter
            #assert (args.code == "B")
            #if args.code != "B":
            print "shelter block: "+str(shelter_blk)
            print "tail: "+str(tail)
            sys.exit(0)
            shelter_blk += y
        if shelter_blk not in shelters:
            raise ValueError("This should not happen! Try increase `shelter_count` (shelter_blk: %d, shelter#: %d)" % (shelter_blk, (shelter_blk-(x*9))/y,))
        shelter = shelters[shelter_blk]
        shelter.shelter_writes(current_requests)
    return

In [9]:
#MAIN PROCESSING LOOP
# Heavily modified to use pandas
for _, trace in traces:
    print _, trace
        
    # read trace into memory
    t = pd.read_csv(trace, delimiter=' ', usecols=[0,1,2,3,4], \
                    header=None, names=['offset', 'dskno', 'blkno', 'blkcount', 'flag'], \
                    dtype={'offset':np.str_, 'dskno':np.int_, 'blkno':np.int_, 'blkcount':np.int_, 'flag':np.int_}, \
                    na_filter=False, engine='c')

    # 1. First we need to merge sequential IOs
    # compare previous IO and tag it if it's sequential
    t['is_seq'] = (t['flag'].shift(1) == t['flag']) & ((t['blkno'] + t['blkcount']).shift(1) == t['blkno'])
    # use cumsum to help group the IO
    t['io_num'] = (~t['is_seq']).astype(int).cumsum()
    # merge sequential IO
    t = t.groupby(['io_num'], as_index=False).agg({
            'offset': 'first',
            'dskno': 'first',
            'blkno': 'min',
            'blkcount': 'sum',
            'flag': 'first',
        }).reset_index()
    
    # 2. Set up all the shelters
    # find the number of shelter by finding the highest tail
    t['tail'] = t['blkno'] + ((t['blkno'] / y) * x) + t['blkcount']
    shelter_count = np.floor(t['tail'].max() / y)
    
    # 3. Add the fifth columns
    t['is_shltr'] = ((t['blkcount'] <= policy_size * 2) & (t['flag'] == 0))
    
    # 4. reorder to the right columns
    t = t[['offset', 'dskno', 'blkno', 'blkcount', 'flag', 'is_shltr']]

    # full_shelter_search_window
    window = 100

    #INITIALIZE
    #shelters: look up shelter by blk_num
    shelters = {}

    # initialize all shelters
    i=0
    while i <= shelter_count:    
        shelter_blk = y * i + (x * 9)
        shelters[shelter_blk] = Shelter(shelter_blk)
        i += 1
    tail = -1
    current_reqs = None

    #remember whether last write was sheltered; 
    #this affects how we shelter a write when the current tail is inside a shelter
    last_sheltered = False
    
    # generate output files' name
    trace_path_comp = path.split(trace)
    trace_file_comp = trace_path_comp[1].split('.')
    outtrace = path.join(trace_path_comp[0], "%s_sheltered.txt" % trace_file_comp[0])
    outshelter = path.join(trace_path_comp[0], "%s_shelter_read.txt" % trace_file_comp[0])        

    with open(outtrace, "w") as output:
        writer = csv.writer(output, delimiter=' ')
        # convert our dataframe into regular list of list so that Nora's code still work
        reader = t.values.tolist()
        reader = map(lambda x: [x[0], int(x[1]), int(x[2]), int(x[3]), int(x[4]), int(x[5])], reader)
        
        for row in tqdm(reader):
            current_reqs = CurrentReq(row)
            if current_reqs.should_shelter():
                shelter_writes(current_reqs)
                last_sheltered = True
            else:
                current_reqs.shift_requests()
                last_sheltered = False
                
            #Now that requests have been modified as needed,
            #we can write them to outfile
            for req in current_reqs.requests:
                writer.writerow(req)

            #update tail
            tail = current_reqs.get_next_blk()
        
    # Done! Calc some stats
    sorted_shelters = sorted(shelters.items(), key=operator.itemgetter(0))
    #shelters_by_bcount = map(lambda tup: (tup[0], sum([bcount for _, bcount in tup[1].reqs])), sorted_shelters)    
    #print "total sheltered:", sum([bcount for _, bcount in shelters_by_bcount])
    
    with open(outshelter, "w") as out:    
        for block, shelter in sorted_shelters:
            if len(shelter.reqs) == 0:
                continue
            t = {
                "time": 0,
                "devno": 0,
                "blkno": block,
                "bcount": sum([bcount for _, bcount in shelter.reqs]),
                "flags": 1, # read
            };
            out.write("%s %d %d %d %d\n" % ("{0:.3f}".format(t['time']), t['devno'], t['blkno'], t['bcount'], t['flags']))

T1 /home/tiratatp/Repositories/snia_traces/T1/LiveMapsBackEnd/Combined/disk1_filtered_1hrs.txt
T2 /home/tiratatp/Repositories/snia_traces/T2/DisplayAdsDataServer/Combined/disk0_1hrs.txt
T3 /home/tiratatp/Repositories/snia_traces/T3/DisplayAdsPayload/Combined/disk0_1hrs.txt
T4 /home/tiratatp/Repositories/snia_traces/T4/Exchange-Server-Traces/Combined/disk8_1hrs.txt
T5 /home/tiratatp/Repositories/snia_traces/T5/MSNStorageCFS/Combined/disk6_filtered_1hrs.txt
T6 /home/tiratatp/Repositories/snia_traces/T6/MSNStorageFileServer/Combined/disk5_filtered_1hrs.txt
T7 /home/tiratatp/Repositories/snia_traces/T7/BuildServer/Combined/disk0_filtered_1hrs.txt
T8 /home/tiratatp/Repositories/snia_traces/T8/DevelopmentToolsRelease/Combined/disk6_filtered_1hrs.txt
T9 /home/tiratatp/Repositories/snia_traces/T9/RadiusAuthentication/Combined/disk0_filtered_1hrs.txt
T10 /home/tiratatp/Repositories/snia_traces/T10/RadiusBackEndSQLServer/Combined/disk4_filtered_1hrs.txt
