## Number Crunching I

This notebook is mostly to mess around with loading the data,
as well as trimming and reserializing the columns we care about
in a lighter (\~20% size), and faster (~15x load) format (gzipped parquet)

In [1]:
import pandas as pd
import numpy as np

In [2]:
def mkPath(name, direction, trajectory, run, users, radius, ul, dl):
    return './out/%s%s_%s_run%d_usr%d_rad%.2f_ul%d_dl%d.txt' % (direction, name, trajectory, run, users, radius, ul, dl)
    
def readRlcStats(direction, trajectory, run, users, radius, ul, dl):
    fname = mkPath('RlcStats', direction, trajectory, run, users, radius, ul, dl)

    frame = pd.read_csv(fname, sep='\t', index_col=False,
                        header=0L,
                        names=[
                            'start', 'end',
                            'cellId', 'IMSI', 'RNTI', 'LCID',
                            'nTxPDUs', 'TxBytes', 'nRxPDUs', 'RxBytes',
                            'delay', 'delay.stdDev', 'delay.min', 'delay.max',
                            'PduSize', 'PduSize.stdDev', 'PduSize.min', 'PduSize.max'
                        ],
                        usecols=[
                            'start', 'end',
                            'cellId', 'IMSI', #'RNTI', 'LCID',
                            'nTxPDUs', 'TxBytes', 'nRxPDUs', 'RxBytes',
                            'delay', 'delay.stdDev', 'delay.min', 'delay.max',
                            'PduSize', 'PduSize.stdDev', 'PduSize.min', 'PduSize.max'
                        ],
                        dtype={
                            'start': 'float32', 'end': 'float32',

                            'cellId': 'UInt8', 'IMSI': 'UInt8', 'LCID': 'UInt8',

                            'nTxPDUs': 'UInt16', 'TxBytes': 'float32',
                            'nRxPDUs': 'UInt16', 'RxBytes': 'float32',

                            'delay':     'float32', 'delay.stdDev': 'float32',
                            'delay.min': 'float32', 'delay.max':    'float32',

                            'PduSize':     'float32', 'PduSize.stdDev': 'float32',
                            'PduSize.min': 'float32', 'PduSize.max':    'float32',
                        }
                    )
    

    frame.start = pd.to_timedelta(frame.start, 's')
    frame.end = pd.to_timedelta(frame.end, 's')
    
    # Make time column the index
    frame.set_index('start', inplace=True)
    
    return frame
    
def readDlRsrpSinrStats(trajectory, run, users, radius, ul, dl):
    fname = mkPath('RsrpSinrStats', 'Dl', trajectory, run, users, radius, ul, dl)

    frame = pd.read_csv(fname, sep='\t', index_col=False,
                        header=0L,
                        names=  ['time', 'cellId', 'IMSI', 'RNTI', 'rsrp', 'sinr', 'ComponentCarrierId'],
                        usecols=['time', 'cellId', 'IMSI', 'rsrp', 'sinr'],
                        dtype={
                            'time': 'float32',
                            'cellId': 'UInt8', 'IMSI': 'UInt8', 'RNTI': 'UInt8',
                            'rsrp': 'float32', 'sinr': 'float32',
                            'ComponentCarrierId': 'UInt8'
                        }
                       )

    # Convert SINR, RSRP to dB
    frame.sinr = 10 * np.log10(frame.sinr)
    frame.rsrp = 10 * np.log10(frame.rsrp)

    frame.time = pd.to_timedelta(frame.time, 's')
    
    # Make time column the index
    frame.set_index('time', inplace=True)
    
    return frame

def readUlSinrStats(trajectory, run, users, radius, ul, dl):
    fname = mkPath('SinrStats', 'Ul', trajectory, run, users, radius, ul, dl)

    frame = pd.read_csv(fname, sep='\t', index_col=False,
                        header=0L,
                        names=['time', 'cellId', 'IMSI', 'RNTI', 'sinr', 'componentCarrierId'],
                        usecols=['time', 'cellId', 'IMSI', 'sinr'],
                        dtype={
                            'time': 'float32',
                            'cellId': 'UInt8', 'IMSI': 'UInt8', 'RNTI': 'UInt8',
                            'sinr': 'float32', 'ComponentCarrierId': 'UInt8'
                        }
                       )

    # Convert SINR, RSRP to dB
    frame.sinr = 10 * np.log10(frame.sinr)
    
    frame.time = pd.to_timedelta(frame.time, 's')

    # Make time column the index
    #frame.set_index(['time', 'IMSI'], inplace=True)
    frame.set_index('time', inplace=True)
    
    return frame

def readApplicationStats(clientServer, trajectory, run, users, radius, ul, dl):
    fname = mkPath('Stats', clientServer, trajectory, run, users, radius, ul, dl)
    
    frame = pd.read_csv(fname, index_col=False)
    frame.rename(columns={ frame.columns[0]: 'time' }, inplace=True)
    frame.time = pd.to_timedelta(frame.time, 'min')
    frame.time = frame.time.round('s') # To correct our terrible encoding choices
    frame.set_index(['time'], inplace=True)

    # Correct integer overflows in byte counts
    frame[frame < -(2**30)] += 2**32
    
    # Unpivot the flowId into an index
    frame = pd.melt(frame, var_name='flowId', value_name='bytes', ignore_index=False)
    frame.flowId = frame.flowId.astype('uint8')
    #frame.set_index('flowId', append=True, inplace=True)
    
    return frame


In [3]:
def mkPathPacked(name, trajectory, run, users, radius, ul, dl):
    return './packed2/%s_%s_run%d_usr%d_rad%.2f_ul%d_dl%d.parquet.gzip' % (name, trajectory, run, users, radius, ul, dl)


def compressRun(trajectory, run, users, radius, ul, dl):
    args={'trajectory': trajectory, 'run': run, 'users': users, 'radius': radius, 'ul': ul, 'dl': dl}
    dlSinr, ulSinr, dlRlc, ulRlc, serverStats, clientStats = (
        readDlRsrpSinrStats(**args), readUlSinrStats(**args),
        readRlcStats('Dl', **args), readRlcStats('Ul', **args),
        readApplicationStats('server', **args), readApplicationStats('client', **args)
    )
    
    dlSinr.to_parquet(mkPathPacked('dlSinr', **args), compression='gzip', engine='fastparquet')
    ulSinr.to_parquet(mkPathPacked('ulSinr', **args), compression='gzip', engine='fastparquet')
    dlRlc.to_parquet(mkPathPacked('dlRlc', **args), compression='gzip', engine='fastparquet')
    ulRlc.to_parquet(mkPathPacked('ulRlc', **args), compression='gzip', engine='fastparquet')
    
    serverStats.to_parquet(mkPathPacked('serverStats', **args), compression='gzip', engine='fastparquet')
    clientStats.to_parquet(mkPathPacked('clientStats', **args), compression='gzip', engine='fastparquet')
    


In [7]:
from tqdm.auto import tqdm
from multiprocessing import Pool
from queue import Queue

q = []

def doWork(args):
    compressRun(*args)
    

# Adjust this for whatever we need to compress
for i in list(range(5, 10)) + list(range(15,20)) + list(range(25,30)):
    for traj in ['nm-0Wh-r0', 'nm-500Wh-r0', 'pso-0Wh-r0', 'pso-500Wh-r0']:
        q.append((traj, i, 5, 5, False, True))
    
with Pool(16) as p:
   r = list(tqdm(p.imap(doWork, q), total = len(q)))

# for i in tqdm(q):
#   doWork(i)

HBox(children=(HTML(value=''), FloatProgress(value=0.0, min=0.0, max=60.0), HTML(value='')))


