# Sony Lifelog: Data science of sleep

Author: Jesper Bitsch, s113730@student.dtu.dk

This is a notebook for converting and cleaning a part of the Sony Lifelog dataset. The notebook should be read in conjuction with the thesis.

Links:
<ul>
<li><a href="http://spark.apache.org/docs/2.0.0/api/python/_modules/pyspark/sql/functions.html">PySpark SQL functions</a></li>
</ul>

# Imports 

In [None]:
import time
import datetime
from dateutil.parser import parse
import operator
import matplotlib as matplotlib
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import sys
import pickle
import math
from collections import Counter
matplotlib.style.use("ggplot")
%matplotlib inline

In [2]:
#yarn application --list
#yarn application -kill

In [3]:
#from pyspark import SparkConf, SparkContext, StorageLevel
#from pyspark.sql import SQLContext, Row
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, DateType, BooleanType, IntegerType, TimestampType, DoubleType, FloatType, StringType, StructField, StructType

# Variables

In [4]:
pre_path = ""
bucket_path = ""
timebin_interval = 5 #Change to what ever timeinterval we want (In minutes)

In [5]:
#start and end dates for the selected months
start_end_date_times_for_periods = [
    (datetime.datetime(2015, 10, 1,0,0,0),datetime.datetime(2015, 10, 31, 23, 59, 0)),
    (datetime.datetime(2015, 11, 1,0,0,0),datetime.datetime(2015, 11, 30, 23, 59, 0)),
    (datetime.datetime(2015, 12, 1,0,0,0),datetime.datetime(2015, 12, 31, 23, 59, 0)),
    (datetime.datetime(2016, 1, 1,0,0,0),datetime.datetime(2016, 1, 31, 23, 59, 0)),
    (datetime.datetime(2016, 2, 1,0,0,0),datetime.datetime(2016, 2, 29, 23, 59, 0)),
    (datetime.datetime(2016, 3, 1,0,0,0),datetime.datetime(2016, 3, 31, 23, 59, 0)),
    (datetime.datetime(2016, 4, 1,0,0,0),datetime.datetime(2016, 4, 30, 23, 59, 0)),
    (datetime.datetime(2016, 5, 1,0,0,0),datetime.datetime(2016, 5, 31, 23, 59, 0)),
    (datetime.datetime(2016, 6, 1,0,0,0),datetime.datetime(2016, 6, 30, 23, 59, 0)),
    (datetime.datetime(2016, 7, 1,0,0,0),datetime.datetime(2016, 7, 31, 23, 59, 0)),
    (datetime.datetime(2016, 8, 1,0,0,0),datetime.datetime(2016, 8, 31, 23, 59, 0)),
    (datetime.datetime(2016, 9, 1,0,0,0),datetime.datetime(2016, 9, 30, 23, 59, 0)),
    (datetime.datetime(2016, 10, 1,0,0,0),datetime.datetime(2016, 10, 31, 23, 59, 0))
]

#Folders which is used for the study
folders_to_load = [
    "yearmonth=201510", "yearmonth=201511", "yearmonth=201512", "yearmonth=201601", "yearmonth=201602", 
    "yearmonth=201603", "yearmonth=201604", "yearmonth=201605", "yearmonth=201606", "yearmonth=201607",
    "yearmonth=201608", "yearmonth=201609", "yearmonth=201610"
]

#start and end datetimes for the selected months. 
start_end_times_for_periods = [
    (datetime.date(2015, 10, 1),datetime.date(2015, 10, 31)),
    (datetime.date(2015, 11, 1),datetime.date(2015, 11, 30)),
    (datetime.date(2015, 12, 1),datetime.date(2015, 12, 31)),
    (datetime.date(2016, 1, 1),datetime.date(2016, 1, 31)),
    (datetime.date(2016, 2, 1),datetime.date(2016, 2, 29)),
    (datetime.date(2016, 3, 1),datetime.date(2016, 3, 31)),
    (datetime.date(2016, 4, 1),datetime.date(2016, 4, 30)),
    (datetime.date(2016, 5, 1),datetime.date(2016, 5, 31)),
    (datetime.date(2016, 6, 1),datetime.date(2016, 6, 30)),
    (datetime.date(2016, 7, 1),datetime.date(2016, 7, 31)),
    (datetime.date(2016, 8, 1),datetime.date(2016, 8, 31)),
    (datetime.date(2016, 9, 1),datetime.date(2016, 9, 30)),
    (datetime.date(2016, 10, 1),datetime.date(2016, 10, 31))
]

# Format the sleep dataset

Check the total count and what influence it has when removing all SWR10 and users with less than 10 registered sleep entries

In [None]:
arr = []
for folder in folders_to_load:
    arr.append(pre_path+"sleepactivity/"+folder+"/*.avro")
    
df = (sqlContext.read.format("com.databricks.spark.avro").load(arr))

un_all = df.select("useruuid").distinct().collect()
un_all_limit = df.select("useruuid").groupBy("useruuid").count().where("count > 9").collect()  

df_no_swr10 = df.withColumn('device_name', func_get_device_type_udf(F.col('devices'))).where("device_name != 'swr10'")\
        .select("useruuid")
    
un_no_swr10 = df_no_swr10.distinct().collect()
un_no_swr10_limit = select("useruuid").groupBy("useruuid").count().where("count > 9").collect()


print len(un_all)
print len(un_all_limit)
print len(un_no_swr10)
print len(un_no_swr10_limit)

Now we can format and remove the entries we dont want

In [None]:
for i, folder in enumerate(folders_to_load):
    print "########### Starting with ", folder, "###########"
    
    #Defines the period, so we can remove items which has been added to late. 
    times_for_period = start_end_times_for_periods[i]
    
    #Load the dataset for the given period. 
    df_sleep_raw = (sqlContext.read.format("com.databricks.spark.avro")\
      .load(pre_path+"/sleepactivity/"+folder+"/*.avro")).where("deleted_time == ''")\
      .withColumn('device_name', func_get_device_type_udf(F.col('devices'))).where("device_name != 'swr10'")\
      .select("*", F.col("start_time").cast("date").alias("start_date"), F.col("end_time").cast("timestamp").alias("lookup_time"))\
      .where(F.col("start_date") > times_for_period[0])\
      .where(F.col("start_date") < times_for_period[1])
    
    print "Data loaded"
    
    #Make a dataframe with the newest enty for each of the id's 
    newest_ids = df_sleep_raw.select(F.col("id").alias("dummy_id"),"lookup_time")\
        .groupBy(F.col("dummy_id"))\
        .agg(F.max(F.col("lookup_time")))
        
    print "Newest entries found"
    
    #join the two dataframes
    df_sleep_no_doublets = df_sleep_raw.join(newest_ids, 
                                        (df_sleep_raw["lookup_time"] == newest_ids["max(lookup_time)"]) & 
                                        (df_sleep_raw["id"] == newest_ids["dummy_id"]))
    
    print "Removed duplicates"
    
    #Only take the relevant columns. 
    df_sleep_clean = df_sleep_no_doublets.select(F.col("id"),
                                                 F.col("useruuid"),
                                                 F.col("start_time"),
                                                 F.col("end_time"),
                                                 F.col("device_name"),
                                                 F.col("sleep_state"))
    
    #Merge sleep entries togheter (20 minutes). 
    rdd_sleep = df_sleep_clean.rdd\
        .map(lambda item: (item["useruuid"],item))\
        .groupByKey()\
        .mapValues(map_sleep_items)\
        .flatMap(lambda x : x[1])
        
    print "Merging sleep entries"

    #converts to dataframe
    df_sleep_sleep_merged = rdd_sleep.toDF()
    
    print "Merged sleep entries"
    
    #we want at least 10 entries from the user
    df_sleep_sleep_good_ids = df_sleep_sleep_merged\
        .groupBy(F.col("useruuid").alias("dummy_id")).count()\
        .where(F.col("count") >= 10)
    
    #Join the dataframes
    df_sleep_final = df_sleep_sleep_merged.join(df_sleep_sleep_good_ids, 
                (df_sleep_sleep_merged["useruuid"] == df_sleep_sleep_good_ids["dummy_id"]))\
        .drop("dummy_id").drop("count")
        
    print "Removed userers with less than then 10 entries"
        
    df_sleep_final.write.format('com.databricks.spark.avro').save(bucket_path+"sleep_"+folder)
    
    print "Dataset saved to s3"

    print "Done with folder ", folder
    

### Chronotype, social jet lag etc..

In [29]:
schema = StructType([StructField('useruuid', StringType(), True),
                     StructField('avg_sleep_state', FloatType(), True),
                     StructField('avg_start_work', FloatType(), True),
                     StructField('avg_start_free', FloatType(), True),
                     StructField('avg_end_work', FloatType(), True),
                     StructField('avg_end_free', FloatType(), True),
                     StructField('msw', FloatType(), True),
                     StructField('msf', FloatType(), True),
                     StructField('msf_c', FloatType(), True),
                     StructField('msf_c_month', FloatType(), True),
                     StructField('sj_month', FloatType(), True),
                     StructField('sj', FloatType(), True),
                     StructField('std_start_work', FloatType(), True),
                     StructField('std_end_work', FloatType(), True),
                     StructField('std_start_free', FloatType(), True),
                     StructField('std_end_free', FloatType(), True),
                     StructField('std_start', FloatType(), True),
                     StructField('std_end', FloatType(), True)
])

In [None]:
for folder in folders_to_load:
    df_period = (sqlContext.read.format("com.databricks.spark.avro")\
                .load(bucket_path+"sleep_"+folder+"/*.avro")) 
    
    print "Loaded for", folder
    df_sleep_user_info = df_period.rdd\
        .map(lambda item: (item["useruuid"],item))\
        .groupByKey()\
        .mapValues(map_users_sleep_items_to_info)\
        .map(lambda (key,value) : value).toDF(schema)
    
    print "Mapped info for", folder
    df_sleep_user_info.write.format('com.databricks.spark.avro').save(bucket_path+"sleep_stats_"+folder)
    print "Info set saved to s3", folder

In [76]:
load_all_files_arr = []
for folder in folders_to_load: 
    load_all_files_arr.append(bucket_path+"sleep_"+folder+"/*.avro")
    
df_total = (sqlContext.read.format("com.databricks.spark.avro")\
                .load(load_all_files_arr)) 
            
df_sleep_user_info = df_total.rdd\
        .map(lambda item: (item["useruuid"],item))\
        .groupByKey()\
        .mapValues(map_users_sleep_items_to_info)\
        .map(lambda (key,value) : value).toDF(schema)

df_sleep_user_info.write.format('com.databricks.spark.avro').save(bucket_path+"sleep_stats_total")

In [12]:
def map_sleep_items(items):
    import math, decimal, datetime
    import numpy as np
    from dateutil.parser import parse

    def data_func_from_string(d):
        date = parse(d)
        unaware_date = date.replace(tzinfo=None)
    
        return unaware_date
    
    items = list(items)
    items.sort(key=lambda item:data_func_from_string(item['start_time']), reverse=False)
    
    sleep_items = []
    #Loop through sub 
    sleepItemsToUpdate = {}
    sleepItemsToDelete = []
    delta_merge = datetime.timedelta(minutes=25) #The interval which is allowed between two sleep entries. 

    #Find sleeps to merge
    for i in range(len(items)):
        item = items[i]
        for j in range(i+1,len(items)):
            sub_item = items[j].asDict()
            if item["id"] != sub_item["id"] and \
                data_func_from_string(item["end_time"]) < data_func_from_string(sub_item["start_time"]) and \
                data_func_from_string(item["end_time"]) > data_func_from_string(sub_item["start_time"]) - delta_merge:
                    sleepItemsToDelete.append(sub_item["id"])
                    sleepItemsToUpdate[item["id"]]  = {
                            "id2" : sub_item["id"],
                            "new_endtime_dt" : data_func_from_string(sub_item["end_time"]),
                            "new_endtime_str" : sub_item["end_time"],
                            "new_states" : sub_item["sleep_state"],
                            "start_time_two" : data_func_from_string(sub_item["start_time"])
                            
                        } 
            else:
                #No sleeps close enough to merge. We know that because they are sorted. 
                break

    #Loop through all items to generates new sleep entries. 
    for i in range(len(items)):
        num_awake = 0 #Times awake during a night
        awake_time = float(0) #Total time in seconds a user is awake during a night. 
        item = items[i].asDict()
        item["end_time_dt"] = data_func_from_string(item["end_time"])
        item["start_time_dt"] = data_func_from_string(item["start_time"])
            
        #The item is a subitem to another, and we will therefore discard it. 
        if item["id"] in sleepItemsToDelete:
            continue
            
        newEndDate = item["end_time_dt"]
        newEndDateStr = item["end_time"]
        lookup_id = item["id"]
        #Find entries to which should be updated. 
        while True:
            if lookup_id in sleepItemsToUpdate:
                num_awake += 1
                lookupItem = sleepItemsToUpdate[lookup_id]
                start_time_two = lookupItem["start_time_two"]
                awake_time += (start_time_two-newEndDate).total_seconds()
                newEndDate = lookupItem["new_endtime_dt"]
                newEndDateStr = lookupItem["new_endtime_str"]
                newStates = lookupItem["new_states"]
                item["sleep_state"] += newStates
            
                lookup_id = lookupItem["id2"]
            else:
                break
        
        date = item["start_time_dt"]
        day = -1
        if date.hour < 4:
            day = date.weekday()
            if day == 0:
                day = 6
            else:
                 day -= 1
        else:
             day = date.weekday()
    
        item["weekday"] = day
    
        def calculate_moon_phase(date):
            """Method which returns and int which defienes the moon phase for the given tiem."""
            dec = decimal.Decimal
            now = date.replace(tzinfo=None)

            diff = now - datetime.datetime(2001, 1, 1)
            days = dec(diff.days) + (dec(diff.seconds) / dec(86400))
            lunations = dec("0.20439731") + (days * dec("0.03386319269"))
    
            pos = lunations % dec(1)
    
            index = (pos * dec(8)) + dec("0.5")
            index = math.floor(index)
            
            return int(index) & 7

        def parse_utc(date):
            """Generates unaware UTC datetime from string"""
            parse_date = parse(date)
            date_utc = parse_date - parse_date.utcoffset()
            unaware_date = date_utc.replace(tzinfo=None)
            return unaware_date
        
        def genrate_key(dt,roundTo):
            seconds = (dt - dt.min).seconds
            rounding = (seconds+roundTo/2) // roundTo * roundTo
            new_dt = dt + datetime.timedelta(0,rounding-seconds,-dt.microsecond)
            
            return str(new_dt.hour) + "-" + str(new_dt.minute)
        
        sleep_states = item["sleep_state"]
        deep_sleep_count = sleep_states.count(2)
        
        adjust_sleep_duration = float(0)
        is_awake = False
        is_start = True
        adjust_start_seconds = 0 
        for state in sleep_states:
            if state == 0:
                adjust_sleep_duration += float(60)
                if is_awake == False:
                    num_awake += 1
                is_awake = True
                if is_start == True:
                    adjust_start_seconds += 60
            else:
                is_start = False
                is_awake = False
        
        item["start_time_dt"] = item["start_time_dt"] + datetime.timedelta(seconds=adjust_start_seconds)
        item["week_number_key_user"] = item["useruuid"]+"_"+str(item["start_time_dt"].isocalendar()[1])
        item["end_time_dt"] = newEndDate
        item["end_time"] = newEndDateStr
        item["adjusted_sleep_duration"] = abs((newEndDate-item["start_time_dt"]).total_seconds()) - abs(awake_time) - adjust_sleep_duration
        item["awake_time"] = awake_time
        delta = (item["end_time_dt"]-item["start_time_dt"]).total_seconds()
        item["sleep_duration"] = delta
        item["time_awake"] = num_awake
        
        item["moon_phase"] = calculate_moon_phase(date)
        item["average_sleep"] = float(np.mean(sleep_states))
        item["start_time_dt_utc"] = parse_utc(item["start_time"]) + datetime.timedelta(seconds=adjust_start_seconds)
        item["end_time_dt_utc"] = parse_utc(item["end_time"]) 
        item["start_key"] = genrate_key(item["start_time_dt"],60*15)
        item["end_key"] = genrate_key(item["end_time_dt"],60*15)
        
        if deep_sleep_count > 0:
            item["p_deep_sleep"] = deep_sleep_count/float(len(sleep_states))
            item["time_before_deep_sleep"] = (sleep_states.index(2)+1)*60 #convert to seconds
        else:
            item["p_deep_sleep"] = float(0)
            item["time_before_deep_sleep"] = float(-1.0)
            
        if item["sleep_duration"] < 50400 and item["sleep_duration"] > 10800:
            sleep_items.append(item)
            
    return sleep_items

In [30]:
def map_users_sleep_items_to_info(items):
    
    import math, decimal, datetime
    import numpy as np
    from dateutil.parser import parse
    
    def adjusted_msf(msf_str,duration_free,duration_work):
        """Functiosn which returns the adjusted MSF: Alogorithm is found in the litereature. See report."""
        adjusted_seconds = 0.5 * ( duration_free - (5*duration_work + 2*duration_free) /float(7))
        delta = datetime.timedelta(seconds=adjusted_seconds)
        msf = datetime.datetime(2015, 9, 1, msf_str[0], msf_str[1],msf_str[2])
        msf -= delta
        
        return (msf.hour,msf.minute, msf.second)

    
    def get_midpoint(date_str,duration):
        """Get midpoint of datetime. Is done by adding half of the duration to the beginning"""
        seconds_to_add = duration / 2
        delta = datetime.timedelta(seconds=seconds_to_add)
    
        date = datetime.datetime(2015, 9, 1, date_str[0], date_str[1],date_str[2])
        date += delta
    
        return (date.hour,date.minute, date.second)

    def data_func_from_string(d):
        """Converts a date string to a unaware local datetime. Used to detect when people are going to sleep"""
        from dateutil.parser import parse
        date = parse(d)
        unaware_date = date.replace(tzinfo=None)
    
        return unaware_date
    
    def get_average_timestamp(times,adjust):
        """
        Function which returns the average time for a list of datetimes. If adjust == True, all dates with hour < 12 
        will be added additionally 24 hours. 
        """
        times = [data_func_from_string(x) for x in times]
        if adjust:
            time_list = map(lambda s : s.second + 60*(s.minute + 60*(s.hour+24)) if s.hour < 12
                else s.second + 60*(s.minute + 60*s.hour),
                times)
        else:
            time_list = map(lambda s : s.second + 60*(s.minute + 60*s.hour),
                times)
            
        average = sum(time_list)/len(time_list)
        bigmins, secs = divmod(average, 60)
        hours, mins = divmod(bigmins, 60)
    
        if hours >= 24:
            hours = hours-24

        return (hours, mins, secs)
    
    def get_std_timestamp(times,adjust):
        times = [data_func_from_string(x) for x in times]
        
        if adjust:
            time_list = map(lambda s : s.second + 60*(s.minute + 60*(s.hour+24)) if s.hour < 12
                else s.second + 60*(s.minute + 60*s.hour),
                times)
        else:
            time_list = map(lambda s : s.second + 60*(s.minute + 60*s.hour),
                times)

        return float(np.std(time_list))
    
    def from_min_hour_to_num(time):
        minutes_per = time[1]/float(60)
        
        return time[0] + minutes_per
    
    def calculate_social_jetlag(msw,msf_c):
        """
        Function which returns the social jetlag for a given user. 
        The social jet lag is defiend as the abs difference between MSW and MSF_C
        """
        
        day_w = 1
        day_f = 1
        if msw[0] < 12:
            day_w = 2
        if msf_c[0] < 12:
            day_f = 2
                
        dtw = datetime.datetime(2015,10,day_w,msw[0],msw[1],msw[2])
        dtf = datetime.datetime(2015,10,day_f,msf_c[0],msf_c[1],msf_c[2])
            
        #diff = abs((dtf-dtw).total_seconds())
        diff = (dtf-dtw).total_seconds()
            
        return diff
    
    #Defines the two arrays: Free and work. The weekends is defined as freedays. 
    arr_free = [x for x in items if x["weekday"] == 4 or x["weekday"] == 5]
    arr_work = [x for x in items if x["weekday"] != 4 and x["weekday"] != 5 ]
    average_sleep_state = float(np.mean([x["average_sleep"] for x in items]))
    
    if len(arr_free) > 4 and len(arr_work) > 8:
        month_dict = {}
        for item in items:
            month = data_func_from_string(item["start_time"]).month
            if month not in month_dict:
                month_dict[month] = []
            
            month_dict[month].append(item)
            
        month_chronotype = []
        month_social_jetlag = []
        
        for month in month_dict:
            month_items = month_dict[month]

            
            arr_free_month = [x for x in month_items if x["weekday"] == 4 or x["weekday"] == 5]
            arr_work_month = [x for x in month_items if x["weekday"] != 4 and x["weekday"] != 5 ]
            
            if len(arr_free_month) == 0 or len(arr_work_month) == 0:
                continue
                
            avg_start_work_month = get_average_timestamp([x["start_time"] for x in arr_work_month],True)
            avg_end_work_month = get_average_timestamp([x["end_time"] for x in arr_work_month],False)
        
            avg_start_free_month = get_average_timestamp([x["start_time"] for x in arr_free_month],True)
            avg_end_free_month = get_average_timestamp([x["end_time"] for x in  arr_free_month],False)
        
            msw_month  = get_midpoint(avg_start_work_month,np.mean([x["sleep_duration"] for x in arr_work_month]))
            msf_month = get_midpoint(avg_start_free_month,np.mean([x["sleep_duration"] for x in arr_free_month]))

            msf_c_month = adjusted_msf(msf_month, np.mean([x["sleep_duration"] for x in arr_free_month]),
                                      np.mean([x["sleep_duration"] for x in arr_work_month]))
            
            month_chronotype.append(from_min_hour_to_num(msf_c_month))
            month_social_jetlag.append(calculate_social_jetlag(msw_month,msf_month))
        
        avg_start_work = get_average_timestamp([x["start_time"] for x in arr_work],True)
        avg_end_work = get_average_timestamp([x["end_time"] for x in arr_work],False)
        
        avg_start_free = get_average_timestamp([x["start_time"] for x in arr_free],True)
        avg_end_free = get_average_timestamp([x["end_time"] for x in  arr_free],False)
        
        msw  = get_midpoint(avg_start_work,np.mean([x["sleep_duration"] for x in arr_work]))
        msf = get_midpoint(avg_start_free,np.mean([x["sleep_duration"] for x in arr_free]))
        
        msf_c = adjusted_msf(msf, np.mean([x["sleep_duration"] for x in arr_free]),
                                  np.mean([x["sleep_duration"] for x in arr_work]))

        sj_month = None
        c_month = None
        
        if len(month_chronotype) > 0:
            sj_month = float(np.mean(month_social_jetlag))
            c_month = float(np.mean(month_chronotype))
            
        return list(items)[0]["useruuid"]\
            ,average_sleep_state\
            ,from_min_hour_to_num(avg_start_work)\
            ,from_min_hour_to_num(avg_start_free)\
            ,from_min_hour_to_num(avg_end_work)\
            ,from_min_hour_to_num(avg_end_free)\
            ,from_min_hour_to_num(msw)\
            ,from_min_hour_to_num(msf)\
            ,from_min_hour_to_num(msf_c)\
            ,c_month\
            ,sj_month\
            ,calculate_social_jetlag(msw,msf)\
            ,get_std_timestamp([x["start_time"] for x in arr_work],True)\
            ,get_std_timestamp([x["end_time"] for x in arr_work],False)\
            ,get_std_timestamp([x["start_time"] for x in arr_free],True)\
            ,get_std_timestamp([x["end_time"] for x in  arr_free],False)\
            ,get_std_timestamp([x["start_time"] for x in items],True)\
            ,get_std_timestamp([x["end_time"] for x in items],False)
    else:
        return list(items)[0]["useruuid"],average_sleep_state, None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None

# Location 

The following takes the locations and divides them into timebins of size 5 mins. If no locations for a given user, the nearest locations will be found. If they are more than 3 hours old, the given timebin will be discarded. 

In [77]:
#Adds the two files sun and place so we can get the amount of daylight to each of the coordinate set. 
sc.addPyFile("sun.py")
sc.addPyFile("place.py")

In [135]:
location_schema = StructType([StructField('time',TimestampType() , True),
                     StructField('time_key', StringType(), True),
                     StructField('useruuid', StringType(), True),
                     StructField('timebin', StringType(), True),
                     StructField('lat', FloatType(), True),
                     StructField('lon', FloatType(), True),
                     StructField('name', StringType(), True),
                     StructField('area', StringType(), True),
                     StructField('country', StringType(), True),
                     StructField('region', StringType(), True),
                     StructField('sunlight', FloatType(), True),
                     StructField('from_rise', FloatType(), True),
                     StructField('from_set', FloatType(), True),
                     StructField('altitude', FloatType(), True),         
])

In [None]:
for i, folder in enumerate(folders_to_load[:1]):
    print "########### Starting with ", folder, "###########"

    #The period for the timebins which is created. 
    times_for_period = start_end_times_for_periods[i]
    
    #Load the dataset for the given period. 
    df_location_raw = (sqlContext.read.format("com.databricks.spark.avro")\
      .load(pre_path+"/location/"+folder+"/*.avro")).where("accuracy > 0").drop("devices")
    
    #Load sleep for the given period so we know which users we want locations from. 
    df_sleep_period = (sqlContext.read.format("com.databricks.spark.avro")\
                .load(bucket_path+"sleep_"+folder+"/*.avro")).select(F.col("useruuid").alias("dummy_id")).distinct()
    
    #Join the two dataframes so we only work with the relevant locations
    df_location_raw_filtered = df_location_raw.join(df_sleep_period, 
                (df_location_raw["useruuid"] == df_sleep_period["dummy_id"]))\
        .drop("dummy_id")
        
    #Defines the timebin size in minutes. As a start it is 5 minutes. 
    timebin_interval = 5
    
    #Generates all the bins for the month
    start = start_end_date_times_for_periods[i][0]
    last = start_end_date_times_for_periods[i][1]
    delta = datetime.timedelta(minutes=timebin_interval) 

    times = []
    while start < last:
        times.append(start)
        start += delta
        
    times = sc.broadcast(times)
    
    
    rdd_location = df_location_raw_filtered.rdd\
        .map(lambda item : (item["useruuid"],item))\
        .groupByKey()\
        .mapValues(location_mapper)\
        .flatMap(lambda x : x[1])


    mapped_df = rdd_location.toDF(location_schema)
    print "Mapped data"
    mapped_df.write.format('com.databricks.spark.avro').save(bucket_path+"location_"+folder)
    print "Done with ", folder

### Merging the sleep and location datasets. 

In [None]:
for folder in folders_to_load:
    df_period = (sqlContext.read.format("com.databricks.spark.avro")\
                .load(bucket_path+"sleep_"+folder+"/*.avro"))\
                .withColumn('sleep_bin', time_bin_udf(F.col('start_time'), F.col("useruuid")))
    
    df_period_locations = (sqlContext.read.format("com.databricks.spark.avro")\
                .load(bucket_path+"location_"+folder+"/*.avro")).drop("time").drop("time_key").drop("useruuid")

    
    df_period_sleep_loc = df_period.join(df_period_locations, 
                (df_period["sleep_bin"] == df_period_locations["timebin"]))\
        .drop("timebin").drop("sleep_bin")
        
    print "Done joining ", folder
    df_period_sleep_loc.write.format('com.databricks.spark.avro').save(bucket_path+"sleep_loc_"+folder)
    print "Done with ", folder    

### Get locations with no country registered

After february 2016 the location dataset did not contain country information. So this is needed to be added manually. To do this i am using a shape file. Se Location notebook for convertion between coordinates and country. 

In [18]:
arr_folders = []
for folder in folders_to_load:
    arr_folders.append(bucket_path+"sleep_loc_"+folder+"/*.avro")
    
df_loc_total = (sqlContext.read.format("com.databricks.spark.avro").load(arr_folders))

In [None]:
#prepare coordinates to file
coordinates_no_country = df_loc_total.select("lon","lat").where("country == ''").collect()
coordinates_all = df_loc_total.select("lon","lat").collect()

First for coordinates without country

In [None]:
coordinates = []
for row in coordinates_no_country:
    coordinates.append(str(row["lon"])+","+str(row["lat"]))
    
print len(coordinates)

f = open('coordinates__no_country.txt', 'w')
for item in coordinates:
    f.write("%s\n" % item)
    
f.close()

Next for all locations

In [None]:
coordinates = []
for row in coordinates_all:
    coordinates.append(str(row["lon"])+","+str(row["lat"]))
    
print len(coordinates)

f = open('coordinates_all.txt', 'w')
for item in coordinates:
    f.write("%s\n" % item)
    
f.close()

Get the countries for each of the location set from dropbox

In [None]:
coordinates_with_country = pickle.load(open("coordinates_no_country_filled.p","rb"))

In [None]:
#Defines a dict with country as key and region as value
countries_region = df_loc_total.select("country","region").distinct().collect()
country_region_dict = {}
for row in countries_region:
    country_region_dict[row["country"]] = row["region"]

In [None]:
#make broadcast values
coordinates_with_country_bc = sc.broadcast(coordinates_with_country)
country_region_dict_bc = sc.broadcast(country_region_dict)
replace_country_dict_bc = sc.broadcast(countries_to_change)

In [None]:
#maps the items and save the df
for folder in folders_to_load: 
    df_period_locations = (sqlContext.read.format("com.databricks.spark.avro")\
                .load(bucket_path+"sleep_loc_"+folder+"/*.avro"))
    
    df_period_locations_updated = df_period_locations.rdd.map(no_country_mapper).toDF(df_period_locations.schema)
    print "Saving"
    df_period_locations_updated.write.format('com.databricks.spark.avro').save(bucket_path+"sleep_loc_filled_"+folder)
    print "Done with folder ", folder

In [None]:
def no_country_mapper(item):
    if item["country"] == "": #If no registered country. 
        item = item.asDict()
        key = str(item["lon"]) + "-" + str(item["lat"]) + "\n" 
        if key in coordinates_with_country_bc.value:
            #We have a country for the location. 
            country = coordinates_with_country_bc.value[key]
            
            #If the country name should be converted..
            if country in replace_country_dict_bc.value:
                country = replace_country_dict_bc.value[country]
                
            item["country"] = country
            if country in country_region_dict_bc.value:
                item["region"] = country_region_dict_bc.value[item["country"]]
        return item
    else:
        return item

The following is the location mapper which creates a row for each timebin for each of the user. For each row, the median of the locations beloing to that bin is selected as the coordinates. If there is not coordinates for a given bin, it fnds the closests locations. If the clostest location is more than 3 hours away, the row is disgarded. 

In [31]:
def location_mapper(items):
    
    from sun import Sun
    import place as place

    #Generates an array for each of the timebins. Are going to be filled with locations beloning to that timebin. 
    has_locations = set()
    temp_data = {}
    for time in times.value:
        time_key = str(time.year) + "-" + str(time.month) + "-" + str(time.day) + "-" + str(time.hour) + "-" + str(time.minute)
        temp_data[time_key] = []
    
    def utc_date_func_from_string(d):
        """Converts a date string to a unaware utc datetime. Used for the timebin which is using the utc time."""
        from dateutil.parser import parse
        date = parse(d)
        date_utc = date - date.utcoffset()
        unaware_date = date_utc.replace(tzinfo=None)
    
        return unaware_date
    
    #Loop through each of the locations beloning to the given user. 
    for loc in items:
        start_t = utc_date_func_from_string(loc["start_time"])
        end_t = utc_date_func_from_string(loc["end_time"])     
        
        #Find the timebin which the start time belongs to and adds the location to the given timebin. 
        minute = int(5 * round(float(start_t.minute)/5))
        diff = minute - start_t.minute
        time = start_t + datetime.timedelta(minutes=diff)
        
        delta = datetime.timedelta(minutes=timebin_interval) 
        d_key = str(time.year) + "-" + str(time.month) + "-" + str(time.day) + "-" + str(time.hour) + "-" + str(time.minute)
        if d_key in temp_data:
            has_locations.add(d_key)
            temp_data[d_key].append(loc)
            time += delta
        
        #Loop through each of the timebins between start and enddate and adds them to the given timebin. 
        while time < end_t:
            d_key = str(time.year) + "-" + str(time.month) + "-" + str(time.day) + "-" + str(time.hour) + "-" + str(time.minute)
            if d_key in temp_data:
                has_locations.add(d_key)
                temp_data[d_key].append(loc)
            time += delta
    
    if len(has_locations) == 0:
        return []
    
    #Variables which holds the last found location. Added so we do not need to look up the closests for each run. 
    last_coordinates = None
    last_time = None
    
    rows = []
    has_locations = [datetime.datetime.strptime(x,'%Y-%m-%d-%H-%M') for x in has_locations]
    
    for time in times.value:
        time_key = str(time.year) + "-" + str(time.month) + "-" + str(time.day) + "-" + str(time.hour) + "-" + str(time.minute)
        coordinates = temp_data[time_key]
        row = None
        
        #If length of coordinates is equal to 0, we need to find the closests registered location. 
        if len(coordinates) == 0:
            if last_coordinates == None:
                tt = min(has_locations, key=lambda x: abs(x - time))
                d_key = str(tt.year) + "-" + str(tt.month) + "-" + str(tt.day) + "-" + str(tt.hour) + "-" + str(tt.minute)
                last_time = tt
                last_coordinates = temp_data[d_key]
                diff_last = abs((last_time-time).total_seconds())
                if diff < 36000:
                    coordinates = temp_data[d_key]
                
            else:
                diff_last = abs((last_time-time).total_seconds())
                if diff < 36000: 
                    #Last found coordinates are okay, we are going to use them. 
                    coordinates = last_coordinates 
                else: 
                    #The last found location are to far away. Check to see whether there are any closer. 
                    tt = min(has_locations, key=lambda x: abs(x - time))
                    d_key = str(tt.year) + "-" + str(tt.month) + "-" + str(tt.day) + "-" + str(tt.hour) + "-" + str(tt.minute)
                    last_time = tt
                    last_coordinates = temp_data[d_key]
                        
                    diff_last = abs((last_time-time).total_seconds())
                    if diff < 36000:
                        coordinates = temp_data[d_key]                   
            
        if len(coordinates) > 0:
            time_key = str(time.year) + "-" + str(time.month) + "-" + str(time.day) + "-" + str(time.hour) + "-" + str(time.minute)
            last_coordinates = coordinates
            last_time = time
            lat = np.median([x["latitude"] for x in coordinates])
            lon = np.median([x["longitude"] for x in coordinates])
            
            #Get the risetime, settime and sunlight 
            pl = place.Place("",(lon,lat))
            risetime, settime, hrs, mins  = place.getsuninfo(pl, time)

            return_hrs = float(hrs)
            if mins < 8:
                return_hrs += 0.0
            elif mins < 16:
                 return_hrs += 0.25
            elif mins < 31:
                return_hrs += 0.50
            elif mins < 46:
                return_hrs += 0.75
            else:
                return_hrs += 1.0
                
            altitude = np.mean([x["altitude"] for x in coordinates])
            
            row = [
                time,
                time_key,
                coordinates[0]["useruuid"],
                time_key + "_" + coordinates[0]["useruuid"],
                float(lat),
                float(lon),
                coordinates[0]["name"],
                coordinates[0]["area"],
                coordinates[0]["country"],
                coordinates[0]["region"],
                float(return_hrs),
                (time-risetime).total_seconds(),
                (time-settime).total_seconds(),
                float(altitude)
            ]

        if row != None:
            rows.append(row)
            
    return rows

# App usage

The following code generates a row which is a timebin with length 5 minutes for each of the periods for each of the users. Each row contains information about how much the phone has been used for the last hour. The row has a key which can link the app usage for the last hour with an sleep entry. 

In [None]:
for i, folder in enumerate(folders_to_load):
    print "########### Starting with ", folder, "###########"

    #The period for the timebins which is created. 
    times_for_period = start_end_times_for_periods[i]
    
    #Load the dataset for the given period. 
    df_app_raw = (sqlContext.read.format("com.databricks.spark.avro")\
      .load(pre_path+"/applicationactivity/"+folder+"/*.avro")).where("category != 'N/A'")\
        .where("deleted_time == ''")
    
    #Load sleep for the given period so we know which users we want app activity from. 
    df_user_ids = (sqlContext.read.format("com.databricks.spark.avro")\
                .load(bucket_path+"sleep_"+folder+"/*.avro"))\
                .select(F.col("useruuid").alias("dummy_id")).distinct()
                #.withColumn('sleep_bin', time_bin_minute_udf(F.col('start_time'), F.col("useruuid")))
    
    print "Done loading"
    
    #Join the two dataframes so we only work with the relevant app usages
    df_app_raw_filtered = df_app_raw.join(df_user_ids, 
                (df_app_raw["useruuid"] == df_user_ids["dummy_id"]))\
        .drop("dummy_id")
               
    #Make a dataframe with the newest enty for each of the id's
    newest_ids = df_app_raw_filtered.select(F.col("id").alias("dummy_id"),"timestamp_seen")\
        .groupBy(F.col("dummy_id"))\
        .agg(F.max(F.col("timestamp_seen")))
        
    print "Newest entries found"
    
    #join the two dataframes
    df_app_no_doublets = df_app_raw_filtered.join(newest_ids, 
                                (df_app_raw_filtered["timestamp_seen"] == newest_ids["max(timestamp_seen)"]) & 
                                (df_app_raw_filtered["id"] == newest_ids["dummy_id"]))\
                                .drop("max(timestamp_seen)").drop("dummy_id")
        
    print "Removed duplicates, Saving"
    df_app_no_doublets.write.format('com.databricks.spark.avro').save(bucket_path+"app_"+folder)
    
    print "Done with ", folder
    print ""

### Investigate the apps

In [None]:
all_apps_folders = []
for folder in folders_to_load:
    all_apps_folders.append(bucket_path+"app_"+folder+"/*.avro")
    
app_total_df = (sqlContext.read.format("com.databricks.spark.avro").load(all_apps_folders))
print app_total_df.count()

In [None]:
unique_apps = app_total_df.select("application_name").distinct().collect()
print len(unique_apps)

In [None]:
#unique_apps = app_total_df.select("application_name").distinct().collect()

f = open('apps_all.txt', 'w')
for app in unique_apps:
    f.write("%s\n" % app["application_name"])
    
f.close()

### Amount of exposure for a time period

Now we can load in the cleaned app dataframes for each of the periodes and find the amount of exposure for the following time periods: 1 hour, 2 hours and 3 hours

In [7]:
#structure of the generated df
periods_prefix = ["one","two","three"]

arr_app_schema = [
    StructField('timebin', StringType(), True)
]

for pre in periods_prefix:
    arr_app_schema.append(StructField("seconds_"+pre,FloatType(),True))
    arr_app_schema.append(StructField("seconds_per_"+pre,FloatType(),True))
    arr_app_schema.append(StructField("browsing_"+pre,FloatType(),True))
    arr_app_schema.append(StructField("entertainment_"+pre,FloatType(),True))
    arr_app_schema.append(StructField("other_"+pre,FloatType(),True))
    arr_app_schema.append(StructField("books_"+pre,FloatType(),True))
    arr_app_schema.append(StructField("music_"+pre,FloatType(),True))
    arr_app_schema.append(StructField("game_"+pre,FloatType(),True))
    arr_app_schema.append(StructField("camera_"+pre,FloatType(),True))
    arr_app_schema.append(StructField("communication_"+pre,FloatType(),True))
    arr_app_schema.append(StructField("movie_"+pre,FloatType(),True))
    
app_schema = StructType(arr_app_schema)
print len(app_schema)

23


In [None]:
for i, folder in enumerate(folders_to_load):
    print "########### Starting with ", folder, "###########"
    
    #Load the dataset for the given period and only select the attributes we need to decrease the size in memory!!!
    df_app_no_doublets = (sqlContext.read.format("com.databricks.spark.avro")\
      .load(bucket_path+"app_"+folder)).select("start_time","end_time","useruuid","category") 
    
    print "Done loading"
    
    #Defines the timebin size in minutes.
    timebin_interval = 5
    
    #Generates all the bins for the period.
    start = start_end_date_times_for_periods[i][0]
    last = start_end_date_times_for_periods[i][1]
    delta = datetime.timedelta(minutes=timebin_interval) 

    times = []
    while start < last:
        times.append(start)
        start += delta
        
    times = sc.broadcast(times)
    
    rdd_app = df_app_no_doublets.rdd\
        .map(lambda item : (item["useruuid"],item))\
        .groupByKey()\
        .mapValues(application_mapper)\
        .flatMap(lambda x : x[1])
        
    print "Convertig to df"
    mapped_df = rdd_app.toDF(app_schema).drop("useruuid").drop("time_key")
    print "Saving"
    mapped_df.write.format('com.databricks.spark.avro').save(bucket_path+"/app_usage/app_bins_two_three_"+folder)
        
    print "Done with ", folder
    print ""

Now we can combine the sleep and app dataset

In [None]:
for i, folder in enumerate(folders_to_load):
    print "########### Starting with ", folder, "###########"
    
    #App bins for the given period
    df_app = (sqlContext.read.format("com.databricks.spark.avro")\
      .load(bucket_path+"/app_usage/app_bins_one_"+folder))
    
    #Load sleep for the given period so we can merge them togheter. 
    df_sleep_period = (sqlContext.read.format("com.databricks.spark.avro")\
                .load(bucket_path+"sleep_"+folder+"/*.avro"))\
                .withColumn('sleep_bin', time_bin_udf(F.col('start_time'), F.col("useruuid")))

    print "Joining sleep and app"
    df_app_sleep = df_sleep_period.join(df_app, 
            (df_sleep_period["sleep_bin"] == df_app["timebin"]))\
            .drop("sleep_bin").drop("timebin")
    
    print "Datasets joined, saving"
    df_app_sleep.write.format('com.databricks.spark.avro').save(bucket_path+"/app_usage/app_sleep_one_"+folder)
    
    print "Done with ", folder
    print ""

Check how many sleep entries which has been removed due to joining

In [14]:
df_app_sleep = df_sleep_period.join(df_app, 
            (df_sleep_period["sleep_bin"] == df_app["timebin"]),"left_outer")\
            .drop("sleep_bin").drop("timebin")
        
users_dropped = df_app_sleep.where(F.col("seconds_one").isNull()).select("useruuid").distinct().collect()
print len(users_dropped)

9696


This is the application mapper which maps and application usage for the last hour to timebins of size 1 minute. 

In [9]:
def application_mapper(items): 
    temp_data = {}
    bin_size = 5
    
    #Generates the timebins. The timebins has an interval of bin size minute. 
    for time in times.value:
        time_key = str(time.year) + "-" + str(time.month) + "-" + str(time.day) + "-" + str(time.hour) + "-" + str(time.minute)
        temp_data[time_key] = {
            "seconds" : 0, "Browsing" : 0, "Entertainment" : 0, "Other" : 0, "Books" : 0, "Music" : 0, "Game" : 0,
            "Camera/Album" : 0, "Communication" : 0, "Movie/TV" : 0       
        }
    
    def utc_date_func_from_string(d):
        """Converts a date string to a unaware utc datetime. Used for the timebin which is using the utc time."""
        from dateutil.parser import parse
        date = parse(d)
        date_utc = date - date.utcoffset()
        unaware_date = date_utc.replace(tzinfo=None)
    
        return unaware_date
    
    for application in items:
        #Generates the start and end time for the app usage
        start_t = utc_date_func_from_string(application["start_time"])
        end_t = utc_date_func_from_string(application["end_time"]) 
        
        #Is the delta we increment the time with. 
        delta = datetime.timedelta(seconds=1)
        
        #Fill the corresponding timebins. 
        while start_t <= end_t:  
            
            #Round to the corresponding timebin (bin_size minute sizes)
            minute = int(bin_size * round(float(start_t.minute)/bin_size))
            diff = minute - start_t.minute
            time = start_t + datetime.timedelta(minutes=diff)
        
            #Generates           
            time_key = str(time.year) + "-" + str(time.month) + "-" + str(time.day) + "-" + str(time.hour) + "-" + str(time.minute)
            
            if time_key in temp_data:
                temp_data[time_key]["seconds"] += 1
                temp_data[time_key][application["category"]] += 1
            
            start_t += delta
            
    rows = []
    useruuid = list(items)[0]["useruuid"]
    periods_num = [61,121,181]
    periods_prefix = ["one","two","three"]
    
    
    #Defines the keys for each of the categories. 
    keys = ["Browsing", "Entertainment","Other","Books","Music","Game","Camera/Album","Communication","Movie/TV"]
    
    #Loop through each timebin
    for key in temp_data:
        incomplete = False
        #Get data for current timbin
        item = temp_data[key]
        total_dict = {}
        #Defines the dict which holds the information from the current time and 1,2 and 3 hours back
        for j,pre in enumerate(periods_prefix):
            total_dict["seconds_"+pre] = item["seconds"]
            for k in keys:
                total_dict[k+"_"+pre] = item[k]

        time = datetime.datetime.strptime(key,'%Y-%m-%d-%H-%M')
        for j, num in enumerate(periods_num):
            pre = periods_prefix[j]
            for i in range(1,num/bin_size+1): 
                #Substract i*bin_size minutes from the current key
                formatted_time = time - datetime.timedelta(minutes=bin_size*i)
                time_key = str(formatted_time.year) + "-" + str(formatted_time.month) + "-" + str(formatted_time.day) + "-" + str(formatted_time.hour) + "-" + str(formatted_time.minute)
                if time_key not in temp_data:
                    incomplete = True
                    continue
                    
                new_item = temp_data[time_key]
            
                total_dict["seconds_"+pre] += new_item["seconds"]
                for k in keys:
                    total_dict[k+"_"+pre] += new_item[k]
     
        row = [
            key  + "_" + useruuid
        ]
        
        for j,pre in enumerate(periods_prefix):
            total = total_dict["seconds_"+pre]
            row.append(float(total))
            if total > 0:
                row.append(float(total) / float(60*(periods_num[j]-1)))
            else:
                row.append(float(0))
            
            #Define the total for each of the categories. 
            for k in keys:
                if total_dict[k+"_"+pre] > 0:
                    row.append(float(total_dict[k+"_"+pre]))
                else:
                    row.append(float(0))
        if incomplete == False:
            rows.append(row)
        
    return rows

### Last time using a mobile device before sleep

Now we want to find the intervall from the users looked at their phone last to they god to bed

In [None]:
apps_alarms = set()
with open("apps_alarm.txt","rb") as f:
    lines = f.readlines()
    for line in lines:
        app = line.split("\n")[0]
        if app != "":
            apps_alarms.add(app)
apps_alarms_bc = sc.broadcast(apps_alarms)

In [7]:
schema_app = StructType([StructField('sleep_id', StringType(), True),
                    # StructField('alarm_used', BooleanType(), True),
                     StructField('last_app_time_since', FloatType(), True),
                     StructField('last_app_date', StringType(), True),
                     StructField('last_app_category', StringType(), True),
                     StructField('last_app_name', StringType(), True),
                     #StructField('first_app_time_since', FloatType(), True),
                    # StructField('first_app_date', StringType(), True),
                    # StructField('first_app_category', StringType(), True),
                    # StructField('first_app_name', StringType(), True),
])

In [None]:
for i, folder in enumerate(folders_to_load):
    print "########### Starting with ", folder, "###########"
   
    df_app_usage_by_user = (sqlContext.read.format("com.databricks.spark.avro")\
      .load(bucket_path+"app_"+folder)).select("end_time",F.col("useruuid").alias("uid"),"application_name","category")\
    .select("*", F.format_string("%sg#g%sg#g%s",F.col("end_time"),F.col("application_name"),F.col("category")).alias("app_key"))\
      .groupby("uid").agg(F.collect_list("app_key")).drop("end_time")

    print "Apps loaded, loading sleep"

    #Load sleep for the given period and add the new columns
    df_sleep_period = (sqlContext.read.format("com.databricks.spark.avro")\
                .load(bucket_path+"sleep_"+folder+"/*.avro")).select("start_time","id","useruuid")
    
    print "Sleep loaded, joining"
    df_sleep_period = df_sleep_period.join(df_app_usage_by_user, 
                                    (df_sleep_period["useruuid"] == df_app_usage_by_user["uid"]),"left_outer")\
                                    .drop("uid")
        
    print "Joined"
    
    df_sleep_period = df_sleep_period.rdd.map(app_usage_mapper).toDF(schema_app)
        
    print "Column added, saving"
    
    df_sleep_period.write.format('com.databricks.spark.avro').save(bucket_path+"/app_usage/app_usage_before_bed_3"+folder)
    print "Saved..."
    print ""

In [13]:
def app_usage_mapper(item):
    
    target_date = parse(item["start_time"]).replace(tzinfo=None)

    last_app_date = None
    last_app_time_since = None
    last_app_name = None
    last_app_category = None
   
    apps_for_user = item["collect_list(app_key)"]
    
    if apps_for_user == None:
        return item["id"],None,None,None,None 
    
    for row in apps_for_user:
        row_date_str, row_name, row_category = row.split("g#g")
        
        row_date = parse(row_date_str).replace(tzinfo=None)
          
        if row_date < target_date:        
            delta = (target_date-row_date).total_seconds()
            if last_app_time_since == None:
                last_app_time_since = float(delta)
                last_app_date = row_date_str
                last_app_category = row_category
                last_app_name = row_name
                
            elif delta < last_app_time_since:
                last_app_time_since = float(delta)
                last_app_date = row_date_str
                last_app_category = row_category
                last_app_name = row_name
            
    return [item["id"],last_app_time_since, last_app_date, last_app_category,last_app_name] 

In [None]:
def app_usage_mapper(item):
    
    target_date = parse(item["start_time"]).replace(tzinfo=None)
    target_date_end = parse(item["end_time"]).replace(tzinfo=None)
    
    #Defines intervals for the wake up period. 
    target_end_interval_lower = target_date_end - datetime.timedelta(minutes=15)
    target_end_interval_upper = target_date_end + datetime.timedelta(minutes=15)

    last_app_date = None
    last_app_time_since = None
    last_app_name = None
    last_app_category = None
    
    first_app_date = None
    first_app_time_since = None
    first_app_name = None
    first_app_category = None
    
    alarm_used = False
    alarm_app = None
    
    apps_for_user = item["collect_list(app_key)"]
    
    if apps_for_user == None:
        return item["id"], None,None,None,None,None,None,None,None,None,None 
    
    for row in apps_for_user:
        row_date_str, row_name, row_category = row.split("#")
        
        row_date = parse(row_date_str).replace(tzinfo=None)
        
        if row_date > target_end_interval_lower and row_date < target_end_interval_upper:
            #App usage in the wakeup interval. Check to see whether it is an alarm app. 
            if row_name in apps_alarms_bc.value:
                alarm_used = True
                alarm_app = row_name

        if row_date > target_date_end:
            delta = (target_date_end-row_date).total_seconds()
            if first_app_time_since == None:
                first_app_time_since = float(delta)
                first_app_date = row_date_str
                first_app_category = row_category
                first_app_name = row_name
            elif delta < first_app_time_since:
                first_app_time_since = float(delta)
                first_app_date = row_date_str            
                first_app_category = row_category
                first_app_name = row_name         
        elif row_date < target_date:        
            delta = (target_date-row_date).total_seconds()
            if last_app_time_since == None:
                last_app_time_since = float(delta)
                last_app_date = row_date_str
                last_app_category = row_category
                last_app_name = row_name
                
            elif delta < last_app_time_since:
                last_app_time_since = float(delta)
                last_app_date = row_date_str
                last_app_category = row_category
                last_app_name = row_name
            
    return [item["id"],alarm_used,alarm_app,last_app_time_since, last_app_date, last_app_category,last_app_name,
            first_app_time_since,first_app_date,first_app_category,first_app_name] 

# Users

In [None]:
load_all_files_sleep_arr = []
for folder in folders_to_load: 
    load_all_files_sleep_arr.append(bucket_path+"sleep_"+folder+"/*.avro")

In [None]:
user_folders = []
for folder in folders_to_load:
    user_folders.append(pre_path+"/user/"+folder+"/*.avro")

In [None]:
df_total = (sqlContext.read.format("com.databricks.spark.avro")\
                .load(load_all_files_sleep_arr)) 

user_ids_to_load = df_total.select(F.col("useruuid").alias("dummy_id")).distinct()
print user_ids_to_load.count()

#Load users from the given period
users_df_raw = (sqlContext.read.format("com.databricks.spark.avro")\
                .load(user_folders))

#Remove users with no sleep
df_users_raw_filtered = users_df_raw.join(user_ids_to_load, 
                (users_df_raw["useruuid"] == user_ids_to_load["dummy_id"]))\
        .drop("dummy_id")
    
    
    
#Make a dataframe with the newest enty for each of the users's
newest_ids = df_users_raw_filtered.select(F.col("useruuid").alias("dummy_id"),"timestamp_modified")\
        .groupBy(F.col("dummy_id"))\
        .agg(F.max(F.col("timestamp_modified")))
        
    
#join the two dataframes
df_users_no_doublets = df_users_raw_filtered.join(newest_ids, 
                                    (df_users_raw_filtered["timestamp_modified"] == newest_ids["max(timestamp_modified)"]) & 
                                    (df_users_raw_filtered["useruuid"] == newest_ids["dummy_id"]))\
                                    .drop("dummy_id").drop("max(timestamp_modified)")
    

df_user_final = df_users_no_doublets.withColumn('age', udf_age_of_user(F.col('birthday')))

print df_user_final.count()

df_user_final.write.format('com.databricks.spark.avro').save(bucket_path+"users")

Generate a users nationality based on the most common country

In [None]:
arr_folders = []
for folder in folders_to_load:
    arr_folders.append(bucket_path+"sleep_loc_filled_"+folder+"/*.avro")
    
df_loc_total = (sqlContext.read.format("com.databricks.spark.avro").load(arr_folders))

In [None]:
schema = StructType([StructField('user', StringType(), True),StructField('country', StringType(), True)])
        
        
def users_nationality_mapper(items):
    from collections import Counter
    
    locations = []
    for item in items:
        if item["country"] != "":
            locations.append(item["country"])
    
    if len(locations) == 0:
        return None
    
    counts = Counter(locations)
    return counts.most_common()[:1][0][0]

df_user_nationality = df_loc_total.select("useruuid","country").rdd\
        .map(lambda item: (item["useruuid"],item))\
        .groupByKey()\
        .mapValues(users_nationality_mapper).toDF(schema)
        
df_user_nationality.write.format('com.databricks.spark.avro').save(bucket_path+"user_nationality")

### Simpple check to see which informations that has been updated
The purpose of this, is to see how  many of the entries is actually updated, and whether i would make sense to only use the newest entry from all the users. 

In [None]:
test_rdd_users = df_users_raw_filtered.select("useruuid","weight","height","gender","birthday").rdd\
    .map(lambda item: (item['useruuid'],item)) \
    .groupByKey()\
    .mapValues(check_updaste_of_user_info)\

In [None]:
def check_updaste_of_user_info(items):
    items = list(items)
    weight_to_check = items[0]["weight"]
    height_to_check = items[0]["height"]
    gender_to_check = items[0]["gender"]
    birtday_to_check = items[0]["birthday"]
    
    result = []
    for item in items:
        if item["weight"] != weight_to_check:
            result.append(1)
        elif item["height"] != height_to_check:
            result.append(2)
        elif item["gender"] != gender_to_check:
            result.append(3)
        elif item["birthday"] != birtday_to_check:
            result.append(4)
        
    return result

In [None]:
print "No updates: ", test_rdd_users.filter(lambda x : len(x[1]) == 0).count()
print "Weight updated: ",test_rdd_users.filter(lambda x : 1 in x[1] ).count()
print "Height updated: ", test_rdd_users.filter(lambda x : 2 in x[1]).count()
print "Gender updated: ", test_rdd_users.filter(lambda x : 3 in x[1]).count()
print "Birtday updated: ",test_rdd_users.filter(lambda x : 4 in x[1]).count()

# Motion acitvity

The following combines the users total activity for each day. This information will be merged with a sleep entry based on its starttime. If the hour of the startime for the sleep entry is below 12, the day will subtracted with 1. 

In [17]:
schema = StructType([StructField('day_key', StringType(), True),
                     StructField('total_distance', FloatType(), True),
                     StructField('total_steps', FloatType(), True),
                     StructField('tota_walk', FloatType(), True),
                     StructField('total_run', FloatType(), True)
])

In [10]:
df_motion_raw = (sqlContext.read.format("com.databricks.spark.avro")\
      .load(pre_path+"/motionactivity/"+folders_to_load[11]+"/*.avro")).where("deleted_time != ''")\
      .where(F.col("type").isin({"WALKING","RUNNING"}))\
      .select('start_time','type','distance','id','step_count','timestamp_seen','useruuid')\
        

In [None]:
df_motion_raw.select("distance","step_count").describe().show()

In [None]:
#Types of activities: OTHER, TRANSPORTATION, WALKING, RUNNING.
# For this study we will only look at WALKING AND RUNNING. 
# The distances is defined in mm. 

for folder in folders_to_load:
    #take the activity dataset for the given month. Sort out all other types than walking and running. 
    df_motion_raw = (sqlContext.read.format("com.databricks.spark.avro")\
      .load(pre_path+"/motionactivity/"+folder+"/*.avro")).where("deleted_time != ''")\
      .where(F.col("type").isin({"WALKING","RUNNING"}))\
      .select('start_time','type','distance','id','step_count','timestamp_seen','useruuid')\
        
    #Make a dataframe with the newest enty for each of the id's 
    newest_ids = df_motion_raw\
        .select(F.col("id").alias("dummy_id"),"timestamp_seen")\
        .groupBy(F.col("dummy_id"))\
        .agg(F.max(F.col("timestamp_seen")))
        
    print "Newest entries found"

    #join the two dataframes
    df_activity_no_doublets = df_motion_raw.join(newest_ids, 
                                    (df_motion_raw["timestamp_seen"] == newest_ids["max(timestamp_seen)"]) & 
                                    (df_motion_raw["id"] == newest_ids["dummy_id"]))\
                                    .drop("dummy_id").drop("max(timestamp_seen)")\
                                    .withColumn("day_key",generate_day_key_udf(F.col("start_time"),F.col("useruuid")))
            
    df_activities_day =  df_activity_no_doublets.rdd\
        .map(lambda item: (item['day_key'],item)) \
        .groupByKey()\
        .mapValues(motion_activity_mapper)\
        .map(lambda (key,value) : value).toDF(schema)

    df_activities_day.write.format('com.databricks.spark.avro').save(bucket_path+"activities/"+ folder + "_by_day")
    print "Done with ", folder
    print ""

In [None]:
for folder in folders_to_load:
        dd = (sqlContext.read.format("com.databricks.spark.avro")\
          .load(bucket_path+"activities/"+ folder + "_by_day"+"/*.avro"))
        print "Loaded"
        dd.select("total_distance","total_steps","tota_walk","total_run").describe().show()
        print ""

In [15]:
def motion_activity_mapper(items):
    total_distance = 0 
    total_steps = 0
    tota_walk = 0
    total_run = 0
    key = None
    
    
    for item in items:
        key = item["day_key"]
        if item["distance"] < 0:
            continue
        if item["step_count"] < 0:
            continue
            
        if item["type"] == "WALKING":
            tota_walk += item["distance"]
        if item["type"] == "RUNNING":
            total_run += item["distance"]
        
        total_distance += item["distance"]
        for step in item["step_count"]:
            total_steps += step
            
    return [key,float(total_distance),float(total_steps),float(tota_walk),float(total_run)]

# Heart rate

The heart rate dataset contains a users heart rate at a given time. 

In [5]:
schema_bpm = StructType([StructField('sleep_id', StringType(), True),
                     StructField('bpm_during', FloatType(), True),
                     StructField('bpm_before', FloatType(), True)
])

In [None]:
for folder in folders_to_load:
    print "Loading for folder ", folder
    df_heart_raw = (sqlContext.read.format("com.databricks.spark.avro")\
      .load(pre_path+"/heartrate/"+folder+"/*.avro"))\
      .select('useruuid','time','bpm')
    
    #df_heart_raw_filtered.select("time",F.substring(F.col("time"), 1, 16).alias('time_str')).show(truncate=False)
    
    #Load sleep for the given period so we know which users we want heart rate from. 
    df_sleep_period = (sqlContext.read.format("com.databricks.spark.avro")\
                .load(bucket_path+"sleep_"+folder+"/*.avro")).select(F.col("useruuid").alias("dummy_id")).distinct()

    #Join the two dataframes so we only work with the relevant heart rates. 
    #Generate key like this heartRate#Time. Used so we only send one value to the udf and we can group all entries 
    #by user. 
    
    df_heart_raw_filtered = df_heart_raw.join(df_sleep_period, 
                (df_heart_raw["useruuid"] == df_sleep_period["dummy_id"]))\
        .drop("dummy_id")\
        .select("useruuid","time","bpm", F.format_string("%d#%s",F.col("bpm"),F.col("time")).alias("pbm_key"))

    #group all heart rates for each user togheter. 
    df_heart_by_user = df_heart_raw_filtered.groupby("useruuid").agg(F.collect_list("pbm_key"))

    print "Data filetered!!"
    
    #Load the sleep entries
    df_sleep_period_with_bpm = (sqlContext.read.format("com.databricks.spark.avro")\
           .load(bucket_path+"sleep_"+folder+"/*.avro"))\
           .select("id",F.col("useruuid").alias("uid"),"start_time","end_time")
    
    
    #Join the sleep entries with the list of heart rates. No left or right join here since we are only interested to 
    #loop through entries with bpm registered for that user. 
    df_sleep_period_with_bpm = df_sleep_period_with_bpm.join(df_heart_by_user, 
                                    (df_sleep_period_with_bpm["uid"] == df_heart_by_user["useruuid"]))\
                                    .drop("uid")
               
    df_sleep_period_with_bpm = df_sleep_period_with_bpm.rdd.map(bpm_mapper).toDF(schema_bpm)
            
    df_sleep_period_with_bpm.drop("collect_list(pbm_key)")\
        .write.format('com.databricks.spark.avro').save(bucket_path+"bpm/"+ "combined_" + folder)
    
    print "Saved, done with ", folder
    print ""

In [36]:
def bpm_mapper(item):
    from dateutil.parser import parse
    
    date_end = parse(item["end_time"]).replace(tzinfo=None)
    date_start = parse(item["start_time"]).replace(tzinfo=None)
    interval_before_start = date_start - datetime.timedelta(hours=5)
    
    bpm_for_user = item["collect_list(pbm_key)"]
    
    if bpm_for_user == None:
        return item["id"], None, None
    
    bpms_arr_before = []    
    bpms_arr = []
    
    for row in bpm_for_user:
        bpm, time_str = row.split("#")
        time = parse(time_str).replace(tzinfo=None)
        
        if time > date_start and time < date_end:
            bpms_arr.append(float(bpm))
            
        if time > interval_before_start and time < date_start:
            bpms_arr_before.append(float(bpm))
            
    bpm_during = None
    bpm_before = None

    if len(bpms_arr) > 0:
        bpm_during = sum(bpms_arr) / float(len(bpms_arr))
    if len(bpms_arr_before) > 0:
        bpm_before = sum(bpms_arr_before) / float(len(bpms_arr_before))
    
    return item["id"], bpm_during , bpm_before 

# Generating final datasets
This section combines all the sub datasets which has been generated throughout this notebook to the following datasets:
<ul>
<li>Dataset were sleep entries is the rows. Additional metadata has been added such as forexample location, activity, app usage, ect..
</li>
<li>Dataset where the unique user is the row. Each row will contain information such as age, gender, nationality, chronotype, social jetlag, etc</li>
</ul>

In [14]:
df_user_nationality = (sqlContext.read.format("com.databricks.spark.avro")\
                .load(bucket_path+"user_nationality")) 
print df_user_nationality.printSchema()

root
 |-- user: string (nullable = true)
 |-- country: string (nullable = true)

None


In [None]:
df_nation_count = df_user_nationality.select("country").groupBy("country").count().orderBy(F.desc("count"))

In [15]:
#Get the users dataset, and generate a new row id which corresponds to the row. 
#This is the new user id from now on so the dataset can be take home also. 
df_users = (sqlContext.read.format("com.databricks.spark.avro")\
                .load(bucket_path+"users"))\
                .withColumn('bmi', udf_bmi_of_user(F.col('weight'),F.col("height")))\
                #.withColumn("user_id", F.monotonically_increasing_id())
        


In [16]:
#Make a df with the new and the old id, 
#which can be used to join with the other datasets and then remove the old id column

#df_user_new_id = df_users.select(F.col("useruuid").alias("old_id"),"user_id").cache()
#df_user_new_id.write.format('com.databricks.spark.avro').save(bucket_path+"final/"+ "user_new_ids")

df_user_new_id = (sqlContext.read.format("com.databricks.spark.avro")\
                .load(bucket_path+"final/"+ "user_new_ids")).cache()

df_users = df_users.join(df_user_new_id, 
            (df_users["useruuid"] == df_user_new_id["old_id"]))\
            .drop("useruuid").drop("old_id")
    

In [None]:
#The dataset which contains information about, social jetlag, chronotype etc...
df_users_sleep_stats = (sqlContext.read.format("com.databricks.spark.avro")\
                .load(bucket_path+"sleep_stats_total"))

df_users_sleep_stats = df_users_sleep_stats.join(df_user_new_id, 
            (df_users_sleep_stats["useruuid"] == df_user_new_id["old_id"]))\
           .drop("useruuid").drop("old_id").select("*",F.col("user_id").alias("uid")).drop("user_id")
    
print df_users_sleep_stats

In [None]:
df_users.select("height","weight","age","bmi").describe().show()

### New sleep dataset with extra metadata
What is missing: 
<ul>
    <li>Light polution</li>
</ul>

What will not be included:
<ul>
    <li>Babies</li>
    <li>BPM</li>
</ul>

Light polution, alarm?, Babies?, Screen recency, chronotype(Average - average by month), BPM

In [11]:
#All sleep entries
load_all_files_sleep_arr = []
for folder in folders_to_load: 
    load_all_files_sleep_arr.append(bucket_path+"sleep_"+folder+"/*.avro")
    
#All sleep entries with location. 
load_all_files_location_arr = []
for folder in folders_to_load: 
    load_all_files_location_arr.append(bucket_path+"sleep_loc_filled_"+folder+"/*.avro")
    
#All app bins for one hour
load_all_files_app_sleep_arr = []
for folder in folders_to_load: 
    load_all_files_app_sleep_arr.append(bucket_path+"app_usage/app_bins_one_"+folder+"/*.avro")
    
#All app bins for two and three hours. 
load_all_files_app_sleep_rest_arr = []
for folder in folders_to_load: 
    load_all_files_app_sleep_rest_arr.append(bucket_path+"app_usage/app_bins_two_three_"+folder+"/*.avro")
    
#App recency and is wake up by alarm
load_all_recency = []
for folder in folders_to_load:
    load_all_recency.append(bucket_path+"/app_usage/app_usage_before_bed_"+folder+"/*.avro")
        
#All physical activities by day.
load_all_activities_by_date = []
for folder in folders_to_load:
    load_all_activities_by_date.append(bucket_path+"activities/"+folder+"_by_day/*.avro")
    
#Load bpm all sleep entries
load_bpm_arr = []
for folder in folders_to_load:
    load_bpm_arr.append(bucket_path+"bpm/"+ folder+"/*.avro")

In [None]:
#Load all sleep entries
sleeps_total_df = (sqlContext.read.format("com.databricks.spark.avro").load(load_all_files_sleep_arr))\
    .withColumn('sleep_bin', time_bin_udf(F.col('start_time'), F.col("useruuid")))\
    .withColumn("day_key_sleep",generate_day_key_adjusted_udf(F.col("start_time"),F.col("useruuid")))\
    .drop("week_number_key_user").drop("end_time_dt").drop("end_time_dt_utc")\
    .drop("start_time_dt").drop("start_time_dt_utc")\
    .select("*",F.weekofyear(F.col("start_time")).alias("weeknumber"))
    

sleeps_total_df = sleeps_total_df.join(df_user_new_id, 
            (sleeps_total_df["useruuid"] == df_user_new_id["old_id"]))\
           .drop("useruuid").drop("old_id")

print sleeps_total_df.printSchema()

In [None]:
#Get the populations from dropbox. 
transferData.get_file("/population.p","population.p")
populations = pickle.load(open("population.p","rb"))
populations_bc = sc.broadcast(populations)

In [43]:
def population_by_loc(lon,lat):
    key = str(lon) + "-" + str(lat) + "\n" #stupid mistake, so add \n to the end...
    if key in populations_bc.value:
        value = populations_bc.value[key]

        if value == None:
            return None
        else:
            return float(value)
    
    return None

population_by_loc_udf = F.udf(population_by_loc,FloatType())

In [44]:
#Get all locations and join with the sleep dataset
loc_total_df = (sqlContext.read.format("com.databricks.spark.avro").load(load_all_files_location_arr))\
    .select("lon","lat","country","region","altitude",
           F.col("sunlight").alias("daylight"),F.col("id").alias("location_sleep_id"))\
    .withColumn('population', population_by_loc_udf(F.col('lon'),F.col('lat')))

print loc_total_df.printSchema()

root
 |-- lon: float (nullable = true)
 |-- lat: float (nullable = true)
 |-- country: string (nullable = true)
 |-- region: string (nullable = true)
 |-- altitude: float (nullable = true)
 |-- daylight: float (nullable = true)
 |-- location_sleep_id: string (nullable = true)
 |-- population: float (nullable = true)

None


In [45]:
sleeps_total_df = sleeps_total_df.join(loc_total_df, 
            (sleeps_total_df["id"] == loc_total_df["location_sleep_id"]),"left_outer")\
           .drop("location_sleep_id")
    
#print "Entries with no location ", sleeps_total_df.where(F.col("lat").isNull()).count()

In [35]:
sleeps_total_df.write.format('com.databricks.spark.avro').save(bucket_path+"final/"+ "sleep_loc")

In [46]:
df_user_meta = df_users.select(F.col("useruuid"), "age","gender","bmi","weight","height")

df_user_meta = df_user_meta.join(df_user_new_id, 
            (df_user_meta["useruuid"] == df_user_new_id["old_id"]))\
           .drop("useruuid").drop("old_id").select("*",F.col("user_id").alias("uid")).drop("user_id")

sleeps_total_df = sleeps_total_df.join(df_user_meta, 
                (df_user_meta["uid"] == sleeps_total_df["user_id"])).drop("uid").drop("old_id")

#print "Entries with no user information", sleeps_total_df.where(F.col("gender").isNull()).count()

In [47]:
sleeps_total_df.write.format('com.databricks.spark.avro').save(bucket_path+"final/"+ "sleep_loc_user")

In [52]:
# Sleep stats 
sleeps_total_df = sleeps_total_df.join(df_users_sleep_stats, 
            (sleeps_total_df["user_id"] == df_users_sleep_stats["uid"]),"left_outer")\
            .drop("uid")

In [53]:
sleeps_total_df.write.format('com.databricks.spark.avro').save(bucket_path+"final/"+ "sleep_loc_user_stats")

In [14]:
sleeps_total_df = (sqlContext.read.format("com.databricks.spark.avro")\
                .load(bucket_path+"final/"+ "sleep_loc_user_stats_one_rec_ac"))

In [12]:
df_total_app = (sqlContext.read.format("com.databricks.spark.avro")\
                .load(load_all_files_app_sleep_arr))

sleeps_total_df = sleeps_total_df.join(df_total_app, 
            (sleeps_total_df["sleep_bin"] == df_total_app["timebin"]),"left_outer")\
            .drop("timebin")
    

#.where(F.col("seconds_one").isNotNull())
#print "Entries with no app info one hour", sleeps_total_df.where(F.col("seconds_one").isNull()).count()

In [22]:
sleeps_total_df.write.format('com.databricks.spark.avro').save(bucket_path+"final/"+ "sleep_loc_user_stats_one")

In [15]:
df_total_app = (sqlContext.read.format("com.databricks.spark.avro")\
                .load(load_all_files_app_sleep_rest_arr))

sleeps_total_df = sleeps_total_df.join(df_total_app, 
            (sleeps_total_df["sleep_bin"] == df_total_app["timebin"]),"left_outer")\
            .drop("timebin")
    
#print "Entries with no app info two hours", sleeps_total_df.where(F.col("seconds_two").isNull()).count()
#print "Entries with no app info three hours", sleeps_total_df.where(F.col("seconds_three").isNull()).count()

In [16]:
sleeps_total_df.write.format('com.databricks.spark.avro').save(bucket_path+"final/"+ "sleep_loc_user_stats_one_rec_ac_rest")

In [26]:
#App recency.. 
app_recency_total = (sqlContext.read.format("com.databricks.spark.avro").load(load_all_recency))
print app_recency_total.printSchema()

root
 |-- sleep_id: string (nullable = true)
 |-- alarm_used: boolean (nullable = true)
 |-- alarm_app: string (nullable = true)
 |-- last_app_time_since: float (nullable = true)
 |-- last_app_date: string (nullable = true)
 |-- last_app_category: string (nullable = true)
 |-- last_app_name: string (nullable = true)
 |-- first_app_time_since: float (nullable = true)
 |-- first_app_date: string (nullable = true)
 |-- first_app_category: string (nullable = true)
 |-- first_app_name: string (nullable = true)

None


In [27]:
sleeps_total_df = sleeps_total_df.join(app_recency_total, 
            (sleeps_total_df["id"] == app_recency_total["sleep_id"]),"left_outer")\
           .drop("sleep_id")

In [28]:
sleeps_total_df.write.format('com.databricks.spark.avro').save(bucket_path+"final/"+ "sleep_loc_user_stats_one_rec_ac")

In [23]:
#Physiacal activity
phy_ac_total = (sqlContext.read.format("com.databricks.spark.avro").load(load_all_activities_by_date))

sleeps_total_df = sleeps_total_df.join(phy_ac_total, 
            (sleeps_total_df["day_key_sleep"] == phy_ac_total["day_key"]),"left_outer")\
            .drop("day_key")
#print "Entries with no activity", sleeps_total_df.where(F.col("seconds_two").isNull()).count()

In [24]:
sleeps_total_df.write.format('com.databricks.spark.avro').save(bucket_path+"final/"+ "sleep_loc_user_stats_one_ac")

### New User dataset

This is the features we want for each of the rows:
<ul>
    <li>**Country**</li>
    <li>Region</li>
    <li>Population density (Average)</li>
    <li>Screen exposure: Recency (Average)</li>
    <li>Screen exposure: Amount (Average)</li>
    <li>Physical activity (Average)</li>
    <li>**Age**</li>
    <li>**Gender**</li>
    <li>**BMI**</li>
    <li>**Weight**</li>
    <li>**Height**</li>
</ul>

In [None]:
df_sleep_stats_total = (sqlContext.read.format("com.databricks.spark.avro")\
               .load(bucket_path+"sleep_stats_total"))


df_sleep_stats_total = df_sleep_stats_total.join(df_user_nationality, 
            (df_sleep_stats_total["useruuid"] == df_user_nationality["user"])).drop("user")

print df_sleep_stats_total.printSchema()    

In [24]:
df_sleep_stats_total = df_sleep_stats_total.join(df_user_new_id, 
            (df_sleep_stats_total["useruuid"] == df_user_new_id["old_id"]))\
           .drop("old_id").select("*",F.col("user_id").alias("uid")).drop("user_id")

In [26]:
df_user_meta = df_users.select(F.col("user_id"), "age","gender","bmi","weight","height")

df_sleep_stats_total = df_sleep_stats_total.join(df_user_meta, 
                (df_sleep_stats_total["uid"] == df_user_meta["user_id"])).drop("user_id")

In [27]:
df_sleep_stats_total.write.format('com.databricks.spark.avro').save(bucket_path+"final/"+ "stat_user")

In [38]:
df_sleep_stats_total = (sqlContext.read.format("com.databricks.spark.avro")\
               .load(bucket_path+"final/"+ "stat_user"))

In [39]:
total_last = sleeps_total_df.select("user_id","last_app_time_since").groupBy("user_id").agg(F.avg("last_app_time_since"))
total_amount = sleeps_total_df.select("user_id","seconds_one").groupBy("user_id").agg(F.avg("seconds_one"))
total_distance = sleeps_total_df.select("user_id","total_distance").groupBy("user_id").agg(F.avg("total_distance"))
total_population = sleeps_total_df.select("user_id","population").groupBy("user_id").agg(F.avg("population"))

In [40]:
df_sleep_stats_total = df_sleep_stats_total.join(total_last, 
                (df_sleep_stats_total["uid"] == total_last["user_id"])).drop("user_id")

df_sleep_stats_total = df_sleep_stats_total.join(total_amount, 
                (df_sleep_stats_total["uid"] == total_amount["user_id"])).drop("user_id")

df_sleep_stats_total = df_sleep_stats_total.join(total_distance, 
                (df_sleep_stats_total["uid"] == total_distance["user_id"])).drop("user_id")

df_sleep_stats_total = df_sleep_stats_total.join(total_population, 
                (df_sleep_stats_total["uid"] == total_population["user_id"])).drop("user_id")

In [18]:
#df_sleep_stats_total.printSchema()

In [43]:
df_sleep_stats_total = df_sleep_stats_total.select("*",
                                                   F.col("avg(last_app_time_since)").alias("avg_last_app_time_since"),
                                                   F.col("avg(seconds_one)").alias("avg_seconds_one"),
                                                   F.col("avg(total_distance)").alias("avg_total_distance"),
                                                   F.col("avg(population)").alias("avg_population")
                                                  ).drop("avg(last_app_time_since)").drop("avg(seconds_one)")\
                                                    .drop("avg(total_distance)").drop("avg(population)")


In [None]:
df_sleep_stats_total.write.format('com.databricks.spark.avro').save(bucket_path+"final/"+ "stat_user_extra")

In [None]:
df_user_meta = df_users.select(F.col("lifelog_useruuid").alias("user"), "age","gender","bmi","weight","height")

for folder in folders_to_load:
    df_sleep_stats_period = (sqlContext.read.format("com.databricks.spark.avro")\
                .load(bucket_path+"sleep_stats_"+folder+"/*.avro")) 
    
    df_sleep_stats_period = df_sleep_stats_period.join(df_user_meta, 
                (df_sleep_stats_period["useruuid"] == df_user_meta["user"])).drop("user")
    
    df_sleep_stats_period = df_sleep_stats_period.join(df_user_nationality, 
            (df_sleep_stats_period["useruuid"] == df_user_nationality["user"])).drop("user")

    df_sleep_stats_period = df_sleep_stats_period.join(df_user_new_id, 
            (df_sleep_stats_period["useruuid"] == df_user_new_id["old_id"]))\
           .drop("old_id").select("*",F.col("user_id").alias("uid")).drop("user_id")
    
    df_sleep_stats_period.write.format('com.databricks.spark.avro').save(
        bucket_path+"final/"+ "sleep_stats_"+folder)
    
    print "Done with", folder


# UDF

The following is the user defined functions which is used in this project. It has been tried to keep the number of UDF's at a minimum since there are very performance heavy compared to native spark sql functions. 

In [13]:
def find_difference_time(end_str, start_str):
    from dateutil.parser import parse
    
    if start_str != None and end_str != None:
        date_end = parse(end_str).replace(tzinfo=None)
        date_start = parse(start_str).replace(tzinfo=None)
    
        delta = (date_end-date_start).total_seconds()
    
        return float(delta)
    
    return None
        
def generate_day_key_adjusted(date_str,useruuid):
    from dateutil.parser import parse
    date = parse(date_str)
    unaware_date = date.replace(tzinfo=None)
    if unaware_date.hour < 12:
        unaware_date = unaware_date - datetime.timedelta(days=1)

    key = str(unaware_date.year) + "-" + str(unaware_date.month) + "-" + str(unaware_date.day) + "_" + useruuid
    return key

def generate_day_key(date_str,useruuid):
    from dateutil.parser import parse
    date = parse(date_str)
    unaware_date = date.replace(tzinfo=None)

    key = str(unaware_date.year) + "-" + str(unaware_date.month) + "-" + str(unaware_date.day) + "_" + useruuid
    return key

def generate_time_bin(date_str, useruuid):
    
    from dateutil.parser import parse
    date = parse(date_str)
    date_utc = date - date.utcoffset()
    unaware_date = date_utc.replace(tzinfo=None)

    minute = int(5 * round(float(unaware_date.minute)/5))
    diff = minute - unaware_date.minute
    time = unaware_date + datetime.timedelta(minutes=diff)

    key = str(time.year) + "-" + str(time.month) + "-" + str(time.day) + "-" + str(time.hour) + "-" + str(time.minute) + "_" + useruuid
    return key

def generate_time_bin_minute(date_str, useruuid):
    
    from dateutil.parser import parse
    date = parse(date_str)
    date_utc = date - date.utcoffset()
    unaware_date = date_utc.replace(tzinfo=None)

    minute = int(1 * round(float(unaware_date.minute)/1))
    diff = minute - unaware_date.minute
    time = unaware_date + datetime.timedelta(minutes=diff)

    key = str(time.year) + "-" + str(time.month) + "-" + str(time.day) + "-" + str(time.hour) + "-" + str(time.minute) + "_" + useruuid
    return key

def func_get_device_type(devices):
    return devices[0]["name"]

def calculate_age_of_user(x):
    from datetime import datetime
    try:
        born = parse(x)
        return (datetime.today() - born).days/365
    except Exception:
        return 0
    
def calculate_BMI(weight,height):
    #Weight in  kg, height in meters
    height_m = float(height/float(1000))
    weight_kg = weight/1000
    bmi = weight_kg/(height_m*height_m)

    if bmi <= 18.5:
        return 0
        #print('Your BMI is', bmi,'which means you are underweight.')
    elif bmi > 18.5 and bmi < 25:
        return 1
        #print('Your BMI is', bmi,'which means you are normal.')
    elif bmi > 25 and bmi < 30:
        return 2
        #print('your BMI is', bmi,'overweight.')
    elif bmi > 30:
        return 3
        #print('Your BMI is', bmi,'which means you are obese.')


udf_age_of_user = F.udf(calculate_age_of_user, IntegerType())
udf_bmi_of_user = F.udf(calculate_BMI, IntegerType())

find_difference_time_udf = F.udf(find_difference_time,FloatType())
func_get_device_type_udf = F.udf(func_get_device_type,StringType())
time_bin_udf = F.udf(generate_time_bin, StringType())
time_bin_minute_udf = F.udf(generate_time_bin_minute, StringType())
generate_day_key_udf = F.udf(generate_day_key,StringType())
generate_day_key_adjusted_udf = F.udf(generate_day_key_adjusted,StringType())

# Daylight code implementation

In [6]:
%%file place.py

import datetime
import math
from sun import Sun

class Place(object):
    def __init__(self, name, coords):
        self.name = name        # string
        self.coords = coords    # tuple (E/W long, N/S lat)

def _hoursmins(hours):
    """Convert floating point decimal time in hours to integer hrs,mins"""
    frac,h = math.modf(hours)
    m = round(frac*60, 0)
    if m == 60: # rounded up to next hour
        h += 1; m = 0
    return int(h),int(m)

def _ymd(date):
    """Return y,m,d from datetime object as tuple"""
    return date.timetuple()[:3]

def getsuninfo(location, date=None):
    """Return utc datetime of sunrise, sunset, and length of day in hrs,mins)"""
    if date == None:
        querydate = datetime.date.today()
    else: # date given should be datetime instance
        querydate = date

    args = _ymd(querydate) + location.coords
    utcrise, utcset = Sun().sunRiseSet(*args)
    daylength = Sun().dayLength(*args)
    hrs,mins = _hoursmins(daylength)

    risehour, risemin = _hoursmins(utcrise)
    sethour, setmin   = _hoursmins(utcset)

    # convert times to timedelta values (ie from midnight utc of the date)
    midnight = datetime.datetime(*_ymd(querydate))
    deltarise = datetime.timedelta(hours=risehour, minutes=risemin)
    utcdatetimerise = midnight+deltarise
    deltaset = datetime.timedelta(hours=sethour, minutes=setmin)
    utcdatetimeset  = midnight+deltaset

    # convert results from UTC time to local time of location
    #localrise = utcdatetimerise.astimezone(location.tz)
    #localset  = utcdatetimeset.astimezone(location.tz)

    return utcdatetimerise, utcdatetimeset, hrs, mins

Writing place.py


In [7]:
%%file sun.py
# -*- coding: iso-8859-1 -*-
SUN_PY_VERSION = 1.5
 
import math
from math import pi
 
import calendar
 
class Sun:
   
    def __init__(self):
        """"""
       
        # Some conversion factors between radians and degrees
        self.RADEG = 180.0 / pi
        self.DEGRAD = pi / 180.0
        self.INV360 = 1.0 / 360.0
       
 
    def daysSince2000Jan0(self, y, m, d):
        """A macro to compute the number of days elapsed since 2000 Jan 0.0
           (which is equal to 1999 Dec 31, 0h UT)"""
        return (367*(y)-((7*((y)+(((m)+9)/12)))/4)+((275*(m))/9)+(d)-730530)
   
 
    # The trigonometric functions in degrees
    def sind(self, x):
        """Returns the sin in degrees"""
        return math.sin(x * self.DEGRAD)
 
    def cosd(self, x):
        """Returns the cos in degrees"""
        return math.cos(x * self.DEGRAD)
 
    def tand(self, x):
        """Returns the tan in degrees"""
        return math.tan(x * self.DEGRAD)
 
    def atand(self, x):
        """Returns the arc tan in degrees"""
        return math.atan(x) * self.RADEG
   
    def asind(self, x):
        """Returns the arc sin in degrees"""
        return math.asin(x) * self.RADEG
 
    def acosd(self, x):
        """Returns the arc cos in degrees"""
        return math.acos(x) * self.RADEG
 
    def atan2d(self, y, x):
        """Returns the atan2 in degrees"""
        return math.atan2(y, x) * self.RADEG
 
    # Following are some macros around the "workhorse" function __daylen__
    # They mainly fill in the desired values for the reference altitude    
    # below the horizon, and also selects whether this altitude should    
    # refer to the Sun's center or its upper limb.                        
 
    def dayLength(self, year, month, day, lon, lat):
        """
        This macro computes the length of the day, from sunrise to sunset.
        Sunrise/set is considered to occur when the Sun's upper limb is
        35 arc minutes below the horizon (this accounts for the refraction
        of the Earth's atmosphere).
        """
        return self.__daylen__(year, month, day, lon, lat, -35.0/60.0, 1)
   
    def dayCivilTwilightLength(self, year, month, day, lon, lat):
        """
        This macro computes the length of the day, including civil twilight.
        Civil twilight starts/ends when the Sun's center is 6 degrees below
        the horizon.
        """
        return self.__daylen__(year, month, day, lon, lat, -6.0, 0)
 
    def dayNauticalTwilightLength(self, year, month, day, lon, lat):
        """
        This macro computes the length of the day, incl. nautical twilight.
        Nautical twilight starts/ends when the Sun's center is 12 degrees
        below the horizon.
        """
        return self.__daylen__(year, month, day, lon, lat, -12.0, 0)
 
    def dayAstronomicalTwilightLength(self, year, month, day, lon, lat):
        """
        This macro computes the length of the day, incl. astronomical twilight.
        Astronomical twilight starts/ends when the Sun's center is 18 degrees
        below the horizon.
        """
        return self.__daylen__(year, month, day, lon, lat, -18.0, 0)
   
    def sunRiseSet(self, year, month, day, lon, lat):
        """
        This macro computes times for sunrise/sunset.
        Sunrise/set is considered to occur when the Sun's upper limb is
        35 arc minutes below the horizon (this accounts for the refraction
        of the Earth's atmosphere).
        """
        return self.__sunriset__(year, month, day, lon, lat, -35.0/60.0, 1)
 
   
    def civilTwilight(self, year, month, day, lon, lat):
        """
        This macro computes the start and end times of civil twilight.
        Civil twilight starts/ends when the Sun's center is 6 degrees below
        the horizon.
        """
        return self.__sunriset__(year, month, day, lon, lat, -6.0, 0)
   
    def nauticalTwilight(self, year, month, day, lon, lat):
        """
        This macro computes the start and end times of nautical twilight.
        Nautical twilight starts/ends when the Sun's center is 12 degrees
        below the horizon.
        """
        return self.__sunriset__(year, month, day, lon, lat, -12.0, 0)
   
    def astronomicalTwilight(self, year, month, day, lon, lat):
        """
        This macro computes the start and end times of astronomical twilight.
        Astronomical twilight starts/ends when the Sun's center is 18 degrees
        below the horizon.
        """
        return self.__sunriset__(year, month, day, lon, lat, -18.0, 0)
   
    # The "workhorse" function for sun rise/set times
    def __sunriset__(self, year, month, day, lon, lat, altit, upper_limb):
        """
        Note: year,month,date = calendar date, 1801-2099 only.
              Eastern longitude positive, Western longitude negative
             Northern latitude positive, Southern latitude negative
              The longitude value IS critical in this function!
              altit = the altitude which the Sun should cross
                      Set to -35/60 degrees for rise/set, -6 degrees
                      for civil, -12 degrees for nautical and -18
                      degrees for astronomical twilight.
                upper_limb: non-zero -> upper limb, zero -> center
                      Set to non-zero (e.g. 1) when computing rise/set
                      times, and to zero when computing start/end of
                      twilight.
              *rise = where to store the rise time
              *set  = where to store the set  time
                      Both times are relative to the specified altitude,
                      and thus this function can be used to compute
                      various twilight times, as well as rise/set times
        Return value:  0 = sun rises/sets this day, times stored at
                          *trise and *tset.
                      +1 = sun above the specified 'horizon' 24 hours.
                           *trise set to time when the sun is at south,
                           minus 12 hours while *tset is set to the south
                           time plus 12 hours. 'Day' length = 24 hours
                      -1 = sun is below the specified 'horizon' 24 hours
                           'Day' length = 0 hours, *trise and *tset are
                            both set to the time when the sun is at south.
        """
        # Compute d of 12h local mean solar time
        d = self.daysSince2000Jan0(year,month,day) + 0.5 - (lon/360.0)
       
        # Compute local sidereal time of this moment
        sidtime = self.revolution(self.GMST0(d) + 180.0 + lon)
       
        # Compute Sun's RA + Decl at this moment
        res = self.sunRADec(d)
        sRA = res[0]
        sdec = res[1]
        sr = res[2]
       
        # Compute time when Sun is at south - in hours UT
        tsouth = 12.0 - self.rev180(sidtime - sRA)/15.0;
       
        # Compute the Sun's apparent radius, degrees
        sradius = 0.2666 / sr;
       
        # Do correction to upper limb, if necessary
        if upper_limb:
            altit = altit - sradius
       
        # Compute the diurnal arc that the Sun traverses to reach
        # the specified altitude altit:
       
        cost = (self.sind(altit) - self.sind(lat) * self.sind(sdec))/\
               (self.cosd(lat) * self.cosd(sdec))
 
        if cost >= 1.0:
            rc = -1
            t = 0.0           # Sun always below altit
           
        elif cost <= -1.0:
            rc = +1
            t = 12.0;         # Sun always above altit
 
        else:
            t = self.acosd(cost)/15.0   # The diurnal arc, hours
 
       
        # Store rise and set times - in hours UT
        return (tsouth-t, tsouth+t)
 
 
    def __daylen__(self, year, month, day, lon, lat, altit, upper_limb):
        """
        Note: year,month,date = calendar date, 1801-2099 only.            
              Eastern longitude positive, Western longitude negative      
              Northern latitude positive, Southern latitude negative      
              The longitude value is not critical. Set it to the correct  
              longitude if you're picky, otherwise set to, say, 0.0    
              The latitude however IS critical - be sure to get it correct
              altit = the altitude which the Sun should cross              
                      Set to -35/60 degrees for rise/set, -6 degrees      
                      for civil, -12 degrees for nautical and -18          
                      degrees for astronomical twilight.                  
                upper_limb: non-zero -> upper limb, zero -> center        
                      Set to non-zero (e.g. 1) when computing day length  
                      and to zero when computing day+twilight length.      
                                                       
        """
       
        # Compute d of 12h local mean solar time
        d = self.daysSince2000Jan0(year,month,day) + 0.5 - (lon/360.0)
                 
        # Compute obliquity of ecliptic (inclination of Earth's axis)
        obl_ecl = 23.4393 - 3.563E-7 * d
       
        # Compute Sun's position
        res = self.sunpos(d)
        slon = res[0]
        sr = res[1]
       
        # Compute sine and cosine of Sun's declination
        sin_sdecl = self.sind(obl_ecl) * self.sind(slon)
        cos_sdecl = math.sqrt(1.0 - sin_sdecl * sin_sdecl)
       
        # Compute the Sun's apparent radius, degrees
        sradius = 0.2666 / sr
       
        # Do correction to upper limb, if necessary
        if upper_limb:
            altit = altit - sradius
 
           
        cost = (self.sind(altit) - self.sind(lat) * sin_sdecl) / \
               (self.cosd(lat) * cos_sdecl)
        if cost >= 1.0:
            t = 0.0             # Sun always below altit
       
        elif cost <= -1.0:
            t = 24.0      # Sun always above altit
       
        else:
            t = (2.0/15.0) * self.acosd(cost);     # The diurnal arc, hours
           
        return t
 
   
    def sunpos(self, d):
        """
        Computes the Sun's ecliptic longitude and distance
        at an instant given in d, number of days since    
        2000 Jan 0.0.  The Sun's ecliptic latitude is not  
        computed, since it's always very near 0.          
        """
 
        # Compute mean elements
        M = self.revolution(356.0470 + 0.9856002585 * d)
        w = 282.9404 + 4.70935E-5 * d
        e = 0.016709 - 1.151E-9 * d
       
        # Compute true longitude and radius vector
        E = M + e * self.RADEG * self.sind(M) * (1.0 + e * self.cosd(M))
        x = self.cosd(E) - e
        y = math.sqrt(1.0 - e*e) * self.sind(E)
        r = math.sqrt(x*x + y*y)              #Solar distance
        v = self.atan2d(y, x)                 # True anomaly
        lon = v + w                        # True solar longitude
        if lon >= 360.0:
            lon = lon - 360.0   # Make it 0..360 degrees
           
        return (lon,r)
   
 
    def sunRADec(self, d):
        """
       Returns the angle of the Sun (RA)
       the declination (dec) and the distance of the Sun (r)
       for a given day d.
       """
       
        # Compute Sun's ecliptical coordinates
        res = self.sunpos(d)
        lon = res[0]  # True solar longitude
        r = res[1]    # Solar distance
       
        # Compute ecliptic rectangular coordinates (z=0)
        x = r * self.cosd(lon)
        y = r * self.sind(lon)
       
        # Compute obliquity of ecliptic (inclination of Earth's axis)
        obl_ecl = 23.4393 - 3.563E-7 * d
       
        # Convert to equatorial rectangular coordinates - x is unchanged
        z = y * self.sind(obl_ecl)
        y = y * self.cosd(obl_ecl)
 
        # Convert to spherical coordinates
        RA = self.atan2d(y, x)
        dec = self.atan2d(z, math.sqrt(x*x + y*y))
 
        return (RA, dec, r)
 
 
    def revolution(self, x):
        """
        This function reduces any angle to within the first revolution
        by subtracting or adding even multiples of 360.0 until the    
        result is >= 0.0 and < 360.0
       
        Reduce angle to within 0..360 degrees
        """
        return (x - 360.0 * math.floor(x * self.INV360))
   
    def rev180(self, x):
        """
        Reduce angle to within +180..+180 degrees
        """
        return (x - 360.0 * math.floor(x * self.INV360 + 0.5))
 
    def GMST0(self, d):
        """
        This function computes GMST0, the Greenwich Mean Sidereal Time  
        at 0h UT (i.e. the sidereal time at the Greenwhich meridian at  
        0h UT).  GMST is then the sidereal time at Greenwich at any    
        time of the day.  I've generalized GMST0 as well, and define it
        as:  GMST0 = GMST - UT  --  this allows GMST0 to be computed at
        other times than 0h UT as well.  While this sounds somewhat    
        contradictory, it is very practical:  instead of computing      
        GMST like:                                                      
                                                                       
         GMST = (GMST0) + UT * (366.2422/365.2422)                      
                                                                       
        where (GMST0) is the GMST last time UT was 0 hours, one simply  
        computes:                                                      
                                                                       
         GMST = GMST0 + UT                                              
                                                                       
        where GMST0 is the GMST "at 0h UT" but at the current moment!  
        Defined in this way, GMST0 will increase with about 4 min a    
        day.  It also happens that GMST0 (in degrees, 1 hr = 15 degr)  
        is equal to the Sun's mean longitude plus/minus 180 degrees!    
        (if we neglect aberration, which amounts to 20 seconds of arc  
        or 1.33 seconds of time)
        """
        # Sidtime at 0h UT = L (Sun's mean longitude) + 180.0 degr  
        # L = M + w, as defined in sunpos().  Since I'm too lazy to
        # add these numbers, I'll let the C compiler do it for me.  
        # Any decent C compiler will add the constants at compile  
        # time, imposing no runtime or code overhead.              
                                               
        sidtim0 = self.revolution((180.0 + 356.0470 + 282.9404) +
                                     (0.9856002585 + 4.70935E-5) * d)
        return sidtim0;
 
    def solar_altitude(self, latitude, year, month, day):
        """
       Compute the altitude of the sun. No atmospherical refraction taken
       in account.
       Altitude of the southern hemisphere are given relative to
       true north.
       Altitude of the northern hemisphere are given relative to
       true south.
       Declination is between 23.5° North and 23.5° South depending
       on the period of the year.
       Source of formula for altitude is PhysicalGeography.net
       http://www.physicalgeography.net/fundamentals/6h.html
       """
        # Compute declination
        N = self.daysSince2000Jan0(year, month, day)
        res =  self.sunRADec(N)
        declination = res[1]
        sr = res[2]
 
        # Compute the altitude
        altitude = 90.0 - latitude  + declination
 
        # In the tropical and  in extreme latitude, values over 90 may occurs.
        if altitude > 90:
            altitude = 90 - (altitude-90)
 
        if altitude < 0:
            altitude = 0
 
        return altitude
 
    def get_max_solar_flux(self, latitude, year, month, day):
        """
       Compute the maximal solar flux to reach the ground for this date and
       latitude.
       Originaly comes from Environment Canada weather forecast model.
       Information was of the public domain before release by Environment Canada
       Output is in W/M^2.
       """
 
        (fEot, fR0r, tDeclsc) = self.equation_of_time(year, month, day, latitude)
        fSF = (tDeclsc[0]+tDeclsc[1])*fR0r
 
        # In the case of a negative declinaison, solar flux is null
        if fSF < 0:
            fCoeff = 0
        else:
            fCoeff =  -1.56e-12*fSF**4 + 5.972e-9*fSF**3 -\
                     8.364e-6*fSF**2  + 5.183e-3*fSF - 0.435
       
        fSFT = fSF * fCoeff
 
        if fSFT < 0:
            fSFT=0
 
        return fSFT
 
    def equation_of_time(self, year, month, day, latitude):
        """
       Description: Subroutine computing the part of the equation of time
                    needed in the computing of the theoritical solar flux
                    Correction originating of the CMC GEM model.
                   
       Parameters:  int nTime : cTime for the correction of the time.
 
       Returns: tuple (double fEot, double fR0r, tuple tDeclsc)
                dEot: Correction for the equation of time
                dR0r: Corrected solar constant for the equation of time
                tDeclsc: Declinaison
       """
        # Julian date
        nJulianDate = self.Julian(year, month, day)
        # Check if it is a leap year
        if(calendar.isleap(year)):
            fDivide = 366.0
        else:
            fDivide = 365.0
        # Correction for "equation of time"
        fA = nJulianDate/fDivide*2*pi
        fR0r = self.__Solcons(fA)*0.1367e4
        fRdecl = 0.412*math.cos((nJulianDate+10.0)*2.0*pi/fDivide-pi)
        fDeclsc1 = self.sind(latitude)*math.sin(fRdecl)
        fDeclsc2 = self.cosd(latitude)*math.cos(fRdecl)
        tDeclsc = (fDeclsc1, fDeclsc2)
        # in minutes
        fEot = 0.002733 -7.343*math.sin(fA)+ .5519*math.cos(fA) \
               - 9.47*math.sin(2.0*fA) - 3.02*math.cos(2.0*fA) \
               - 0.3289*math.sin(3.*fA) -0.07581*math.cos(3.0*fA) \
               -0.1935*math.sin(4.0*fA) -0.1245*math.cos(4.0*fA)
        # Express in fraction of hour
        fEot = fEot/60.0
        # Express in radians
        fEot = fEot*15*pi/180.0
 
        return (fEot, fR0r, tDeclsc)
 
    def __Solcons(self, dAlf):
        """
       Name: __Solcons
       
       Parameters: [I] double dAlf : Solar constant to correct the excentricity
       
       Returns: double dVar : Variation of the solar constant
 
       Functions Called: cos, sin
       
       Description:  Statement function that calculates the variation of the
         solar constant as a function of the julian day. (dAlf, in radians)
       
       Notes: Comes from the
       
       Revision History:
        Author          Date            Reason
       Miguel Tremblay      June 30th 2004
       """
       
        dVar = 1.0/(1.0-9.464e-4*math.sin(dAlf)-0.01671*math.cos(dAlf)- \
                    + 1.489e-4*math.cos(2.0*dAlf)-2.917e-5*math.sin(3.0*dAlf)- \
                    + 3.438e-4*math.cos(4.0*dAlf))**2
        return dVar
 
 
    def Julian(self, year, month, day):
        """
       Return julian day.
       """
        if calendar.isleap(year): # Bissextil year, 366 days
            lMonth = [0, 31, 60, 91, 121, 152, 182, 213, 244, 274, 305, 335, 366]
        else: # Normal year, 365 days
            lMonth = [0, 31, 59, 90, 120, 151, 181, 212, 243, 273, 304, 334, 365]
 
        nJulian = lMonth[month-1] + day
        return nJulian

Writing sun.py
