<h1>Dataframe assignment</h1>
In this assignment, you need to write a user defined aggregate function that returns the mean and the standard deviation of a column in a dataframe. Your function should have the following characteristics:
<ol>
    <li>name the function <b>meanstd</b></li>
    <li>meanstd should take three columns in the input schema</li>
    <li>meanstd should contain a predicate function that takes two String arguments and returns true or false. The predicate function will serve as the "selection criterion" for including data in the mean and std deviation calculation
    <li>data from the first two columns in the input schema will be the input values to the predicate function. If the predicate function returns true for a data row, then the third column value for that row will be included in the calculation of mean and std. If the predicate function returns false, that row will be skipped in the calculation</li>
    <li>the predicate function should be replacable. I.e., you should be able to change the condition for different calls to meanstd</li>
    <li>you need to calculate the mean and the std in one pass. Use the algorithm below</li>

<h1>Example</h1>

Consider the following data table
<table>
    <tr><td>col1</td><td>col2</td><td>col3</td></tr>
<tr><td>John</td><td>Cloud</td><td>9</td></tr>
<tr><td>John</td><td>Analytics</td><td>7</td></tr>
<tr><td>Jill</td><td>Cloud</td><td>8</td></tr>
<tr><td>James</td><td>Analytics</td><td>11</td></tr>
<tr><td>Jacinta</td><td>Politics</td><td>8</td></tr>
<tr><td>Jacques</td><td>Cloud</td><td>9</td></tr>
    </table> 
    


A call to 

df.agg(meanstd(df("col1"),df("col2"),df("col3"))) 

should return a mean of 8.8 and a standard deviation of 1.326 when the data selection condition is that the name of the student (col1) should start with the letter "J" and the quiz (col2) should be in either Cloud or Analytics.

A call to 

df.groupBy("col2").agg(meanstd(df("col1"),df("col2"),df("col3"))) 

with the same condition should return:

<pre>
+---------+-----------------+
|     col2|             mean|
+---------+-----------------+
|Analytics|              9.0|
|    Cloud|8.666666666666666|
| Politics|              NaN|
+---------+-----------------+

+---------+------------------+
|     col2|               std|
+---------+------------------+
|Analytics|               2.0|
|    Cloud|0.4714045207910384|
| Politics|               NaN|
+---------+------------------+
</pre>



<h1>Deliverables</h1>
<ol>
    <li>The definition of meanstd (see the class outline below). Note that we can't use object here because we need to update the predicate function for each use. Note also that class instances are instantiated with new.</li>
    <li>use meanstd to get the mean and standard deviation for the fare_amount field in taxi_data.csv</li>
    <li>use meanstd to get the mean and standard deviation for the fare_amount field in taxi_data.csv for all trips that start and end in the same location zone (pickup location id is the same as dropoff location id)</li>
    <li>use meanstd to get the mean and standard deviation for the fare_amount field in taxi_data.csv for all trips that start and end in the same location zone (pickup location id is the same as dropoff location id grouped by pickup location id</li>
    <li>report the location id for the location with the highest mean from deliverable 4 above</li>
    
        

<h2>One pass algorithm for the mean and standard deviation</h2>
<pre>
    if(n == 0)
        return 0.0;
    sum = 0;
    sq_sum = 0;
    for(int i = 0; i < n; ++i) {
       sum += a[i];
       sq_sum += a[i] * a[i];
    }
    mean = sum / n;
    stdev = sqrt(sq_sum / n - mean * mean);
}
</pre>

In [3]:
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._

class meanstd(p: (String,String) => Boolean) extends UserDefinedAggregateFunction {
    def inputSchema: StructType = StructType(StructField("inputColumn1", StringType) :: StructField("inputColumn2", StringType) 
                                             :: StructField("inputColumn3", DoubleType) :: Nil)
    
    def predicate(x:String, y: String):Boolean = {
        if (p(x,y)) true
        else false
    }
    
    
    def bufferSchema: StructType = {
        StructType(StructField("sum", DoubleType) :: StructField("sq_sum", DoubleType) :: StructField("count", LongType) :: Nil)
     }
    
 
    def dataType: DataType = {StructType(StructField("mean",DoubleType) :: StructField("std",DoubleType) :: Nil)}
    
    def deterministic: Boolean = true
    
    def initialize(buffer: MutableAggregationBuffer): Unit = {
        buffer(0) = 0.0
        buffer(1) = 0.0
        buffer(2) = 0L
    }
    
    def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        if (!input.isNullAt(0) && !input.isNullAt(1) && !input.isNullAt(2)) {           
            if (predicate(input.getString(0), input.getString(1))) {
                buffer(0) = buffer.getDouble(0) + input.getDouble(2)
                buffer(1) = buffer.getDouble(1) + input.getDouble(2)*input.getDouble(2)
                buffer(2) = buffer.getLong(2) + 1
            }
        }
    }
    def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
        buffer1(0) = buffer1.getDouble(0) + buffer2.getDouble(0)
        buffer1(1) = buffer1.getDouble(1) + buffer2.getDouble(1)
        buffer1(2) = buffer1.getLong(2) + buffer2.getLong(2)
    }
    
     def evaluate(buffer: Row) = {
         val mean = buffer.getDouble(0) / buffer.getLong(2) 
         val std = scala.math.sqrt((buffer.getDouble(1) / buffer.getLong(2)) - ((buffer.getDouble(0) / buffer.getLong(2))*(buffer.getDouble(0) / buffer.getLong(2))))
         (mean, std)
    }                                                    
}

//the default value of p
//For each p, you will need to create a new instance of meanstd and update the register accordingly


import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._
defined class meanstd


<h2>Unconditional mean and std</h2>

In [6]:
val taxi = spark.read.format("csv").option("header", "true").load("C://Users//vaish//Downloads//taxi_data.csv")
val taxi_double = taxi.withColumn("fare_amount", taxi("fare_amount").cast(DoubleType))
val meanstd_uc = new meanstd((a,b) => true)
spark.udf.register("meanstd_uc", meanstd_uc)

taxi: org.apache.spark.sql.DataFrame = [VendorID: string, tpep_pickup_datetime: string ... 15 more fields]
taxi_double: org.apache.spark.sql.DataFrame = [VendorID: string, tpep_pickup_datetime: string ... 15 more fields]
meanstd_uc: meanstd = meanstd@7e356894
res4: org.apache.spark.sql.expressions.UserDefinedAggregateFunction = meanstd@7e356894


In [7]:
val meanstd_uc_df = taxi_double.agg(meanstd_uc(taxi_double("PULocationID"), taxi_double("DOLocationID"), taxi_double("fare_amount")))

meanstd_uc_df: org.apache.spark.sql.DataFrame = [meanstd(PULocationID, DOLocationID, fare_amount): struct<mean: double, std: double>]


In [8]:
meanstd_uc_df.select($"meanstd(PULocationID, DOLocationID, fare_amount).mean").show()
meanstd_uc_df.select($"meanstd(PULocationID, DOLocationID, fare_amount).std").show()

+------------------+
|              mean|
+------------------+
|12.404401066500117|
+------------------+

+------------------+
|               std|
+------------------+
|225.19866345385836|
+------------------+



<h2>the mean is 12.404401066500117, the std is 225.19866345385836</h2>

<h2>Conditional mean and std</h2>

In [22]:
val meanstd_cond = new meanstd((a,b) => if (a==b)true else false)
spark.udf.register("meanstd_cond", meanstd_cond)
val meanstd_cond_df = taxi_double.agg(meanstd_cond(taxi_double("PULocationID"), taxi_double("DOLocationID"), taxi_double("fare_amount")))

meanstd_cond: meanstd = meanstd@61a4d858
meanstd_cond_df: org.apache.spark.sql.DataFrame = [meanstd(PULocationID, DOLocationID, fare_amount): struct<mean: double, std: double>]


In [23]:
val meanstd_cond_df = taxi_double.agg(meanstd_cond(taxi_double("PULocationID"), taxi_double("DOLocationID"), taxi_double("fare_amount")))

meanstd_cond_df: org.apache.spark.sql.DataFrame = [meanstd(PULocationID, DOLocationID, fare_amount): struct<mean: double, std: double>]


In [24]:
meanstd_cond_df.select($"meanstd(PULocationID, DOLocationID, fare_amount).mean").show()
meanstd_cond_df.select($"meanstd(PULocationID, DOLocationID, fare_amount).std").show()

+-----------------+
|             mean|
+-----------------+
|8.203262706753465|
+-----------------+

+-----------------+
|              std|
+-----------------+
|18.67995256537117|
+-----------------+



<h2>the mean is 8.203262706753465, the std is 18.67995256537117</h2>

<h2>Grouped mean and std</h2>

In [25]:
val meanstd_grouped_df = taxi_double.groupBy("PULocationID").agg(meanstd_cond(taxi_double("PULocationID"), taxi_double("DOLocationID"), taxi_double("fare_amount")))
meanstd_grouped_df.select($"PULocationID",$"meanstd(PULocationID, DOLocationID, fare_amount).mean").show()
meanstd_grouped_df.select($"PULocationID", $"meanstd(PULocationID, DOLocationID, fare_amount).std").show()

+------------+------------------+
|PULocationID|              mean|
+------------+------------------+
|         125| 9.023267326732674|
|           7| 6.142718658892129|
|          51|15.236170212765956|
|         124|24.608571428571427|
|         169|14.773611111111112|
|         205|30.300000000000004|
|         234| 5.923206497093623|
|         232| 9.423110403397027|
|          54|11.783170731707317|
|          15|            22.685|
|         155|30.257741935483875|
|         132|24.806696083838936|
|         154|              15.7|
|         200|16.532222222222224|
|         101| 17.73076923076923|
|          11|              43.3|
|         138|14.307208465303773|
|          69|10.537942857142857|
|          29|24.921756097560976|
|          42| 6.432658066860465|
+------------+------------------+
only showing top 20 rows

+------------+------------------+
|PULocationID|               std|
+------------+------------------+
|         125| 13.79772025205827|
|           7|  8.6512

meanstd_grouped_df: org.apache.spark.sql.DataFrame = [PULocationID: string, meanstd(PULocationID, DOLocationID, fare_amount): struct<mean: double, std: double>]


<h2>Highest mean location id</h2>

In [18]:
val highest_mean_df = meanstd_grouped_df.select($"PULocationID",$"meanstd(PULocationID, DOLocationID, fare_amount).mean")
val highest_std_df = meanstd_grouped_df.select($"PULocationID", $"meanstd(PULocationID, DOLocationID, fare_amount).std")

highest_mean_df: org.apache.spark.sql.DataFrame = [PULocationID: string, mean: double]
highest_std_df: org.apache.spark.sql.DataFrame = [PULocationID: string, std: double]


In [19]:
val highest_mean = highest_mean_df.orderBy(desc("mean"))
val highest_std = highest_std_df.orderBy(desc("std"))

highest_mean: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [PULocationID: string, mean: double]
highest_std: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [PULocationID: string, std: double]


In [26]:
highest_mean.na.drop.show()
//highest_std.na.drop.show()

+------------+------------------+
|PULocationID|              mean|
+------------+------------------+
|           1| 73.59195372750645|
|         172|              72.4|
|          84|             71.56|
|         265| 68.54885736294776|
|         251|60.592000000000006|
|          46|              52.0|
|         118|              51.0|
|         204|             47.71|
|         214|44.720909090909096|
|          59|              43.5|
|          11|              43.3|
|         201| 40.25454545454546|
|         117|          38.57875|
|          31|             38.25|
|         203| 37.50411764705882|
|         245|           36.5425|
|         175| 35.62307692307692|
|          58|             34.68|
|           5|             33.75|
|         122| 33.36666666666667|
+------------+------------------+
only showing top 20 rows



<h2>so the highest mean location id is 1, the highest mean is 73.59195372750645</h2>

<h1>Test data</h1>

In [16]:
val df = spark.read.format("csv").option("header", "true").load("C://Users//vaish//Downloads//test_data.csv")
var testdata = new meanstd((a,b) => {if (a.startsWith("J") && (b == "Cloud" | b == "Analytics")) true else false})
spark.udf.register("testdata", testdata)

df.agg(testdata(df("col1"),df("col2"),df("col3"))).select("meanstd(col1, col2, col3).mean").show
df.agg(testdata(df("col1"),df("col2"),df("col3"))).select("meanstd(col1, col2, col3).std").show

val testresult = df.groupBy("col2").agg(testdata(df("col1"),df("col2"),df("col3")))
testresult.select($"col2",$"meanstd(col1, col2, col3).mean").show
testresult.select($"col2",$"meanstd(col1, col2, col3).std").show

+----+
|mean|
+----+
| 8.8|
+----+

+------------------+
|               std|
+------------------+
|1.3266499161421565|
+------------------+

+---------+-----------------+
|     col2|             mean|
+---------+-----------------+
|Analytics|              9.0|
|    Cloud|8.666666666666666|
| Politics|              NaN|
+---------+-----------------+

+---------+------------------+
|     col2|               std|
+---------+------------------+
|Analytics|               2.0|
|    Cloud|0.4714045207910384|
| Politics|               NaN|
+---------+------------------+



df: org.apache.spark.sql.DataFrame = [col1: string, col2: string ... 1 more field]
testdata: meanstd = meanstd@886a14
testresult: org.apache.spark.sql.DataFrame = [col2: string, meanstd(col1, col2, col3): struct<mean: double, std: double>]
