Oracle RF Fingerprinting

Paper available at :

 https://arxiv.org/pdf/1812.01124

 https://arxiv.org/pdf/1904.07623.pdf

 

Datasets from: 

https://mailuc-my.sharepoint.com/:f:/g/personal/wang2ba_ucmail_uc_edu/EjXyRTpV0Y5Dn-OjKlxKg8gBZWyq2PIHy5OPgh3bf3g4fg?e=B9oAJc

https://genesys-lab.org/oracle

In [None]:
pip install sigmf

In [48]:
import sigmf
import numpy as np
import pandas as pd
from sigmf import SigMFFile, sigmffile
import os
import glob,json
import re,pickle

import matplotlib.pyplot as plt

#importing the required libraries
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D,ZeroPadding2D
from tensorflow.keras.layers import MaxPool2D
from tensorflow.keras.layers import Flatten,Reshape
from tensorflow.keras.layers import Dropout,Input
from tensorflow.keras.layers import Dense,BatchNormalization
from tensorflow.keras import regularizers,metrics
from tensorflow.keras.callbacks import EarlyStopping

from numpy.fft import fft, ifft,fftshift,ifftshift

from sklearn.svm import SVC
from sklearn.model_selection import train_test_split
from imblearn.over_sampling import SMOTE
from scipy.stats import wasserstein_distance #this is EMD
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from scipy.stats import multivariate_normal
from sklearn import preprocessing
from sklearn.preprocessing import MinMaxScaler 
from scipy.signal import butter,filtfilt
from sklearn.model_selection import train_test_split
from scipy import signal
from sklearn.metrics import confusion_matrix
from sklearn.preprocessing import LabelBinarizer as LB
from sklearn.preprocessing import normalize 
from sklearn.metrics import accuracy_score

# CNN BASED IQ CLASSIFIER

In [None]:
#downloading data
!apt-get install axel
!axel -n 50 https://repository.library.northeastern.edu/downloads/neu:m044q523j?datastream_id=content #300mb data

In [None]:
#run this if you need to download the 28gb dataset
#!axel -n 100 https://repository.library.northeastern.edu/downloads/neu:m044q5210?datastream_id=content #28gb data

In [None]:
!mv neu_m044q523j.zip iq_data_300mb.zip
!unzip iq_data_300mb.zip
!mv KRI-16IQImbalances-DemodulatedData KRI_16IQIMb_Demod

In [None]:
#data preperation, run once to get rid of errors
files_data = glob.glob("./KRI_16IQIMb_Demod/*.sigmf-meta")
for f in files_data:
  file_handle = open(f,'r+')
  cur_json = json.load(file_handle)
  cur_json.update(cur_json['_metadata'])
  del cur_json['_metadata']
  file_handle.seek(0)
  iq_imbalance = re.search('%s(.*)%s' % ("_IQ#", "_run"), f).group(1)
  cur_json["annotations"][0]["genesys:transmitter"].update({"iq_balance":iq_imbalance})
  json.dump(cur_json,file_handle)
  file_handle.truncate()
  

In [None]:
#loading files
signal_handles = []
files_data = glob.glob("./KRI_16IQIMb_Demod/*.sigmf-meta")
for f in files_data:
  signal_handles.append(sigmffile.fromfile(f))
  

#run as a test to see wether all data were loaded
df = pd.DataFrame({'real':[],'img':[],'blnc':[]})
stdscaler = StandardScaler()

for signal in signal_handles:
  sample_rate = signal.get_global_field(SigMFFile.SAMPLE_RATE_KEY)
  sample_count = signal.sample_count
  signal_duration = sample_count / sample_rate
  signal_sample = signal.read_samples().view(np.complex128).flatten()
  signal_balance = int(signal.get_annotations()[0]["genesys:transmitter"]["iq_balance"])
  df_tim = pd.DataFrame({'real':signal_sample.real,'img':signal_sample.imag,'blnc':np.repeat(signal_balance,len(signal_sample.real))})
  df = df.append(df_tim)

df.blnc.astype(np.int8)
df.real.astype(np.float16)
df.real.astype(np.float16)

       real       img  blnc
0 -0.184742  0.925385   4.0
1  0.708407 -0.771680   4.0
2 -0.437391 -0.940851   4.0
3 -0.473556 -0.430780   4.0
4 -0.449694 -0.945056   4.0


In [None]:
#generate extra data for testing
extra_data = []
handle_index = 0 
for signal in signal_handles:
  signal_sample = signal.read_samples().view(np.complex128).flatten()
  signal_balance = int(signal.get_annotations()[0]["genesys:transmitter"]["iq_balance"])

  for b in range(0,3):

    mu, sigma = 0, b/10
    noise = np.random.normal(mu, sigma, signal_sample.shape) 
    noise2 = np.random.normal(mu, sigma, signal_sample.shape)

    i_emd = 0# wasserstein_distance(signal_sample.real,signal_sample.real + noise)
    q_emd = 0#wasserstein_distance(signal_sample.imag,signal_sample.imag + noise2)
    if (i_emd + q_emd < 0.1):
      print("found sample at:",b)
      
      noisy_data = np.array((signal_sample.real + noise) + 1j*  (signal_sample.imag + noise2))
      df = df.append(pd.DataFrame({'real':noisy_data.real,'img':noisy_data.imag,'blnc':np.repeat(signal_balance,len(noisy_data.real))}))

      extra_data.append({'data':noisy_data,'blnc':signal_balance,'sigma':sigma,'mu':mu})
    else:
      break

  

In [None]:
### THIS CELL IS NOT ACTUALLY NEEDED JUST FOR VISUALISATION OF DATA

#show iq spectrum
for signal in signal_handles:
   signal_sample = signal.read_samples().view(np.complex128).flatten()
   signal_balance = signal.get_annotations()[0]["genesys:transmitter"]["iq_balance"]
   # Plot the spectogram of this data
   plt.scatter(np.real(signal_sample[0:100000]), np.imag(signal_sample[0:100000]))
   plt.title("Constellation of "+ signal_balance ) 
   plt.show() 

# get the locations
X = df.iloc[:, :-1]
y = df.iloc[:, -1]

sm = SMOTE(random_state = 42)

X_oversampled, y_oversampled = sm.fit_resample(X, y)
df_smote = pd.DataFrame(X_oversampled, columns=df.columns)
df_smote.blnc = y_oversampled

#show how much data imbalance is there
equilibre = df.blnc.value_counts()
plt.figure(figsize=(20,10))
my_circle=plt.Circle( (0,0), 0.7, color='white')
plt.pie(equilibre,autopct='%1.1f%%')
p=plt.gcf()
p.gca().add_artist(my_circle)
plt.show()

In [None]:
#Load the data and split it
unique_labels = sorted(df.iloc[:, -1].unique().astype('int16'))
train_ratio = 0.8
validate_ratio = 0.1
test_ratio = 0.1

x_train = np.zeros((1,2,128))
y_train = np.array([],dtype='int16')

x_test = np.zeros((1,2,128))
y_test = np.array([],dtype='int16')

x_val = np.zeros((1,2,128))
y_val = np.array([],dtype='int16')

for blnc,lbl in enumerate(unique_labels):
  data_blnc = df.loc[df["blnc"]==lbl]
  size_blnc = len(data_blnc)
  rem = size_blnc%128
  all_x_blnc = data_blnc.iloc[:,:-1].values[:size_blnc-rem].reshape((size_blnc//128,2,128))

  train_size = int(all_x_blnc.shape[0] * train_ratio)
  x_train = np.concatenate([x_train,all_x_blnc[0:train_size]])
  y_train = np.concatenate([y_train,np.repeat(blnc, train_size)])

  test_size = int(all_x_blnc.shape[0] * test_ratio)
  x_test = np.concatenate([x_test,all_x_blnc[train_size:train_size+test_size]])
  y_test = np.concatenate([y_test,np.repeat(blnc, test_size)])

  valid_size = int(all_x_blnc.shape[0] * validate_ratio)
  x_val = np.concatenate([x_val,all_x_blnc[train_size+test_size:train_size+test_size+valid_size]])
  y_val = np.concatenate([y_val,np.repeat(blnc, valid_size)])


x_train = x_train[1:]
x_test = x_test[1:]
x_val = x_val[1:]

In [None]:
#defining model Architecture
model=Sequential()
#adding convolution layer
model.add(Conv2D(50,(1,7),activation='relu',input_shape=(2,128,1)))
model.add(BatchNormalization())
model.add(MaxPool2D(pool_size=(2, 2),strides=1,padding="same"))

model.add(Conv2D(50,(2,7),activation='relu'))
model.add(BatchNormalization())
model.add(MaxPool2D(pool_size=(2, 2),strides=1,padding="same"))

model.add(Flatten())

model.add(Dense(256,kernel_regularizer=regularizers.L2(l2=1e-4) ))
model.add(Dropout(0.5))
model.add(Dense(80,kernel_regularizer=regularizers.L2(l2=1e-4) ))
model.add(Dropout(0.5))

#adding output layer
model.add(Dense(16,activation='softmax' ))
#compiling the model
model.compile(loss='sparse_categorical_crossentropy',optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),metrics=['accuracy'])


In [None]:
#fitting the model
model.fit(x_train,y_train,epochs=5,validation_data=(x_test, y_test),)

In [None]:
model.save_weights(os.getcwd()+'/model2.hd5')

In [None]:
def convertiq_stream(iq_stream):
  data_blnc = pd.DataFrame({'real':iq_stream.real,'img':iq_stream.imag})
  size_blnc = len(data_blnc)
  rem = size_blnc%128
  return data_blnc.values[:size_blnc-rem].reshape((size_blnc//128,2,128))


In [None]:
#validating signal
signal = x_val
test_val = convertiq_stream(signal['data'][:1000]) #convertiq_stream(extra_data[0]['data'])
signal_balance = signal['blnc']
print('balance:',signal_balance)
y_pred = model.predict(test_val[:1000])

cm = confusion_matrix(y_val,y_pred)
accuracy = float(cm.diagonal().sum())/len(y_val)
print("\nAccuracy Of SVM For The Given Dataset : ", accuracy)

# SVM BASED IQ CLASSIFIER

In [5]:
#returns iq data for a real frequency signal
def generateTone(fs, toneFreq, numSamples, amplitude):
    #Generates a sinusoidal signal with the specified
    #frequency and amplitude
    
    step = (float(toneFreq) / float(fs)) * 2.0 * np.pi
    phaseArray = np.array(range(0,numSamples)) * step
    
    #Euler's Formular: e^(j*theta) = cos(theta) + j * sin(theta)
    #For a complex sinusoidal theta = 2*pi*f*t where each time step is 1/fs    
    wave = np.exp(1.0j * phaseArray) * amplitude
    return wave


In [None]:
no_sig = 10**6
base_band = generateTone(10**3,10**3,no_sig,1)
base_power = (base_band * np.conj(base_band)).real


finger_array = np.array([])
label_array = np.array([])
for i in range(0,20):
  #Generate data
  target_snr_db = 10
  # Calculate signal power and convert to dB 
  sig_avg_watts = np.mean(base_power)
  sig_avg_db = 10 * np.log10(sig_avg_watts)
  # Calculate noise according to [2] then convert to watts
  noise_avg_db = sig_avg_db - target_snr_db
  noise_avg_watts = 10 ** (noise_avg_db / 10)
  # Generate an sample of white noise
  mean_noise = 0
  noise_volts = np.random.normal(mean_noise, np.sqrt(noise_avg_watts), len(base_power)) + 1j * np.random.normal(mean_noise, np.sqrt(noise_avg_watts), len(base_power))


  var_theta = np.radians(3 + i)
  var_epsilon = 0.1 + i/100

  var_alpha = np.cos(var_theta) + 1j * var_epsilon * np.sin(var_theta)
  var_beta = var_epsilon * np.cos(var_theta) + 1j * np.sin(var_theta)
  r_sig = var_alpha * base_band + var_beta * np.conj(base_band) + noise_volts
  Y_mat = np.array([r_sig,np.conj(r_sig)])


  vctr1 = r_sig * r_sig
  vctr2  = r_sig * np.conj(r_sig)
  
  vctr_size = len(vctr1)
  batch_size = int(no_sig/100)
  leftover = vctr_size % batch_size
  for s_x in range(0,vctr_size-batch_size,batch_size):
    vctr3 = np.mean((vctr1[s_x:s_x+batch_size]))/np.mean((vctr2[s_x:s_x+batch_size]))
    finger_print = (1 + 1/10**( target_snr_db/10))*vctr3
    label_array = np.append(label_array,i)
    finger_array = np.append(finger_array,finger_print)
  
  plt.scatter(finger_array.imag, finger_array.real,label=i)

#You can see the clear seperation of RF data from the graph
plt.legend()
plt.show()

In [None]:
classifier = SVC(kernel='rbf', random_state = 1)
data_class = np.hstack([np.array([finger_array.imag]).reshape(-1,1),np.array([finger_array.real]).reshape(-1,1),np.array([label_array]).reshape(-1,1)])

training_set, test_set = train_test_split(data_class, test_size = 0.2, random_state = 1)
X_train = training_set[:,0:2]
Y_train = training_set[:,2]
X_test = test_set[:,0:2]
Y_test = test_set[:,2]

classifier.fit(X_train,Y_train)
Y_pred = classifier.predict(X_test)

cm = confusion_matrix(Y_test,Y_pred)
accuracy = float(cm.diagonal().sum())/len(Y_test)
print("\nAccuracy Of SVM For The Given Dataset : ", accuracy)

# CNN BASED MODULATION CLASSIFIER

In [None]:
!wget http://opendata.deepsig.io/datasets/2016.10/RML2016.10b.tar.bz2 #you have to actually signup to get the link to download the dataset
!tar jxf RML2016.10b.tar.bz2

In [None]:
data_set = open("RML2016.10b.dat",'rb')
Xd = pickle.load(data_set, encoding = 'bytes')

snrs, mods = map(lambda j: sorted(list(set(map(lambda x: x[j], Xd.keys())))), [1,0])
X = [] 
lbl = []
for mod in mods:
    for snr in snrs:
        X.append(Xd[(mod,snr)])
        for i in range(Xd[(mod,snr)].shape[0]):  lbl.append((mod,snr))
    
X = np.vstack(X)
data_set.close()

In [None]:
X_data = X[:,0], X[:,1]
labels = np.array(lbl)
in_shape = X_data[0].shape
out_shape = tuple([1]) + in_shape

np.random.seed(80)

n_examples = labels.shape[0]
r = np.random.choice(range(n_examples), n_examples, replace = False)

train_examples = r[:n_examples//2]
test_examples =  r[n_examples//2:]
X_train = X_data[train_examples]
X_test = X_data[test_examples]

y_train = LB().fit_transform(labels[train_examples][:,0])
y_test = LB().fit_transform(labels[test_examples][:,0])

snr_train = labels[train_examples][:,1].astype(int)
snr_test = labels[test_examples][:,1].astype(int)

In [None]:
dropout = 0.5
model = Sequential()
model.add(Reshape(out_shape, input_shape = in_shape))
model.add(ZeroPadding2D((0, 2), data_format = 'channels_first'))
model.add(Conv2D(256, (1, 3), padding = 'valid', activation = "relu", name="conv1", kernel_initializer='glorot_uniform', data_format="channels_first"))
model.add(Dropout(dropout))
model.add(ZeroPadding2D((0,2), data_format = 'channels_first'))
model.add(Conv2D(80, (2, 3), activation="relu", name="conv3", padding="valid", kernel_initializer="glorot_uniform", data_format="channels_first"))
model.add(Dropout(dropout))
model.add(Flatten())   
model.add(Dense(256, activation="relu", name="dense1", kernel_initializer="he_normal"))
model.add(Dropout(dropout))
model.add(Dense(10, name="dense3", kernel_initializer="he_normal", activation = 'softmax'))
model.add(Reshape([len(mods)]))
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics = ['accuracy'])

In [None]:
model.fit(X_train, y_train, epochs = 80, batch_size = 1024, validation_split = 0.05, callbacks=[EarlyStopping(patience = 15, restore_best_weights = True)]

In [None]:
y_pred = model.predict(x_test)
cm = confusion_matrix(y_test,y_pred)
accuracy = float(cm.diagonal().sum())/len(y_test)
print("\nAccuracy Of SVM For The Given Dataset : ", accuracy)