# Joins in `pyspark`

Performed with `df_left.join(df_right, how=type_str)`

In [2]:
from pyspark.sql import SparkSession
from more_pyspark import to_pandas
spark = SparkSession.builder.appName('Ops').getOrCreate()
dept = spark.read.csv("./data/department.csv",  header=True, inferSchema=True)
dept.collect() >> to_pandas

Unnamed: 0,DeptID,DeptName
0,31,Sales
1,33,Engineering
2,34,Clerical
3,35,Marketing


In [3]:
empl = spark.read.csv("./data/employee.csv",  header=True, inferSchema=True)
empl.collect() >> to_pandas

Unnamed: 0,LastName,DeptID
0,Rafferty,31.0
1,Jones,33.0
2,Heisenberg,33.0
3,Robinson,34.0
4,Smith,34.0
5,Williams,


#### Inner join

In [4]:
(empl.join(dept, empl.DeptID == dept.DeptID, how='inner')
 .collect()) >> to_pandas

Unnamed: 0,LastName,DeptID,DeptName
0,Rafferty,31,Sales
1,Jones,33,Engineering
2,Heisenberg,33,Engineering
3,Robinson,34,Clerical
4,Smith,34,Clerical


#### Left join

In [7]:
(empl.join(dept, empl.DeptID == dept.DeptID, how='left')
 .collect()) >> to_pandas

Unnamed: 0,LastName,DeptID,DeptName
0,Rafferty,31.0,Sales
1,Jones,33.0,Engineering
2,Heisenberg,33.0,Engineering
3,Robinson,34.0,Clerical
4,Smith,34.0,Clerical
5,Williams,,


#### Right join

In [8]:
(empl.join(dept, empl.DeptID == dept.DeptID, how='right')
 .collect()) >> to_pandas

Unnamed: 0,LastName,DeptID,DeptName
0,Rafferty,31,Sales
1,Heisenberg,33,Engineering
2,Jones,33,Engineering
3,Smith,34,Clerical
4,Robinson,34,Clerical
5,,35,Marketing


#### Outer join

In [10]:
(empl.join(dept, empl.DeptID == dept.DeptID, how='outer')
 .collect()) >> to_pandas

Unnamed: 0,LastName,DeptID,DeptName
0,Williams,,
1,Rafferty,31.0,Sales
2,Jones,33.0,Engineering
3,Heisenberg,33.0,Engineering
4,Robinson,34.0,Clerical
5,Smith,34.0,Clerical
6,,35.0,Marketing


## Joining on multiple keys

Next, we will look at table joins that require matching multiple keys.

### Example -- Total At Bats, Hits, and Runs Allowed in 2010

To illustrate joining on multiple keys, lets

1. Compute the totals for AB, H, and R in 2010 for each team from the `Pitching` table.
2. Join on the team name and park.

This is a good example, because team information can change over the years, so we need to match both `teamID` and `yearID`.

#### Step 1. Read and process the pitching table

In [42]:
pitching = spark.read.csv("./data/baseball/core/Batting.csv", header=True, inferSchema=True)

(pitching
 .take(5)
) >> to_pandas

Unnamed: 0,playerID,yearID,stint,teamID,lgID,G,AB,R,H,2B,...,RBI,SB,CS,BB,SO,IBB,HBP,SH,SF,GIDP
0,abercda01,1871,1,TRO,,1,4,0,0,0,...,0,0,0,0,0,,,,,0
1,addybo01,1871,1,RC1,,25,118,30,32,6,...,13,8,1,4,0,,,,,0
2,allisar01,1871,1,CL1,,29,137,28,40,4,...,19,3,1,2,5,,,,,1
3,allisdo01,1871,1,WS3,,27,133,28,44,10,...,27,1,1,0,2,,,,,0
4,ansonca01,1871,1,RC1,,25,120,29,39,11,...,16,6,2,2,1,,,,,0


In [43]:
from pyspark.sql.functions import mean, stddev, col, sum

team_totals_2010 = (pitching
                    .select('teamID', 'yearID', 'AB', 'R', 'H')
                    .where(col('yearID') == 2010)
                    .groupBy(col('teamID'), col('yearID'))
                    .agg(sum('AB').alias('Total At Bats'),
                         sum('R').alias('Total Runs'), 
                         sum('H').alias('Total Hits'))
                   )

team_totals_2010.take(5) >> to_pandas

Unnamed: 0,teamID,yearID,Total At Bats,Total Runs,Total Hits
0,MIN,2010,5568,781,1521
1,CHA,2010,5484,752,1467
2,TOR,2010,5495,755,1364
3,FLO,2010,5531,719,1403
4,TBA,2010,5439,802,1343


#### Step 2. Read and process the teams table

In [40]:
teams = spark.read.csv("./data/baseball/core/Teams.csv", header=True, inferSchema=True)

team_name_and_park = (teams
                     .select('yearID', 'teamID', col('name').alias('Team Name'), 'park')
                     )
              
team_name_and_park.take(5) >> to_pandas

Unnamed: 0,yearID,teamID,Team Name,park
0,1871,BS1,Boston Red Stockings,South End Grounds I
1,1871,CH1,Chicago White Stockings,Union Base-Ball Grounds
2,1871,CL1,Cleveland Forest Citys,National Association Grounds
3,1871,FW1,Fort Wayne Kekiongas,Hamilton Field
4,1871,NY2,New York Mutuals,Union Grounds (Brooklyn)


#### Step 3. Perform a left-join.

Since we want to keep all rows in the totals table, and only add the team information when available, we will perform a left join on the totals table.

Notice that the second `on` argument is now a `list` of column expressions, one for each matching rule.

In [44]:
(team_totals_2010
.join(team_name_and_park,
      on = [team_totals_2010.yearID == team_name_and_park.yearID,
            team_totals_2010.teamID == team_name_and_park.teamID],
      how='left')
.take(5)
) >> to_pandas

Unnamed: 0,teamID,yearID,Total At Bats,Total Runs,Total Hits,Team Name,park
0,MIN,2010,5568,781,1521,Minnesota Twins,Target Field
1,CHA,2010,5484,752,1467,Chicago White Sox,U.S. Cellular Field
2,TOR,2010,5495,755,1364,Toronto Blue Jays,Rogers Centre
3,FLO,2010,5531,719,1403,Florida Marlins,Dolphin Stadium
4,TBA,2010,5439,802,1343,Tampa Bay Rays,Tropicana Field


## <font color="red"> Exercise 2 </font>

Determine all the players that have hit more than 100 home runs in a season.  The final table should include the players proper name, as well as the team name.  

**Hint:** You will need join the files listed below.  To get credit for this exercise, use the join `pyspark` join methods presented above.

In [7]:
paths = ("./data/baseball/core/Batting.csv", 
         "./data/baseball/core/People.csv",
         "./data/baseball/core/Teams.csv")

In [8]:
# Your code here