<font color=red>
Spark_Version:2.0.0<br/>
Python_Version:Python 3.5.2 | Anaconda4.1.1(64-bit)<br/>
Jupyter_Version:4.2.1<br/>
System:Ubuntu 16.04 LTS(64-bit)
</font>

In [1]:
import platform
print("Spark_Version:",sc.version)
print("Python_Version:",platform.python_version())
print("System:",platform.system())

Spark_Version: 2.0.0
Python_Version: 3.5.2
System: Linux


## SQL operations
SQL operations are most widely used constructs for data manipulation. Some of most used operations are, selecting all or some of the columns, filtering based on one or more conditions, sorting and grouping operations, and computing summary functions such as average on GroupedData. The JOIN operations on multiple data sources and set
operations such as union, intersect and minus are some other operations that are widely performed. Furthermore, data frames are registered as temporary tables and passed traditional SQL statements to perform the aforementioned operations. User-Defined Functions (UDF) are defined and used with and without registration. We'll be focusing on
<strong>window</strong> operations, which have been just introduced in Spark 2.0. They address sliding window operations. For example, if you want to report the average peak temperature every day in the past seven days, then you are operating on a sliding window of seven days until today. Here is an example that computes average sales per month for the past three months.The data file contains 24 observations showing monthly sales for two products, P1 and P2.

<dl>
    <dt>
    <tt >rangeBetween</tt>
    <big>(</big><em>start</em>, <em>end</em><big>)</big>
    </dt>
     <dd><p>Defines the frame boundaries, from <cite>start</cite> (inclusive) to <cite>end</cite> (inclusive).</p>
        <font color=red><p>Both <cite>start</cite> and <cite>end</cite> are relative from the current row. For example,“0” means “current row”, while “-1” means one off before the current row,and “5” means the five off after the current row.</p></font>
        <table>
            <tbody>
            <tr><th>Parameters:</th><td><ul>
            <li><strong>start</strong> – boundary start, inclusive.The frame is unbounded if this is <tt>-sys.maxsize</tt> (or lower).</li>
            <li><strong>end</strong> – boundary end, inclusive.The frame is unbounded if this is <tt>sys.maxsize</tt> (or higher).</li>
            </ul>
            </td>
            </tr>
            </tbody>
        </table>
    </dd>
</dl>

<dl>
<dt>
<tt>pyspark.sql.functions.</tt><tt>bround</tt><big>(</big><em>col</em>, <em>scale=0</em><big>)</big></dt>
<dd><p>Round the given value to <cite>scale</cite> decimal places using HALF_EVEN rounding mode if <cite>scale</cite> &gt;= 0
or at integral part when <cite>scale</cite> &lt; 0.</p>
<pre><span class="gp">&gt;&gt;&gt; </span><span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">([(</span><span class="mf">2.5</span><span class="p">,)],</span> <span class="p">[</span><span class="s">'a'</span><span class="p">])</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="n"><span class="highlighted">bround</span></span><span class="p">(</span><span class="s">'a'</span><span class="p">,</span> <span class="mi">0</span><span class="p">)</span><span class="o">.</span><span class="n">alias</span><span class="p">(</span><span class="s">'r'</span><span class="p">))</span><span class="o">.</span><span class="n">collect</span><span class="p">()</span>
<span class="go">[Row(r=2.0)]</span>
</pre>
</dd></dl>

In [2]:
from pyspark.sql import Window
import pyspark.sql.functions as func

#Create a DataFrame containing monthly sales data for two products
file_path = "resource/MonthlySales.csv"
monthlySales = spark.read.csv(file_path, header=True, inferSchema=True)
monthlySales.show()

+-------+-----+-----+
|Product|Month|Sales|
+-------+-----+-----+
|     P1|    1|   66|
|     P1|    2|   24|
|     P1|    3|   54|
|     P1|    4|    0|
|     P1|    5|   56|
|     P1|    6|   34|
|     P1|    7|   48|
|     P1|    8|   46|
|     P1|    9|   76|
|     P1|   10|   12|
|     P1|   11|    8|
|     P1|   12|   24|
|     P2|    1|   98|
|     P2|    2|   16|
|     P2|    3|   78|
|     P2|    4|   66|
|     P2|    5|   14|
|     P2|    6|   76|
|     P2|    7|   62|
|     P2|    8|   92|
+-------+-----+-----+
only showing top 20 rows



In [3]:
#Prepare WindowSpec to create a 3 month sliding window for a product
#Negative subscript denotes rows above current row
w = Window\
    .partitionBy(monthlySales["Product"])\
    .orderBy(monthlySales["Month"])\
    .rangeBetween(-2, 0)

In [4]:
w

<pyspark.sql.window.WindowSpec at 0x7f6dfbfd05c0>

In [5]:
f = func.avg(monthlySales["Sales"]).over(w)

In [6]:
f

Column<b'avg(Sales) OVER (PARTITION BY Product ORDER BY Month ASC RANGE BETWEEN 2 PRECEDING AND CURRENT ROW)'>

In [7]:
monthlySales\
.select(monthlySales.Product,monthlySales.Sales,monthlySales.Month,func.bround(f,2).alias("MovingAvg"))\
.orderBy(monthlySales.Product,monthlySales.Month)\
.show(6)

+-------+-----+-----+---------+
|Product|Sales|Month|MovingAvg|
+-------+-----+-----+---------+
|     P1|   66|    1|     66.0|
|     P1|   24|    2|     45.0|
|     P1|   54|    3|     48.0|
|     P1|    0|    4|     26.0|
|     P1|   56|    5|    36.67|
|     P1|   34|    6|     30.0|
+-------+-----+-----+---------+
only showing top 6 rows



## Structured Streaming
Let us look at a simple example. The following example listens to System Activity Report(sar) on Linux on a local machine and computes the average free memory. System Activity Report gives system activity statistics and the current example collects memory usage,reported 20 times at a 2-second interval. The Spark stream reads this streaming output and computes average memory. We use a handy networking utility netcat (nc) to redirect the sar output onto a given port. The options l and k specify that nc should listen for an incoming connection and it has to keep listening for another connection even after its current connection is completed.

```
//Run the following command from one terminal window
sar -r 2 20 | nc -lk 9999
```

In [8]:
#In another window, open pyspark shell and do the following
myStream = spark.readStream.format("socket").\
                option("host", "localhost").\
                option("port", 9999).load()

In [9]:
#Filter out unwanted lines and then extract free memory part as a float
#Drop missing values, if any
myDF = myStream.filter("value rlike 'IST'").\
                    select(func.substring("value", 15, 9).cast("float").\
                          alias("memFree")).na.drop().select("memFree")

In [10]:
avgMemFree = myDF.select(func.avg("memFree"))
query = avgMemFree.writeStream. \
            outputMode("complete"). \
                format("console"). \
                        start()

```
-------------------------------------------
Batch: 0
-------------------------------------------
+------------+ 
|avg(memFree)|
+------------+
|        null|
+------------+
```