In [1]:
# Spark Imports
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.types import (StructField,StringType, 
                               IntegerType,StructType)

In [2]:
# Regular Python imports.
from datetime import datetime
import random

In [3]:
# Initialize the seed for the Random Number Generator.
random.seed(datetime.now())

In [4]:
# Start the Spark session.
spark = SparkSession.builder.appName('aggs').getOrCreate()

In [7]:
# We can infer the schema/types (only in CSV), and header tells us
# that the first row are the names of the columns.
df = spark.read.csv('Data/sales_info.csv',
                    inferSchema=True, header=True)

In [8]:
# Number of rows we read.
df.count()

12

In [7]:
# 
df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



In [8]:
df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [31]:
# This returns a GroupedData object.
grouped = df.groupBy("Company")

In [32]:
# You can call several functions on grouped data, but mathematical ones
# will only work on numerical columns. You can't call .show() directly
# on data that has been grouped, you have to run an aggregate function.
grouped.mean().show() 

+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+



In [33]:
# For grouped data you can call max, min, count, mean, avg, etc.
grouped.count().show()

+-------+-----+
|Company|count|
+-------+-----+
|   APPL|    4|
|   GOOG|    3|
|     FB|    2|
|   MSFT|    3|
+-------+-----+



In [163]:
random_udf = f.udf(lambda value: random.randint(0, value), IntegerType())

In [180]:
# In each row lets put an extra column where the value is a random
# integer value between 0 and the value of the 'Sales' column.
new_grouped = (df.withColumn('Random', random_udf(f.col('Sales')))
                .groupBy("Company"))

In [181]:
# Multiple aggregator functions can be applied in a single call on
# different numerical columns!
new_grouped.agg({'Sales' : 'sum', 'Random' : 'avg'}).show()

+-------+----------+------------------+
|Company|sum(Sales)|       avg(Random)|
+-------+----------+------------------+
|   APPL|    1480.0|             137.0|
|   GOOG|     660.0|             134.0|
|     FB|    1220.0|             368.0|
|   MSFT|     967.0|147.66666666666666|
+-------+----------+------------------+



In [173]:
# The dataframe has to be persisted 'cache()' because otherwise
# everytime we'll perform some computation on it, it will retrieve
# the columns and the 'random' column will be re-rolled.
x = df.withColumn('Random', random_udf(f.col('Sales'))).cache()

In [174]:
x.show()

+-------+-------+-----+------+
|Company| Person|Sales|Random|
+-------+-------+-----+------+
|   GOOG|    Sam|200.0|    91|
|   GOOG|Charlie|120.0|    24|
|   GOOG|  Frank|340.0|   287|
|   MSFT|   Tina|600.0|   191|
|   MSFT|    Amy|124.0|    58|
|   MSFT|Vanessa|243.0|   194|
|     FB|   Carl|870.0|   490|
|     FB|  Sarah|350.0|   246|
|   APPL|   John|250.0|   169|
|   APPL|  Linda|130.0|    63|
|   APPL|   Mike|750.0|   278|
|   APPL|  Chris|350.0|    38|
+-------+-------+-----+------+



In [183]:
# In the select you can pass some of the predefined functions,
# which there's PLENTY of in the pyspark.sql.functions module.
df.select(f.countDistinct('Sales')).show()

+---------------------+
|count(DISTINCT Sales)|
+---------------------+
|                   11|
+---------------------+



In [190]:
# You can alias columns to different names for clarity!
# `stddev_samp` is an ugly name. But you still have a
# hard to read number with a bunch of significant digits.
sales_std = df.select(f.stddev('Sales').alias('STDDEV'))
# format a column to two significant digits!
sales_std = sales_std.select(f.format_number('STDDEV', 2))
sales_std.show()

+------------------------+
|format_number(STDDEV, 2)|
+------------------------+
|                  250.09|
+------------------------+



In [210]:
# You can order by one (or multiple) keys, the first keys take precedence
# and then in case of a tie it takes into account the following keys and
# so on. By default it will sort ascending and receiving only the column
# name(s), you have to manually specify the column object and .desc() if
# you want to use descending.
df.orderBy('Company', f.col('Sales').desc(), df['Person'].asc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   GOOG|  Frank|340.0|
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   MSFT|   Tina|600.0|
|   MSFT|Vanessa|243.0|
|   MSFT|    Amy|124.0|
+-------+-------+-----+

