In [1]:
 %tensorflow_version 2.x
from __future__ import absolute_import, division, print_function, unicode_literals
import pandas as pd
import tensorflow as tf
from tensorflow.keras import Model
import numpy as np
from google.colab import drive
from datetime import datetime
import ast
import time

try:
  import tensorflow.compat.v2 as tf
except Exception:
  pass

sample_rate = 100000
signals_count = 2
capture_size = 2048
train_size = 800
test_size = 200

dir_base_model = "/content/gdrive/My Drive/DeepLearning/sigid/test-"
model_name = "best-noise-vs-fm-info-best"

model_file = "/content/gdrive/My Drive/DeepLearning/sigid/test-best-noise-vs-fm-test-checkpoints2-090.model"
model_weights = "/content/gdrive/My Drive/DeepLearning/sigid/test-best-noise-vs-fm-test-checkpoints2-090.cpk"

print(tf.__version__)

2.2.0-rc2


In [2]:
drive.mount('/content/gdrive')

# Create a dtype with the binary data format and the desired column names
dt = np.dtype([('i', 'f4'), ('q', 'f4')])
# Load noisy data
data = np.fromfile(dir_base_model + "nothing.iq", dtype=dt)
dfa = pd.DataFrame(data)
dfa["label"] = "BLANK"
npArray = np.array(range(len(dfa)), dtype='f4', copy=True, order='K', subok=False, ndmin=0)
dfa["time"] = npArray / sample_rate
# Load FM data
data = np.fromfile(dir_base_model + "999.iq", dtype=dt)
dfb = pd.DataFrame(data)
dfb["label"] = "FM"
npArray = np.array(range(len(dfb)), dtype='f4', copy=True, order='K', subok=False, ndmin=0)
dfb["time"] = npArray / sample_rate

frames = [dfa, dfb]
result = pd.concat(frames)

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/gdrive


In [0]:
train = []
test = []

# Build datasets for train 
for i in (0, train_size):
  ii = i * capture_size
  train.append(dfa[ii:ii + capture_size].values)
for i in (0, train_size):
  ii = i * capture_size
  train.append(dfb[ii:ii + capture_size].values)

# Build datasets for test 
for i in (0, test_size):
  ii = i * capture_size
  test.append(dfa[ii:ii + capture_size].values)
for i in (0, test_size):
  ii = i * capture_size
  test.append(dfb[ii:ii + capture_size].values)

In [4]:
import plotly.express as px
import plotly.graph_objects as go
fig = go.Figure()
fig.add_trace(go.Scatter(x=dfa[1:capture_size]['time'], y=dfa[1:capture_size]['i'], mode='lines', name='i'))
fig.add_trace(go.Scatter(x=dfa[1:capture_size]['time'], y=dfa[1:capture_size]['q'], mode='lines', name='q'))
fig.update_layout(
    title="NOISE",
    xaxis_title="time",
    yaxis_title="value",
    font=dict(
        family="Courier New, monospace",
        size=18,
        color="#7f7f7f"
    )
)
fig.show()

fig = go.Figure()
fig.add_trace(go.Scatter(x=dfb[1:capture_size]['time'], y=dfb[1:capture_size]['i'], mode='lines', name='i'))
fig.add_trace(go.Scatter(x=dfb[1:capture_size]['time'], y=dfb[1:capture_size]['q'], mode='lines', name='q'))
fig.update_layout(
    title="FM",
    xaxis_title="time",
    yaxis_title="value",
    font=dict(
        family="Courier New, monospace",
        size=18,
        color="#7f7f7f"
    )
)
fig.show()

In [0]:
def get_signal_dataframes(df):
  interleave = 2
  dx = []
  dy = []
  tx = []
  ty = []

  # Generate train dataset 
  dfax = df[['i', 'q']] 
  dfay = df[['label']] 
  dfax = dfax * 10000.0
  for i in range(0, train_size):
    newx = dfax[interleave * i:interleave * i + capture_size]
    dx.append(newx)    
    newy = dfay[interleave * i:interleave * i + capture_size]
    dy.append(newy)

  # Offset to separate train from test datain df
  offset = train_size * interleave + capture_size

  # Generate test dataset 
  for i in range(0, test_size):
    newx = dfax[interleave * i + offset:interleave * i + capture_size + offset]
    tx.append(newx)
    newy = dfay[interleave * i + offset:interleave * i + capture_size + offset]
    ty.append(newy)

  # Convert lists to dataframes
  train_x = pd.DataFrame(dx) 
  train_y = pd.DataFrame(dy) 
  test_x = pd.DataFrame(tx) 
  test_y = pd.DataFrame(ty) 
  return train_x, train_y, test_x, test_y

def add_dataframe_for_signal(df, train_x, train_y, test_x, test_y):
  x, y, tx, ty = get_signal_dataframes(df)
  train_x.append(x)
  train_y.append(y)
  test_x.append(tx)
  test_y.append(ty)

train_x = []
train_y = []
test_x = []
test_y = []
add_dataframe_for_signal(dfa, train_x, train_y, test_x, test_y)
add_dataframe_for_signal(dfb, train_x, train_y, test_x, test_y)

# Build dataframes 
train_x = pd.concat(train_x)
train_y = pd.concat(train_y)
test_x = pd.concat(test_x)
test_y = pd.concat(test_y)

# Shuffle data
permutations = np.random.permutation(len(train_y))
train_x = train_x.iloc[permutations]
train_y = train_y.iloc[permutations]
permutations = np.random.permutation(len(test_x))
test_x = test_x.iloc[permutations]
test_y = test_y.iloc[permutations]

In [0]:
def recreate_np_array_x(d, size):
  new = np.ndarray(shape=(signals_count * size, 2, capture_size), dtype=float)
  # train or test size i
  for i in range(len(d.values)):
    # 2048 j
    for j in range(len((d.values)[i])):
        new[i][0][j] = ((d.values)[i])[0].values[j][0]
        new[i][1][j] = ((d.values)[i])[0].values[j][1]
  return new

def recreate_np_array_y(d, size):
  new = np.ndarray(shape=(signals_count * size), dtype=np.int16)
  # train or test size i
  for i in range(len(d.values)):
    result = ((d.values)[i])[0].values[0][0]
    if (result == 'FM'):
      new[i] = 1
    else:
      new[i] = 0
  return new

train_x = recreate_np_array_x(train_x, train_size)
train_y = recreate_np_array_y(train_y, train_size)
test_x = recreate_np_array_x(test_x, test_size)
test_y = recreate_np_array_y(test_y, test_size)

In [0]:
# Prepare one line of data to predict
element = 0
samples_to_show = 5

td = np.ndarray(shape=(samples_to_show, 2, capture_size), dtype=float)
ld = np.ndarray(shape=(samples_to_show), dtype=int)

for j in range(samples_to_show):
  for i in range(capture_size):
    td[j][0][i] = test_x[element+j][0][i]
    td[j][1][i] = test_x[element+j][1][i]
  ld[j] = test_y[element+j]

def label_from_column(max_column):
  if (max_column==1):
    return "FM"
  if (max_column==0):
    return "Noise"
  return "Error"

In [0]:
def load_last_model1():
  model = tf.keras.models.load_model(model_file)
  model.load_weights(model_weights)
  #model.load_weights('/content/gdrive/My Drive/DeepLearning/sigid/model-90-weightstf')#.expect_partial()
  return model

In [0]:
def load_last_model2():
  model = tf.keras.models.load_model(model_file)
  manager = tf.train.Checkpoint(optimizer=model.optimizer, model=model)
  result = manager.restore('/content/gdrive/My Drive/DeepLearning/sigid/test-best-noise-vs-fm-test-checkpoints2alternative-090-1')#.expect_partial()
  return model

In [0]:
def load_last_model3():
  with open('/content/gdrive/My Drive/DeepLearning/sigid/model-90.json') as json_file:
    json_config = json_file.read()
  model = tf.keras.models.model_from_json(json_config)
  model.load_weights('/content/gdrive/My Drive/DeepLearning/sigid/model-90-weights.h5')
  return model

In [0]:
def load_last_model4():
  model = tf.keras.models.load_model(model_file)
  ckpt = tf.train.Checkpoint(optimizer=model.optimizer, model=model)
  manager = tf.train.CheckpointManager(ckpt, '/content/gdrive/My Drive/DeepLearning/sigid/test-best-noise-vs-fm-test-checkpoints2alternative-090-1', max_to_keep=3)
  result = ckpt.restore('/content/gdrive/My Drive/DeepLearning/sigid/test-best-noise-vs-fm-test-checkpoints2alternative-090-1')#.expect_partial()
  return model

In [0]:
def load_last_model5():
  model = tf.keras.models.load_model(model_file)
  ckpt = tf.train.Checkpoint(optimizer=model.optimizer, model=model)
  manager = tf.train.CheckpointManager(ckpt, '/content/gdrive/My Drive/DeepLearning/sigid/test-best-noise-vs-fm-test-checkpoints2alternative-090-1', max_to_keep=3)
  #model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
  result = ckpt.restore(manager.latest_checkpoint)
  #result.assert_existing_objects_matched()
  return model

In [17]:
model = load_last_model5()
#model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
#model.compile()
predictions = model.predict(td)

# Print predicted values
print(predictions)

# Build predicted values
listx = []
for i in range(len(predictions)):
  result = np.argmax(predictions[i], axis=None) 
  print("predicted " + str(result))
  listx.append(label_from_column(result))
print(listx)

# Print original values
print(ld)

[[[8.7446052e-01 1.2553950e-01]]

 [[4.9059317e-03 9.9509406e-01]]

 [[8.7560666e-01 1.2439337e-01]]

 [[8.7199992e-01 1.2800002e-01]]

 [[1.0810088e-04 9.9989188e-01]]]
predicted 0
predicted 1
predicted 0
predicted 0
predicted 1
['Noise', 'FM', 'Noise', 'Noise', 'FM']
[0 1 0 0 1]


In [14]:
model.weights

[<tf.Variable 'conv1d/kernel:0' shape=(16, 2048, 10) dtype=float32, numpy=
 array([[[-7.71527551e-03,  3.98713723e-03, -4.78489231e-03, ...,
          -1.28010102e-03, -1.18473899e-02,  1.31051615e-02],
         [-9.80687048e-03,  1.28255617e-02,  1.03215389e-02, ...,
           7.34695233e-03,  1.29141659e-03, -5.68995625e-03],
         [-1.01522766e-02, -4.94765025e-03,  2.69517116e-03, ...,
          -1.16679706e-02, -8.88768118e-03,  5.28007932e-03],
         ...,
         [-4.33260202e-03,  1.00655910e-02,  7.78834522e-03, ...,
           1.34273935e-02, -1.17983138e-02,  6.38412125e-03],
         [-8.68437439e-03, -9.74799413e-03,  6.03585131e-03, ...,
          -1.19347293e-02, -8.16855021e-03, -3.83586902e-03],
         [-3.67488340e-03, -1.14295324e-02,  6.79171272e-03, ...,
           9.11996886e-03,  3.55853327e-03,  1.14742164e-02]],
 
        [[ 4.94667888e-03,  8.25469010e-03, -5.70236612e-03, ...,
           1.19261686e-02, -8.21413565e-03,  1.24612730e-02],
         [-1

In [15]:
def test_function_printed(df, label):
  fig = go.Figure()
  fig.add_trace(go.Scatter(x=dfb[0:capture_size]['time'], y=df[0], mode='lines', name='i'))
  fig.add_trace(go.Scatter(x=dfb[0:capture_size]['time'], y=df[1], mode='lines', name='q'))
  fig.update_layout(
      title="This should be " + str(label),
      xaxis_title="time",
      yaxis_title="value",
      height=400,
      width=600,
      font=dict(
          family="Courier New, monospace",
          size=18,
          color="#7f7f7f"
      )
  )
  fig.show()

for i in range(samples_to_show):
  test_function_printed(td[i], ld[i])

In [0]:
dfb[0:capture_size]['time']*10000.0