# Spark Tutorial

## 1. WordCount

In [1]:
textFile = sc.textFile("README.md")

In [3]:
textFile.count() # Number of items in this RDD

104

In [4]:
textFile.first() # First item in this RDD

'# Apache Spark'

In [5]:
linesWithSpark = textFile.filter(lambda line: "Spark" in line)

In [6]:
# How many lines contain "Spark"?
textFile.filter(lambda line: "Spark" in line).count()

20

In [9]:
textFile.map(lambda line: len(line.split()))\
        .reduce(lambda a, b: a if (a > b) else b)

22

In [10]:
def max(a, b):
    if a > b:
        return a
    else:
        return b

textFile.map(lambda line: len(line.split())).reduce(max)

22

In [11]:
wordCounts = textFile.flatMap(lambda line: line.split())\
                        .map(lambda word: (word, 1))\
                        .reduceByKey(lambda a, b: a+b)

In [12]:
wordCounts.collect()

[('guide,', 1),
 ('APIs', 1),
 ('environment', 1),
 ('name', 1),
 ('is', 6),
 ('developing', 1),
 ('["Parallel', 1),
 ('[http://spark.apache.org/developer-tools.html](the', 1),
 ('development', 1),
 ('tools', 1),
 ('system', 1),
 ('-T', 1),
 ('shell:', 2),
 ('Pi', 1),
 ('find', 1),
 ('Developer', 1),
 ('sc.parallelize(1', 1),
 ('Configuration', 1),
 ('run', 7),
 ('computing', 1),
 ('start', 1),
 ('return', 2),
 ('see', 3),
 ('Python,', 2),
 ('directory.', 1),
 ('rich', 1),
 ('processing.', 1),
 ('"local[N]"', 1),
 ('graph', 1),
 ('you', 4),
 ('data', 1),
 ('using:', 1),
 ('package', 1),
 ('Building', 1),
 ('mesos://', 1),
 ('do', 2),
 ('several', 1),
 ('A', 1),
 ('The', 1),
 ('MASTER', 1),
 ('"local"', 1),
 ('by', 1),
 ('programming', 1),
 ('for', 12),
 ('Online', 1),
 ('scala>', 1),
 ('a', 8),
 ('on', 7),
 ('overview', 1),
 ('basic', 1),
 ('optimized', 1),
 ('latest', 1),
 ('package.)', 1),
 ('Hadoop,', 2),
 ('module,', 1),
 ('More', 1),
 ('[project', 1),
 ('Useful', 1),
 ('1000:', 2)

## 2. K-means Clustering

In [13]:
from pyspark.ml.clustering import KMeans

In [14]:
# Loads data.
dataset = spark.read.format("libsvm")\
                .load("data/mllib/sample_kmeans_data.txt")

In [29]:
dataset.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|           (3,[],[])|
|  1.0|(3,[0,1,2],[0.1,0...|
|  2.0|(3,[0,1,2],[0.2,0...|
|  3.0|(3,[0,1,2],[9.0,9...|
|  4.0|(3,[0,1,2],[9.1,9...|
|  5.0|(3,[0,1,2],[9.2,9...|
+-----+--------------------+



In [15]:
# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)

In [16]:
# Evaluate clustering by computing 
# Within Set Sum of Squared Errors.
wssse = model.computeCost(dataset)
print("Within Set Sum of Squared Errors = " + str(wssse))

Within Set Sum of Squared Errors = 0.11999999999994547


In [17]:
# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Cluster Centers: 
[ 0.1  0.1  0.1]
[ 9.1  9.1  9.1]


## 3. sokulee data analysis

In [30]:
df_sleep = spark.read\
            .json('/home/hadoop/sokulee/*/*_sleep.json')

In [32]:
df_sleep.show()

+--------------------+-------------+
|               sleep|      summary|
+--------------------+-------------+
|[[0,0,1,2016-05-0...|[1199,2,1219]|
|[[0,0,22,2016-04-...| [993,3,1082]|
|[[8,58,44,2016-05...| [880,1,1040]|
|[[1,2,24,2016-04-...|  [915,1,962]|
|[[0,0,1,2016-05-0...|  [959,1,960]|
|[[8,15,38,2016-05...|  [859,1,950]|
|[[2,2,45,2016-05-...|  [786,2,902]|
|[[1,4,33,2016-05-...|  [820,1,884]|
|[[5,17,22,2016-05...|  [809,1,873]|
|[[2,4,12,2016-05-...|  [823,2,856]|
|[[2,4,12,2016-05-...|  [823,2,856]|
|[[1,3,25,2016-04-...|  [783,2,848]|
|[[2,5,40,2016-04-...|  [782,1,857]|
|[[4,21,42,2016-05...|  [759,1,851]|
|[[4,21,42,2016-05...|  [759,1,851]|
|[[2,3,40,2016-04-...|  [735,2,839]|
|[[2,2,34,2016-04-...|  [783,1,841]|
|[[3,10,38,2016-04...|  [746,1,837]|
|[[1,1,30,2016-04-...|  [753,2,821]|
|[[1,4,13,2016-05-...|  [763,1,819]|
+--------------------+-------------+
only showing top 20 rows



In [33]:
df_steps = spark.read\
            .json('/home/hadoop/sokulee/*/*_steps.json')

In [34]:
df_steps.show()

+--------------------+-------------------------+------+-------+
|    activities-steps|activities-steps-intraday|errors|success|
+--------------------+-------------------------+------+-------+
|[[2016-04-04,42083]]|     [WrappedArray([00...|  null|   null|
|[[2016-05-14,44756]]|     [WrappedArray([00...|  null|   null|
|[[2016-05-14,38608]]|     [WrappedArray([00...|  null|   null|
|[[2016-05-06,42330]]|     [WrappedArray([00...|  null|   null|
|[[2016-04-09,35277]]|     [WrappedArray([00...|  null|   null|
|[[2016-04-08,30282]]|     [WrappedArray([00...|  null|   null|
|[[2016-04-09,33941]]|     [WrappedArray([00...|  null|   null|
|[[2016-04-09,33941]]|     [WrappedArray([00...|  null|   null|
|[[2016-04-12,32075]]|     [WrappedArray([00...|  null|   null|
|[[2016-04-12,32075]]|     [WrappedArray([00...|  null|   null|
|[[2016-04-09,29336]]|     [WrappedArray([00...|  null|   null|
|[[2016-05-01,31228]]|     [WrappedArray([00...|  null|   null|
|[[2016-04-04,31950]]|     [WrappedArray

In [35]:
df_sleep.printSchema()

root
 |-- sleep: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- awakeCount: long (nullable = true)
 |    |    |-- awakeDuration: long (nullable = true)
 |    |    |-- awakeningsCount: long (nullable = true)
 |    |    |-- dateOfSleep: string (nullable = true)
 |    |    |-- duration: long (nullable = true)
 |    |    |-- efficiency: long (nullable = true)
 |    |    |-- isMainSleep: boolean (nullable = true)
 |    |    |-- logId: long (nullable = true)
 |    |    |-- minuteData: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- dateTime: string (nullable = true)
 |    |    |    |    |-- value: string (nullable = true)
 |    |    |-- minutesAfterWakeup: long (nullable = true)
 |    |    |-- minutesAsleep: long (nullable = true)
 |    |    |-- minutesAwake: long (nullable = true)
 |    |    |-- minutesToFallAsleep: long (nullable = true)
 |    |    |-- restlessCount: long (nullable = true)
 |

In [36]:
df_steps.printSchema()

root
 |-- activities-steps: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- dateTime: string (nullable = true)
 |    |    |-- value: string (nullable = true)
 |-- activities-steps-intraday: struct (nullable = true)
 |    |-- dataset: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- time: string (nullable = true)
 |    |    |    |-- value: long (nullable = true)
 |    |-- datasetInterval: long (nullable = true)
 |    |-- datasetType: string (nullable = true)
 |-- errors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- errorType: string (nullable = true)
 |    |    |-- message: string (nullable = true)
 |-- success: boolean (nullable = true)



In [37]:
sleep_total_minutes_sleep = df_sleep.select(
                df_sleep['summary']['totalMinutesAsleep'])
sleep_total_time_in_bed = df_sleep.select(
                df_sleep['summary']['totalTimeInBed'])

In [38]:
sleep_total_minutes_sleep.show()

+--------------------------+
|summary.totalMinutesAsleep|
+--------------------------+
|                      1199|
|                       993|
|                       880|
|                       915|
|                       959|
|                       859|
|                       786|
|                       820|
|                       809|
|                       823|
|                       823|
|                       783|
|                       782|
|                       759|
|                       759|
|                       735|
|                       783|
|                       746|
|                       753|
|                       763|
+--------------------------+
only showing top 20 rows



In [39]:
sleep_total_time_in_bed.show()

+----------------------+
|summary.totalTimeInBed|
+----------------------+
|                  1219|
|                  1082|
|                  1040|
|                   962|
|                   960|
|                   950|
|                   902|
|                   884|
|                   873|
|                   856|
|                   856|
|                   848|
|                   857|
|                   851|
|                   851|
|                   839|
|                   841|
|                   837|
|                   821|
|                   819|
+----------------------+
only showing top 20 rows



In [40]:
from pyspark.sql.functions import mean, min, max

In [45]:
df_sleep.select(mean(df_sleep['summary']['totalMinutesAsleep']), 
                   min(df_sleep['summary']['totalMinutesAsleep']), 
                   max(df_sleep['summary']['totalMinutesAsleep'])).show()

+----------------------------------+----------------------------------+----------------------------------+
|avg(summary['totalMinutesAsleep'])|min(summary['totalMinutesAsleep'])|max(summary['totalMinutesAsleep'])|
+----------------------------------+----------------------------------+----------------------------------+
|                306.74880239520957|                                 0|                              1199|
+----------------------------------+----------------------------------+----------------------------------+



In [46]:
df_sleep.select(mean(df_sleep['summary']['totalTimeInBed']), 
                   min(df_sleep['summary']['totalTimeInBed']), 
                   max(df_sleep['summary']['totalTimeInBed'])).show()

+------------------------------+------------------------------+------------------------------+
|avg(summary['totalTimeInBed'])|min(summary['totalTimeInBed'])|max(summary['totalTimeInBed'])|
+------------------------------+------------------------------+------------------------------+
|            332.98952095808386|                             0|                          1219|
+------------------------------+------------------------------+------------------------------+

