# Overview

- filter
- conditional operation
    - Logical operators (&, |, ~)
- Convert list into Dict
    - grab the key from dictionary
    
    
# Aggregation

In [1]:
import findspark
findspark.init('/home/navin/spark-2.4.6-bin-hadoop2.7')

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark=SparkSession.builder.appName('ops').getOrCreate()

In [5]:
df=spark.read.csv("file:///home/navin/Documents/Data/appl_stock.csv", header=True, inferSchema=True)

In [9]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [11]:
df.head(5)

[Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039),
 Row(Date=datetime.datetime(2010, 1, 5, 0, 0), Open=214.599998, High=215.589994, Low=213.249994, Close=214.379993, Volume=150476200, Adj Close=27.774976000000002),
 Row(Date=datetime.datetime(2010, 1, 6, 0, 0), Open=214.379993, High=215.23, Low=210.750004, Close=210.969995, Volume=138040000, Adj Close=27.333178000000004),
 Row(Date=datetime.datetime(2010, 1, 7, 0, 0), Open=211.75, High=212.000006, Low=209.050005, Close=210.58, Volume=119282800, Adj Close=27.28265),
 Row(Date=datetime.datetime(2010, 1, 8, 0, 0), Open=210.299994, High=212.000006, Low=209.06000500000002, Close=211.98000499999998, Volume=111902700, Adj Close=27.464034)]

In [12]:
type(df.head(5))

list

In [14]:
len(df.head(5))

5

In [8]:
df.count()

1762

In [18]:
df.filter('Close<200').select(['Open', 'Close', 'Volume']).show(5)

+------------------+----------+---------+
|              Open|     Close|   Volume|
+------------------+----------+---------+
|206.78000600000001|    197.75|220441900|
|        204.930004|199.289995|293375600|
|        201.079996|192.060003|311488100|
|192.36999699999998|194.729998|187469100|
|        195.909998|195.859997|174585600|
+------------------+----------+---------+
only showing top 5 rows



In [24]:
# Multiple condition operation

df.filter((df['Close']<200) & (df['Open']<200)).select(['Open', 'Close', 'Volume']).show(5)

+------------------+----------+---------+
|              Open|     Close|   Volume|
+------------------+----------+---------+
|192.36999699999998|194.729998|187469100|
|        195.909998|195.859997|174585600|
|        195.169994|199.229994|153832000|
|        196.730003|192.050003|189413000|
|192.63000300000002|195.460001|212576700|
+------------------+----------+---------+
only showing top 5 rows



# collect

- save the result in list

In [33]:
result=df.filter((df['Close']<200) & (df['Open']<200)).select(['Open', 'Close', 'Volume']).limit(2).collect()

In [34]:
result

[Row(Open=192.36999699999998, Close=194.729998, Volume=187469100),
 Row(Open=195.909998, Close=195.859997, Volume=174585600)]

In [37]:
# convert list into dictionary
row=result[0]
row.asDict()['Volume']

187469100

# Group by & Aggregate Functions

In [39]:
sales_df=spark.read.csv("file:///home/navin/Documents/Data/sales_info.csv", header=True, inferSchema=True)

In [41]:
sales_df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [42]:
sales_df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



In [46]:
sales_df.groupBy("Company")

<pyspark.sql.group.GroupedData at 0x7f52af23ea20>

In [49]:
sales_df.groupBy("Company").mean().show()

+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+



In [50]:
sales_df.groupBy("Company").sum().show()

+-------+----------+
|Company|sum(Sales)|
+-------+----------+
|   APPL|    1480.0|
|   GOOG|     660.0|
|     FB|    1220.0|
|   MSFT|     967.0|
+-------+----------+



In [51]:
sales_df.groupBy("Company").count().show()

+-------+-----+
|Company|count|
+-------+-----+
|   APPL|    4|
|   GOOG|    3|
|     FB|    2|
|   MSFT|    3|
+-------+-----+



## Total (all columns)

In [52]:
sales_df.agg({"Sales":"SUM"}).show()

+----------+
|sum(Sales)|
+----------+
|    4327.0|
+----------+



In [55]:
group_Company=sales_df.groupBy("Company")
group_Company.agg({"Sales":"SUM"}).show()

+-------+----------+
|Company|sum(Sales)|
+-------+----------+
|   APPL|    1480.0|
|   GOOG|     660.0|
|     FB|    1220.0|
|   MSFT|     967.0|
+-------+----------+



In [60]:
from pyspark.sql.functions import countDistinct, avg, stddev

In [58]:
sales_df.select(countDistinct('Sales')).show()

+---------------------+
|count(DISTINCT Sales)|
+---------------------+
|                   11|
+---------------------+



In [59]:
sales_df.select(avg('Sales').alias('Avg Sales')).show()

+-----------------+
|        Avg Sales|
+-----------------+
|360.5833333333333|
+-----------------+



In [63]:
sales_df.select(stddev('Sales').alias('Avg Sales')).show()

+------------------+
|         Avg Sales|
+------------------+
|250.08742410799007|
+------------------+



I want to display only two decimal numbers

In [86]:
from pyspark.sql.functions import format_number

In [81]:
sales_std=sales_df.select(stddev('Sales').alias('std'))


In [83]:
sales_std.select(format_number('std',2).alias('std')).show()

+------+
|   std|
+------+
|250.09|
+------+



In [91]:
sales_df.orderBy(sales_df['Sales'].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



In [92]:
# Stop the SparkSession
spark.stop()