In [193]:
#traces
import pandas as pd
import numpy as np
import re
import sys
import nbimporter
#for ordered dict for traces implementation
from collections import OrderedDict

import traces
from datetime import datetime
from datetime import timedelta
#import sources.endomondolib as endo
#import sources.pysparkconvenience as ps
from numpy import array
from math import sqrt

#for timedelta manipulation
from math import fabs

from pyspark.sql.functions import *
from pyspark.mllib.clustering import KMeans, KMeansModel
from pyspark.sql import DataFrameReader
from pyspark.sql import SQLContext
from IPython.display import display, HTML
from pyspark.sql.functions import col
from pyspark.mllib.linalg import Vectors

from pyspark.ml.feature import VectorAssembler

# Disable warnings, set Matplotlib inline plotting and load Pandas package
import warnings
warnings.filterwarnings('ignore')

%matplotlib inline
pd.options.display.mpl_style = 'default'

#%load_ext autotime
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

del min
del max

data_run='/Users/momori/dse/maomori/thesis/git/fitness_capstone/data/run_data_with_hr_spd.csv'
data_users='/Users/momori/dse/maomori/thesis/git/fitness_capstone/data/run_with_hr_users.csv'

In [185]:
#create pyspark dataframe from csv
def df_from_csv(csv_file):
    text = sc.textFile(csv_file)\
        .map(lambda line: line.split(","))
    #didn't work with take(1). believe returns 
    #different object then first()
    schema = text.first()
    data = text.filter(lambda x: x != schema)
    df = sqlContext.createDataFrame(data, schema)
    return df

def change_column_names(df, old_names, new_names):
    pass
    return df;

##input
##   string: 2013-02-22 21:38:45
##output: 
##   list: [u'2013', u'02', u'22', u'21', u'38', u'45']
def datetime_to_trace_time(timestamp):
    #strip double quotes
    timestamp = timestamp.replace('"', '')
    r_list = re.split('[- :]', timestamp)
    return [int(x) for x in r_list]

In [10]:

df_runs = df_from_csv(data_run)
df_runs.show(2)


+--------------------+----------+------------+-------------+-------------+------------+-----------+--------+
|              "time"|"altitude"|"heart_rate"|   "latitude"|  "longitude"|     "speed"|"workoutid"|    "id"|
+--------------------+----------+------------+-------------+-------------+------------+-----------+--------+
|"2013-02-22 21:38...|          |   147.00000|51.4394768234|-0.8953504544|6.7140000000|  167479013|13856798|
|"2013-02-22 21:38...|          |   148.00000|51.4392022323|-0.8952703234|9.3492000000|  167479013|13856799|
+--------------------+----------+------------+-------------+-------------+------------+-----------+--------+
only showing top 2 rows



In [261]:
#class for creating timeseries objects. 
#Allows for computation of unknown data points based on the data around the time asked.
#Allows for recreation of datapoints based on the interval 

class TimeSeriesObject():
    data_points = []
    interpolation_type = ''
    time_format = ''
    
    #init:
    #   data_points: list of tuples ("time string", "value") ex: [('"2013-02-22 21:38:45"', 5), ('"2013-02-22 21:38:50"', 10), ('"2013-02-22 21:38:40"', 0)]
    #   i_type: interpolation type (only linear required?)
    #   time_fomat: format of time in input data. default is '"%Y-%m-%d %H:%M:%S"' (t1 = '"2013-02-22 21:38:45"')
    #   is_sorted: is data sorted by time (key)? if not, will sort when object is created
    def __init__(self, d_points = [], i_type='', 
                 time_format='', is_sorted = False):
        if  i_type:
            self.interpolation_type = i_type
        else:
            self.interpolation_type = 'linear'
            
        if  time_format:
            self.time_format = time_format
        else:
            self.time_format = '"%Y-%m-%d %H:%M:%S"'
            
        if not is_sorted:
            d_points = sorted(d_points, key=lambda x: x[0])

        #change first element into datetime objects using time_format
        self.data_points = [(datetime.strptime(x[0], self.time_format), x[1]) for x in d_points]

    
    def describe(self):
        print 'interpolation type: ' + self.interpolation_type
        print 'data points: ' , self.data_points
        print 'time fomrat: ' + self.time_format
        
    #get value at specific time, using interpolation_type
    #time must be in same format of object's time_format
    def value_at(self, time):
        #values at both sides of the datapoint. mintime[0] = time, mintime[1] = value   
        mintime = min(self.data_points, key=lambda x:fabs((x[0] - time).total_seconds())\
                      if time >= x[0] else float('inf'))
        maxtime = min(self.data_points, key=lambda x:fabs((x[0] - time).total_seconds())\
                      if time <= x[0] else float('inf'))

        #check if mintime/maxtime == time. If so, return value
        if (mintime[0]-time).total_seconds()==0:
            return mintime[1]
        if (maxtime[0]-time).total_seconds()==0:
            return maxtime[1]
        
        #value to return
        rval = ''
        
        if self.interpolation_type == 'linear':
            #get time difference and value difference
            total_time_diff = fabs((maxtime[0] - mintime[0]).total_seconds())
            if total_time_diff == 0:
                return 'time_diff is 0'
            val_diff = maxtime[1] - mintime[1]
            sample_time_diff = fabs((time - mintime[0]).total_seconds())
            
            #calculate new value
            rval = mintime[1] + val_diff * sample_time_diff/(total_time_diff)
        if not rval and rval != 0:
            return 'bad interpolation type'
        
        return rval
    
    #returns new datapoints based on the slicing interval based on original data
    #arguments:
    #   intervals: integer value (seconds)
    #   replace: if True, replace current datapoints used for self.
    #what to do if the data cannot be evenly split into intervals?
    def slice(self, intervals, replace = True):
        r_list = []
        
        stepTime = timedelta(0, intervals)
        
        #get min/max times from original data
        mintime = self.data_points[0][0]
        maxtime = self.data_points[-1][0]
        
        #setup time counter
        curTime = mintime
        
        #create slices of data
        while curTime <= maxtime:
            r_list.append((curTime, self.value_at(curTime)))
            curTime += stepTime

        #if replace, change internal data for this object
        self.data_points = r_list
        return r_list

    #TBD:
    #https://pypi.python.org/pypi/python-fastpip/1.1
    #
    def smooth(self):
        pass

In [262]:
in_pattern = '"%Y-%m-%d %H:%M:%S"'
new_time = datetime.strptime('"2013-02-22 21:38:42"', in_pattern)
new_interval = 2
t1 = '"2013-02-22 21:38:45"'
t2 = 5
t3 = '"2013-02-22 21:38:50"'
t4 = 50
t5 = '"2013-02-22 21:38:40"'
t6 = 0
tup1 = (t1,t2)
tup2 = (t3,t4)
tup3 = (t5, t6)
ll = [tup1, tup2, tup3]
print ll
obj = TimeSeriesObject(d_points=ll, is_sorted=False)
obj.describe()
print '\n'
#obj.value_at(new_time)
obj.slice(new_interval, replace = False)

[('"2013-02-22 21:38:45"', 5), ('"2013-02-22 21:38:50"', 50), ('"2013-02-22 21:38:40"', 0)]
interpolation type: linear
data points:  [(datetime.datetime(2013, 2, 22, 21, 38, 40), 0), (datetime.datetime(2013, 2, 22, 21, 38, 45), 5), (datetime.datetime(2013, 2, 22, 21, 38, 50), 50)]
time fomrat: "%Y-%m-%d %H:%M:%S"




[(datetime.datetime(2013, 2, 22, 21, 38, 40), 0),
 (datetime.datetime(2013, 2, 22, 21, 38, 42), 2.0),
 (datetime.datetime(2013, 2, 22, 21, 38, 44), 4.0),
 (datetime.datetime(2013, 2, 22, 21, 38, 46), 14.0),
 (datetime.datetime(2013, 2, 22, 21, 38, 48), 32.0),
 (datetime.datetime(2013, 2, 22, 21, 38, 50), 50)]

In [160]:
in_pattern = '%Y-%m-%d %H:%M:%S'


t1 = datetime.strptime('2013-02-22 21:38:45', in_pattern)
t2 = 5
t3 = datetime.strptime('2013-02-22 21:38:50', in_pattern)
t4 = 10
t5 = datetime.strptime('2013-02-22 21:38:40', in_pattern)
t6 = 4

# d1 = datetime.strptime(t1, in_pattern)
# print d1
# d2 = datetime.strptime(t3, in_pattern)
# print d2 - d1


tup1 = (t1,t2)
tup2 = (t3,t4)
tup3 = (t5, t6)
ll = [tup1, tup2, tup3]
#print ll
#print sorted(ll, key=lambda el: el[0])
new_time = '2013-02-22 21:38:48'
new_time = datetime.strptime('2013-02-22 21:38:48', in_pattern)

#min(ll, key=lambda x:abs(x[0] - new_time))
mintime = min(ll, key=lambda x:fabs((x[0] - new_time).total_seconds()) if new_time > x[0] else float('inf'))
maxtime = min(ll, key=lambda x:fabs((x[0] - new_time).total_seconds()) if new_time < x[0] else float('inf'))

#td1 = fabs((mintime[0] - new_time).total_seconds)

td1 = fabs((mintime[0] - new_time).total_seconds())
td2 = fabs((maxtime[0] - new_time).total_seconds())
print td1, td2
#rise
rise = maxtime[1] - mintime[1]
print rise
print mintime[1] + rise * td1/(td1+td2)

3.0 2.0
5
8.0


In [8]:
tso = TimeSeriesObject()
tso.describe()

linear


In [4]:
##example for workoutid 167479013
wid = 167479013
print type(df_runs)
sample_df = df_runs.where(df_runs['"workoutid"'] == 167479013)
time_series = traces.TimeSeries()
sample_df.show()

<class 'pyspark.sql.dataframe.DataFrame'>
+--------------------+----------+------------+-------------+-------------+-------------+-----------+--------+
|              "time"|"altitude"|"heart_rate"|   "latitude"|  "longitude"|      "speed"|"workoutid"|    "id"|
+--------------------+----------+------------+-------------+-------------+-------------+-----------+--------+
|"2013-02-22 21:38...|          |   147.00000|51.4394768234|-0.8953504544| 6.7140000000|  167479013|13856798|
|"2013-02-22 21:38...|          |   148.00000|51.4392022323|-0.8952703234| 9.3492000000|  167479013|13856799|
|"2013-02-22 21:39...|          |   148.00000|51.4390821196|-0.8953502029|10.6308000000|  167479013|13856800|
|"2013-02-22 21:39...|          |   149.00000|51.4388703089|-0.8954045177|13.3740000000|  167479013|13856801|
|"2013-02-22 21:39...|          |   151.00000|51.4386848174|-0.8952745982|12.6468000000|  167479013|13856802|
|"2013-02-22 21:39...|          |   152.00000|51.4385818038|-0.8951149229|11.6

In [138]:
print tt
print datetime_to_trace_time(tt)

"2013-02-22 21:38:45"
<type 'unicode'>
[2013, 2, 22, 21, 38, 45]


In [140]:
tt = sample_df.select('"time"').first()[0]
print tt

#from_pattern = 'MMM d, yyyy h:mm:ss aa'
from_pattern = '"%Y-%m-%d %H:%M:%S"'
in_tt = datetime_to_trace_time(tt)
print 'masa'
print in_tt
time_series = traces.TimeSeries()
time_series[datetime(in_tt[0], in_tt[1], in_tt[2],\
                     in_tt[3], in_tt[4],\
                    in_tt[5])] = 4

"2013-02-22 21:38:45"
<type 'unicode'>
masa
[2013, 2, 22, 21, 38, 45]


In [147]:
ll = sample_df.select('"time"').head(4)

In [153]:
time_series = traces.TimeSeries()

for row in ll:
    print row
    in_tt = datetime_to_trace_time(row[0])
    time_series[datetime(in_tt[0], in_tt[1], in_tt[2],\
                     in_tt[3], in_tt[4],\
                    in_tt[5])] = 4


Row("time"=u'"2013-02-22 21:38:45"')
Row("time"=u'"2013-02-22 21:38:57"')
Row("time"=u'"2013-02-22 21:39:01"')
Row("time"=u'"2013-02-22 21:39:07"')


In [None]:
time_series.distribution