In [1]:
# Import packages
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
spark = SparkSession.builder.getOrCreate()

In [2]:
# Read csv files
ds = spark.read.csv(r"C:\Users\jeffr\Documents\Google Data Analytics\Case Study 2\Data - Original\Fitabase Data 4.12.16-5.12.16\dailyActivity_merged.csv", header=True)

# Print number of records and columns
print((ds.count(), len(ds.columns)))

(940, 15)


In [3]:
# Show the first 5 rows of data
ds.show(5)

+----------+------------+----------+-------------+---------------+------------------------+------------------+------------------------+-------------------+-----------------------+-----------------+-------------------+--------------------+----------------+--------+
|        Id|ActivityDate|TotalSteps|TotalDistance|TrackerDistance|LoggedActivitiesDistance|VeryActiveDistance|ModeratelyActiveDistance|LightActiveDistance|SedentaryActiveDistance|VeryActiveMinutes|FairlyActiveMinutes|LightlyActiveMinutes|SedentaryMinutes|Calories|
+----------+------------+----------+-------------+---------------+------------------------+------------------+------------------------+-------------------+-----------------------+-----------------+-------------------+--------------------+----------------+--------+
|1503960366|  2016-04-12|     13162|          8.5|            8.5|                       0|       1.879999995|             0.550000012|        6.059999943|                      0|               25|        

In [4]:
# Get list of columns and data types
ds.printSchema()

root
 |-- Id: string (nullable = true)
 |-- ActivityDate: string (nullable = true)
 |-- TotalSteps: string (nullable = true)
 |-- TotalDistance: string (nullable = true)
 |-- TrackerDistance: string (nullable = true)
 |-- LoggedActivitiesDistance: string (nullable = true)
 |-- VeryActiveDistance: string (nullable = true)
 |-- ModeratelyActiveDistance: string (nullable = true)
 |-- LightActiveDistance: string (nullable = true)
 |-- SedentaryActiveDistance: string (nullable = true)
 |-- VeryActiveMinutes: string (nullable = true)
 |-- FairlyActiveMinutes: string (nullable = true)
 |-- LightlyActiveMinutes: string (nullable = true)
 |-- SedentaryMinutes: string (nullable = true)
 |-- Calories: string (nullable = true)



In [5]:
# Format columns to correct data type
ds = ds.withColumn("TotalSteps",col("TotalSteps").cast(IntegerType())) \
    .withColumn("TotalDistance",col("TotalDistance").cast(FloatType())) \
    .withColumn("TrackerDistance",col("TrackerDistance").cast(FloatType())) \
    .withColumn("LoggedActivitiesDistance",col("LoggedActivitiesDistance").cast(FloatType())) \
    .withColumn("VeryActiveDistance",col("VeryActiveDistance").cast(FloatType())) \
    .withColumn("ModeratelyActiveDistance",col("ModeratelyActiveDistance").cast(FloatType())) \
    .withColumn("LightActiveDistance",col("LightActiveDistance").cast(FloatType())) \
    .withColumn("SedentaryActiveDistance",col("SedentaryActiveDistance").cast(FloatType())) \
    .withColumn("VeryActiveMinutes",col("VeryActiveMinutes").cast(IntegerType())) \
    .withColumn("FairlyActiveMinutes",col("FairlyActiveMinutes").cast(IntegerType())) \
    .withColumn("LightlyActiveMinutes",col("LightlyActiveMinutes").cast(IntegerType())) \
    .withColumn("SedentaryMinutes",col("SedentaryMinutes").cast(IntegerType())) \
    .withColumn("Calories",col("Calories").cast(IntegerType()))

# Convert activity date from string to date format
ds = ds.withColumn("date",
                 to_date(unix_timestamp(col("ActivityDate"), "yyyy-MM-dd").cast("timestamp")))

# Verify reformatting was successful
ds.printSchema()

root
 |-- Id: string (nullable = true)
 |-- ActivityDate: string (nullable = true)
 |-- TotalSteps: integer (nullable = true)
 |-- TotalDistance: float (nullable = true)
 |-- TrackerDistance: float (nullable = true)
 |-- LoggedActivitiesDistance: float (nullable = true)
 |-- VeryActiveDistance: float (nullable = true)
 |-- ModeratelyActiveDistance: float (nullable = true)
 |-- LightActiveDistance: float (nullable = true)
 |-- SedentaryActiveDistance: float (nullable = true)
 |-- VeryActiveMinutes: integer (nullable = true)
 |-- FairlyActiveMinutes: integer (nullable = true)
 |-- LightlyActiveMinutes: integer (nullable = true)
 |-- SedentaryMinutes: integer (nullable = true)
 |-- Calories: integer (nullable = true)
 |-- date: date (nullable = true)



In [6]:
# Show first 5 rows
ds.show(5)

+----------+------------+----------+-------------+---------------+------------------------+------------------+------------------------+-------------------+-----------------------+-----------------+-------------------+--------------------+----------------+--------+----------+
|        Id|ActivityDate|TotalSteps|TotalDistance|TrackerDistance|LoggedActivitiesDistance|VeryActiveDistance|ModeratelyActiveDistance|LightActiveDistance|SedentaryActiveDistance|VeryActiveMinutes|FairlyActiveMinutes|LightlyActiveMinutes|SedentaryMinutes|Calories|      date|
+----------+------------+----------+-------------+---------------+------------------------+------------------+------------------------+-------------------+-----------------------+-----------------+-------------------+--------------------+----------------+--------+----------+
|1503960366|  2016-04-12|     13162|          8.5|            8.5|                     0.0|              1.88|                    0.55|               6.06|                 

In [7]:
# Remove duplicate rows and count all rows again
ds.dropDuplicates().count()

940

In [8]:
# Verify no instances of null values
ds.select([count(when(col(c).isNull(), c)).alias(c) for c in ds.columns]).show()

+---+------------+----------+-------------+---------------+------------------------+------------------+------------------------+-------------------+-----------------------+-----------------+-------------------+--------------------+----------------+--------+----+
| Id|ActivityDate|TotalSteps|TotalDistance|TrackerDistance|LoggedActivitiesDistance|VeryActiveDistance|ModeratelyActiveDistance|LightActiveDistance|SedentaryActiveDistance|VeryActiveMinutes|FairlyActiveMinutes|LightlyActiveMinutes|SedentaryMinutes|Calories|date|
+---+------------+----------+-------------+---------------+------------------------+------------------+------------------------+-------------------+-----------------------+-----------------+-------------------+--------------------+----------------+--------+----+
|  0|           0|         0|            0|              0|                       0|                 0|                       0|                  0|                      0|                0|                  0| 

In [9]:
# Count distinct Id to confirm there are only 30
unique_id = ds.select(countDistinct("Id"))
unique_id.show()

+------------------+
|count(DISTINCT Id)|
+------------------+
|                33|
+------------------+



In [10]:
# Create new column for day of week
ds = ds.withColumn("week_day", date_format(col("date"), "EEEE"))

# Confirm calculation was successful
ds.select("date", "week_day").show(10)

+----------+---------+
|      date| week_day|
+----------+---------+
|2016-04-12|  Tuesday|
|2016-04-13|Wednesday|
|2016-04-14| Thursday|
|2016-04-15|   Friday|
|2016-04-16| Saturday|
|2016-04-17|   Sunday|
|2016-04-18|   Monday|
|2016-04-19|  Tuesday|
|2016-04-20|Wednesday|
|2016-04-21| Thursday|
+----------+---------+
only showing top 10 rows



In [12]:
# Create a column for total exercise minutes
import pyspark.sql.functions as F
ds = ds.withColumn("total_exercise_mins", F.col("SedentaryMinutes") + F.col("LightlyActiveMinutes") + F.col("FairlyActiveMinutes") + F.col("VeryActiveMinutes"))

ds.select("Id", "SedentaryMinutes", "LightlyActiveMinutes", "FairlyActiveMinutes", "VeryActiveMinutes", "total_exercise_mins").show(10)

+----------+----------------+--------------------+-------------------+-----------------+-------------------+
|        Id|SedentaryMinutes|LightlyActiveMinutes|FairlyActiveMinutes|VeryActiveMinutes|total_exercise_mins|
+----------+----------------+--------------------+-------------------+-----------------+-------------------+
|1503960366|             728|                 328|                 13|               25|               1094|
|1503960366|             776|                 217|                 19|               21|               1033|
|1503960366|            1218|                 181|                 11|               30|               1440|
|1503960366|             726|                 209|                 34|               29|                998|
|1503960366|             773|                 221|                 10|               36|               1040|
|1503960366|             539|                 164|                 20|               38|                761|
|1503960366|       

In [13]:
# Convert total minutes to total hours
ds = ds.withColumn("total_exercise_hours", round(F.col("total_exercise_mins") / 60))

ds.select("total_exercise_mins", "total_exercise_hours").show(10)

+-------------------+--------------------+
|total_exercise_mins|total_exercise_hours|
+-------------------+--------------------+
|               1094|                18.0|
|               1033|                17.0|
|               1440|                24.0|
|                998|                17.0|
|               1040|                17.0|
|                761|                13.0|
|               1440|                24.0|
|               1120|                19.0|
|               1063|                18.0|
|               1076|                18.0|
+-------------------+--------------------+
only showing top 10 rows



In [14]:
# Frequency distribution of day of the week (null represents total)
ds.cube('week_day').count().show()

+---------+-----+
| week_day|count|
+---------+-----+
|Wednesday|  150|
| Thursday|  147|
|  Tuesday|  152|
|   Monday|  120|
|   Friday|  126|
|   Sunday|  121|
| Saturday|  124|
|     null|  940|
+---------+-----+



In [15]:
# View summary data
ds.select('TotalSteps', 'TotalDistance', 'total_exercise_hours', 'calories').summary().show()

+-------+------------------+-----------------+--------------------+-----------------+
|summary|        TotalSteps|    TotalDistance|total_exercise_hours|         calories|
+-------+------------------+-----------------+--------------------+-----------------+
|  count|               940|              940|                 940|              940|
|   mean|7637.9106382978725|5.489702121915415|  20.319148936170212|2303.609574468085|
| stddev| 5087.150741753412|3.924605908624869|  4.4322200255392525|718.1668621342561|
|    min|                 0|              0.0|                 0.0|                0|
|    25%|              3789|             2.62|                16.0|             1827|
|    50%|              7399|             5.24|                24.0|             2133|
|    75%|             10725|             7.71|                24.0|             2793|
|    max|             36019|            28.03|                24.0|             4900|
+-------+------------------+-----------------+--------

In [16]:
# Rename and drop columns for consistency
df = ds.withColumnRenamed("Id","id") \
    .withColumnRenamed("TotalSteps","total_steps") \
    .withColumnRenamed("TotalDistance","total_dist") \
    .withColumnRenamed("TrackerDistance","tracker_dist") \
    .withColumnRenamed("LoggedActivitiesDistance","logged_act_dist") \
    .withColumnRenamed("VeryActiveDistance","very_act_dist") \
    .withColumnRenamed("ModeratelyActiveDistance","moderately_act_dist") \
    .withColumnRenamed("LightActiveDistance","light_act_dist") \
    .withColumnRenamed("SedentaryActiveDistance","sedentary_act_mins") \
    .withColumnRenamed("VeryActiveMinutes","very_act_mins") \
    .withColumnRenamed("FairlyActiveMinutes","fairly_act_mins") \
    .withColumnRenamed("LightlyActiveMinutes","light_act_mins") \
    .withColumnRenamed("SedentaryMinutes","sedentary_mins") \
    .withColumnRenamed("Calories","calories") \
    .drop("ActivityDate")

In [17]:
from pandas import *

# Export dataset as a .csv file
df.toPandas().to_csv(r"C:\Users\jeffr\Documents\Google Data Analytics\Case Study 2\bellabeat_cleaned.csv")