In [1]:
import torch
import numpy as np
from sklearn.metrics import accuracy_score
import os
import argparse
import numpy as np
import scipy.io as scio
from sklearn.metrics import accuracy_score
import os
import numpy as np
import pandas as pd
!pip install ptflops
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

Collecting ptflops
  Downloading ptflops-0.7.4-py3-none-any.whl.metadata (9.4 kB)
Downloading ptflops-0.7.4-py3-none-any.whl (19 kB)
Installing collected packages: ptflops
Successfully installed ptflops-0.7.4


In [2]:

preset = {
    #
    ## define model
    "model": "LSTM",                                    # "ST-RF", "MLP", "LSTM", "CNN-1D", "CNN-2D", "CLSTM", "ABLSTM", "THAT"
    #
    ## define task
    "task": "location",                                 # "identity", "activity", "location"
    #
    ## number of repeated experiments
    "repeat": 3,
    #
    ## path of data
    "path": {
        "data_x": "/kaggle/input/wimans/wifi_csi/amp",               # directory of CSI amplitude files
        "data_y": "/kaggle/input/wimans/annotation.csv",             # path of annotation file
        "save": f"result_-.json"                           # path to save results
    },
    #
    ## data selection for experiments
    "data": {
        "num_users": ["0", "1", "2", "3", "4", "5"],    # select number(s) of users, (e.g., ["0", "1"], ["2", "3", "4", "5"])
        "wifi_band": ["2.4"],                           # select WiFi band(s) (e.g., ["2.4"], ["5"], ["2.4", "5"])
        "environment": ["classroom"],                   # select environment(s) (e.g., ["classroom"], ["meeting_room"], ["empty_room"])
        "length": 3000,                                 # default length of CSI
    },
    #
    ## hyperparameters of models
    "nn": {
        "lr": 1e-3,                                     # learning rate
        "epoch": 300,                                   # number of epochs
        "batch_size": 128,                              # batch size
        "threshold": 0.5,                               # threshold to binarize sigmoid outputs
    },
    #
    ## encoding of activities and locations
    "encoding": {
        "activity": {                                   # encoding of different activities
            "nan":      [0, 0, 0, 0, 0, 0, 0, 0, 0],
            "nothing":  [1, 0, 0, 0, 0, 0, 0, 0, 0],
            "walk":     [0, 1, 0, 0, 0, 0, 0, 0, 0],
            "rotation": [0, 0, 1, 0, 0, 0, 0, 0, 0],
            "jump":     [0, 0, 0, 1, 0, 0, 0, 0, 0],
            "wave":     [0, 0, 0, 0, 1, 0, 0, 0, 0],
            "lie_down": [0, 0, 0, 0, 0, 1, 0, 0, 0],
            "pick_up":  [0, 0, 0, 0, 0, 0, 1, 0, 0],
            "sit_down": [0, 0, 0, 0, 0, 0, 0, 1, 0],
            "stand_up": [0, 0, 0, 0, 0, 0, 0, 0, 1],
        },
        "location": {                                   # encoding of different locations
            "nan":  [0, 0, 0, 0, 0],
            "a":    [1, 0, 0, 0, 0],
            "b":    [0, 1, 0, 0, 0],
            "c":    [0, 0, 1, 0, 0],
            "d":    [0, 0, 0, 1, 0],
            "e":    [0, 0, 0, 0, 1],
        },
    },
}

def mat_to_amp(data_mat):
    """
    [description]
    : calculate amplitude of raw WiFi CSI data
    [parameter]
    : data_mat: dict, raw WiFi CSI data from *.mat files
    [return]
    : data_csi_amp: numpy array, CSI amplitude
    """
    #
    ## 
    var_length = data_mat["trace"].shape[0]
    #
    data_csi_amp = [abs(data_mat["trace"][var_t][0][0][0][-1]) for var_t in range(var_length)]
    #
    data_csi_amp = np.array(data_csi_amp, dtype = np.float32)
    #
    return data_csi_amp

#
##
def extract_csi_amp(var_dir_mat, 
                    var_dir_amp):
    """
    [description]
    : read raw WiFi CSI files (*.mat), calcuate CSI amplitude, and save amplitude (*.npy)
    [parameter]
    : var_dir_mat: string, directory to read raw WiFi CSI files (*.mat)
    : var_dir_amp: string, directory to save WiFi CSI amplitude (*.npy)
    """
    #
    ##
    var_path_mat = os.listdir(var_dir_mat)
    #
    for var_c, var_path in enumerate(var_path_mat):
        #
        data_mat = scio.loadmat(os.path.join(var_dir_mat, var_path))
        #
        data_csi_amp = mat_to_amp(data_mat)
        #
        
        #
        var_path_save = os.path.join(var_dir_amp, var_path.replace(".mat", ".npy"))
        #
        with open(var_path_save, "wb") as var_file:
            np.save(var_file, data_csi_amp)

#
##
def parse_args():
    """
    [description]
    : parse arguments from input
    """
    #
    ##
    var_args = argparse.ArgumentParser()
    #
    var_args.add_argument("--dir_mat", default = "/kaggle/input/wimans/wifi_csi/mat", type = str)
    var_args.add_argument("--dir_amp", default = "/kaggle/input/wimans/wifi_csi/amp", type = str)
    #
    return var_args.parse_args()



In [3]:

def load_data_y(var_path_data_y,
                var_environment = None, 
                var_wifi_band = None, 
                var_num_users = None):
    """
    [description]
    : load annotation file (*.csv) as a pandas dataframe
    : according to selected environment(s), WiFi band(s), and number(s) of users
    [parameter]
    : var_path_data_y: string, path of annotation file
    : var_environment: list, selected environment(s), e.g., ["classroom"]
    : var_wifi_band: list, selected WiFi band(s), e.g., ["2.4"]
    : var_num_users: list, selected number(s) of users, e.g., ["0", "1", "2"]
    [return]
    : data_pd_y: pandas dataframe, labels of selected data
    """
    #
    ##
    data_pd_y = pd.read_csv(var_path_data_y, dtype = str)
    #
    if var_environment is not None:
        data_pd_y = data_pd_y[data_pd_y["environment"].isin(var_environment)]
    #
    if var_wifi_band is not None:
        data_pd_y = data_pd_y[data_pd_y["wifi_band"].isin(var_wifi_band)]
    #
    if var_num_users is not None:
        data_pd_y = data_pd_y[data_pd_y["number_of_users"].isin(var_num_users)]
    #
    return data_pd_y

#
##
def load_data_x(var_path_data_x, 
                var_label_list):
    """
    [description]
    : load CSI amplitude (*.npy)
    : according to a label list of selected data
    [parameter]
    : var_path_data_x: string, directory of CSI amplitude files
    : var_label_list: list, selected labels
    [return]
    : data_x: numpy array, CSI amplitude
    """
    #
    ##
    var_path_list = [os.path.join(var_path_data_x, var_label + ".npy") for var_label in var_label_list]
    #
    data_x = []
    #
    for var_path in var_path_list:
        #
        data_csi = np.load(var_path)
        #
        var_pad_length = preset["data"]["length"] - data_csi.shape[0]
        #
        data_csi_pad = np.pad(data_csi, ((var_pad_length, 0), (0, 0), (0, 0), (0, 0)))
        #
        data_x.append(data_csi_pad)
    #
    data_x = np.array(data_x)
    #
    return data_x

#
##
def encode_data_y(data_pd_y, 
                  var_task):
    """
    [description]
    : encode labels according to specific task
    [parameter]
    : data_pd_y: pandas dataframe, labels of different tasks
    : var_task: string, indicate task
    [return]
    : data_y: numpy array, label encoding of task
    """
    #
    ##
    if var_task == "identity":
        #
        data_y = encode_identity(data_pd_y)
    #
    elif var_task == "activity":
        #
        data_y = encode_activity(data_pd_y, preset["encoding"]["activity"])
    #
    elif var_task == "location":
        #
        data_y = encode_location(data_pd_y, preset["encoding"]["location"])
    #
    return data_y

#
##
def encode_identity(data_pd_y):
    """
    [description]
    : encode identity labels in a pandas dataframe
    [parameter]
    : data_pd_y: pandas dataframe, labels of different tasks
    [return]
    : data_identity_onehot_y: numpy array, onehot encoding for identity labels
    """
    #
    ##
    data_location_pd_y = data_pd_y[["user_1_location", "user_2_location", 
                                    "user_3_location", "user_4_location", 
                                    "user_5_location", "user_6_location"]]
    # 
    data_identity_y = data_location_pd_y.to_numpy(copy = True).astype(str)
    #
    data_identity_y[data_identity_y != "nan"] = 1
    data_identity_y[data_identity_y == "nan"] = 0
    #
    data_identity_onehot_y = data_identity_y.astype("int8")
    #
    return data_identity_onehot_y

#
##
def encode_activity(data_pd_y, 
                    var_encoding):
    """
    [description]
    : encode activity labels in a pandas dataframe
    [parameter]
    : data_pd_y: pandas dataframe, labels of different tasks
    : var_encoding: dict, encoding of different activities
    [return]
    : data_activity_onehot_y: numpy array, onehot encoding for activity labels
    """
    #
    ##
    data_activity_pd_y = data_pd_y[["user_1_activity", "user_2_activity", 
                                    "user_3_activity", "user_4_activity", 
                                    "user_5_activity", "user_6_activity"]]
    #
    data_activity_y = data_activity_pd_y.to_numpy(copy = True).astype(str)
    #
    data_activity_onehot_y = np.array([[var_encoding[var_y] for var_y in var_sample] for var_sample in data_activity_y])
    #
    return data_activity_onehot_y

#
##
def encode_location(data_pd_y, 
                    var_encoding):
    """
    [description]
    : encode location labels in a pandas dataframe
    [parameter]
    : data_pd_y: pandas dataframe, labels of different tasks
    : var_encoding: dict, encoding of different locations
    [return]
    : data_location_onehot_y: numpy array, onehot encoding for location labels
    """
    #
    ##
    data_location_pd_y = data_pd_y[["user_1_location", "user_2_location", 
                                    "user_3_location", "user_4_location", 
                                    "user_5_location", "user_6_location"]]
    #
    data_location_y = data_location_pd_y.to_numpy(copy = True).astype(str)
    #
    data_location_onehot_y = np.array([[var_encoding[var_y] for var_y in var_sample] for var_sample in data_location_y])
    #
    return data_location_onehot_y

#
##
def test_load_data_y():
    """
    [description]
    : test load_data_y() function
    """
    #
    ##


#
##
def test_load_data_x():
    """
    [description]
    : test load_data_x() function
    """
    #
    ##
    data_pd_y = load_data_y(preset["path"]["data_y"],
                            var_environment = ["meeting_room"], 
                            var_wifi_band = ["2.4"], 
                            var_num_users = None)
    #
    var_label_list = data_pd_y["label"].to_list()
    #
    data_x = load_data_x(preset["path"]["data_x"], var_label_list)
    #
  

#
##
def test_encode_identity():
    """
    [description]
    : test encode_identity() function
    """
    #
    ##
    data_pd_y = pd.read_csv(preset["path"]["data_y"], dtype = str)
    #
    data_identity_onehot_y = encode_identity(data_pd_y)
    #


#
##
def test_encode_activity():
    """
    [description]
    : test encode_activity() function
    """
    #
    ##
    data_pd_y = pd.read_csv(preset["path"]["data_y"], dtype = str)
    #
    data_activity_onehot_y = encode_activity(data_pd_y, preset["encoding"]["activity"])
    #

#
##
def test_encode_location():
    """
    [description]
    : test encode_location() function
    """
    #
    ##
    data_pd_y = pd.read_csv(preset["path"]["data_y"], dtype = str)
    #
    data_location_onehot_y = encode_location(data_pd_y, preset["encoding"]["location"])
    #
   
"""
[file]          run.py
[description]   Run WiFi-based models
"""

import json
import argparse
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import torch._dynamo
torch._dynamo.config.suppress_errors = True

from torchvision import models

var_task = "location"
var_model = "LSTM"
var_repeat = 2

# Load annotation file as labels
data_pd_y = load_data_y(
    preset["path"]["data_y"],
    var_environment=preset["data"]["environment"],
    var_wifi_band=preset["data"]["wifi_band"],
    var_num_users=preset["data"]["num_users"]
)
var_label_list = data_pd_y["label"].to_list()
data_x = load_data_x(preset["path"]["data_x"], var_label_list)
data_y = encode_data_y(data_pd_y, var_task)

In [4]:
import numpy as np
shapee = data_x.shape
print(shapee)
data_x_1 = np.resize(data_x, (shapee[0], 3000, 270)) 
print(data_x_1.shape)

print(data_y.shape)

(1881, 3000, 3, 3, 30)
(1881, 3000, 270)
(1881, 6, 5)


In [5]:
import torch
import torchvision.models as models
from torch.utils.data import TensorDataset, DataLoader

# تعریف مدل LSTM
class LSTMM(torch.nn.Module):
    def __init__(self, var_x_shape, var_y_shape):
        super(LSTMM, self).__init__()
        var_dim_input = var_x_shape[-1]  # باید 270 باشد
        var_dim_output = var_y_shape[-1]  # باید 30 باشد
        self.layer_norm = torch.nn.BatchNorm1d(var_dim_input)
        self.layer_pooling = torch.nn.AvgPool1d(kernel_size=10, stride=10)
        self.layer_lstm = torch.nn.LSTM(
            input_size=var_dim_input,
            hidden_size=512,
            batch_first=True
        )
        self.layer_linear = torch.nn.Linear(512, var_dim_output)

    def forward(self, var_input):
        # شکل ورودی: (batch, time, features)
        var_t = torch.permute(var_input, (0, 2, 1))  # (batch, features, time)
        var_t = self.layer_norm(var_t)
        var_t = self.layer_pooling(var_t)  # (batch, features, pooled_time)
        var_t = torch.permute(var_t, (0, 2, 1))  # (batch, pooled_time, features)
        var_t, _ = self.layer_lstm(var_t)  # (batch, pooled_time, hidden)
        var_t = var_t[:, -1, :]  # (batch, hidden)
        var_output = self.layer_linear(var_t)  # (batch, var_dim_output)
        return var_output

# تعریف مدل ResNet
def build_resnet(var_y_shape):
    model_resnet = models.resnet18(weights=None)  # استفاده از 'weights' به جای 'pretrained'

    # تغییر لایه کانولوشن اول برای پذیرش ورودی با 1 کانال
    model_resnet.conv1 = torch.nn.Conv2d(
        in_channels=1,
        out_channels=64,
        kernel_size=7,
        stride=2,
        padding=3,
        bias=False
    )

    # تغییر لایه Fully Connected نهایی برای خروجی تعداد کلاس‌های مورد نظر
    in_features_fc = model_resnet.fc.in_features  # معمولاً 512
    out_features_fc = var_y_shape[-1]  # 30
    model_resnet.fc = torch.nn.Linear(in_features_fc, out_features_fc)

    return model_resnet

# تابع برای پاکسازی پیشوند 'module.' یا '_orig_mod.'
def remove_module_prefix(state_dict):
    new_state_dict = {}
    for k, v in state_dict.items():
        if k.startswith('module.'):
            new_key = k[len('module.'):]
        elif k.startswith('_orig_mod.'):
            new_key = k[len('_orig_mod.'):]
        else:
            new_key = k
        new_state_dict[new_key] = v
    return new_state_dict


def compute_accuracy_batch(model, dataloader, device, var_threshold=0.5):
    """
    محاسبه دقت مدل به صورت دسته‌ای بدون ذخیره‌سازی تمام پیش‌بینی‌ها و برچسب‌ها.

    Args:
        model (torch.nn.Module): مدل PyTorch برای ارزیابی.
        dataloader (DataLoader): DataLoader برای داده‌ها.
        device (torch.device): دستگاه (GPU یا CPU) برای اجرای مدل.
        var_threshold (float): آستانه برای تبدیل احتمالات به مقادیر باینری.

    Returns:
        float: دقت مدل بر روی داده‌های ارزیابی.
    """
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        sum = 0

        for batch in dataloader:

            inputs, labels = batch
            inputs = inputs.to(device)
            labels = labels.to(device)

            # پیش‌بینی مدل
            outputs = model(inputs)

            # اعمال سیگموید برای تبدیل خروجی‌ها به احتمالات
            preds = torch.sigmoid(outputs)
     
            # اعمال آستانه برای تبدیل احتمالات به مقادیر باینری
            preds = (preds > var_threshold).float()
    
            predict_test_y = preds.detach().cpu().numpy()
            data_test_y = labels.detach().cpu().numpy()
            
            # محاسبه تعداد نمونه‌های درست پیش‌بینی شده
            # در اینجا فرض بر این است که دقت به صورت تطابق کامل برچسب‌ها محاسبه می‌شود

            predict_test_y = predict_test_y.reshape(-1, data_test_y.shape[-1])
            data_test_y = data_test_y.reshape(-1, data_test_y.shape[-1])
            var_accuracy_test = accuracy_score(data_test_y.astype(int), predict_test_y.astype(int))
            sum = var_accuracy_test + sum
        ave = sum/len(dataloader)
    return ave


In [6]:
import torch
import torchvision.models as models
from torch.utils.data import TensorDataset, DataLoader
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
import numpy as np

class LSTMM(torch.nn.Module):
    def __init__(self, var_x_shape, var_y_shape):
        super(LSTMM, self).__init__()
        var_dim_input = var_x_shape[-1]  # باید 270 باشد
        var_dim_output = var_y_shape[-1]  # باید 30 باشد
        self.layer_norm = torch.nn.BatchNorm1d(var_dim_input)
        self.layer_pooling = torch.nn.AvgPool1d(kernel_size=10, stride=10)
        self.layer_lstm = torch.nn.LSTM(
            input_size=var_dim_input,
            hidden_size=512,
            batch_first=True
        )
        self.layer_linear = torch.nn.Linear(512, var_dim_output)

    def forward(self, var_input):
        # شکل ورودی: (batch, time, features)
        var_t = torch.permute(var_input, (0, 2, 1))  # (batch, features, time)
        var_t = self.layer_norm(var_t)
        var_t = self.layer_pooling(var_t)  # (batch, features, pooled_time)
        var_t = torch.permute(var_t, (0, 2, 1))  # (batch, pooled_time, features)
        var_t, _ = self.layer_lstm(var_t)  # (batch, pooled_time, hidden)
        var_t = var_t[:, -1, :]  # (batch, hidden)
        var_features = var_t  # ویژگی‌ها از LSTM
        var_output = self.layer_linear(var_t)  # خروجی نهایی
        return var_features, var_output
from torchvision import models



def extract_features(model, dataloader, device, model_type='lstm'):
    model.eval()
    features = []
    labels_list = []

    with torch.no_grad():
        for batch in dataloader:
            inputs, labels = batch
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            if model_type == 'lstm':
                var_features, _ = model(inputs)  # Unpack the tuple
            else:
                var_features = model(inputs)
            
            
            var_features = var_features.view(var_features.size(0), -1)  # Flatten
            features.append(var_features.cpu().numpy())
            labels_list.append(labels.cpu().numpy())
            

    features = np.concatenate(features, axis=0)
    labels = np.concatenate(labels_list, axis=0)
    return features, labels

var_x = torch.from_numpy(data_x_1).float()
var_y = torch.from_numpy(data_y).long()

var_x = var_x.to(device)
var_y = var_y.to(device)

var_x_shape = var_x.shape
var_y_shape = var_y.shape


batch_size = 8  
dataset_lstm = TensorDataset(var_x, var_y)
dataloader_lstm = DataLoader(dataset_lstm, batch_size=batch_size, shuffle=False)
print('extract_features(model_lstm)')
# استخراج ویژگی‌ها از LSTM
checkpoint = torch.load("/kaggle/input/models/lstm_mr.pth", map_location=device)
checkpoint = remove_module_prefix(checkpoint['best_weight'])
model_lstm = LSTMM(var_x_shape, (var_y_shape[0],30)).to(device)
model_lstm.load_state_dict(checkpoint)  
features_lstm, labels_lstm = extract_features(model_lstm, dataloader_lstm, device, model_type='lstm')




extract_features(model_lstm)


  checkpoint = torch.load("/kaggle/input/models/lstm_mr.pth", map_location=device)


In [7]:
!pip install tqdm
from tqdm import tqdm

def build_resnet(var_y_shape):
    model_resnet = models.resnet18(weights=None)  # استفاده از 'weights' به جای 'pretrained'

    # تغییر لایه کانولوشن اول برای پذیرش ورودی با 1 کانال
    model_resnet.conv1 = torch.nn.Conv2d(
        in_channels=1,
        out_channels=64,
        kernel_size=7,
        stride=2,
        padding=3,
        bias=False
    )

    # تغییر لایه Fully Connected نهایی برای خروجی تعداد کلاس‌های مورد نظر
    in_features_fc = model_resnet.fc.in_features  # معمولاً 512
    out_features_fc = var_y_shape[-1]  # 30
    model_resnet.fc = torch.nn.Linear(in_features_fc, out_features_fc)
    
    return model_resnet

def extract_features(model, dataloader, device, model_type='resnet'):
    model.eval()
    features = []
    labels_list = []

    with torch.no_grad():
        for batch in tqdm(dataloader):
            inputs, labels = batch
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            var_features = model(inputs)  # خروجی مدل ویژگی‌های 512 بعدی است
            
            var_features = var_features.view(var_features.size(0), -1)  # Flatten
            features.append(var_features.cpu().numpy())
            labels_list.append(labels.cpu().numpy())
            
            
    features = np.concatenate(features, axis=0)
    
    labels = np.concatenate(labels_list, axis=0)
    return features, labels

var_x = torch.from_numpy(data_x_1).float()
var_y = torch.from_numpy(data_y).long()

var_x = var_x.to(device)
var_y = var_y.to(device)

var_x_shape = var_x.shape
var_y_shape = var_y.shape
batch_size = 8
print('extract_features(model_resnet)')
checkpoint1 = torch.load("/kaggle/input/models/resnet_mr.pth", map_location=device)
checkpoint1 = remove_module_prefix(checkpoint1)
model_resnet = build_resnet((var_y_shape[0], 30)).to(device)
model_resnet.load_state_dict(checkpoint1)

# تغییر لایه‌ی FC به Identity برای استخراج ویژگی‌های 512 بعدی
model_resnet.fc = nn.Identity()

var_x_resnet = var_x.unsqueeze(1)  # افزودن بعد کانال
dataset_resnet = TensorDataset(var_x_resnet, var_y)
dataloader_resnet = DataLoader(dataset_resnet, batch_size=batch_size, shuffle=False)

# استخراج ویژگی‌ها از ResNet
features_resnet, labels_resnet = extract_features(model_resnet, dataloader_resnet, device, model_type='resnet')


extract_features(model_resnet)


  checkpoint1 = torch.load("/kaggle/input/models/resnet_mr.pth", map_location=device)
100%|██████████| 236/236 [20:19<00:00,  5.17s/it]


In [8]:

# اطمینان از هم‌خوانی برچسب‌ها
assert np.array_equal(labels_lstm, labels_resnet), "برچسب‌ها با هم همخوانی ندارند"
print('feature concate')
# ترکیب ویژگی‌ها
features_combined = np.concatenate([features_lstm, features_resnet], axis=1)

# آماده‌سازی داده‌ها برای Random Forest
X = features_combined
y = labels_lstm  # یا labels_resnet

# تقسیم داده‌ها به مجموعه آموزش و تست (اختیاری)
from sklearn.model_selection import train_test_split
print('test train splite')
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

y_train = y_train.reshape((y_train.shape[0],30))
y_test = y_test.reshape((y_test.shape[0],30))
print('X_train',X_train.shape)
print('X_test',X_test.shape)
print('y_train',y_train.shape)
print('y_test',y_test.shape)

print('random forest start')
rf_classifier = RandomForestClassifier(n_estimators=100, random_state=42)
rf_classifier.fit(X_train, y_train)

# پیش‌بینی و ارزیابی مدل
y_pred = rf_classifier.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"Accuracy of Random Forest: {accuracy:.4f}")


feature concate
test train splite
X_train (1504, 1024)
X_test (377, 1024)
y_train (1504, 30)
y_test (377, 30)
random forest start
Accuracy of Random Forest: 0.2016
