# Test Processing for NDT Data

This notebook tests the data processing proceedure for the NDT data generated by the RaspberryPi test program `test_data_generator.py`.

In [None]:
## Install packages as needed
#install.packages("mvoutlier", repos='http://cran.us.r-project.org')

In [1]:
# Load libraries and set the plot preferences for R in ipython on Windows.
options(jupyter.plot_mimetypes = 'image/png')
library(plyr)

We want to do three types of cumulative calculations to find outliers:
1. Find the cumulative average for the avgrtt number for each client subnet
2. Find the cumulative distribution size (standard deviation) for the avgrtt measurement for each client subnet
3. Determine if the current value (in the time series) is an outlier based on the current cumulative average and standard deviation.

We have three helper functions that calculate these.

We also calculate the data frequency for each client subnet: how many queries per day, on average, has the client subnet made?

In [24]:
cumavg <- function( x ){

    lx <- length(x)
    updatevals <- c( 2:lx )
    nmean <-1
    nmean[1] <- x[1]

    for(k in updatevals){
        nmean[k] <- (nmean[k-1] * (k-1) + x[k])/k
    }

    return(nmean)

    invisible()
}

cumsigma <- function( x ){
    lx <- length(x)
    updatevals <- c( 2:lx )
    nvar <- nmean <-1
    nvar[1]  <- 0.0
    nmean[1] <- x[1]

    for(k in updatevals){
        nmean[k] <- (nmean[k-1] * (k-1) + x[k])/k
        nvar[k] <- ((nvar[k-1] + nmean[k-1]*nmean[k-1])*(k-1) + x[k]*x[k] )/k - nmean[k]*nmean[k]   
    }

    return(sqrt(nvar))

    invisible()

}

outliertest <- function( x, epsilon=0.75 ){
    #epsilon is the threshold above which we consider a point an outlier. Typically 0.75
    #we only consider outliers if the point is above the average (not below) since we are interested in high network traffic
    lx <- length(x)
    updatevals <- c( 2:lx )
    nvar <- nmean <- outliers <-1
    nvar[1]  <- 0.0
    nmean[1] <- x[1]
    outliers[1] <- 0 #by definition the first one is not an outlier

    for(k in updatevals){

        nmean[k] <- (nmean[k-1] * (k-1) + x[k])/k
        nvar[k] <- ((nvar[k-1] + nmean[k-1]*nmean[k-1])*(k-1) + x[k]*x[k] )/k - nmean[k]*nmean[k]
        if (k==2){
            outliers[k] <- 0
        } else {
            if (nvar[k] > 0){
             outliers[k] <- (abs(x[k] - nmean[k])/sqrt(nvar[k])) > epsilon && x[k] > nmean[k]
            }else{
             outliers[k] <- 0
            }

            if (outliers[k] > 0){
                nvar[k] <- nvar[k-1]
            }
        }
     
    }

    return(outliers)

    invisible()

}

calcnfreq <- function( x, interval=365){
    #count the number of events in the last year
    nfreq <- 1
    nfreq[1] <- 0
    npts <- length(x)
    for (k in c(2:npts)){
        nfreq[k] <- (nfreq[k-1] + 1/as.numeric(difftime(x[k],x[k-1],units='secs')))/2
    }
    # convert seconds to days for the frequency
    return(nfreq*86400)
    invisible()
}



In order to test that these functions worked properly, I created a manual set of data and tested the cumulative sums and averages against the functions. They work as they should.

In [26]:
#Testing these against manual calculations
cumavg(c(1,8,654,132,4654,8,464))
cumsigma(c(1,8,654,132,4654,8,464))
outliertest(c(1,8,654,132,4654,8,464))

I next load in the data and convert the daytime and dataday.

In [27]:
df <- read.csv('tempdata.log',stringsAsFactors=FALSE, header=FALSE, 
               col.names=c("daytime","serverip","clientip","clientport","clientlat","clientlon","sumrtt","countrtt",
                           "avgrtt","clientsub"))
df<-df[with(df, order(clientsub,daytime)), ]

# I need an initial temporary count of each entry to caclculate the number of queries per clientsub
df$tempcount<-1

df$asdate <- as.POSIXct(strptime(df$daytime, "%Y-%m-%dT%H:%M:%S+00:00"));
df$dataday<-format(df$asdate, "%Y%m%d")
str(df)

'data.frame':	1000 obs. of  13 variables:
 $ daytime   : chr  "2015-01-08T16:46:15+00:00" "2015-01-14T15:37:47+00:00" "2015-01-30T22:20:35+00:00" "2015-02-01T01:48:01+00:00" ...
 $ serverip  : chr  "serverip" "serverip" "serverip" "serverip" ...
 $ clientip  : chr  "OVTSW" "GKHTB" "UUNXV" "ZLDSO" ...
 $ clientport: int  23867 88927 69359 87973 80445 21236 50135 63079 11727 58854 ...
 $ clientlat : num  -21.87 38.51 79.45 -53.63 3.14 ...
 $ clientlon : num  175.13 80.11 -8.02 55.96 -146.82 ...
 $ sumrtt    : int  414 43 596 763 984 990 540 674 330 723 ...
 $ countrtt  : int  32065 28382 81681 94393 36793 61314 44812 97033 92090 72752 ...
 $ avgrtt    : num  77.5 660 137 123.7 37.4 ...
 $ clientsub : chr  "A" "A" "A" "A" ...
 $ tempcount : num  1 1 1 1 1 1 1 1 1 1 ...
 $ asdate    : POSIXct, format: "2015-01-08 16:46:15" "2015-01-14 15:37:47" ...
 $ dataday   : chr  "20150108" "20150114" "20150130" "20150201" ...


Now I use the `ddply` function to apply each of the transforms to groups of data with the same clientsub, then recombine them back into the base dataframe.

In [30]:
df<-ddply(df,c("clientsub"),
      transform,
      ninsub = cumsum(tempcount))
df<-ddply(df,c("clientsub"),
          transform,
          avginsub = cumavg(avgrtt))
df<-ddply(df,c("clientsub"),
          transform,
          sigmainsub = cumsigma(avgrtt))
df<-ddply(df,c("clientsub"),
          transform,
          outlier = outliertest(avgrtt))

df<-ddply(df,c("clientsub"),
          transform,
          datafreq = calcnfreq(asdate))
head(df[with(df, order(clientsub,daytime)), ],3)


daytime,serverip,clientip,clientport,clientlat,clientlon,sumrtt,countrtt,avgrtt,clientsub,tempcount,asdate,dataday,ninsub,avginsub,sigmainsub,outlier,datafreq
2015-01-08T16:46:15+00:00,serverip,OVTSW,23867,-21.8695,175.126212,414,32065,77.45169,A,1,2015-01-08 16:46:15,20150108,1,77.45169,0.0,0,0.0
2015-01-14T15:37:47+00:00,serverip,GKHTB,88927,38.50504,80.113544,43,28382,660.04651,A,1,2015-01-14 15:37:47,20150114,2,368.7491,291.2974,0,0.08399897
2015-01-30T22:20:35+00:00,serverip,UUNXV,69359,79.44545,-8.021539,596,81681,137.04866,A,1,2015-01-30 22:20:35,20150130,3,291.51562,261.724,0,0.07271254


I then select the columns that I need for the HIVE schema. This will create a base of learning for each clientsub to be used to measure future outliers.

In [31]:
dfout <- df[,c('daytime','serverip','clientip','clientport','clientlat','clientlon',
  'sumrtt','countrtt','avgrtt','ninsub','avginsub','sigmainsub','datafreq',
  'outlier','dataday','clientsub')]
dfout<-dfout[with(dfout, order(dataday)), ]
head(dfout,3)

Unnamed: 0,daytime,serverip,clientip,clientport,clientlat,clientlon,sumrtt,countrtt,avgrtt,ninsub,avginsub,sigmainsub,datafreq,outlier,dataday,clientsub
382,2015-01-01T08:45:17+00:00,serverip,QHXUW,89910,12.835674,-137.29536,68,62347,916.86765,1,916.86765,0,0,0,20150101,K
878,2015-01-01T19:24:43+00:00,serverip,TVPSH,93285,-37.492239,10.66631,725,32741,45.16,1,45.16,0,0,0,20150101,X
268,2015-01-02T16:45:09+00:00,serverip,QAJRE,97349,9.915895,92.48581,965,94065,97.47668,1,97.47668,0,0,0,20150102,H


Finally, I write the data out to a file for transfer to Hadoop and Hive.

In [32]:
write.table(dfout,file='processed_tempdata.log.csv',row.names=FALSE, col.names=FALSE,quote=FALSE,sep=",")