## Config

In [89]:
from pyspark.sql import SparkSession
import pandas as pd
import findspark

# Init a spark session first
findspark.init()
spark = SparkSession.builder \
    .appName("sony_test") \
    .getOrCreate()

# Point to path + read csv + create temp table + cache
people_file_path = "People.csv"  # Update this with your CSV file path
batting_file_path = "Batting.csv"  # Update this with your CSV file path

people_df = spark.read.csv(people_file_path, header=True, inferSchema=True)
batting_df = spark.read.csv(batting_file_path, header=True, inferSchema=True)

people_df.createOrReplaceTempView('people_table')
batting_df.createOrReplaceTempView('batting_table')


spark.sql(f'cache table people_table')
spark.sql(f'cache table batting_table')

DataFrame[]

## Discovery

- Initial questions:
    - Are there any nulls in playerID,yearID,stint,teamID,lgID?
        - answer: no
    - Are there any duplicates?
        - answer: no
    - Does a player have more than 1 row if they swap teams mid season
        - answer: yes
        - Perform group by + aggs to get year total for each player

### Null Check

Checking data types and looking for nulls/errored data.

In [90]:
spark.sql("DESCRIBE TABLE batting_table").show()

print('-------------------')
print('Null Check')

spark.sql("""
  select * 
  from batting_table 
  where yearID is null 
    or playerID is null
    or stint is null
    or teamID is null
    or lgID is null""").show(10)

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|playerID|   string|   NULL|
|  yearID|      int|   NULL|
|   stint|      int|   NULL|
|  teamID|   string|   NULL|
|    lgID|   string|   NULL|
|       G|      int|   NULL|
|      AB|      int|   NULL|
|       R|      int|   NULL|
|       H|      int|   NULL|
|      2B|      int|   NULL|
|      3B|      int|   NULL|
|      HR|      int|   NULL|
|     RBI|      int|   NULL|
|      SB|      int|   NULL|
|      CS|      int|   NULL|
|      BB|      int|   NULL|
|      SO|      int|   NULL|
|     IBB|      int|   NULL|
|     HBP|      int|   NULL|
|      SH|      int|   NULL|
+--------+---------+-------+
only showing top 20 rows

-------------------
Null Check
+--------+------+-----+------+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+
|playerID|yearID|stint|teamID|lgID|  G| AB|  R|  H| 2B| 3B| HR|RBI| SB| CS| BB| SO|IBB|HBP| SH| SF|GIDP|
+--------+------+-----+------+----+--

In [91]:
# Queries to check for faulty data (e.g. '000000000', ' ' ,etc.)
spark.sql("""
  select 
        lgID
        ,count(*) as lg_count 
  from batting_table 
  group by lgID""").show(10)

spark.sql("""
  select 
        teamID
        ,count(*) as team_count 
  from batting_table 
  group by teamID
  order by teamID asc""").show(10)

+----+--------+
|lgID|lg_count|
+----+--------+
|  UA|     334|
|  NL|   55945|
|  PL|     149|
|  AA|    1893|
|  NA|     737|
|  AL|   50965|
|  FL|     472|
+----+--------+

+------+----------+
|teamID|team_count|
+------+----------+
|   ALT|        18|
|   ANA|       337|
|   ARI|      1128|
|   ATL|      2372|
|   BAL|      2836|
|   BFN|       122|
|   BFP|        26|
|   BL1|        48|
|   BL2|       197|
|   BL3|        36|
+------+----------+
only showing top 10 rows



### Duplicate Check

Using two methods:
1. group by, count, and filter
2. Use distinct

In [92]:
# One way of checking for duplicates
spark.sql("""
select 
    playerID,
    yearID,
    stint,
    teamID,
    lgID,
    count(*) 
from batting_table 
group by playerID,yearID,stint,teamID,lgID 
having count(*) > 1
    """).show(10)

# Get count of table
spark.sql("""
select 
    count(*) 
from batting_table 
    """).show(10)

# Using distinct to be 100% sure
spark.sql("""
select 
    count(distinct yearID, playerID, stint, teamID, lgID)
from batting_table 
    """).show(10)


+--------+------+-----+------+----+--------+
|playerID|yearID|stint|teamID|lgID|count(1)|
+--------+------+-----+------+----+--------+
+--------+------+-----+------+----+--------+

+--------+
|count(1)|
+--------+
|  110495|
+--------+

+-----------------------------------------------------+
|count(DISTINCT yearID, playerID, stint, teamID, lgID)|
+-----------------------------------------------------+
|                                               110495|
+-----------------------------------------------------+



### Multiple Rows For A Single Player In The Same Year
Since there are multiple entries we need to check to see if duplicates exist. If we group by playerID,yearID,stint,teamID and lgID, that should show any issues.
- Multiple rows do exist for the same player on the same year
- stint increases as player hop between teams (always +1)

In [93]:
# Query to check for multiple counts for a single player in a single year
spark.sql("""
with base as (
    select 
        yearId
        ,playerID
        ,count(*) as player_count
    from batting_table
    group by 
          yearId
          ,playerId
)
select *
from base 
where player_count > 1
order by player_count desc
""").show(10)


# Checking specific player for a specific year
spark.sql("""
    select *
    from batting_table
    where yearID = 1892
        and playerID = 'dowseto01'
""").show(10)

+------+---------+------------+
|yearId| playerID|player_count|
+------+---------+------------+
|  1892|dowseto01|           5|
|  1914|chouife01|           5|
|  1904|huelsfr01|           5|
|  2018|drakeol01|           5|
|  1910|donahpa01|           4|
|  1892|kuehnbi01|           4|
|  1915|wistete01|           4|
|  1902|ohageha01|           4|
|  1971|fernafr01|           4|
|  1884|striege01|           4|
+------+---------+------------+
only showing top 10 rows

+---------+------+-----+------+----+---+---+---+---+---+---+---+---+---+----+---+---+----+---+----+----+----+
| playerID|yearID|stint|teamID|lgID|  G| AB|  R|  H| 2B| 3B| HR|RBI| SB|  CS| BB| SO| IBB|HBP|  SH|  SF|GIDP|
+---------+------+-----+------+----+---+---+---+---+---+---+---+---+---+----+---+---+----+---+----+----+----+
|dowseto01|  1892|    1|   LS3|  NL| 41|145| 10| 21|  2|  0|  0|  7|  1|NULL|  2| 15|NULL|  3|NULL|NULL|NULL|
|dowseto01|  1892|    2|   WAS|  NL|  1|  4|  0|  1|  0|  0|  0|  0|  0|NULL|  0|  0|N

## Questions 1:
**For each year, give the number of MLB players who had a batting average over .300. Only consider players with more than 50 at bats.**

- Notes:
    - With multi rows in mind: group by year, player -> then sum hits + ab -> calc batting average
    - **Rounding to 3 decimals casues the # of players with >.300 avg to change**

In [94]:
# Creating and caching temp table for batting avg for ALL players (table will be used later)
spark.sql(f"""
with base as (
    select 
        yearID
        ,playerID
        ,sum(H) as H
        ,sum(AB) as AB
    from batting_table          
    group by 
          yearID
          ,playerID
)             
select 
    *
    ,round((H / AB),3) as batting_average
from base  
""").createOrReplaceTempView('temp_ba_table')

spark.sql("""cache table temp_ba_table""").collect()


# Testing Logic for group by and aggs
spark.sql("""
select * from temp_ba_table where yearID = 1890 and playerID = 'mcmahsa01'
""").show()

spark.sql("""
select * from batting_table where yearID = 1890 and playerID = 'mcmahsa01'
""").show()

+------+---------+---+---+---------------+
|yearID| playerID|  H| AB|batting_average|
+------+---------+---+---+---------------+
|  1890|mcmahsa01| 44|214|          0.206|
+------+---------+---+---+---------------+

+---------+------+-----+------+----+---+---+---+---+---+---+---+---+---+----+---+----+----+---+----+----+----+
| playerID|yearID|stint|teamID|lgID|  G| AB|  R|  H| 2B| 3B| HR|RBI| SB|  CS| BB|  SO| IBB|HBP|  SH|  SF|GIDP|
+---------+------+-----+------+----+---+---+---+---+---+---+---+---+---+----+---+----+----+---+----+----+----+
|mcmahsa01|  1890|    1|   PH4|  AA| 49|175| 27| 40|  5|  1|  2| 19|  2|NULL|  7|NULL|NULL|  2|NULL|NULL|NULL|
|mcmahsa01|  1890|    2|   BL3|  AA| 12| 39|  4|  4|  0|  0|  0|  1|  2|NULL|  1|NULL|NULL|  0|NULL|NULL|NULL|
+---------+------+-----+------+----+---+---+---+---+---+---+---+---+---+----+---+----+----+---+----+----+----+



In [95]:
spark.sql(f"""
with ba_filter as (
        select *
        from temp_ba_table
        where batting_average > .300
                and AB > 50  
),
player_count_by_year as (
        select 
          yearID
          ,count(*) as ba_over_300
        from ba_filter
        group by yearID
        order by yearID desc
)
select * 
from player_count_by_year 
order by yearID desc
""").show(10)

+------+-----------+
|yearID|ba_over_300|
+------+-----------+
|  2021|         24|
|  2020|         40|
|  2019|         35|
|  2018|         26|
|  2017|         38|
|  2016|         37|
|  2015|         36|
|  2014|         35|
|  2013|         38|
|  2012|         46|
+------+-----------+
only showing top 10 rows



## Question 2
For each year since 1930, give the MLB player with the most home runs from each league. Please include their league, team, and batting average.
- Considerations
    - If the player played on more than 1 team in a season, give the team information for the last team he played for
        - For example, if a player gets traded from the National League to the American league, his home run total would be considered in the American League.
    - If there is a tie, include the player with the higher number of RBI’s
    - If there is still a tie, pick the player with the lower playerID

- Notes:
    - Look out for:
        1. multiple entries
            - rbis
            - hrs
            - stints 
            - batting averages (already have cached table for that)    
        2. Tie scenario give to rbi leader 
            - possilby player with the lower string value
                - solution: order by rbi desc, playerId asc

In [123]:
spark.sql(f"""
--get player stats for each year
with player_year_stats as (
    select 
        yearID
        ,playerID
        ,sum(HR) as total_hrs
        ,sum(RBI) as total_rbis
        ,max(stint) as max_stint --used to check that window func is accurate
    from batting_table
    where yearID >= 1930  
    group by 
        yearID
        ,playerID        
),   
--get the last team/stint for each year/player 
latest_team as (
    select 
        yearID 
        ,playerID
        ,lgID
        ,teamID
        ,stint
        ,dense_rank() OVER (PARTITION BY yearID, playerID ORDER BY stint desc) as stint_rank
    from batting_table
    where yearID >= 1930                       
),
--filter down result from table above to only include latest data
filtered_latest_team AS (
    select
        playerID
        ,yearID
        ,lgID
        ,teamID
        ,stint
    FROM latest_team
    WHERE stint_rank = 1
),
--join first cte with filtered data and batting averages table
join_stats_team as (
    select 
        a.yearID
        ,b.lgID
        ,a.playerId
        ,a.total_hrs
        ,a.total_rbis
        ,c.batting_average
        ,b.teamID
        ,a.max_stint
        ,b.stint
    from player_year_stats a 
    join filtered_latest_team b
        on  a.yearID = b.yearID
        and a.playerID = b.playerID       
    join temp_ba_table c
        on a.yearId = c.yearID
        and a.playerID = c.playerID
),
--for each year & league, rank hr amount -> if tie then rank by rbi or playerid
year_league_rank as (
    select
        yearID
        ,lgID
        ,teamID
        ,playerID
        ,total_hrs
        ,total_rbis
        ,batting_average
        ,stint
        ,dense_rank() over (partition by yearID, lgId order by total_hrs desc, total_rbis desc, playerID asc) as hr_rank
    from join_stats_team           
),
--filter only top player
filter_top_hr as(
    select *
    from year_league_rank
    where hr_rank = 1     
),
--join people_table to get playername
player_name_join as (
    select 
        a.*
        ,concat(b.nameFirst, ' ', b.nameLast) as playername
    from  filter_top_hr a
    join people_table b        
        on a.playerId = b.playerId
)          
          
select 
    yearID
    ,lgID
    ,playername
    ,total_hrs
    ,batting_average
    ,playerID
    ,teamID
from player_name_join
""").show(1000)

+------+----+-----------------+---------+---------------+---------+------+
|yearID|lgID|       playername|total_hrs|batting_average| playerID|teamID|
+------+----+-----------------+---------+---------------+---------+------+
|  1930|  AL|        Babe Ruth|       49|          0.359| ruthba01|   NYA|
|  1930|  NL|      Hack Wilson|       56|          0.356|wilsoha01|   CHN|
|  1931|  AL|       Lou Gehrig|       46|          0.341|gehrilo01|   NYA|
|  1931|  NL|      Chuck Klein|       31|          0.337|kleinch01|   PHI|
|  1932|  AL|      Jimmie Foxx|       58|          0.364| foxxji01|   PHA|
|  1932|  NL|      Chuck Klein|       38|          0.348|kleinch01|   PHI|
|  1933|  AL|      Jimmie Foxx|       48|          0.356| foxxji01|   PHA|
|  1933|  NL|      Chuck Klein|       28|          0.368|kleinch01|   PHI|
|  1934|  AL|       Lou Gehrig|       49|          0.363|gehrilo01|   NYA|
|  1934|  NL|          Mel Ott|       35|          0.326|  ottme01|   NY1|
|  1935|  AL|   Hank Gree

### Spot Checking 
Could not find instance where we needed to rank against playerid.
1. Check : mcgwima01 | 1997 
    - Correct HR + RBI sum
    - Correct Stint -> Team -> lgID
    - Correct batting_average
1. Check : zernigu01 | 1951 
    - Correct HR + RBI sum
    - Correct Stint -> Team -> lgID
    - Correct batting_average   

In [97]:
spark.sql(f"""select * from batting_table where playerID ='mcgwima01' and yearID = 1997""").show()
spark.sql(f"""select * from temp_ba_table where playerID ='mcgwima01' and yearID = 1997""").show()
spark.sql(f"""select yearID,playerID,sum(HR) as HR from batting_table where yearID = 1997 group by yearID,playerID order by HR desc""").show()

+---------+------+-----+------+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+
| playerID|yearID|stint|teamID|lgID|  G| AB|  R|  H| 2B| 3B| HR|RBI| SB| CS| BB| SO|IBB|HBP| SH| SF|GIDP|
+---------+------+-----+------+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+
|mcgwima01|  1997|    1|   OAK|  AL|105|366| 48|104| 24|  0| 34| 81|  1|  0| 58| 98|  8|  4|  0|  5|   9|
|mcgwima01|  1997|    2|   SLN|  NL| 51|174| 38| 44|  3|  0| 24| 42|  2|  0| 43| 61|  8|  5|  0|  2|   0|
+---------+------+-----+------+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+

+------+---------+---+---+---------------+
|yearID| playerID|  H| AB|batting_average|
+------+---------+---+---+---------------+
|  1997|mcgwima01|148|540|          0.274|
+------+---------+---+---+---------------+

+------+---------+---+
|yearID| playerID| HR|
+------+---------+---+
|  1997|mcgwima01| 58|
|  1997|griffke02| 56|
|  1997|walkela01| 49|
|  1997|m

## Question 3
What teams have had the highest slugging percentage? Your output should give the top 10 historical team slugging percentages, the year it occurred, and the team name.

Formula : ((1B) + (2Bx2) + (3Bx3) + (HRx4))/(AB)

- How do we want to handle ties?
    - Possilbe solutions:
        - Dense_rank()
        - rank()
        - row_number()
        - **limit** (simple since nothing is outlined) 
- No 1B attribute

In [99]:
spark.sql(f"""
    select 
        yearID
        ,teamID
        ,round((((sum(H) - sum(2B) - sum(3B) - sum(HR)) + (sum(2B) * 2) + (sum(3B) * 3) + (sum(HR) * 4)) / ((sum(AB)))),3) as slugging_percentage
    from batting_table
    group by 
        yearID 
        ,teamID
    order by slugging_percentage desc
    limit 10
""").show(20)

+------+------+-------------------+
|yearID|teamID|slugging_percentage|
+------+------+-------------------+
|  2019|   HOU|              0.495|
|  2019|   MIN|              0.494|
|  2003|   BOS|              0.491|
|  2019|   NYA|               0.49|
|  1930|   NYA|              0.488|
|  1927|   NYA|              0.488|
|  1997|   SEA|              0.485|
|  1894|   BSN|              0.484|
|  1996|   SEA|              0.484|
|  1994|   CLE|              0.484|
+------+------+-------------------+



### Testing Logic of formula...

In [100]:
spark.sql(f"""
    select 
        *
    from batting_table
    where yearID = 2016
          and teamID in ('BOS','SEA')
    order by AB desc
    limit 5 
""").createOrReplaceTempView('q3_test')

spark.sql(f"""cache table q3_test""").collect()
spark.sql(f"""select * from q3_test""").show()

+---------+------+-----+------+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+
| playerID|yearID|stint|teamID|lgID|  G| AB|  R|  H| 2B| 3B| HR|RBI| SB| CS| BB| SO|IBB|HBP| SH| SF|GIDP|
+---------+------+-----+------+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+
|bettsmo01|  2016|    1|   BOS|  AL|158|672|122|214| 42|  5| 31|113| 26|  4| 49| 80|  1|  2|  0|  7|  12|
| canoro01|  2016|    1|   SEA|  AL|161|655|107|195| 33|  2| 39|103|  0|  1| 47|100|  8|  8|  0|  5|  18|
|bogaexa01|  2016|    1|   BOS|  AL|157|652|115|192| 34|  1| 21| 89| 13|  4| 58|123|  0|  6|  0|  3|  14|
|pedrodu01|  2016|    1|   BOS|  AL|154|633|105|201| 36|  1| 15| 74|  7|  4| 61| 73|  0|  0|  1|  3|  24|
|seageky01|  2016|    1|   SEA|  AL|158|597| 89|166| 36|  3| 30| 99|  3|  1| 69|108| 10|  8|  0|  2|  18|
+---------+------+-----+------+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+



In [101]:
spark.sql(f"""
    select 
        yearID
        ,teamID
        ,round((((sum(H) - sum(2B) - sum(3B) - sum(HR)) + (sum(2B) * 2) + (sum(3B) * 3) + (sum(HR) * 4)) / ((sum(AB)))),3) as slugging_percentage
    from q3_test
    group by 
        yearID 
        ,teamID
    order by slugging_percentage desc
    limit 10
""").show(20)

+------+------+-------------------+
|yearID|teamID|slugging_percentage|
+------+------+-------------------+
|  2016|   SEA|              0.517|
|  2016|   BOS|              0.477|
+------+------+-------------------+



# Stop Session

In [None]:
spark.stop()