# Window functions with astronauts dataset using PyStarburst and Galaxy

## Getting started

### Prepare the environment

First run the following in your terminal. You can find more information about Python Virtual Environments in the [Python docs]("https://docs.python.org/3/library/venv.html").

*Windows cmd.exe*

`C:\> python -m venv venv`

`C:\> venv\Scripts\activate.bat`

*MacOS/Linux bash/zsh*

`$ python -m venv venv`

`$ source venv/bin/activate`

### Sign up for a Galaxy account & setup the sample catalog

You'll need a [Starburst Galaxy]("https://www.starburst.io/platform/starburst-galaxy/start/") account with a sample catalog [setup]("https://docs.starburst.io/starburst-galaxy/catalogs/sample.html").

In [None]:
# Install the library

%pip install pystarburst

In [None]:
# Define Connection Properties
# You can get the host and other information from the Partner Connect -> PyStarburst section in Galaxy

import getpass

host = input("Host name")
username = input("User name")
password = getpass.getpass("Password")

In [3]:
# Import dependencies

from pystarburst import Session
from pystarburst import functions as F
from pystarburst.functions import col, lag, lead, row_number, round
from pystarburst.window import Window as W

import trino

session_properties = {
    "host":host,
    "port": 443,
    # Needed for https secured clusters
    "http_scheme": "https",
    # Setup authentication through login or password or any other supported authentication methods
    # See docs: https://github.com/trinodb/trino-python-client#authentication-mechanisms
    "auth": trino.auth.BasicAuthentication(username, password)
}

session = Session.builder.configs(session_properties).create()

In [None]:
# Validate connectivity to the cluster

session.sql("select 1 as b").collect()

In [None]:
#
# Let's understand the astronauts dataset
#

a = session.table("sample.demo.astronauts")

# Show the columns
print(a.schema)

# Show the data
a.show()

In [None]:
#
# That was pretty busy, let's try that again...
#

# Show the columns
for field in a.schema.fields:
    print(field.name +" , "+str(field.datatype))
    
# Show a highly projected and filtered query's results
li = ["Nicollier, Claude", "Ross, Jerry L."]
 
twoAs = a.select("name", "nationality", \
                 "year_of_mission", "hours_mission") \
    .rename("year_of_mission", "m_yr") \
    .rename("hours_mission", "m_hrs") \
    .filter(a.name.isin(li)) \
    .sort("name", "m_yr")
 
twoAs.show(20)

### What is the data above?

This table is NOT a list of astronauts. It is a table of astronauts’ trips to space. These two fellas are represented multiple times; Jerry went to space 7 times and Claude went 4 times.

## Window function examples

The remainder of this notebook's code was originally published at https://lestermartin.wordpress.com/2023/10/19/viewing-astronauts-thru-windows-more-pystarburst-examples/ and you can find SQL examples there to compare each of the Python code blocks against.

### Single window for all rows

Let’s see how each mission compares with an OVERALL average across ALL missions. Create a single window that encompasses all input rows and calculates a single average for all output rows.

Introduce the over() function without any parameters to create a single window to be used for all input rows.

In [None]:
# trim out the nation column
aDF = twoAs.drop("nationality")
 
# use an empty parameter over function call
aDF.withColumn("avg_all_m_hrs", F.avg("m_hrs").over()) \
    .sort("name", "m_yr").show(20)

### Window for each distinct value

Keep all the rows, but calculate an aggregate for each window created for all input rows with the same astronaut.

Create a WindowSpec object based on the Window.partition_by() factory method.

In [None]:
# define the window specification
w2 = W.partition_by("name")
 
aDF.withColumn("tot_m_hrs", F.sum("m_hrs").over(w2)) \
    .sort("name", "m_yr").show(20)

### Multiple aggregations on the same window

In [None]:
# You can calculate multiple aggregations on the same window specification.

# chain another withColumn method
aDF.withColumn("avg_m_hrs", F.avg("m_hrs").over(w2)) \
    .withColumn("tot_m_hrs", F.sum("m_hrs").over(w2)) \
    .sort("name", "m_yr").show(20)

# You can also manipulate the values we get back from the window’s aggregate functions, too.
aDF.withColumn("percent_of_tot", 
        round(aDF.m_hrs / F.sum("m_hrs").over(w2) * 100, 
            2)) \
    .withColumn("tot_m_hrs", F.sum("m_hrs").over(w2)) \
    .sort("name", "m_yr").show(20)

### Using different columns to create multiple windows

For each row you can create additional windows. In the example below, we are adding another one based on all input records that have the same year_of_mission column as the current row.

Only 1993 had more than one space flight and we see it for both astronauts in our dataset.

In [None]:
# define another window specification
w3 = W.partition_by("m_yr")
 
aDF.withColumn("tot_m_hrs", F.sum("m_hrs").over(w2)) \
    .withColumn("tot_m_yr_for_all", 
        F.count("m_yr").over(w3)) \
    .sort("name", "m_yr").show(20)

### Order the window's contents

By adding an ORDER BY clause within the window definition, we can perform some additional calculations based on position of the current input row in the window. This example shows how you can create a mission number.

In [None]:
w4 = W.partition_by("name").order_by("m_yr")
 
aDF.withColumn("m_nbr", row_number().over(w4)) \
    .sort("name", "m_yr").show(20)

In addition to the prior ranking function, you can look forward or backward 1+ record to get values to populate new columns as shown below.

In [None]:
# lag and lead functions
aDF.withColumn("prev_m_hrs", 
        lag(aDF.m_hrs, 1).over(w4)) \
    .withColumn("next_m_hrs", 
        lead(aDF.m_hrs, 1).over(w4)) \
    .sort("name", "m_yr").show(20)

### Create rolling windows

You can bind the window using boundaries such as UNBOUNDED PRECEDING, n PRECEDING, CURRENT ROW, n FOLLOWING, and UNBOUNDED FOLLOWING. The example below has a window that includes the current input row and all previous records based on the sort order.

YES, this is a good time to look at window functions explained at https://lestermartin.wordpress.com/2023/10/04/sql-window-functions-explained-transparently-as-possible/

In [None]:
w5 = W.partition_by("name").order_by("m_yr") \
    .rows_between(W.UNBOUNDED_PRECEDING, W.CURRENT_ROW)
 
aDF.withColumn("running_tot_m_hrs", 
        F.sum("m_hrs").over(w5)) \
    .sort("name", "m_yr").show(20)

Here is another example

In [None]:
w6 = W.partition_by("name").order_by("m_yr") \
    .rows_between(-2, W.CURRENT_ROW)
     
# rollling avg over curr rec and last two
aDF.withColumn("avg_this_and_last2", 
        round(F.avg("m_hrs").over(w6), 2)) \
    .sort("name", "m_yr").show(20)

To call out an example from above, look at Jerry’s 1998 mission. The avg_this_and_last2 value of 217 is the average of 283, 129, and 239.

Another example is Claude’s 1993 entry. The 225.61 average is only based on 259.97 and 191.25 as there is only one row preceding it.

### Sophisticated rolling windows

In the prior section, the n value when using ROW is a specific number of rows PRECEDING and/or FOLLOWING. When the data type that the sorting is done on is a number or date datatype and the RANGE keyword replaces ROW, the number of rows before/after are relative to the actual value of the sorting column.

Another good tiome to visit https://lestermartin.wordpress.com/2023/10/04/sql-window-functions-explained-transparently-as-possible/ for some visuals.

In [None]:
w7 = W.partition_by("name").order_by("m_yr") \
    .range_between(-2, W.CURRENT_ROW)
     
aDF.withColumn("avg_last2_YEARS", 
        round(F.avg("m_hrs").over(w7), 2)) \
    .withColumn("avg_last2_ROWS", 
        round(F.avg("m_hrs").over(w6), 2)) \
    .sort("name", "m_yr").show(20)

To call out an example from above, look at Claude’s 1999 mission. The avg_last2_rows value of 330.94 is the average of 191.18, 541.67, and 259.97. Conversely, his avg_last2_years is only the current value of 191.18 as there were no other missions looking backwards 2 years.

## Or... just run some SQL

While the programmer in me likes the method chaining code you see in this post, you could still use the Session object’s sql() function to just write some SQL.

### Start with SQL...

You could start off with some SQL to get an initial DataFrame and then perform some more method chaining for additional transformations.

In [None]:
twoAs_fromSQL = session.sql(
    "SELECT name, "\
    "       year_of_mission AS m_yr, "\
    "       hours_mission AS m_hrs "\
    "  FROM sample.demo.astronauts "\
    " WHERE name IN ('Nicollier, Claude', "\
    "                'Ross, Jerry L.')")
 
w8 = W.partition_by("name")
 
twoAs_fromSQL.withColumn("tot_m_hrs", 
        F.sum("m_hrs").over(w2)) \
    .sort("name", "m_yr").show(20)

### Do it all with SQL...

Or just put the whole SQL statement inside the sql() call.

In [None]:
ALL_fromSQL = session.sql(
    "SELECT name, "\
    "       year_of_mission AS m_yr, "\
    "       hours_mission AS m_hrs, "\
    "       SUM(hours_mission) "\
    "          OVER (PARTITION BY name) "\
    "       AS tot_m_hrs "\
    "  FROM sample.demo.astronauts "\
    " WHERE name IN ('Nicollier, Claude', "\
    "                'Ross, Jerry L.')"\
    " ORDER BY name, m_yr ")
 
ALL_fromSQL.show(20)

You definitely have some **optionality** with the DataFrame API.

Reminder: this notebook's code was originally published at https://lestermartin.wordpress.com/2023/10/19/viewing-astronauts-thru-windows-more-pystarburst-examples/ and you can find SQL examples there to compare each of the Python code blocks against.