# PyStarburst for Data Analysis
This notebook accompanies the course [Use PyStarburst for Data Analysis](https://academy.starburst.io/getting-started-with-pystarburst/192527), which is part of the overall course [Getting Started with PyStarburst](https://academy.starburst.io/getting-started-with-pystarburst) on Starburst Academy.  

Course author: Starburst Academy Team  
Notebook author: Mark Bauer

In [1]:
import os
import trino
from pystarburst import Session
from pystarburst import functions as f
from pystarburst.functions import col, lag, round, row_number
from pystarburst.window import Window

To reproduce the results and follow along, you'll need to set up your environment in Starburst. You can prepare your environment by visiting: https://academy.starburst.io/getting-started-with-pystarburst/192527.

In [2]:
host = str(os.getenv('host'))
port = os.getenv('port')
http_scheme = str(os.getenv('http_scheme'))
username = str(os.getenv('username'))
password = str(os.getenv('password'))

db_parameters = {
    "host": host,
    "port": port,
    "http_scheme": http_scheme,
    # Setup authentication through login or password or any other supported authentication methods
    # See docs: https://github.com/trinodb/trino-python-client#authentication-mechanisms
    "auth": trino.auth.BasicAuthentication(username, password)
}
session = Session.builder.configs(db_parameters).create()

print(type(session))

<class 'pystarburst.session.Session'>


**Never** show your username and password publicly.

Learn how to save [environmental variables](https://conda.io/projects/conda/en/latest/user-guide/tasks/manage-environments.html) in conda.

# Analyze Aviation Data

In [3]:
allFs = session.table("tmp_cat.aviation.raw_flight")
print(type(allFs))

<class 'pystarburst.table.Table'>


In [4]:
print(allFs.count())

2056494


In [5]:
# get the whole table, aggregate & sort
mostAs = (
    session 
    .table("tmp_cat.aviation.raw_airport") 
    .group_by("country").count() 
    .sort("count", ascending=False)
)

mostAs.show()

-----------------------------------------
|"country"                    |"count"  |
-----------------------------------------
|USA                          |3363     |
|GA                           |2        |
|NMarianaIslands              |1        |
|country                      |1        |
|Palau                        |1        |
|Thailand                     |1        |
|NY                           |1        |
|OK                           |1        |
|PA                           |1        |
|FederatedStatesofMicronesia  |1        |
-----------------------------------------



In [6]:
# get the whole table, aggregate & sort
mostFs = (
    session 
    .table("tmp_cat.aviation.raw_flight") 
    .group_by("unique_carrier").count() 
    .rename("unique_carrier", "carr") 
    .sort("count", ascending=False)
)

mostFs.show(5)

--------------------
|"carr"  |"count"  |
--------------------
|WN      |356167   |
|AA      |175969   |
|OO      |166445   |
|MQ      |141178   |
|US      |133403   |
--------------------



In [7]:
# get all of the carriers
allCs = session.table("tmp_cat.aviation.raw_carrier")
 
# repurpose mostFs from above (or chain on it) 
#   to join the 2 DFs and sort the results that
#   have already been grouped
top5CarrNm = (
    mostFs 
    .join(allCs, mostFs.carr == allCs.code) 
    .drop("code") 
    .sort("count", ascending=False)
)

top5CarrNm.show(5, 30)

-----------------------------------------------------
|"carr"  |"count"  |"description"                   |
-----------------------------------------------------
|WN      |356167   |Southwest Airlines Co.          |
|AA      |175969   |American Airlines Inc.          |
|OO      |166445   |Skywest Airlines Inc.           |
|MQ      |141178   |American Eagle Airlines Inc.    |
|US      |133403   |US Airways Inc. (Merged wit...  |
-----------------------------------------------------



In [8]:
# trimFs are flights projected & filtered
trimFs = (
    session.table("tmp_cat.aviation.raw_flight") 
    .rename("tail_number", "tNbr") 
    .select("tNbr", "distance") 
    .filter(col("distance") > 1500) 
)
 
# trimPs are planes table projected & filtered
trimPs = (
    session.table("tmp_cat.aviation.raw_plane") 
    .select("tail_number", "model") 
    .filter("model is not null")
)
 
# join, group & sort
q5Answer = (
    trimFs 
    .join(trimPs, trimFs.tNbr == trimPs.tail_number) 
    .drop("tail_number") 
    .group_by("model").count() 
    .sort("count", ascending=False) 
)

q5Answer.show()

----------------------
|"model"   |"count"  |
----------------------
|A320-232  |28926    |
|737-7H4   |21597    |
|757-222   |14609    |
|757-232   |12972    |
|737-824   |10789    |
|737-832   |9393     |
|A319-131  |5881     |
|A321-211  |4921     |
|767-332   |4522     |
|A319-132  |4480     |
----------------------



In [9]:
# temp DF holds counts for each originating airport 
#   by month
aggFlights = (
    session.table("tmp_cat.aviation.raw_flight") 
    .select("origination", "month") 
    .rename("origination", "orig") 
    .group_by("orig", "month").count() 
    .rename("count", "num_fs")
)

In [10]:
# define a window specification
w1 = Window.partition_by("orig").order_by("month")
 
# add col to grab the prior row's nbr flights
changeFlights = (
    aggFlights 
    .withColumn("num_fs_b4", 
        lag("num_fs",1).over(w1))
)

In [11]:
# add col for the percentage change
q6Answer = (
    changeFlights 
    .withColumn("perc_chg", 
        round((1.0 * (col("num_fs") - col("num_fs_b4")) / 
              (1.0 * col("num_fs_b4"))), 1))
)

q6Answer.show()

----------------------------------------------------------
|"orig"  |"month"  |"num_fs"  |"num_fs_b4"  |"perc_chg"  |
----------------------------------------------------------
|ABE     |1        |99        |NULL         |NULL        |
|ABE     |2        |111       |99           |0.1         |
|ABE     |3        |127       |111          |0.1         |
|ABE     |4        |142       |127          |0.1         |
|ABE     |5        |137       |142          |-0.0        |
|ABE     |6        |116       |137          |-0.2        |
|ABE     |7        |113       |116          |-0.0        |
|ABE     |8        |106       |113          |-0.1        |
|ABE     |9        |94        |106          |-0.1        |
|ABE     |10       |140       |94           |0.5         |
----------------------------------------------------------



In [12]:
# determine counts from orig>dest pairs
popularRoutes = (
    session 
    .table("tmp_cat.aviation.raw_flight") 
    .rename("origination", "orig") 
    .rename("destination", "dest") 
    .group_by("orig", "dest").count() 
    .rename("count", "num_fs")
)

In [13]:
# define a window specification
w2 = (
    Window.partition_by("orig") 
    .order_by(col("num_fs").desc())
)
 
# add col to put the curr row's ranking in
rankedRoutes = (
    popularRoutes 
    .withColumn("rank", 
        row_number().over(w2))
)

In [14]:
# just show up to 3 for each orig airport
q7Answer = (
    rankedRoutes 
    .filter(col("rank") <= 3) 
    .sort("orig", "rank")
)

q7Answer.show(17);

---------------------------------------
|"orig"  |"dest"  |"num_fs"  |"rank"  |
---------------------------------------
|ABE     |ORD     |420       |1       |
|ABE     |DTW     |282       |2       |
|ABE     |ATL     |247       |3       |
|ABI     |DFW     |773       |1       |
|ABQ     |PHX     |1619      |1       |
|ABQ     |DEN     |1254      |2       |
|ABQ     |DAL     |951       |3       |
|ABY     |ATL     |338       |1       |
|ACK     |EWR     |62        |1       |
|ACK     |JFK     |58        |2       |
|ACT     |DFW     |567       |1       |
|ACV     |SFO     |705       |1       |
|ACV     |SMF     |175       |2       |
|ACV     |SLC     |134       |3       |
|ACY     |ATL     |34        |1       |
|ACY     |JFK     |1         |2       |
|ACY     |LGA     |1         |3       |
---------------------------------------

