# Sensos Signal Processing - Prototype

The purpose of this notebook is to prototype the processing of a sensor signal where 5 consecutive values above an upper threshold or 5 consecutive values below a lower threshold result in an event being recorded.

To simulate a signal this notebook will make use of an array of random numbers between 1 and 10
An event is the fact that there have been 5 consecutive signals surpassing the upper or lower threshold. Signals surpassing the upper threshold will result in a positive event. Signals surpassing the lower threshold will result in a negative event.
An event consists of an ID, an event type indicating it is a positive or negative event, and the array index of the last occurrance where the signal surpassed the threshold.

The assumption for this prototype is that after an event has been created, thus 5 consecutive signals have been processed, and the next signal also surpasses the threshold then this would be the first occurance of the next event (providing there are 4 more consecutive signals surpassing the threshold)

In [1]:
# import required libraries
import numpy as np

In [2]:
signal = np.random.randint(1,10,100000)
upperThreshold = 7
lowerThreshold = 4

In [3]:
test = np.random.randint(1,10,10)
for i in test:
    print(i)

print(len(test))
print(test[0])

1
5
9
9
7
5
5
1
4
7
10
1


In [4]:
eventLog = [] # create empty event log list to collect events
signalCount = 0
index = 0

#check
#print(eventLog)
#print(len(eventLog))
#eventLog.append([len(eventLog)+1, ('upper', 7)])
#print(len(eventLog))
#print(eventLog)

In [5]:
for i in signal:
    index = index + 1
    if i >= lowerThreshold and i <= upperThreshold:
        signalCount = 0 # reset count of signals
    elif i > upperThreshold and signalCount <= 0:
        signalCount = 1 # first signal of upper event
    elif i < lowerThreshold and signalCount >= 0:
        signalCount = -1 # first signal of lower event
    elif i > upperThreshold and signalCount < 4:
        signalCount = signalCount + 1 # next signal is also over upper threshold, but not yet reached 5 consecutive signals
    elif i < lowerThreshold and signalCount > -4:
        signalCount = signalCount - 1 # next signal is also under lower threshold, but not yet reached 5 consecutive signals
    elif i > upperThreshold and signalCount == 4:
        eventLog.append([len(eventLog)+1, ('upper', index)]) # insert upper event in event log
        signalCount = 0 # set signal count back to zero for next event
    elif i < lowerThreshold and signalCount == -4:
        eventLog.append([len(eventLog)+1, ('lower', index)]) # insert lower event in event log
        signalCount = 0 # set signal count back to zero for next event
    else:
        print(i)
        print('unexpected result - review code')
        break
        
        

# Next version to include sensor files


In [6]:
# import required lybraries/functions
import datetime
from random import randint

In [7]:
# variables for this program


In [29]:
# create dummy sensor files

for i in range(10):
    dt = datetime.datetime(2019, 11, 3) + datetime.timedelta(minutes = 1*i)
    filename = 'sensors/sensor1/sensor1_' + dt.strftime('%Y%m%d%H%M%S') + '.txt'
    #print(filename)
    file = open(filename, 'a')
    #file.write('sensorid, datetime, value\n')
    file.write('sensor1,'+ dt.strftime('%Y%m%d%H%M%S') + ',' + str(randint(1,10)) + '\n')
    file.close()


In [30]:
# read existing sensor files from direcory
from os import listdir
from os.path import isfile, join

mypath = 'sensors/sensor1/'

sensorFiles = [f for f in listdir(mypath) if isfile(join(mypath, f))]
print(sensorFiles)

['sensor1_20191103000000.txt', 'sensor1_20191103000100.txt', 'sensor1_20191103000200.txt', 'sensor1_20191103000300.txt', 'sensor1_20191103000400.txt', 'sensor1_20191103000500.txt', 'sensor1_20191103000600.txt', 'sensor1_20191103000700.txt', 'sensor1_20191103000800.txt', 'sensor1_20191103000900.txt']


In [31]:
#test read files
for f in sensorFiles:
    mf = mypath + f
    with open(mf,'r') as sf:
        lines = sf.readlines()
        #print(lines)
        for l in lines:
            l = l.rstrip()
            print(l)

sensor1,20191103000000,5
sensor1,20191103000100,5
sensor1,20191103000200,2
sensor1,20191103000300,5
sensor1,20191103000400,8
sensor1,20191103000500,10
sensor1,20191103000600,9
sensor1,20191103000700,4
sensor1,20191103000800,4
sensor1,20191103000900,1


In [36]:
#sc.stop

<bound method SparkContext.stop of <SparkContext master=local[4] appName=Sensor Signal Processing Prototype>>

In [37]:
#Create Spark Session
from pyspark import SparkContext, SparkConf
import pandas as pd

conf = SparkConf().setAppName("Sensor Signal Processing Prototype").setMaster("local[4]")
sc = SparkContext(conf=conf)


ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=Sensor Signal Processing Prototype, master=local[4]) created by __init__ at <ipython-input-19-f12ff0fb290c>:6 

In [38]:
#Read files into DataFrame

sensorReadings = sc.textFile(r"C:\Users\kiewi\Documents\python\Notebooks\sensors\sensor1\sensor*.txt")
print(sensorReadings)
print(sensorReadings.take(3))

C:\Users\kiewi\Documents\python\Notebooks\sensors\sensor1\sensor*.txt MapPartitionsRDD[8] at textFile at <unknown>:0
['sensor1,20191103000000,5', 'sensor1,20191103000100,5', 'sensor1,20191103000200,2']


In [42]:
#create (key, value) pair and sort on key

srSorted = sensorReadings.map(lambda x: x.split(',')).map(lambda x: ((x[0],x[1]),x[2]))

print(srSorted.take(3))

[(('sensor1', '20191103000000'), '5'), (('sensor1', '20191103000100'), '5'), (('sensor1', '20191103000200'), '2')]
