In [1]:
import findspark
findspark.init()

In [2]:
import pyspark

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Aggregation Functions").getOrCreate()

In [6]:
flights_df = spark.read\
    .option("inferSchema", "true")\
    .option("header","true")\
    .csv("D:\\code\\spark\\spark-basics\\data\\flight-data\\csv")
flights_df.registerTempTable("flights")

In [7]:
flights_df.count()

1506

In [8]:
spark.sql("select count(*) from flights").show()

+--------+
|count(1)|
+--------+
|    1506|
+--------+



In [12]:
from pyspark.sql.functions import count
flights_df.select(count("DEST_COUNTRY_NAME")).show()

+------------------------+
|count(DEST_COUNTRY_NAME)|
+------------------------+
|                    1506|
+------------------------+



In [14]:
from pyspark.sql.functions import countDistinct
flights_df.select( \
    countDistinct("DEST_COUNTRY_NAME")\
    .alias("dest_countries"))\
    .show()

+--------------+
|dest_countries|
+--------------+
|           170|
+--------------+



In [15]:
spark.sql("""select 
count(distinct DEST_COUNTRY_NAME) as dest_countries
from flights""").show()

+--------------+
|dest_countries|
+--------------+
|           170|
+--------------+



In [18]:
from pyspark.sql.functions import approx_count_distinct
flights_df.select (\
    approx_count_distinct("DEST_COUNTRY_NAME")\
    .alias("approx countries"))\
    .show()

+----------------+
|approx countries|
+----------------+
|             164|
+----------------+



In [19]:
flights_df.select (\
    approx_count_distinct("DEST_COUNTRY_NAME", 0.15)\
    .alias("approx countries"))\
    .show()

+----------------+
|approx countries|
+----------------+
|             163|
+----------------+



In [21]:
flights_df.select (\
    approx_count_distinct("DEST_COUNTRY_NAME",0.25)\
    .alias("approx countries"))\
    .show()

+----------------+
|approx countries|
+----------------+
|             160|
+----------------+



In [28]:
spark.sql("""
SELECT approx_count_distinct(DEST_COUNTRY_NAME,0.1)
as approx_countries
FROM flights
""").show()

+----------------+
|approx_countries|
+----------------+
|             162|
+----------------+



In [31]:
from pyspark.sql.functions import first, last
flights_df.select(\
    first("DEST_COUNTRY_NAME"),
    last("DEST_COUNTRY_NAME"))\
    .show()

+------------------------+-----------------------+
|first(DEST_COUNTRY_NAME)|last(DEST_COUNTRY_NAME)|
+------------------------+-----------------------+
|           United States|                      b|
+------------------------+-----------------------+



In [41]:
flights_df = spark.read\
    .option("inferSchema", "true")\
    .option("header","true")\
    .csv("D:\\code\\spark\\spark-basics\\data\\flight-data\\csv\\2010-summary.csv")
flights_df.registerTempTable("flights")

In [42]:
from pyspark.sql.functions import min, max
flights_df.select(\
    min("count"),
    max("count"))\
    .show()

+----------+----------+
|min(count)|max(count)|
+----------+----------+
|         1|    348113|
+----------+----------+



In [43]:
spark.sql("select min(count),max(count) from flights").show()

+----------+----------+
|min(count)|max(count)|
+----------+----------+
|         1|    348113|
+----------+----------+



In [45]:
from pyspark.sql.functions import sum
flights_df.select(sum("count")).show()

+----------+
|sum(count)|
+----------+
|    422269|
+----------+



In [46]:
spark.sql("select sum(count) from flights").show()

+----------+
|sum(count)|
+----------+
|    422269|
+----------+



In [47]:
from pyspark.sql.functions import sumDistinct
flights_df.select(sumDistinct("count")).show()

+-------------------+
|sum(DISTINCT count)|
+-------------------+
|             419432|
+-------------------+



In [49]:
from pyspark.sql.functions import avg
flights_df.select(avg("count")).show()

+-----------------+
|       avg(count)|
+-----------------+
|1655.956862745098|
+-----------------+



In [50]:
spark.sql("select avg(count) from flights").show()

+-----------------+
|       avg(count)|
+-----------------+
|1655.956862745098|
+-----------------+



In [51]:
from pyspark.sql.functions import collect_list, collect_set
flights_df.select(\
    collect_list("DEST_COUNTRY_NAME"),
    collect_set("DEST_COUNTRY_NAME"))\
    .show()

+-------------------------------+------------------------------+
|collect_list(DEST_COUNTRY_NAME)|collect_set(DEST_COUNTRY_NAME)|
+-------------------------------+------------------------------+
|           [United States, U...|          [Italy, Slovakia,...|
+-------------------------------+------------------------------+



In [53]:
spark.sql("""select 
collect_Set(DEST_COUNTRY_NAME), 
collect_list(DEST_COUNTRY_NAME) 
from flights""").show()

+------------------------------+-------------------------------+
|collect_set(DEST_COUNTRY_NAME)|collect_list(DEST_COUNTRY_NAME)|
+------------------------------+-------------------------------+
|          [Italy, Slovakia,...|           [United States, U...|
+------------------------------+-------------------------------+



In [56]:
flights_df.groupBy("DEST_COUNTRY_NAME").count().show(5)

+-----------------+-----+
|DEST_COUNTRY_NAME|count|
+-----------------+-----+
|         Anguilla|    1|
|           Russia|    1|
|         Paraguay|    1|
|          Senegal|    1|
|           Sweden|    1|
+-----------------+-----+
only showing top 5 rows



In [58]:
spark.sql("""
select DEST_COUNTRY_NAME, count(*) from 
flights group by DEST_COUNTRY_NAME 
limit 5""").show()

+-----------------+--------+
|DEST_COUNTRY_NAME|count(1)|
+-----------------+--------+
|         Anguilla|       1|
|           Russia|       1|
|         Paraguay|       1|
|          Senegal|       1|
|           Sweden|       1|
+-----------------+--------+



In [60]:
flights_df.groupBy("DEST_COUNTRY_NAME").avg("count").show(5)

+-----------------+----------+
|DEST_COUNTRY_NAME|avg(count)|
+-----------------+----------+
|         Anguilla|      21.0|
|           Russia|     152.0|
|         Paraguay|      90.0|
|          Senegal|      29.0|
|           Sweden|      65.0|
+-----------------+----------+
only showing top 5 rows

