<a href="https://colab.research.google.com/github/nicolebraun310/DSCI326_module_5_intro_to_ML_in_Python/blob/main/6_3_data_verbs_in_pyspark%20(1).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#### Colab Prep

Execute the following code cells to whenever you open/restart the notebook in Google Colab.

In [1]:
!wget https://github.com/WSU-DataScience/dsci_325_module6_basic_data_management_in_python/raw/main/sample_data.zip

--2025-11-18 15:03:50--  https://github.com/WSU-DataScience/dsci_325_module6_basic_data_management_in_python/raw/main/sample_data.zip
Resolving github.com (github.com)... 140.82.113.4
Connecting to github.com (github.com)|140.82.113.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/WSU-DataScience/dsci_325_module6_basic_data_management_in_python/main/sample_data.zip [following]
--2025-11-18 15:03:50--  https://raw.githubusercontent.com/WSU-DataScience/dsci_325_module6_basic_data_management_in_python/main/sample_data.zip
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.108.133, 185.199.111.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12511348 (12M) [application/zip]
Saving to: ‘sample_data.zip’


2025-11-18 15:03:50 (111 MB/s) - ‘sample_data.zip’ saved [1251134

In [2]:
!unzip -r ./sample_data.zip

UnZip 6.00 of 20 April 2009, by Debian. Original by Info-ZIP.

Usage: unzip [-Z] [-opts[modifiers]] file[.zip] [list] [-x xlist] [-d exdir]
  Default action is to extract files in list, except those in xlist, to exdir;
  file[.zip] may be a wildcard.  -Z => ZipInfo mode ("unzip -Z" for usage).

  -p  extract files to pipe, no messages     -l  list files (short format)
  -f  freshen existing files, create none    -t  test compressed archive data
  -u  update files, create if necessary      -z  display archive comment only
  -v  list verbosely/show version info       -T  timestamp archive to latest
  -x  exclude files that follow (in xlist)   -d  extract files into exdir
modifiers:
  -n  never overwrite existing files         -q  quiet mode (-qq => quieter)
  -o  overwrite files WITHOUT prompting      -a  auto-convert any text files
  -j  junk paths (do not make directories)   -aa treat ALL files as text
  -U  use escapes for all non-ASCII Unicode  -UU ignore any Unicode fields
  -C  mat

# Overview of PySpark data management

In this notebook, we will illustrate how various data verbs are implemented in `pyspark`.

## `polars` $\approx$ `pyspark`

You will see a lot of similarities between `polars` and `pyspark`

1. Lazy evaluation and column expression,
2. Parallel processing out-of-the-box,
3. Dot-chained queries, and
4. Data verbs related to `SQL` and/or `dplyr`.

## Data verbs in `pyspark`

In this lecture, we will look at how the common data verbs are implemented in `pyspark`.  Luckily, the implementation is similar to `polars`, so it should be a relatively pain-free transition.

### Overview of Basic Data Verbs in `polars` and `pyspark`

Verb/Function | `polars` | `pyspark` |
--------------|----------|-----------|
Column expr.  | `pl.col('name') ...` | `col('name') ...`|
SELECT | `.select(...)` | `.select(...)` |
FILTER | `.filter(...)` | `.where(...)` |
MUTATE | `.with_columns(...)` | `.withColumn(...)` |
GROUPBY | `.group_by(...)` | `.groupBy(...)`|
AGGREGATE | `.agg(...)` | `.agg(...)` |
JOIN | `l_tbl.join(r_tbl,...)` | `l_tbl.join(r_tbl,...)`|
UNION | `pl.concat` or SQL | `t1.union(t2)` |
STACK COLUMNS | `.unpivot(...)` | `.unpivot(...)`|
UNSTACK COLUMNS | `.pivot(...)` | `.groupBy(...).pivot(...).<aggfunc>(...)`|

In [4]:
!curl https://raw.githubusercontent.com/DSCI-326/DSCI326_module_6_lazy_operations/refs/heads/main/data/heroes_information.csv > ./sample_data/heroes_information.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100 45542  100 45542    0     0   498k      0 --:--:-- --:--:-- --:--:--  499k


In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean

spark = SparkSession.builder.appName('Ops').getOrCreate()
heroes = spark.read.csv('./sample_data/heroes_information.csv', inferSchema=True, header=True, nullValue='-')

heroes.limit(5).toPandas()

Unnamed: 0,ID,name,Gender,Eye color,Race,Hair color,Height,Publisher,Skin color,Alignment,Weight
0,0,A-Bomb,Male,yellow,Human,No Hair,203.0,Marvel Comics,,good,441
1,1,Abe Sapien,Male,blue,Icthyo Sapien,No Hair,191.0,Dark Horse Comics,blue,good,65
2,2,Abin Sur,Male,blue,Ungaran,No Hair,185.0,DC Comics,red,good,90
3,3,Abomination,Male,green,Human / Radiation,No Hair,203.0,Marvel Comics,,bad,441
4,4,Abraxas,Male,blue,Cosmic Entity,Black,-99.0,Marvel Comics,,bad,-99


In [6]:
heroes.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Eye color: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Hair color: string (nullable = true)
 |-- Height: double (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- Skin color: string (nullable = true)
 |-- Alignment: string (nullable = true)
 |-- Weight: integer (nullable = true)



## Selecting Columns

The first verb, `select`

* filters the *columns*
* At the core of `SQL` statements

In [8]:
from pyspark.sql.functions import col

(select_query :=
 heroes
 .select(heroes.name,      # Column via dataframe.name
         heroes.Gender,
      #   col('Gender'),    # Column expression (lazy)
         'Weight')         # String
).limit(5).toPandas()      # <-- outside the saved query

Unnamed: 0,name,Gender,Weight
0,A-Bomb,Male,441
1,Abe Sapien,Male,65
2,Abin Sur,Male,90
3,Abomination,Male,441
4,Abraxas,Male,-99


In [9]:
select_query  # <-- lazy query

DataFrame[name: string, Gender: string, Weight: int]

## Filtering Rows

The next verb, `filter`

* filters the *rows*
* is related to the `SQL` `WHERE` clause
* `pyspark`: Use the `where` method

In [10]:
heroes.Gender == 'Male' # <-- Lazy column expression

Column<'(Gender = Male)'>

In [11]:
col('Gender') == 'Male' # <-- Lazy column expression

Column<'(Gender = Male)'>

In [None]:
(heroes
 .where(col('Gender') == 'Male')
).limit(5).toPandas()

Unnamed: 0,_c0,name,Gender,Eye color,Race,Hair color,Height,Publisher,Skin color,Alignment,Weight
0,0,A-Bomb,Male,yellow,Human,No Hair,203.0,Marvel Comics,,good,441.0
1,1,Abe Sapien,Male,blue,Icthyo Sapien,No Hair,191.0,Dark Horse Comics,blue,good,65.0
2,2,Abin Sur,Male,blue,Ungaran,No Hair,185.0,DC Comics,red,good,90.0
3,3,Abomination,Male,green,Human / Radiation,No Hair,203.0,Marvel Comics,,bad,441.0
4,4,Abraxas,Male,blue,Cosmic Entity,Black,-99.0,Marvel Comics,,bad,-99.0


## Chaining Data Verbs

* Processing df $\rightarrow$ chaining data verbs
* Accomplished through dot-chains

In [12]:
(heroes
 .where(col('Gender') == 'Male')
 .select('name',
         'Gender',
         'Weight')
).limit(5).toPandas()

Unnamed: 0,name,Gender,Weight
0,A-Bomb,Male,441
1,Abe Sapien,Male,65
2,Abin Sur,Male,90
3,Abomination,Male,441
4,Abraxas,Male,-99


## Constructing New Columns

The third verb, `mutate`

* Creates new columns
* Changes existing columns
* `pyspark`: Use the `withColumns` method

### Example 3 - Converting Weight to kilograms

In [13]:
(heroes
 .select('name',
         'Gender',
         'Weight')
 .withColumn('Weight_kg', col('Weight')/2.2046)
).limit(5).toPandas()

Unnamed: 0,name,Gender,Weight,Weight_kg
0,A-Bomb,Male,441,200.036288
1,Abe Sapien,Male,65,29.483807
2,Abin Sur,Male,90,40.823732
3,Abomination,Male,441,200.036288
4,Abraxas,Male,-99,-44.906105


## Referencing a new column

 Use the `col` function with the label from `withColumn`

In [14]:
(new_col_result :=
 heroes
 .select('name',
         'Gender',
         'Weight')
 .withColumn('Weight_kg', col('Weight')/2.2046)
 .where(col('Weight_kg') < 100)  # <-- one reason we need lazy expressions
).limit(5).toPandas()

Unnamed: 0,name,Gender,Weight,Weight_kg
0,Abe Sapien,Male,65,29.483807
1,Abin Sur,Male,90,40.823732
2,Abraxas,Male,-99,-44.906105
3,Absorbing Man,Male,122,55.338837
4,Adam Monroe,Male,-99,-44.906105


## Simple and Grouped Aggregation

### Simple Aggregation

A **simple aggregation** collapses all rows into one row.

<img src="https://github.com/wsu-stat489/module5_intro_to_pyspark/blob/main/img/simple_aggregation.png?raw=1" width=800>

In [15]:
from pyspark.sql.functions import mean, std, max, min

(heroes
  .agg(mean('Height').alias('Mean(Height)'),
       std('Height').alias('SD(Height)'),
        max('Weight').alias('Max(Height)'),
        min('Weight').alias('Min(Height)'))
).toPandas()

Unnamed: 0,Mean(Height),SD(Height),Max(Height),Min(Height)
0,102.254087,139.624543,900,-99


### Group and Aggregate

<img src="https://github.com/wsu-stat489/module5_intro_to_pyspark/blob/main/img/group_and_aggregate.png?raw=1" width=800>

In [16]:
(heroes
  .where(col('Publisher').isin(['Marvel Comics', 'DC Comics']))
  .groupBy('Publisher')
  .agg(mean('Height').alias('Mean(Height)'),
       std('Height').alias('SD(Height)'),
       max('Weight').alias('Max(Height)'),
       min('Weight').alias('Min(Height)'))
).toPandas()


Unnamed: 0,Publisher,Mean(Height),SD(Height),Max(Height),Min(Height)
0,Marvel Comics,142.756443,125.404393,900,-99
1,DC Comics,91.072093,132.63077,817,-99


### Grouping by more than one category

* `group_by` accepts multiple columns
* Groups all combinations

In [None]:
(heroes
  .where(col('Publisher').isin(['Marvel Comics', 'DC Comics']))
  .where(col('Gender').isNotNull())
  .groupBy('Publisher', 'Gender')
  .agg(mean('Height').alias('Mean(Height)'),
       std('Height').alias('SD(Height)'),
       max('Weight').alias('Max(Height)'),
       min('Weight').alias('Min(Height)'))
).toPandas()


Unnamed: 0,Publisher,Gender,Mean(Height),SD(Height),Max(Height),Min(Height)
0,DC Comics,Male,94.281046,134.892865,817.0,-99.0
1,Marvel Comics,Male,151.712302,132.75239,900.0,-99.0
2,Marvel Comics,Female,141.432432,94.143085,495.0,-99.0
3,DC Comics,Female,86.139344,126.45586,630.0,-99.0


## Joins in `pyspark`

Performed with `df_left.join(df_right, how=type_str)`

In [17]:
! curl https://raw.githubusercontent.com/DSCI-326/DSCI326_module_6_lazy_operations/refs/heads/main/data/department.csv > ./sample_data/department.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100    65  100    65    0     0    898      0 --:--:-- --:--:-- --:--:--   902


In [18]:
!curl https://raw.githubusercontent.com/DSCI-326/DSCI326_module_6_lazy_operations/refs/heads/main/data/employee.csv > ./sample_data/employee.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100    82  100    82    0     0   1148      0 --:--:-- --:--:-- --:--:--  1154


In [19]:
(dept :=
 spark.read.csv("./sample_data/department.csv",  header=True, inferSchema=True)
).toPandas()

Unnamed: 0,DeptID,DeptName
0,31,Sales
1,33,Engineering
2,34,Clerical
3,35,Marketing


In [20]:
(empl :=
 spark.read.csv("./sample_data/employee.csv",  header=True, inferSchema=True)
).toPandas()

Unnamed: 0,LastName,DeptID
0,Rafferty,31.0
1,Jones,33.0
2,Heisenberg,33.0
3,Robinson,34.0
4,Smith,34.0
5,Williams,


#### Inner join

In [21]:
(empl.join(dept, empl.DeptID == dept.DeptID, how='inner')
).toPandas()

Unnamed: 0,LastName,DeptID,DeptID.1,DeptName
0,Rafferty,31,31,Sales
1,Jones,33,33,Engineering
2,Heisenberg,33,33,Engineering
3,Robinson,34,34,Clerical
4,Smith,34,34,Clerical


#### Left join

In [22]:
(empl.join(dept, empl.DeptID == dept.DeptID, how='left')
).toPandas()

Unnamed: 0,LastName,DeptID,DeptID.1,DeptName
0,Rafferty,31.0,31.0,Sales
1,Jones,33.0,33.0,Engineering
2,Heisenberg,33.0,33.0,Engineering
3,Robinson,34.0,34.0,Clerical
4,Smith,34.0,34.0,Clerical
5,Williams,,,


#### Right join

In [23]:
(empl.join(dept, empl.DeptID == dept.DeptID, how='right')
).toPandas()

Unnamed: 0,LastName,DeptID,DeptID.1,DeptName
0,Rafferty,31.0,31,Sales
1,Heisenberg,33.0,33,Engineering
2,Jones,33.0,33,Engineering
3,Smith,34.0,34,Clerical
4,Robinson,34.0,34,Clerical
5,,,35,Marketing


#### Outer join

In [24]:
(empl.join(dept, empl.DeptID == dept.DeptID, how='outer')
).toPandas()

Unnamed: 0,LastName,DeptID,DeptID.1,DeptName
0,Williams,,,
1,Rafferty,31.0,31.0,Sales
2,Jones,33.0,33.0,Engineering
3,Heisenberg,33.0,33.0,Engineering
4,Robinson,34.0,34.0,Clerical
5,Smith,34.0,34.0,Clerical
6,,,35.0,Marketing


## Joining on multiple keys

Next, we will look at table joins that require matching multiple keys.

### Example -- Total At Bats, Hits, and Runs Allowed in 2010

To illustrate joining on multiple keys, lets

1. Compute the totals for H and R in 2010 for each team from the `Pitching` table.
2. Join on the team name and park.

This is a good example, because team information can change over the years, so we need to match both `teamID` and `yearID`.

#### Step 1. Read and process the pitching table

In [25]:
!curl https://raw.githubusercontent.com/DSCI-326/DSCI326_module_6_lazy_operations/refs/heads/main/data/baseball/core/Pitching.csv > ./sample_data/Pitching.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 4110k  100 4110k    0     0  8859k      0 --:--:-- --:--:-- --:--:-- 8858k


In [26]:
!curl https://raw.githubusercontent.com/DSCI-326/DSCI326_module_6_lazy_operations/refs/heads/main/data/baseball/core/Teams.csv > ./sample_data/Teams.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100  565k  100  565k    0     0  2754k      0 --:--:-- --:--:-- --:--:-- 2747k


In [27]:
(pitching :=
 spark.read.csv("./sample_data/Pitching.csv", header=True, inferSchema=True)
).limit(5).toPandas()

Unnamed: 0,playerID,yearID,stint,teamID,lgID,W,L,G,GS,CG,...,IBB,WP,HBP,BK,BFP,GF,R,SH,SF,GIDP
0,bechtge01,1871,1,PH1,,1,2,3,3,2,...,,7,,0,146,0,42,,,
1,brainas01,1871,1,WS3,,12,15,30,30,30,...,,7,,0,1291,0,292,,,
2,fergubo01,1871,1,NY2,,0,0,1,0,0,...,,2,,0,14,0,9,,,
3,fishech01,1871,1,RC1,,4,16,24,24,22,...,,20,,0,1080,1,257,,,
4,fleetfr01,1871,1,NY2,,0,1,1,1,1,...,,0,,0,57,0,21,,,


In [29]:
pitching.printSchema()

root
 |-- playerID: string (nullable = true)
 |-- yearID: integer (nullable = true)
 |-- stint: integer (nullable = true)
 |-- teamID: string (nullable = true)
 |-- lgID: string (nullable = true)
 |-- W: integer (nullable = true)
 |-- L: integer (nullable = true)
 |-- G: integer (nullable = true)
 |-- GS: integer (nullable = true)
 |-- CG: integer (nullable = true)
 |-- SHO: integer (nullable = true)
 |-- SV: integer (nullable = true)
 |-- IPouts: integer (nullable = true)
 |-- H: integer (nullable = true)
 |-- ER: integer (nullable = true)
 |-- HR: integer (nullable = true)
 |-- BB: integer (nullable = true)
 |-- SO: integer (nullable = true)
 |-- BAOpp: double (nullable = true)
 |-- ERA: double (nullable = true)
 |-- IBB: integer (nullable = true)
 |-- WP: integer (nullable = true)
 |-- HBP: integer (nullable = true)
 |-- BK: integer (nullable = true)
 |-- BFP: integer (nullable = true)
 |-- GF: integer (nullable = true)
 |-- R: integer (nullable = true)
 |-- SH: integer (nullable = 

In [30]:
(teams :=
 spark.read.csv("./sample_data/Teams.csv", header=True, inferSchema=True)
).limit(5).toPandas()

Unnamed: 0,yearID,lgID,teamID,franchID,divID,Rank,G,Ghome,W,L,...,DP,FP,name,park,attendance,BPF,PPF,teamIDBR,teamIDlahman45,teamIDretro
0,1871,,BS1,BNA,,3,31,,20,10,...,24,0.834,Boston Red Stockings,South End Grounds I,,103,98,BOS,BS1,BS1
1,1871,,CH1,CNA,,2,28,,19,9,...,16,0.829,Chicago White Stockings,Union Base-Ball Grounds,,104,102,CHI,CH1,CH1
2,1871,,CL1,CFC,,8,29,,10,19,...,15,0.818,Cleveland Forest Citys,National Association Grounds,,96,100,CLE,CL1,CL1
3,1871,,FW1,KEK,,7,19,,7,12,...,8,0.803,Fort Wayne Kekiongas,Hamilton Field,,101,107,KEK,FW1,FW1
4,1871,,NY2,NNA,,5,33,,16,17,...,14,0.84,New York Mutuals,Union Grounds (Brooklyn),,90,88,NYU,NY2,NY2


In [31]:
teams.printSchema()

root
 |-- yearID: integer (nullable = true)
 |-- lgID: string (nullable = true)
 |-- teamID: string (nullable = true)
 |-- franchID: string (nullable = true)
 |-- divID: string (nullable = true)
 |-- Rank: integer (nullable = true)
 |-- G: integer (nullable = true)
 |-- Ghome: integer (nullable = true)
 |-- W: integer (nullable = true)
 |-- L: integer (nullable = true)
 |-- DivWin: string (nullable = true)
 |-- WCWin: string (nullable = true)
 |-- LgWin: string (nullable = true)
 |-- WSWin: string (nullable = true)
 |-- R: integer (nullable = true)
 |-- AB: integer (nullable = true)
 |-- H: integer (nullable = true)
 |-- 2B: integer (nullable = true)
 |-- 3B: integer (nullable = true)
 |-- HR: integer (nullable = true)
 |-- BB: integer (nullable = true)
 |-- SO: integer (nullable = true)
 |-- SB: integer (nullable = true)
 |-- CS: integer (nullable = true)
 |-- HBP: integer (nullable = true)
 |-- SF: integer (nullable = true)
 |-- RA: integer (nullable = true)
 |-- ER: integer (nullabl

In [32]:
from pyspark.sql.functions import sum

(pitching_totals_2010 :=
 pitching
 .select('teamID', 'yearID', 'R', 'H')
 .where(col('yearID') == 2010)
 .groupBy('teamID', 'yearID')
 .agg(sum('R').alias('Total Runs'),
      sum('H').alias('Total Hits')
       )
).limit(5).toPandas()

Unnamed: 0,teamID,yearID,Total Runs,Total Hits
0,MIN,2010,671,1493
1,CHA,2010,704,1471
2,TOR,2010,728,1407
3,FLO,2010,717,1433
4,TBA,2010,649,1347


#### Step 2. Read and process the teams table

In [33]:
(team_name_and_park :=
 teams
 .select('yearID', 'teamID', col('name').alias('Team Name'), 'park')
).limit(5).toPandas()

Unnamed: 0,yearID,teamID,Team Name,park
0,1871,BS1,Boston Red Stockings,South End Grounds I
1,1871,CH1,Chicago White Stockings,Union Base-Ball Grounds
2,1871,CL1,Cleveland Forest Citys,National Association Grounds
3,1871,FW1,Fort Wayne Kekiongas,Hamilton Field
4,1871,NY2,New York Mutuals,Union Grounds (Brooklyn)


#### Step 3. Perform a left-join.

Since we want to keep all rows in the totals table, and only add the team information when available, we will perform a left join on the totals table.

Notice that the second `on` argument is now a `list` of column expressions, one for each matching rule.

In [34]:
(pitching_totals_2010
 .join(team_name_and_park,
       on = [pitching_totals_2010.yearID == team_name_and_park.yearID,
             pitching_totals_2010.teamID == team_name_and_park.teamID],
       how='left')
).limit(5).toPandas()

Unnamed: 0,teamID,yearID,Total Runs,Total Hits,yearID.1,teamID.1,Team Name,park
0,MIN,2010,671,1493,2010,MIN,Minnesota Twins,Target Field
1,CHA,2010,704,1471,2010,CHA,Chicago White Sox,U.S. Cellular Field
2,TOR,2010,728,1407,2010,TOR,Toronto Blue Jays,Rogers Centre
3,FLO,2010,717,1433,2010,FLO,Florida Marlins,Dolphin Stadium
4,TBA,2010,649,1347,2010,TBA,Tampa Bay Rays,Tropicana Field


## Concatenating Tables with Set-Like Operations in `pyspark`

Now let's look at combining tables with `union`, `intersect`, and `except` in `pyspark`.

In [35]:
!curl https://raw.githubusercontent.com/DSCI-326/DSCI326_module_6_lazy_operations/refs/heads/main/data/auto_sales_apr.csv > ./sample_data/auto_sales_apr.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100   118  100   118    0     0   1606      0 --:--:-- --:--:-- --:--:--  1616


In [36]:
!curl https://raw.githubusercontent.com/DSCI-326/DSCI326_module_6_lazy_operations/refs/heads/main/data/auto_sales_may.csv > ./sample_data/auto_sales_may.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100   118  100   118    0     0   1633      0 --:--:-- --:--:-- --:--:--  1638


In [37]:
(sales_apr :=
 spark.read.csv("./sample_data/auto_sales_apr.csv",  header=True, inferSchema=True)
).toPandas()

Unnamed: 0,ID,Salesperson,Compact,Sedan,SUV,Truck
0,0,Ann,22,18,15,12
1,1,Bob,19,12,17,20
2,2,Yolanda,19,8,32,15
3,3,Xerxes,12,23,18,9


In [38]:
(sales_may :=
 spark.read.csv("./sample_data/auto_sales_may.csv",  header=True, inferSchema=True)
).toPandas()

Unnamed: 0,ID,Salesperson,Compact,Sedan,SUV,Truck
0,0,Ann,22,18,15,12
1,1,Bob,20,14,6,24
2,2,Yolanda,19,10,28,17
3,3,Xerxes,11,27,17,9


#### UNION and UNION DISTINCT

In [39]:
(combined_sales :=
 sales_apr
 .union(sales_may)
).toPandas()

Unnamed: 0,ID,Salesperson,Compact,Sedan,SUV,Truck
0,0,Ann,22,18,15,12
1,1,Bob,19,12,17,20
2,2,Yolanda,19,8,32,15
3,3,Xerxes,12,23,18,9
4,0,Ann,22,18,15,12
5,1,Bob,20,14,6,24
6,2,Yolanda,19,10,28,17
7,3,Xerxes,11,27,17,9


In [40]:
(sales_apr
 .union(sales_may)
 .distinct()
).toPandas()

Unnamed: 0,ID,Salesperson,Compact,Sedan,SUV,Truck
0,3,Xerxes,12,23,18,9
1,2,Yolanda,19,8,32,15
2,1,Bob,19,12,17,20
3,0,Ann,22,18,15,12
4,1,Bob,20,14,6,24
5,3,Xerxes,11,27,17,9
6,2,Yolanda,19,10,28,17


#### Including information from the file name

In [41]:
from pyspark.sql.functions import lit

(combined_sales :=
 sales_apr
 .drop('ID')
 .withColumn('Month', lit('Apr'))  # <-- use `lit` to provide a Java literal (similar to pl.lit in polars)
 .union(sales_may
        .drop('ID')
        .withColumn('Month', lit('May'))
       )
).toPandas()


Unnamed: 0,Salesperson,Compact,Sedan,SUV,Truck,Month
0,Ann,22,18,15,12,Apr
1,Bob,19,12,17,20,Apr
2,Yolanda,19,8,32,15,Apr
3,Xerxes,12,23,18,9,Apr
4,Ann,22,18,15,12,May
5,Bob,20,14,6,24,May
6,Yolanda,19,10,28,17,May
7,Xerxes,11,27,17,9,May


#### INTERSECTION

In [42]:
(sales_apr
 .intersect(sales_may)
).toPandas()

Unnamed: 0,ID,Salesperson,Compact,Sedan,SUV,Truck
0,0,Ann,22,18,15,12


#### DIFFERENCE

In [43]:
(sales_apr
 .exceptAll(sales_may)
).toPandas()

Unnamed: 0,ID,Salesperson,Compact,Sedan,SUV,Truck
0,1,Bob,19,12,17,20
1,2,Yolanda,19,8,32,15
2,3,Xerxes,12,23,18,9


## Reshaping tables

#### Stacking columns with `unpivot`

In [44]:
(combined_sales
 .unpivot(ids = ['Salesperson', 'Month'],
          values = ['Compact','Sedan','SUV','Truck'],
          variableColumnName='Type',
          valueColumnName='Sales'
         )
).toPandas()

Unnamed: 0,Salesperson,Month,Type,Sales
0,Ann,Apr,Compact,22
1,Ann,Apr,Sedan,18
2,Ann,Apr,SUV,15
3,Ann,Apr,Truck,12
4,Bob,Apr,Compact,19
5,Bob,Apr,Sedan,12
6,Bob,Apr,SUV,17
7,Bob,Apr,Truck,20
8,Yolanda,Apr,Compact,19
9,Yolanda,Apr,Sedan,8


#### Unstacking columns with GROUPBY + PIVOT + SUMMARY METHOD

In [45]:
(combined_sales
 .unpivot(ids = ['Salesperson', 'Month'],
          values = ['Compact','Sedan','SUV','Truck'],
          variableColumnName='Type',
          valueColumnName='Sales'
         )
 .groupBy('Salesperson')
 .pivot('Type')
 .sum('Sales')
).toPandas()

Unnamed: 0,Salesperson,Compact,SUV,Sedan,Truck
0,Yolanda,38,60,18,32
1,Xerxes,23,35,50,18
2,Ann,44,30,36,24
3,Bob,39,23,26,44


## Review of Basic Data Verbs in `polars` and `pyspark`

Verb/Function | `polars` | `pyspark` |
--------------|----------|-----------|
Column expr.  | `pl.col('name') ...` | `col('name') ...`|
SELECT | `.select(...)` | `.select(...)` |
FILTER | `.filter(...)` | `.where(...)` |
MUTATE | `.with_columns(...)` | `.withColumn(...)` |
GROUPBY | `.group_by(...)` | `.groupBy(...)`|
AGGREGATE | `.agg(...)` | `.agg(...)` |
JOIN | `l_tbl.join(r_tbl,...)` | `l_tbl.join(r_tbl,...)`|
UNION | `pl.concat` or SQL | `t1.union(t2)` |
STACK COLUMNS | `.unpivot(...)` | `.unpivot(...)`|
UNSTACK COLUMNS | `.pivot(...)` | `.groupBy(...).pivot(...).<aggfunc>(...)`|

## <font color="red"> Exercise 6.3 </font>

Determine all the players that have hit more than 40 home runs in a season in the modern era (e.g., since 1946).  The final table should include the players proper name, as well as the team name.

**Tasks.**

1. Use `curl` to download the tables from https://github.com/DSCI-326/DSCI326_module_6_lazy_operations/tree/main/data.  Be sure to copy the "Raw" link!
2. Select and filter where possible,
3. Be sure to aggregate across the stints to compute total HR for each player-year,
4. Remove and keys after joining proper names, and
5. Sort the results by year (outer; ascending) and total HR (inner; descending)

**Hint:** You will need join the files listed below.  To get credit for this exercise, use the join `pyspark` join methods presented above.

In [None]:
# Your code here