# GroupBy and Aggregate Functions

In [3]:
# Import PySpark and create a SparkSession
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ImportData").getOrCreate()

In [4]:
spark

# Ler o ficheiro

In [5]:
# Read a CSV file and create a DataFrame
df_customers=spark.read.option("header","true").csv('./data/customers_data.csv',inferSchema=True)

In [7]:
df_customers.show(truncate=4)

+----+---+---+---------+-------+-------+-------+--------+-----+---------+----+---------+------+-------+
|  Id|age|sex|dayswfreq|tbilled|maccess|freeuse|nentries|cfreq|nrenewals|cref|startDate|months|dropout|
+----+---+---+---------+-------+-------+-------+--------+-----+---------+----+---------+------+-------+
|0...| 23|  1|        7|   37.6|   1.35|      0|       6|    7|        0|   0|     2...|     1|      1|
|0...| 34|  1|      328|   2...|   0.54|      0|      39|    7|        2|   0|     2...|    19|      0|
|0...| 24|  0|        3|   1...|    0.8|      0|      28|    7|        0|   0|     2...|     8|      1|
|0...| 20|  1|       41|   71.6|    1.0|      0|      13|    7|        0|   0|     2...|     3|      1|
|0...| 21|  1|       18|   1...|   0.08|      0|       7|    7|        3|   0|     2...|    24|      1|
|0...| 20|  0|       38|   1...|   0.33|      0|      11|    7|        2|   0|     2...|    10|      1|
|0...| 26|  1|      279|   53.2|   0.16|      0|       6|    7| 

In [8]:
df_customers.printSchema()

root
 |-- Id: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- sex: integer (nullable = true)
 |-- dayswfreq: integer (nullable = true)
 |-- tbilled: double (nullable = true)
 |-- maccess: double (nullable = true)
 |-- freeuse: integer (nullable = true)
 |-- nentries: integer (nullable = true)
 |-- cfreq: integer (nullable = true)
 |-- nrenewals: integer (nullable = true)
 |-- cref: integer (nullable = true)
 |-- startDate: timestamp (nullable = true)
 |-- months: integer (nullable = true)
 |-- dropout: integer (nullable = true)



# groupby

In [11]:
df_customers.groupBy("age","sex").count().show()

+---+---+-----+
|age|sex|count|
+---+---+-----+
| 69|  1|    3|
| 62|  1|    3|
| 30|  0|   60|
| 17|  1|    5|
| 79|  1|    3|
| 58|  1|    2|
| 71|  0|    7|
| 48|  1|   10|
| 83|  0|    1|
| 18|  1|   29|
| 54|  0|   17|
| 93|  0|    1|
| 72|  1|    3|
| 56|  0|   20|
| 42|  0|   14|
| 25|  1|  118|
| 41|  0|   25|
| 64|  0|   10|
| 29|  0|   77|
| 36|  1|   12|
+---+---+-----+
only showing top 20 rows



In [12]:
# age desc order 
from pyspark.sql.functions import col
df_customers.groupBy("age","sex").count().sort(col("age").desc()).show(10)

+---+---+-----+
|age|sex|count|
+---+---+-----+
| 93|  0|    1|
| 87|  1|    1|
| 84|  0|    1|
| 83|  0|    1|
| 83|  1|    1|
| 81|  0|    1|
| 80|  0|    3|
| 79|  1|    3|
| 79|  0|    3|
| 78|  0|    2|
+---+---+-----+
only showing top 10 rows



In [13]:
# age asc
df_customers.groupBy("age").count().sort(col("age").asc()).show(10)

+---+-----+
|age|count|
+---+-----+
|  0|    1|
|  9|    1|
| 14|    1|
| 15|    5|
| 16|    6|
| 17|    9|
| 18|   69|
| 19|  469|
| 20|  527|
| 21|  657|
+---+-----+
only showing top 10 rows



In [14]:
# age asc
df_customers.groupBy("age")\
    .count().sort(col("count").desc()).show(10)

+---+-----+
|age|count|
+---+-----+
| 21|  657|
| 22|  639|
| 20|  527|
| 19|  469|
| 23|  422|
| 24|  355|
| 25|  245|
| 26|  160|
| 27|  131|
| 29|  114|
+---+-----+
only showing top 10 rows



# Aggregate

In [15]:
# executa em todo dataframe em vez de grupos de dados
df_customers.agg({'age':'count'}).show()

+----------+
|count(age)|
+----------+
|      5216|
+----------+



In [None]:
# Stop the SparkSession
spark.stop()