In [2]:
import os, subprocess, json, time, pickle
from sklearn.ensemble import RandomForestClassifier
from sklearn.feature_extraction import DictVectorizer
from sklearn.pipeline import make_pipeline
from sklearn.model_selection import cross_val_score
from collections import Counter

In [3]:
folder_path = os.getcwd()
data_path = folder_path+"\data"
model_file = folder_path + "\model.pkl"

def parse_output(output):
    ssid = bssid = quality = None
    ssid_line = bssid_line = -100
    results = []
    for num, line in enumerate(output.split("\n")):
        line = line.strip()
        if line.startswith("SSID"):
            ssid = " ".join(line.split()[3:]).strip()
            ssid_line = num
            if ssid == '':
                ssid = 'None'
        elif line.startswith("BSSID"):
            bssid = ":".join(line.split(":")[1:]).strip()
            bssid_line = num
        elif num == bssid_line + 1:
            quality = int(":".join(line.split(":")[1:]).strip().replace("%", ""))
            if bssid is not None:
                ap = {"ssid":ssid, "bssid":bssid, "quality" : quality}
                results.append(ap)
    return results

def make_str(output):
    try:                        output = output.decode("utf8",errors='ignore')
    except UnicodeDecodeError:  output = output.decode("utf16",errors='ignore')
    except AttributeError:      pass
    return                      output

def get_sample():
    netsh = subprocess.Popen("netsh wlan show networks mode=bssid", stdout=subprocess.PIPE, shell=True)
    (out, _) = netsh.communicate()
    results = parse_output(make_str(out))
    sample = {ap['ssid'] + " " + ap['bssid']: ap['quality'] for ap in results}
    return sample



def get_pipeline(clf=RandomForestClassifier(n_estimators=100, class_weight="balanced")):
    return make_pipeline(DictVectorizer(sparse=False), clf)

def get_train_data(folder=None):
    X = []
    y = []
    for file_name in os.listdir(folder):
        if file_name.endswith(".txt"):
            data = []
            with open(os.path.join(folder, file_name)) as f:
                for line in f:
                    data.append(json.loads(line))
            X.extend(data)
            y.extend([file_name.rstrip(".txt")] * len(data))
    return X, y

def get_model():
    try:
        model_file = folder_path + "\model.pkl"
        with open(model_file, "rb") as f:
            lp = pickle.load(f)
        return lp
    except: raise ValueError("Can not find model file!")

def train_model():
    X, y = get_train_data(data_path)
    if len(X) == 0: raise ValueError("Can not find any trained locations!")
    lp = get_pipeline()
    lp.fit(X, y)
    with open(model_file, "wb") as f:
        pickle.dump(lp, f)

def learn(label, n=1):
    label_path = os.path.join(data_path, label + ".txt")
    try:
        new_sample = get_sample()
        if new_sample:
            write_data(label_path, new_sample)
            print("Done, number of measurement of", locations(data_path, label))
    except: pass
    train_model()
    
def write_data(label_path, data):
    with open(label_path, "a") as f:
        f.write(json.dumps(data))
        f.write("\n")


def locations(path=None, loc=None):
    _, y = get_train_data(path)
    if len(y) == 0: 
        raise ValueError("Can not find any trained locations!")
    else:
        occurrences = Counter(y)
        if loc:
            for key, value in occurrences.items():
                if key == loc:
                    return("{}: {}".format(key, value))
        else:
            for key, value in occurrences.items():
                print("{}: {}".format(key, value))

def predict_proba():
    lp = get_model()
    data_sample = get_sample()
    print(json.dumps(dict(zip(lp.classes_, lp.predict_proba(data_sample)[0]))))

def predict():
    lp = get_model()
    data_sample = get_sample()
    return lp.predict(data_sample)[0]


def crossval(clf=None, X=None, y=None, folds=10, n=5):
    if X is None or y is None:
        X, y = get_train_data(data_path)
    if len(X) < folds:  raise ValueError('There are not enough samples ({}). Need at least {}.'.format(len(X), folds))
    clf = clf or get_model()
    tot = 0
    print("KFold folds={}, running {} times".format(folds, n))
    for i in range(n):
        res = cross_val_score(clf, X, y, cv=folds).mean()
        tot += res
        print("{}/{}: {}".format(i + 1, n, res))
    print("-------- total --------")
    print(tot / n)

In [4]:
locations(data_path)

home_bedroom: 69
home_kitchen: 30
home_livingroom: 142
home_loby: 30
office_desk: 81
office_loby: 25
office_meetingroom: 45
office_view: 10


In [5]:
learn("office_desk")

Done, number of measurement of office_desk: 82


In [6]:
predict_proba()

{"home_bedroom": 0.0, "home_kitchen": 0.0, "home_livingroom": 0.0, "home_loby": 0.0, "office_desk": 0.98, "office_loby": 0.0, "office_meetingroom": 0.02, "office_view": 0.0}
