In [2]:
from cerebralcortex import Kernel
import json
# from IPython.display import display
# import cufflinks as cf
from plotly.offline import iplot, init_notebook_mode
# import plotly.graph_objs as go
# import plotly.figure_factory as ff
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
import random
from cerebralcortex.core.datatypes import DataStream
from cerebralcortex.core.metadata_manager.stream.metadata import Metadata, DataDescriptor, ModuleMetadata
from cerebralcortex.core.util.spark_helper import get_or_create_sc
import ipywidgets as widgets


init_notebook_mode(connected=True)
pd.set_option('display.max_rows',5)

In [3]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

In [4]:
CC = Kernel("/home/jupyter/cc3_conf/", study_name='mperf')

accelerometer_right_wrist_stream_name = 'accelerometer--org.md2k.motionsense--motion_sense_hrv--right_wrist'
gyroscope_right_wrist_stream_name = 'gyroscope--org.md2k.motionsense--motion_sense_hrv--right_wrist'

all_users = CC.list_users()


In [5]:
def is_correct_accel_data(data):
#     data = np.array([accel_x, accel_y, accel_z]).T
    mg = np.mean([np.linalg.norm(x) for x in data])
    if mg < 0.8:
        return 0
    return 1
udf_is_correct_accel_data = F.udf(is_correct_accel_data, IntegerType())

def correct_accel(data, is_correct):
    if is_correct == 1:
        return data
    data = np.array(data)*2
    return data.tolist()
udf_correct_accel = F.udf(correct_accel, ArrayType(DoubleType()))


In [6]:
schema = StructType([StructField("count", IntegerType())])
@F.pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
def get_data_count_udf(user_data):
#     if len(user_data.index) < 600:
#         return pd.DataFrame([], columns=['timestamp', 'accel', 'gyro'])
    is_movement = []
    correct_data = []
    for index, row in user_data.iterrows():
        al = len(row['accelerometer_x'])
        gl = len(row['gyroscope_x'])
        if al<400:  # 25 * 20 * 60% (frequency * window_size * threshold%)
            is_movement.append(0)
            correct_data.append(1)
            continue
    
        accel_data = np.array([*row['accelerometer_x'], *row['accelerometer_y'], *row['accelerometer_z']])
        
        accel_data = accel_data.reshape(-1, al)
        accel_data = accel_data.T
        if not is_correct_accel_data(accel_data):
            ax = 2*np.array(row['accelerometer_x']).tolist()
            ay = 2*np.array(row['accelerometer_y']).tolist()
            az = 2*np.array(row['accelerometer_z']).tolist()
            accel_data = np.array([*ax, *ay, *az])
            accel_data = accel_data.reshape(-1, al)
            accel_data = accel_data.T
            correct_data.append(0)
        else:
            correct_data.append(1)
        mg = [np.linalg.norm(x) for x in accel_data]
        sd= np.std(mg)
        if sd >= 0.21:
            is_movement.append(1)
        else:
            is_movement.append(0)
    user_data['is_movement'] = is_movement
    user_data['correct'] = correct_data
    data_movement = user_data[user_data['is_movement']==1]
    return pd.DataFrame(data={'count':[len(data_movement.index)]})

In [7]:
schema = StructType([StructField("timestamp", TimestampType()), StructField("accel", ArrayType(ArrayType(DoubleType()))), StructField("gyro", ArrayType(ArrayType(DoubleType())))])
# schema = StructType([StructField("count", IntegerType())])
@F.pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
def get_data_udf(user_data):
#     if len(user_data.index) < 600:
#         return pd.DataFrame([], columns=['timestamp', 'accel', 'gyro'])
    is_movement = []
    correct_data = []
    for index, row in user_data.iterrows():
        al = len(row['accelerometer_x'])
        gl = len(row['gyroscope_x'])
        if al<400:  # 25 * 20 * 60% (frequency * window_size * threshold%)
            is_movement.append(0)
            correct_data.append(1)
            continue
    
        accel_data = np.array([*row['accelerometer_x'], *row['accelerometer_y'], *row['accelerometer_z']])
        
        accel_data = accel_data.reshape(-1, al)
        accel_data = accel_data.T
        if not is_correct_accel_data(accel_data):
            ax = 2*np.array(row['accelerometer_x']).tolist()
            ay = 2*np.array(row['accelerometer_y']).tolist()
            az = 2*np.array(row['accelerometer_z']).tolist()
            accel_data = np.array([*ax, *ay, *az])
            accel_data = accel_data.reshape(-1, al)
            accel_data = accel_data.T
            correct_data.append(0)
        else:
            correct_data.append(1)
        mg = [np.linalg.norm(x) for x in accel_data]
        sd= np.std(mg)
        if sd >= 0.21:
            is_movement.append(1)
        else:
            is_movement.append(0)
    user_data['is_movement'] = is_movement
    user_data['correct'] = correct_data
    data_movement = user_data[user_data['is_movement']==1]
    current = len(data_movement.index)
    
    if len(data_movement.index) == 0:
        return pd.DataFrame([], columns=['timestamp', 'accel', 'gyro'])
#     leap = 1# int(np.floor(len(data_movement.index) / 600))
    ts = []
    adt = []
    gdt = []
    tot = int(np.ceil(len(data_movement.index)*600/user_data['Available'].iloc[0]))
    for i in range(len(data_movement.index)):
        row = data_movement.iloc[i]
        al = len(row['accelerometer_x'])
        gl = len(row['gyroscope_x'])
        ts.append(row['timestamp'])
        if row['correct']==1:
            accel_data = np.array([*row['accelerometer_x'], *row['accelerometer_y'], *row['accelerometer_z']])
        else:
            ax = 2*np.array(row['accelerometer_x']).tolist()
            ay = 2*np.array(row['accelerometer_y']).tolist()
            az = 2*np.array(row['accelerometer_z']).tolist()
            accel_data = np.array([*ax, *ay, *az])
                                   
        gyro_data = np.array([*row['gyroscope_x'], *row['gyroscope_y'], *row['gyroscope_z']])
        accel_data = accel_data.reshape(-1, al)
        gyro_data = gyro_data.reshape(-1, gl)
        accel_data = accel_data.T
        gyro_data = gyro_data.T
        
        adt.append(accel_data.tolist())
        gdt.append(gyro_data.tolist())
        if len(ts)>=tot:
            break
    return pd.DataFrame(data={'timestamp': ts, 'accel': adt, 'gyro': gdt})

In [8]:
def write_log(msg):
    with open('calc_log.txt', 'a+') as f:
        f.write(str(datetime.now())+" => "+msg+"\n")

In [9]:
import pickle
with open('done_users.p', 'rb') as f:
    done_users = pickle.load(f)

In [10]:
mx1, mx2, mx3 = 0, 0, 0
for user in done_users:
    user_id = int(user[6:])
    if user_id < 5000:
        mx1 = max(mx1, user_id)
    elif user_id < 9000:
        mx2 = max(mx2, user_id)
    else:
        mx3 = max(mx3, user_id)

In [None]:
for user in all_users:
    user_id = user['user_id']
#     user_id = 'mperf_1000'
    user_name = user['username']
    if not os.path.exists('data_old/'+user_name+'.csv.gz'):
        continue
    if os.path.exists('data/'+user_name+'.csv.gz'):
        continue
    idd = int(user_name[6:])
#     if idd < 5000:
# #         continue
#         if idd<=mx1:
#             continue
#     elif idd < 9000:
# #         continue
#         if idd<=mx2:
#             continue
#     else:
# #         continue
#         if idd <= mx3:
#             continue
    write_log('Starting for '+user_name)
    try:
        accel_ds = CC.get_stream(accelerometer_right_wrist_stream_name, user_id=user_id).filter("accelerometer_x >= -4 AND accelerometer_x <= 4 AND accelerometer_y >= -4 AND accelerometer_y <= 4 AND accelerometer_z >= -4 AND accelerometer_z <= 4")
        gyro_ds = CC.get_stream(gyroscope_right_wrist_stream_name, user_id=user_id).filter("gyroscope_x >= -250 AND gyroscope_x <= 250 AND gyroscope_y >= -250 AND gyroscope_y <= 250 AND gyroscope_z >= -250 AND gyroscope_z <= 250")
        accel_w = accel_ds.sort(F.asc('timestamp')).window(windowDuration=20)
        gyro_w = gyro_ds.sort(F.asc('timestamp')).window(windowDuration=20)
        gyro_w = gyro_w.drop("version", "user")
        common_ds = accel_w.join(gyro_w, on="window")
        common_ds = common_ds.withColumn('timestamp', F.udf(lambda w: w[0], TimestampType())('window')).drop('window')
#         ali_df=common_ds.withColumn("timestamp",  common_ds.window.start)
#         ali_df.compute(get_data_udf, window=60*60*24)
        windowDuration = "1 day"
        groupbycols = ["user", "version"]

        win = F.window("timestamp", windowDuration=windowDuration)

        groupbycols.append(win)
        count_data = common_ds._data.groupBy(groupbycols).apply(get_data_count_udf)
        total = count_data.agg(F.sum("count")).collect()[0][0]
        if total < 600:
            write_log("No data for "+user_name)
            continue   
        common_ds = common_ds.withColumn('Available', F.lit(total))
        
        data = common_ds._data.groupBy(groupbycols).apply(get_data_udf)
#         CC.save_stream(ali_df)
#     get_stream(version="latest")
        pdf = data.toPandas()
        if len(pdf.index) > 0:
#             pdf['accel'] = pdf['accel'].apply(lambda x: json.loads(x))
#             pdf['gyro'] = pdf['gyro'].apply(lambda x: json.loads(x))
            pdf.to_csv('data/'+user_name+".csv.gz", compression='gzip', index=False)
            write_log('End for '+user_name)
        else:
            write_log("No data for "+user_name)
    except Exception as e:
        write_log('Error for '+user_name)
        write_log(str(e))