In [2]:
import findspark
findspark.init("/Users/Jean/spark-2.4.4-bin-hadoop2.7")
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('mifit').getOrCreate()

In [3]:
from pyspark.sql.types import DateType
from pyspark.sql.functions import (hour, minute, unix_timestamp,
                                   from_unixtime, hour,datediff,
                                   mean, month, corr, date_format, 
                                  to_date, max, min)
from pyspark.sql.types import (StructField, StringType,
                               IntegerType, StructType, TimestampType)

In [4]:
df_sleep = spark.read.csv('SLEEP/SLEEP_1568018402172.csv', inferSchema=True, header=True)
df_activity = spark.read.csv('ACTIVITY/ACTIVITY_1568018401826.csv', inferSchema=True, header=True)
df_heart = spark.read.csv('HEARTRATE_AUTO/HEARTRATE_AUTO_1568018402363.csv', inferSchema=True, header=True)

In [5]:
df_sleep.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- lastSyncTime: integer (nullable = true)
 |-- deepSleepTime: integer (nullable = true)
 |-- shallowSleepTime: integer (nullable = true)
 |-- wakeTime: integer (nullable = true)
 |-- start: integer (nullable = true)
 |-- stop: integer (nullable = true)



In [6]:
df_activity.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- lastSyncTime: integer (nullable = true)
 |-- steps: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- runDistance: integer (nullable = true)
 |-- calories: integer (nullable = true)



In [7]:
df_heart.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- time: string (nullable = true)
 |-- heartRate: integer (nullable = true)



In [8]:
df_sleep.head()

Row(date=datetime.datetime(2019, 7, 12, 0, 0), lastSyncTime=1563000976, deepSleepTime=98, shallowSleepTime=304, wakeTime=2, start=1562915520, stop=1562939760)

In [9]:
df_sleep = df_sleep.withColumn("dateStart", from_unixtime(df_sleep["start"]))
df_sleep = df_sleep.withColumn("dateStop", from_unixtime(df_sleep["stop"]))
df_sleep = df_sleep.withColumn("qntHourSleep", (datediff(df_sleep["dateStop"], df_sleep["dateStart"])* 24) +
                   hour(df_sleep["dateStop"]) - hour(df_sleep["dateStart"]))

In [10]:
df_sleep.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- lastSyncTime: integer (nullable = true)
 |-- deepSleepTime: integer (nullable = true)
 |-- shallowSleepTime: integer (nullable = true)
 |-- wakeTime: integer (nullable = true)
 |-- start: integer (nullable = true)
 |-- stop: integer (nullable = true)
 |-- dateStart: string (nullable = true)
 |-- dateStop: string (nullable = true)
 |-- qntHourSleep: integer (nullable = true)



In [11]:
df_sleep.select(mean(df_sleep["qntHourSleep"]).alias("average sleep")).show()

+-----------------+
|    average sleep|
+-----------------+
|7.016949152542373|
+-----------------+



In [12]:
avg_sleep_month = df_sleep.groupBy(month(df_sleep["date"])).mean().select(["month(date)","avg(qntHourSleep)"])
avg_sleep_month.withColumnRenamed("month(date)", "month").withColumnRenamed("avg(qntHourSleep)", "average").show()

+-----+-----------------+
|month|          average|
+-----+-----------------+
|    9|            6.875|
|    8|6.903225806451613|
|    7|             7.25|
+-----+-----------------+



In [13]:
#Merging data with activity data
df_merge = df_sleep.join(df_activity, "date")

In [14]:
df_merge.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- lastSyncTime: integer (nullable = true)
 |-- deepSleepTime: integer (nullable = true)
 |-- shallowSleepTime: integer (nullable = true)
 |-- wakeTime: integer (nullable = true)
 |-- start: integer (nullable = true)
 |-- stop: integer (nullable = true)
 |-- dateStart: string (nullable = true)
 |-- dateStop: string (nullable = true)
 |-- qntHourSleep: integer (nullable = true)
 |-- lastSyncTime: integer (nullable = true)
 |-- steps: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- runDistance: integer (nullable = true)
 |-- calories: integer (nullable = true)



In [15]:
df_merge.head(1)

[Row(date=datetime.datetime(2019, 7, 12, 0, 0), lastSyncTime=1563000976, deepSleepTime=98, shallowSleepTime=304, wakeTime=2, start=1562915520, stop=1562939760, dateStart='2019-07-12 04:12:00', dateStop='2019-07-12 10:56:00', qntHourSleep=6, lastSyncTime=1563000976, steps=6105, distance=4199, runDistance=424, calories=157)]

In [16]:
df_merge.select(corr(df_merge["calories"], df_merge["shallowSleepTime"])).show()

+--------------------------------+
|corr(calories, shallowSleepTime)|
+--------------------------------+
|            -0.27953974073528665|
+--------------------------------+



In [17]:
#Getting the average heart rate per day
average_heart = (df_heart.select(df_heart["date"], date_format(df_heart["date"], "yyyy-MM-dd").alias("day"), df_heart["heartRate"])
 .groupBy("day")
 .mean()
 .select(["day","avg(heartRate)"]))

In [18]:
#The schema now shows the date as a string.
average_heart.printSchema()

root
 |-- day: string (nullable = true)
 |-- avg(heartRate): double (nullable = true)



In [19]:
#Get the date back to the date format
average_heart = average_heart.select(
    average_heart['avg(heartRate)'].alias("AvgHeartRate"), 
    to_date(unix_timestamp('day', 'yyyy-MM-dd').cast("timestamp")).alias('date')
)

In [20]:
#Now the date is in a date format
average_heart.printSchema()

root
 |-- AvgHeartRate: double (nullable = true)
 |-- date: date (nullable = true)



In [21]:
average_heart.show()

+-----------------+----------+
|     AvgHeartRate|      date|
+-----------------+----------+
|88.72995780590718|2019-08-08|
|96.98776758409785|2019-08-31|
|79.87243735763099|2019-07-13|
|77.62081784386618|2019-07-19|
|90.71465968586388|2019-08-29|
|83.92491467576792|2019-07-26|
|85.23397435897436|2019-08-01|
|78.88571428571429|2019-08-04|
|84.48571428571428|2019-08-11|
|85.85664335664336|2019-09-08|
| 79.4171270718232|2019-07-12|
|86.34166666666667|2019-09-03|
| 87.6403785488959|2019-08-27|
|83.30847457627118|2019-07-23|
|83.89240506329114|2019-07-14|
|87.72093023255815|2019-08-28|
|87.31058020477816|2019-07-24|
|83.98692810457516|2019-08-06|
|95.26923076923077|2019-09-01|
| 85.2970711297071|2019-08-07|
+-----------------+----------+
only showing top 20 rows



In [22]:
#Merging data with heart data
df_merge = df_merge.join(average_heart, "date")

In [23]:
df_merge.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- lastSyncTime: integer (nullable = true)
 |-- deepSleepTime: integer (nullable = true)
 |-- shallowSleepTime: integer (nullable = true)
 |-- wakeTime: integer (nullable = true)
 |-- start: integer (nullable = true)
 |-- stop: integer (nullable = true)
 |-- dateStart: string (nullable = true)
 |-- dateStop: string (nullable = true)
 |-- qntHourSleep: integer (nullable = true)
 |-- lastSyncTime: integer (nullable = true)
 |-- steps: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- runDistance: integer (nullable = true)
 |-- calories: integer (nullable = true)
 |-- AvgHeartRate: double (nullable = true)



In [41]:
df_merge.select(corr(df_merge["shallowSleepTime"], df_merge["deepSleepTime"])).show()

+-------------------------------------+
|corr(shallowSleepTime, deepSleepTime)|
+-------------------------------------+
|                 -0.08180222534329716|
+-------------------------------------+



In [25]:
df_merge.select(min("AvgHeartRate")).show()

+-----------------+
|min(AvgHeartRate)|
+-----------------+
|             68.5|
+-----------------+

