# 1.  
Write a function that will put N doubles into a file. The doubles need to be normally distributed
with mean 0 and standard deviation 1. The function should have two arguments: N
and the full name of the file (ie includes path to file location).  
  
Create a file with 50,000 doubles using the function from problem 1. This file will be used for
the next several problems. It is best if you put the file in the current directory to avoid paths that
do not exist on other machines.

In [1]:
def newFile(N: Int, filepath: String) = {
    val arr1 = Array.fill(N)(scala.util.Random.nextGaussian())
    val avg = arr1.sum/N
    val std = scala.math.sqrt( arr1.map(x => scala.math.pow((x-avg),2)).sum / (N-1) )
    
    scala.tools.nsc.io.File(filepath).writeAll(arr1.mkString("\n"))
    
    assert(arr1.length == N)
    assert( (avg < 0.1) && (avg > -0.1) )
    assert( (std >= 0.9) && (std <= 1.1) )
    println("mean: " + avg)
    println("stdv: " + std)
}

In [2]:
newFile(50000, "file1.txt")

mean: -8.260468405144706E-4
stdv: 1.0003023769931492


# 2.  
Read the file created in #2 into an RDD and compute the mean and standard deviation of
the doubles in the file. Work on the RDD, that is do not convert the RDD to a DataFrame or
Dataset.. You are to use Spark code to compute the values as we want this to run on a
cluster using multiple machines. So the pure Scala code you used in assignment will not
work.

In [4]:
val source1 = scala.io.Source.fromFile("file1.txt", "UTF-8")
val tokens1 = source1.mkString.split("\\s+")
val nums1 = tokens1.map(_.toDouble)
val rdd = sc.parallelize(nums1)
val mean1 = rdd.mean()
val stdev1 = rdd.stdev()
println("mean: " + mean1)
println("stdev: " + stdev1)

mean: -8.26046840514488E-4
stdev: 1.0002923739193694


# 3.  
Repeat #3 but using a DataFrame instead of RDD. Here work on the DataFrame not an
RDD.

In [29]:
import org.apache.spark.sql.functions._

val reader = spark.read
reader.option("header",false)
reader.option("inferSchema",false)
reader.option("sep","\n")
val df = reader.text("file1.txt")

df.select(mean(df("value"))).show()
df.select(stddev(df("value"))).show()

+--------------------+
|          avg(value)|
+--------------------+
|-8.26046840514470...|
+--------------------+

+------------------+
|stddev_samp(value)|
+------------------+
| 1.000302376993155|
+------------------+



# 4.  
Using a DataFrame create a random sample of about 100 elements of the file created in #2
and compute the mean of the sample. 

In [41]:
// 100/50000 = 0.002
val dfSample = df.sample(true, 0.002).limit(100)
dfSample.count

100

In [43]:
dfSample.select(mean("value")).show()

+-----------------+
|       avg(value)|
+-----------------+
|0.110678406963615|
+-----------------+



# 5.  
Create a file of 100 normally distributed doubles. Read the doubles from the file into an
RDD. Using the RDD create a sliding window of size 20 and compute the mean of each
window. 

In [196]:
val df100 = spark.range(100).select(randn()).toDF("doubles")

val filePath = "df100.csv"
df100.coalesce(1).write.option("header", "false").csv("df100.csv")

val fp = "df100.csv/part-00000-2094aff9-b999-4477-aaf7-7a8971dfa5bb-c000.csv"
val rdd100 = sc.textFile(fp).flatMap(_.split("\t")).map(_.toDouble).cache

val pairs = rdd100.zipWithIndex
val slide1 = pairs.filter{case (v,k) => k<20}.keys.mean
val slide2 = pairs.filter{case (v,k) => k>19 && k<40}.keys.mean
val slide3 = pairs.filter{case (v,k) => k>39 && k<60}.keys.mean
val slide4 = pairs.filter{case (v,k) => k>59 && k<80}.keys.mean
val slide5 = pairs.filter{case (v,k) => k>79}.keys.mean

assert(rdd100.count == 100)
assert(pairs.filter{case (v,k) => k<20}.keys.count == 20)
assert(pairs.filter{case (v,k) => k>19 && k<40}.keys.count == 20)
assert(pairs.filter{case (v,k) => k>39 && k<60}.keys.count == 20)
assert(pairs.filter{case (v,k) => k>59 && k<80}.keys.count == 20)
assert(pairs.filter{case (v,k) => k>79}.keys.count == 20)

println("1-20:   " + slide1)
println("21-40:  " + slide2)
println("41-60:  " + slide3)
println("61-80:  " + slide4)
println("81-100: " + slide5)

1-20:   0.08583182707064407
21-40:  -0.23986637832375474
41-60:  -0.22226077556934193
61-80:  -0.0940150257002612
81-100: 0.33477403255179883


# 6.  
The file “multiple-sites.tsv” contains two columns: site and dwell-time. Using Spark compute
the average dwell time for each site.  

In [227]:
var msDf = spark.read.format("csv").
                option("header", "true").
                option("delimiter", "\t").
                load("multiple-sites.tsv")

println(msDf.count)

1000


In [223]:
msDf.groupBy("site").agg(mean("dwell-time") as "avg-time").show(10) 

+----+------------------+
|site|          avg-time|
+----+------------------+
|   7|123.36734693877551|
|  15|119.34782608695652|
|  11| 96.98214285714286|
|   3| 97.47916666666667|
|   8| 94.34693877551021|
|  16| 86.74418604651163|
|   0| 79.85106382978724|
|   5|102.33333333333333|
|  18| 94.81481481481481|
|  17|  77.8913043478261|
+----+------------------+
only showing top 10 rows



# 7.  
The file “multiple-sites.tsv” contains two columns: date and dwell-time. Using Spark compute  
the following:  
1. The average dwell time each hour  
2. The average dwell time per day of week  
3. The average dwell time on week-days (Monday - Friday)  
4. Average dwell time on the weekend.  

In [235]:
var dtDf = spark.read.format("csv").
                option("header", "true").
                option("delimiter", "\t").
                load("dwell-times.tsv")

#### 1. Average dwell time each hour

In [232]:
dtDf.groupBy(window(dtDf.col("date"),"1 hour")).agg(mean("dwell-time") as "avg-hour").show(10)

+--------------------+-----------------+
|              window|         avg-hour|
+--------------------+-----------------+
|[2015-01-05 08:00...|          83.8125|
|[2015-01-09 12:00...|           81.375|
|[2015-01-15 12:00...|80.52941176470588|
|[2015-01-24 01:00...|             35.0|
|[2015-01-28 04:00...|92.26666666666667|
|[2015-02-06 02:00...|             63.0|
|[2015-02-15 11:00...|95.77777777777777|
|[2015-02-18 14:00...|71.36363636363636|
|[2015-03-01 20:00...|             65.0|
|[2015-03-04 00:00...|          111.125|
+--------------------+-----------------+
only showing top 10 rows



#### 2. Average dwell time per day of week

In [234]:
dtDf.groupBy(window(dtDf.col("date"),"1 day")).agg(mean("dwell-time") as "avg-day").show(10)

+--------------------+------------------+
|              window|           avg-day|
+--------------------+------------------+
|[2015-03-01 00:00...|116.47272727272727|
|[2015-01-24 00:00...|112.73076923076923|
|[2015-01-26 00:00...| 92.41025641025641|
|[2015-02-18 00:00...| 81.37800687285224|
|[2015-03-11 00:00...|  87.7127659574468|
|[2015-05-14 00:00...| 96.94561186650185|
|[2015-02-17 00:00...| 85.50541516245487|
|[2015-03-06 00:00...| 91.01186943620178|
|[2015-01-03 00:00...|            124.66|
|[2015-04-06 00:00...|      88.935546875|
+--------------------+------------------+
only showing top 10 rows



#### 3. Average dwell time on week-days

In [313]:
val week = dtDf.withColumn("Day", date_format(col("date"),"EEEE")).groupBy("Day").agg(mean("dwell-time") as "avg-day-of-week")
week.where("Day == 'Monday' or Day == 'Tuesday' or Day == 'Wednesday' or Day == 'Thursday' or Day == 'Friday'").show(5)

                                                                                +---------+-----------------+
|      Day|  avg-day-of-week|
+---------+-----------------+
|Wednesday|90.74125065685759|
|  Tuesday|88.99967886962106|
|   Friday|88.68935445068163|
| Thursday|91.99423893268647|
|   Monday|90.60352703707639|
+---------+-----------------+



#### 4. Average dwell time on the weekend

In [314]:
week.where("Day == 'Saturday' or Day == 'Sunday'").show()

                                                                                +--------+------------------+
|     Day|   avg-day-of-week|
+--------+------------------+
|Saturday|118.96697187704382|
|  Sunday|116.49892933618844|
+--------+------------------+



# 8.  
Do the average dwell times computed in #7 indicate any difference in users behavior?

**The dwell times are predominantly higher during the weekends than the weekdays. This makes sense as it is more  
probable for people to have more free time during the weekend where Saturday reaches the maximum while  
Tuesday and Friday are the low points.**  