## Anomaly Detector

This process takes a streaming data of NAM and gives an anomalous score to each row 



### Strategy  for detecting anomaly
The most common approach to detect anomalies in statistics is to evaluate a mean and standard deviation. And the standard deviation equals the square root of the sample variance. An anomalous value is considered when it overseeds/below the mean plus/subtract 3 times of standard deviation.  

Based on this rule, we evaluated each incoming value as normal or anomaly.    

In [14]:
import math
def is_anomalous(value, mean, sampleVarience):
    standard_deviation = math.sqrt(sampleVarience)
    upper_bound = mean + (3 * standard_deviation)
    lower_bound = mean - (3 * standard_deviation)
    if(value > upper_bound or value < lower_bound):
        return True
    else: 
        return False

def update_mean(newValue, count, existingAggregate):   
    count = count + 1
    (mean, M2) = existingAggregate
    delta = newValue - mean
    mean += delta / count
    delta2 = newValue - mean
    M2 += delta * delta2
    return (mean, M2)  

### Strategy for grouping area

The location index from the NAM is latitude and longitude, it would be hard work to distinguish which points are in the same area. An approach is to transform the latitude and longitude to geo-hash. The longer the hashed value, the accurate result we will get. However, we are wondering there is not enough data for a specific point to have a fair mean and standard deviation, so we decided to take 6 bits from the geo-hash. In other words, the position has the first 6 identical geo-hash codes that will be considered as the same area. 

The *gh_features_dict* is a dictionary storing the 6-code geo-hash as key, and the value is a tuple of each column's mean and M2(i.e. squared distance from the mean)

In [15]:
gh_features_dict = {} # wxyz -> (count, pmw, ps, pt, hum, sd, ts, tt, prec, vs, vis) 

### Anomaly  Evaluation

We take 10 columns from the NAM, which are pressure_maximum_wind, pressure_surface, pressure_tropopause, humidity, snow_depth_surface, temperature_surface, temperature_tropopause, precipitation, vegetation_surface, and visibility.   
When one of the value is considered as anomaly, the row recieves 10 anomaly points in terms of anomalous score. Eventullay, it will output the row with the anomalous score. 


In [16]:
import datetime
import geohash

def is_float(value):
    try:
        float(value)
        return True
    except:
        return False

def calcuate_anomaly(line):
    
    # check the if incoming line is valid 
    if line.startswith('1_'): # header
        return ''
    
    variables = line.split("\t")
    
    if(len(variables) < 16): # Could be invalid text
        return ''
    
    # If a value cannot be converted from a stirng no a number, 
    # we give it a negative value so it would be considred as 
    # anomaly but not be counted to the mean
    for i in range(len(variables)):
        if is_float(variables[i]) == False:
            variables[i] = '-999999'
    
    # convert each column
    milliseconds = int(variables[0])
    dt = datetime.datetime.fromtimestamp(milliseconds/1000.0)
    lat = float(variables[1])
    lon = float(variables[2])
    pressure_maximum_wind = float(variables[5])
    pressure_surface = float(variables[6])
    pressure_tropopause = float(variables[7])
    humidity = float(variables[8])
    snow_depth_surface = float(variables[9])
    temperature_surface = float(variables[10])
    temperature_tropopause = float(variables[11])
    precipitation = float(variables[13])
    vegetation_surface = float(variables[14])                          
    visibility = float(variables[15])
    
    # put features to a tuple, the 1 in the first index means count
    newValues = [1, pressure_maximum_wind, pressure_surface, pressure_tropopause, humidity, \
                snow_depth_surface, temperature_surface, temperature_tropopause, precipitation, \
                vegetation_surface, visibility]
    try:
        # get 6-bit geo-hashed 
        gh = geohash.encode(lat, lon)[0:6]
    except:
        return ''
    
    # define the anomalous_score
    anomalous_score = 0
    if gh in gh_features_dict: # 
        features = gh_features_dict[gh]
        cnt = features[0]
        if(cnt >= 3): # To get a fair result, the calculation will not be invoked until an area has at least 3 records 
            for i in range(1, 10):
                (mean, M2) = features[i]
                sampleVariance =  M2 / (cnt - 1)
                anomalous_score += 10 if is_anomalous(newValues[i], mean, sampleVariance) else 0
    else : # if no geo code in the gh_features_dict, we simply give it a empty record
        features = [0, (0,0), (0,0), (0,0), (0,0), (0,0), (0,0), (0,0), (0,0), (0,0), (0,0)]
    
    cnt = features[0]
    features[0] = cnt + 1
    
    # No matter the data is normal or not, it will be counted into the location's mean and M2.
    # However, a negative value will not being counted because the value is invalid(from the pre-process)
    for i in range(1, 10) : 
        if newValues[i] < 0 :
            continue
        features[i]  = update_mean(newValues[i], cnt, features[i])
    gh_features_dict[gh] = features
    
    # In the end, we return the original line with the anomalous score
    return line[0:len(line)-1] + '\t' + str(anomalous_score) + '\n'

### Start processing
Now let's open a socket waiting for the data emitter, and output to a file(anomaly-detector-output.txt).

In [17]:
# clean up exisitng data
output_file_name = "anomaly-detector-output.txt"
f = open(output_file_name, "w")
f.write("")
f.close()

def write_rdd_to_file(rdd):
    f = open(output_file_name, "a")
    for line in rdd:
        if not line or len(line) == 0:
            continue
        f.write(line)
    f.close()


In [18]:
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 0.5)
ssc.checkpoint("checkpoint")
lines = ssc.socketTextStream("orion03", 12889)
processed = lines.map(lambda line: calcuate_anomaly(line))
processed.foreachRDD(lambda rdd: write_rdd_to_file(rdd.collect()))


In [19]:
ssc.start()

In [8]:
ssc.stop(stopSparkContext=False)
