In [1]:
import os
import sys
import pickle
import numpy as np
import pandas as pd
from glob import glob
from tqdm import tqdm

import dask
from dask.distributed import wait
from dask.distributed import Client, wait, LocalCluster

In [2]:
SENSORS = ['acce','acce_uncali','gyro',
           'gyro_uncali','magn','magn_uncali','ahrs']

NFEAS = {
    'acce': 3,
    'acce_uncali': 3,
    'gyro': 3,
    'gyro_uncali': 3,
    'magn': 3,
    'magn_uncali': 3,
    'ahrs': 3,
    'wifi': 1,
    'ibeacon': 1,
    'waypoint': 3
}

ACOLS = ['timestamp','x','y','z']
        
FIELDS = {
    'acce': ACOLS,
    'acce_uncali': ACOLS,
    'gyro': ACOLS,
    'gyro_uncali': ACOLS,
    'magn': ACOLS,
    'magn_uncali': ACOLS,
    'ahrs': ACOLS,
    'wifi': ['timestamp','ssid','bssid','rssi','last_timestamp'],
    'ibeacon': ['timestamp','code','rssi','last_timestamp'],
    'waypoint': ['timestamp','x','y']
}

def to_frame(data, col):
    cols = FIELDS[col]
    is_dummy = False
    if data.shape[0]>0:
        df = pd.DataFrame(data, columns=cols)
    else:
        df = create_dummy_df(cols)
        is_dummy = True
    for col in df.columns:
        if 'timestamp' in col:
            df[col] = df[col].astype('int64')
    return df, is_dummy

def create_dummy_df(cols):
    df = pd.DataFrame()
    for col in cols:
        df[col] = [0]
        if col in ['ssid','bssid']:
            df[col] = df[col].map(str)
    return df

In [3]:
from dataclasses import dataclass

import numpy as np


@dataclass
class ReadData:
    acce: np.ndarray
    acce_uncali: np.ndarray
    gyro: np.ndarray
    gyro_uncali: np.ndarray
    magn: np.ndarray
    magn_uncali: np.ndarray
    ahrs: np.ndarray
    wifi: np.ndarray
    ibeacon: np.ndarray
    waypoint: np.ndarray


def read_data_file(data_filename):
    acce = []
    acce_uncali = []
    gyro = []
    gyro_uncali = []
    magn = []
    magn_uncali = []
    ahrs = []
    wifi = []
    ibeacon = []
    waypoint = []

    with open(data_filename, 'r', encoding='utf-8') as file:
        lines = file.readlines()

    for line_data in lines:
        line_data = line_data.strip()
        if not line_data or line_data[0] == '#':
            continue

        line_data = line_data.split('\t')

        if line_data[1] == 'TYPE_ACCELEROMETER':
            acce.append([int(line_data[0]), float(line_data[2]), float(line_data[3]), float(line_data[4])])
            continue

        if line_data[1] == 'TYPE_ACCELEROMETER_UNCALIBRATED':
            acce_uncali.append([int(line_data[0]), float(line_data[2]), float(line_data[3]), float(line_data[4])])
            continue

        if line_data[1] == 'TYPE_GYROSCOPE':
            gyro.append([int(line_data[0]), float(line_data[2]), float(line_data[3]), float(line_data[4])])
            continue

        if line_data[1] == 'TYPE_GYROSCOPE_UNCALIBRATED':
            gyro_uncali.append([int(line_data[0]), float(line_data[2]), float(line_data[3]), float(line_data[4])])
            continue

        if line_data[1] == 'TYPE_MAGNETIC_FIELD':
            magn.append([int(line_data[0]), float(line_data[2]), float(line_data[3]), float(line_data[4])])
            continue

        if line_data[1] == 'TYPE_MAGNETIC_FIELD_UNCALIBRATED':
            magn_uncali.append([int(line_data[0]), float(line_data[2]), float(line_data[3]), float(line_data[4])])
            continue

        if line_data[1] == 'TYPE_ROTATION_VECTOR':
            if len(line_data)>=5:
                ahrs.append([int(line_data[0]), float(line_data[2]), float(line_data[3]), float(line_data[4])])
            continue

        if line_data[1] == 'TYPE_WIFI':
            sys_ts = line_data[0]
            ssid = line_data[2]
            bssid = line_data[3]
            rssi = line_data[4]
            lastseen_ts = line_data[6]
            wifi_data = [sys_ts, ssid, bssid, rssi, lastseen_ts]
            wifi.append(wifi_data)
            continue

        if line_data[1] == 'TYPE_BEACON':
            ts = line_data[0]
            uuid = line_data[2]
            major = line_data[3]
            minor = line_data[4]
            rssi = line_data[6]
            lastts = line_data[-1]
            ibeacon_data = [ts, '_'.join([uuid, major, minor]), rssi, lastts]
            ibeacon.append(ibeacon_data)
            continue

        if line_data[1] == 'TYPE_WAYPOINT':
            waypoint.append([int(line_data[0]), float(line_data[2]), float(line_data[3])])

    acce = np.array(acce)
    acce_uncali = np.array(acce_uncali)
    gyro = np.array(gyro)
    gyro_uncali = np.array(gyro_uncali)
    magn = np.array(magn)
    magn_uncali = np.array(magn_uncali)
    ahrs = np.array(ahrs)
    wifi = np.array(wifi)
    ibeacon = np.array(ibeacon)
    waypoint = np.array(waypoint)

    return ReadData(acce, acce_uncali, gyro, gyro_uncali, magn, magn_uncali, ahrs, wifi, ibeacon, waypoint)

The main changes made are these two lines:
```
lastts = line_data[-1] # last timestamp
ibeacon_data = [ts, '_'.join([uuid, major, minor]), rssi, lastts]
```

In [4]:
def get_test_df(PATH):
    """ Function to read submission csv and return as timesorted df"""
    dtest = pd.read_csv(f'{PATH}/sample_submission.csv')
    dtest['building'] = dtest['site_path_timestamp'].apply(lambda x: x.split('_')[0])
    dtest['path'] = dtest['site_path_timestamp'].apply(lambda x: x.split('_')[1])
    dtest['timestamp'] = dtest['site_path_timestamp'].apply(lambda x: x.split('_')[2])
    dtest['timestamp'] = dtest['timestamp'].astype('int64')
    dtest = dtest.sort_values(['path','timestamp']).reset_index(drop=True)
    return dtest


def get_test_dfs(PATH, test_files):
    """
    function reads test path files and returns a dict with 
    key = pathFileName, value  = submission csv for path file
    """
    dtest = get_test_df(PATH)
    buildings = set(dtest['building'].values.tolist())
    dws = {}
    for fname in tqdm(test_files):
        path = fname.split('/')[-1].split('.')[0]
        mask = dtest['path'] == path
        dws[fname] = dtest.loc[mask, ['timestamp','x','y','floor','building','site_path_timestamp']].copy().reset_index(drop=True)
    return dws


def get_time_gap(name):
    """
    Function reads a path file, finds ibeacon data if present,
    returns the gap that maps from fake timestamp to actual timestamp for the 
    particular path file
    """
    data = read_data_file(name)
    db,no_ibeacon = to_frame(data.ibeacon,'ibeacon')
    
    # ibeacon data not present
    if no_ibeacon == True:
        if data.wifi.shape[0] > 0:
            estimatedGap = max(data.wifi[:,-1].astype('int') - data.wifi[:,0].astype('int'))
            return estimatedGap, False
    
    # ibeacon data is present in given path file
    else:
        gap = db['last_timestamp'] - db['timestamp']
        assert gap.unique().shape[0]==1
        return gap.values[0],no_ibeacon

def fix_timestamp_test(df, gap):
    df['real_timestamp'] = df['timestamp'] + gap
    return df

In [5]:
# set n_workers to number of cores
client = Client(n_workers=2, threads_per_worker=1)
client

0,1
Client  Scheduler: tcp://127.0.0.1:41831  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 2  Cores: 2  Memory: 7.23 GiB


### Read data

In [6]:
PATH = '.'
#train_files = glob(f'{PATH}/train/*/*/*.txt')
dtest = get_test_df(PATH)
dtest.head(3)

Unnamed: 0,site_path_timestamp,floor,x,y,building,path,timestamp
0,5da1389e4db8ce0c98bd0547_00ff0c9a71cc37a2ebdd0...,0,75.0,75.0,5da1389e4db8ce0c98bd0547,00ff0c9a71cc37a2ebdd0f05,10
1,5da1389e4db8ce0c98bd0547_00ff0c9a71cc37a2ebdd0...,0,75.0,75.0,5da1389e4db8ce0c98bd0547,00ff0c9a71cc37a2ebdd0f05,4048
2,5da1389e4db8ce0c98bd0547_00ff0c9a71cc37a2ebdd0...,0,75.0,75.0,5da1389e4db8ce0c98bd0547,00ff0c9a71cc37a2ebdd0f05,12526


In [7]:
test_sites = dtest['building'].unique()
train_files = []
for i in test_sites:
    train_files.extend(glob(f'{PATH}/train/{i}/*/*.txt'))

In [8]:
test_files = glob(f'{PATH}/test/*.txt')
test_dfs = get_test_dfs(PATH, test_files)

100%|██████████| 626/626 [00:01<00:00, 451.74it/s]


In [9]:
print(len(train_files), len(test_files), len(list(test_dfs.keys())))

10877 626 626


`test_dfs` is a dictionary which maps the file path to its waypoint dataframe.

### How to recover the real timestamp

In the [webinar](https://youtu.be/xt3OzMC-XMU?t=690), the host mentioned that for `ibeacon`, the `timestamp` and the `last_timestamp` are the same timestamps. We can verify this claim by checking the training ibeacon data. 

In [10]:
fname = train_files[4]
fname
data = read_data_file(fname)
db,no_ibeacon = to_frame(data.ibeacon,'ibeacon')
print(db.head())
print((db['timestamp']==db['last_timestamp']).all())

   timestamp  code  rssi  last_timestamp
0          0     0     0               0
True


In [11]:
no_ibeacon

True

In [12]:
wifi_db, no_wifiData = to_frame(data.wifi, 'wifi')

In [13]:
wifi_db.shape

(363, 5)

In [14]:
wifi_db.head()

Unnamed: 0,timestamp,ssid,bssid,rssi,last_timestamp
0,1573292530396,89ca3c80b5f8d85b205ba7ec76a654db26c553af,f2472445a8662d81689a985992de065e61a47db5,-63,1573292508052
1,1573292530396,89ca3c80b5f8d85b205ba7ec76a654db26c553af,f910bdb300a735f7d475b31cc192d8aef54c4ede,-65,1573292523371
2,1573292530396,70bdd3ed8f22721d8347797977d6b6d7377fe00d,732433e46154dc8e3ec32650a8004c5e52bbae67,-67,1573292508064
3,1573292530396,3c1e7602176e050694e3a5cf8ba5f6f725e3ec51,889bfa434d66eed8c386ccbc90f445932c43f8dd,-69,1573292528847
4,1573292530396,d0af9d9c2709796ee07a0432de0e26298a64e3e8,11567178cc5ca582a37c4733207c77739e1bf5fd,-69,1573292523065


I also checked every other train files. The claim is true for all of them. Next, let's look at one test ibeacon data. 

In [15]:
fname = test_files[0]
data = read_data_file(fname)
db,no_ibeacon = to_frame(data.ibeacon,'ibeacon')
db.head()

Unnamed: 0,timestamp,code,rssi,last_timestamp
0,40672,d7a0f935b19a856d78cbc83a2d3ccc5cacc01891_0716d...,-97,1573385706700
1,70666,d7a0f935b19a856d78cbc83a2d3ccc5cacc01891_0716d...,-91,1573385736694
2,72673,d7a0f935b19a856d78cbc83a2d3ccc5cacc01891_0716d...,-97,1573385738701


The `timestamp` and the `last_timestamp` are obviously different. But if we look closely, the gap between them are actually constant.

In [16]:
db['gap'] = db['last_timestamp'] - db['timestamp']
db['gap'].unique()

array([1573385666028])

Hence, an intuitive guess is this `gap` is artificially introduced when preparing test data and we could use this `gap` to fix timestamps of `waypoints`, `wifi`, etc.

#### Fix one test waypoint

In [17]:
fname = test_files[0]
gap,no_ibeacon = get_time_gap(fname)
df = fix_timestamp_test(test_dfs[fname], gap)
df[['timestamp','real_timestamp','site_path_timestamp']]

Unnamed: 0,timestamp,real_timestamp,site_path_timestamp
0,13,1573385666041,5d2709e003f801723c32d896_ad733a1791d3fdd215326...
1,7202,1573385673230,5d2709e003f801723c32d896_ad733a1791d3fdd215326...
2,17599,1573385683627,5d2709e003f801723c32d896_ad733a1791d3fdd215326...
3,28262,1573385694290,5d2709e003f801723c32d896_ad733a1791d3fdd215326...
4,43292,1573385709320,5d2709e003f801723c32d896_ad733a1791d3fdd215326...
5,50504,1573385716532,5d2709e003f801723c32d896_ad733a1791d3fdd215326...
6,57090,1573385723118,5d2709e003f801723c32d896_ad733a1791d3fdd215326...
7,66144,1573385732172,5d2709e003f801723c32d896_ad733a1791d3fdd215326...
8,76110,1573385742138,5d2709e003f801723c32d896_ad733a1791d3fdd215326...
9,85614,1573385751642,5d2709e003f801723c32d896_ad733a1791d3fdd215326...


### Fix all test waypoints using DASK

In [18]:
%%time
"""
for each test path file, find the time gap based on ibeacon or wifi data
futures list holds the gap,ibeacon data for each test path file
"""
futures = []
for fname in tqdm(test_files, total=len(test_files)):
    f = client.submit(get_time_gap,fname)
    futures.append(f)

"""
futures2 holds the df for each test path file corrected with timestamp
dask result df instance
"""
futures2 = []
no_ibeacon_list = []
testFileTimeGap = {}
for f,fname in tqdm(zip(futures, test_files), total=len(test_files)):
    gap,no_ibeacon = f.result()
    testFileTimeGap[fname.split('/')[-1].rstrip('.txt')] = gap
    no_ibeacon_list.append(no_ibeacon)
    f = client.submit(fix_timestamp_test, test_dfs[fname], gap)
    futures2.append(f)
    
fixed_test_dfs = {}
for f,fname in tqdm(zip(futures2, test_files), total=len(test_files)):
    fixed_test_dfs[fname] = f.result()
    
fix_summary = pd.DataFrame({'file':test_files, 'no_ibeacon':no_ibeacon_list})
fix_summary.head()

100%|██████████| 626/626 [00:00<00:00, 2653.73it/s]
100%|██████████| 626/626 [01:09<00:00,  9.06it/s]
100%|██████████| 626/626 [00:01<00:00, 315.16it/s]

CPU times: user 7.62 s, sys: 498 ms, total: 8.12 s
Wall time: 1min 11s





Unnamed: 0,file,no_ibeacon
0,./test/ad733a1791d3fdd21532677b.txt,False
1,./test/a1a32c28854aeef857ef79a5.txt,False
2,./test/29781f56d67d950acd783d29.txt,False
3,./test/2bdf91b83e265e197ac7b117.txt,False
4,./test/cbe53b25f9fd9c9acea1e6d1.txt,False


In [19]:
fix_summary['no_ibeacon'].mean()

0.0

**Before fix**

In [20]:
fname = test_files[1]
test_dfs[fname].head()[['timestamp','site_path_timestamp']]

Unnamed: 0,timestamp,site_path_timestamp
0,15,5da138b74db8ce0c98bd4774_a1a32c28854aeef857ef7...
1,5922,5da138b74db8ce0c98bd4774_a1a32c28854aeef857ef7...
2,9719,5da138b74db8ce0c98bd4774_a1a32c28854aeef857ef7...
3,12942,5da138b74db8ce0c98bd4774_a1a32c28854aeef857ef7...
4,22163,5da138b74db8ce0c98bd4774_a1a32c28854aeef857ef7...


**After fix**

In [21]:
fixed_test_dfs[fname].head()[['timestamp','real_timestamp','site_path_timestamp']]

Unnamed: 0,timestamp,real_timestamp,site_path_timestamp
0,15,1571884299608,5da138b74db8ce0c98bd4774_a1a32c28854aeef857ef7...
1,5922,1571884305515,5da138b74db8ce0c98bd4774_a1a32c28854aeef857ef7...
2,9719,1571884309312,5da138b74db8ce0c98bd4774_a1a32c28854aeef857ef7...
3,12942,1571884312535,5da138b74db8ce0c98bd4774_a1a32c28854aeef857ef7...
4,22163,1571884321756,5da138b74db8ce0c98bd4774_a1a32c28854aeef857ef7...


You can use the same method to fix test data `wifi` dataframes.

In [22]:
list(testFileTimeGap.keys())[0:5]

['ad733a1791d3fdd21532677b',
 'a1a32c28854aeef857ef79a5',
 '29781f56d67d950acd783d29',
 '2bdf91b83e265e197ac7b117',
 'cbe53b25f9fd9c9acea1e6d1']

In [24]:
# write time gap for each test path file to file
outputFile = open("testFileTimeStampGap.pkl", "wb")
pickle.dump(testFileTimeGap, outputFile)
outputFile.close()