<a href="https://colab.research.google.com/github/marcochisci/Anomaly_detection/blob/main/Anomaly_detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Install Apache Spark 3.0.0

In [1]:
# install Java8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# download spark3.0.0
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
# unzip it
!tar xf spark-3.0.0-bin-hadoop3.2.tgz
# install findspark 
!pip install -q findspark

# Set Environment Variables

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

#Create local spark session


In [3]:
import findspark
findspark.init()
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

Installation Test and pyspark version

In [4]:
#create a test schema
from pyspark.sql.types import *
from pyspark.sql import Row

schema = StructType([StructField('name', StringType()), StructField('age',IntegerType())])
rows = [Row(name='Severin', age=33), Row(name='John', age=48)]
df = spark.createDataFrame(rows, schema)

df.printSchema()
df.show()


# Check the pyspark version
import pyspark
print(pyspark.__version__)


root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

+-------+---+
|   name|age|
+-------+---+
|Severin| 33|
|   John| 48|
+-------+---+

3.0.0


# Airquino Table Data

In [5]:
import pandas as pd
import psycopg2
import datetime
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import matplotlib.dates as mdates


conn = psycopg2.connect(host='playground.magentalab.it', port='55432', database='airqino', user='datareader', password='homntLZnlhQd9prtVA9SFezQek')

query = """select * from crosstab(
  'select sd.data_acquired as timestamp, s.sensor_type ,  sd.float_value as value 
from station_data sd join sensor s on s.id = sd.sensor_id
where sd.data_acquired >= to_timestamp(''2021-06-01 00:00:00'', ''YYYY-mm-dd HH24:MI:SS'')
and sd.station_id = 23284701 order by sd.data_acquired asc, 1'
 , $$VALUES ('O3'::varchar), ('PM2.5'::varchar), ('CO2'::varchar), ('AUX1'::varchar), ('Temp. int.'::varchar), ('T'::varchar)
 , ('PM10'::varchar), ('RH'::varchar), ('NO2'::varchar), ('CO'::varchar), ('AUX2'::varchar), ('VOC'::varchar)$$
) AS value ("Dates" timestamp, "O3" float, "PM2.5" float, "CO2" float, "AUX1" float, "Temp. int." float, "T" float, "PM10" float,
"RH" float, "NO2" float, "CO" float, "AUX2" float, "VOC" float);""".format(29510692, 23284701)  #23284701 is a specific station

df = pd.read_sql(query, conn)
df= df.set_index('Dates')

#removing nans
df = df.dropna()

display(df.head(10))

  """)


Unnamed: 0_level_0,O3,PM2.5,CO2,AUX1,Temp. int.,T,PM10,RH,NO2,CO,AUX2,VOC
Dates,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
2021-06-29 10:40:00,466.0,2.0,414.0,1111.0,3896.0,276.0,9.0,512.0,301.0,226.0,91.0,358.0
2021-06-29 10:42:00,470.0,2.0,413.0,1111.0,3983.0,279.0,11.0,513.0,335.0,227.0,91.0,366.0
2021-06-29 10:46:00,480.0,2.0,414.0,1111.0,4147.0,286.0,10.0,487.0,291.0,226.0,93.0,365.0
2021-06-29 10:48:00,476.0,2.0,413.0,1111.0,4179.0,287.0,8.0,477.0,282.0,224.0,93.0,365.0
2021-06-29 11:00:00,458.0,1.0,410.0,1111.0,4291.0,292.0,7.0,419.0,240.0,216.0,95.0,367.0
2021-06-29 11:03:00,453.0,1.0,410.0,1111.0,4301.0,292.0,4.0,394.0,229.0,212.0,95.0,367.0
2021-06-29 11:06:00,447.0,1.0,410.0,1111.0,4315.0,293.0,5.0,393.0,245.0,211.0,95.0,371.0
2021-06-29 11:07:00,450.0,1.0,409.0,1111.0,4323.0,295.0,5.0,395.0,253.0,216.0,95.0,377.0
2021-06-29 11:09:00,454.0,1.0,409.0,1111.0,4337.0,296.0,6.0,400.0,253.0,217.0,94.0,379.0
2021-06-29 11:10:00,457.0,1.0,409.0,1111.0,4348.0,298.0,6.0,407.0,260.0,221.0,95.0,381.0


# Testing stationarity 

Test first 3 colums: O3, PM2.5, CO2

In [6]:
from statsmodels.tsa.stattools import adfuller

def test_stationarity(ts_data, column='', signif=0.05, series=False):
    if series:
        adf_test = adfuller(ts_data, autolag='AIC')
    else:
        adf_test = adfuller(ts_data[column], autolag='AIC')
    p_value = adf_test[1]                   
    if p_value <= signif:
        test_result = "Stationary"
    else:
        test_result = "Non-Stationary"
    return test_result

adf_test_results1 = {
    col: test_stationarity(df, col)
    for col in df[df.columns[0:3]].columns
}
adf_test_results1    

  import pandas.util.testing as tm


{'CO2': 'Stationary', 'O3': 'Stationary', 'PM2.5': 'Stationary'}

Test T and Temp. int.

In [7]:
adf_test_results2 = {
    col: test_stationarity(df, col)
    for col in df[df.columns[4:6]].columns
}
adf_test_results2   

{'T': 'Stationary', 'Temp. int.': 'Stationary'}

Test PM10, RH and NO2

In [8]:
adf_test_results3 = {
    col: test_stationarity(df, col)
    for col in df[df.columns[6:9]].columns
}
adf_test_results3   

{'NO2': 'Stationary', 'PM10': 'Stationary', 'RH': 'Stationary'}

Test CO, AUX2 and VOC

In [9]:
adf_test_results4 = {
    col: test_stationarity(df, col)
    for col in df[df.columns[9:12]].columns
}
adf_test_results4   

{'AUX2': 'Stationary', 'CO': 'Stationary', 'VOC': 'Stationary'}

Testing AUX1 (pvalue= nan -> sceglie sempre else)

In [10]:
from statsmodels.tsa.stattools import kpss

df[['AUX1']]
result= kpss(df['AUX1'])
pvalue= result[1]
if pvalue<=0.05:
  print("Not Stationary")
else:
  print("Stationary")


Stationary


  kpss_stat = eta / s_hat


Converting to stationary with differencing

In [12]:
def differencing(data, column, order):
    differenced_data = data[column].diff(order)
    differenced_data.fillna(differenced_data.mean(), inplace=True)
    return differenced_data
for col in df.columns:
    df[col] = differencing(df, col, 1)

Test for stationarity now

# VAR model

Removing AUX1

In [11]:
df=df.drop(['AUX1'], axis=1)

Finding best lag for VAR model

In [13]:
from statsmodels.tsa.api import VAR
max_lag = 40
var_model = VAR(df)
# select the best lag order
lag_results = var_model.select_order(max_lag)
selected_lag = lag_results.aic
print(selected_lag)



40


Finding anomalies

In [14]:
import numpy as np

def find_anomalies(squared_errors):
    threshold = np.mean(squared_errors) + np.std(squared_errors)
    predictions = (squared_errors >= threshold).astype(int)
    return predictions, threshold
var = VAR(df)
var_fitresults = var.fit(selected_lag)
squared_errors = var_fitresults.resid.sum(axis=1) ** 2
predictions, threshold = find_anomalies(squared_errors)

data = df.iloc[selected_lag:, :]
data['Predictions'] = predictions.values
data

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  del sys.path[0]


Unnamed: 0_level_0,O3,PM2.5,CO2,Temp. int.,T,PM10,RH,NO2,CO,AUX2,VOC,Predictions
Dates,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
2021-06-29 12:46:00,465.0,1.0,407.0,4507.0,314.0,5.0,369.0,216.0,217.0,95.0,385.0,0
2021-06-29 12:48:00,468.0,1.0,407.0,4517.0,315.0,9.0,370.0,219.0,218.0,96.0,386.0,0
2021-06-29 12:51:00,473.0,1.0,408.0,4534.0,317.0,5.0,362.0,228.0,218.0,95.0,389.0,0
2021-06-29 12:54:00,473.0,1.0,408.0,4539.0,316.0,5.0,370.0,214.0,217.0,96.0,386.0,0
2021-06-29 12:55:00,475.0,1.0,408.0,4546.0,315.0,6.0,370.0,225.0,218.0,97.0,388.0,0
...,...,...,...,...,...,...,...,...,...,...,...,...
2021-10-04 08:40:00,427.0,9.0,497.0,3054.0,182.0,16.0,999.0,319.0,245.0,92.0,372.0,0
2021-10-04 08:41:00,430.0,9.0,497.0,3066.0,183.0,15.0,999.0,318.0,245.0,92.0,371.0,0
2021-10-04 08:43:00,428.0,9.0,496.0,3074.0,184.0,14.0,999.0,316.0,245.0,92.0,369.0,0
2021-10-04 08:44:00,429.0,9.0,498.0,3080.0,184.0,14.0,999.0,319.0,245.0,91.0,372.0,0


In [15]:
data['Predictions'].value_counts()

0    85243
1      571
Name: Predictions, dtype: int64