In [3]:
%pip install "pyspark==3.5.7"

Collecting pyspark<4
  Downloading pyspark-3.5.7.tar.gz (317.4 MB)
     -------------------------------------- 317.4/317.4 MB 2.5 MB/s eta 0:00:00
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting py4j==0.10.9.7
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
     -------------------------------------- 200.5/200.5 kB 3.0 MB/s eta 0:00:00
Using legacy 'setup.py install' for pyspark, since package 'wheel' is not installed.
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py4j 0.10.9.9
    Uninstalling py4j-0.10.9.9:
      Successfully uninstalled py4j-0.10.9.9
  Attempting uninstall: pyspark
    Found existing installation: pyspark 4.0.1
    Uninstalling pyspark-4.0.1:
      Successfully uninstalled pyspark-4.0.1
  Running setup.py install for pyspark: started
  Running setup.py install for pyspark: finished with status 'done'
Successfully installed py4j-0.10


[notice] A new release of pip available: 22.2.2 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [1]:
import os

os.environ["HADOOP_HOME"] = r"D:\zehro\Desktop\PlaxidityX-Assignment\hadoop"
os.environ["PATH"] += r";D:\zehro\Desktop\PlaxidityX-Assignment\hadoop\bin"

In [None]:
from pyspark.sql import SparkSession, functions as F

spark = (
    SparkSession.builder
    .appName("DEV-BaseballAnalytics")
    .master("local[2]")
    .config("spark.ui.port", "4060")
    .config("spark.jars.packages", "mysql:mysql-connector-java:8.0.33")
    .getOrCreate()
)

spark.conf.set("spark.sql.shuffle.partitions", "20")

In [3]:
jdbc_url = "jdbc:mysql://localhost:3306/lahman2016"
db_props = {
    "user": "root",
    "password": "root",
    "driver": "com.mysql.cj.jdbc.Driver"
}

def read_mysql_table(table_name: str):
    return (spark.read
        .format("jdbc")
        .option("url", jdbc_url)
        .option("dbtable", table_name)
        .option("user", db_props["user"])
        .option("password", db_props["password"])
        .option("driver", db_props["driver"])
        .load()
        )
    
TABLES = {
    "salaries": "Salaries",
    "fielding": "Fielding",
    "pitching": "Pitching",
    "pitching_post": "PitchingPost",
    "allstar": "AllstarFull",
    "hallofame": "HallOfFame",
    "teams": "Teams",
}

salaries_df = read_mysql_table(TABLES["salaries"])
fielding_df = read_mysql_table(TABLES["fielding"])
pitching_df = read_mysql_table(TABLES["pitching"])
pitching_post_df = read_mysql_table(TABLES["pitching_post"])
allstar_df = read_mysql_table(TABLES["allstar"])
hof_df = read_mysql_table(TABLES["hallofame"])
teams_df = read_mysql_table(TABLES["teams"])


In [4]:
INFIELD_POSITIONS = ["1B", "2B", "3B", "SS"]


players_positions = fielding_df.withColumn(
        "group",
        F.when(F.col("POS").isin(INFIELD_POSITIONS), "infielder")
         .when(F.col("POS") == "P", "pitcher")
    ).where(
        F.col("group").isNotNull()
    ).select(
        'playerID', 'yearID', 'group'
    )


In [9]:

salary_positions = salaries_df.alias('a').join(
    players_positions.alias('b'), ["playerID", "yearID"],
    ).select(
        'a.salary', 'b.yearID', 'b.group'
    )

avg_salary = (
    salary_positions
    .groupBy("yearID", "group")
    .agg(F.avg("salary").alias("avg_salary"))
    .orderBy("yearID", "group")
).persist()

avg_salary.show(5)

+------+---------+------------------+
|yearID|    group|        avg_salary|
+------+---------+------------------+
|  1985|infielder|          459512.8|
|  1985|  pitcher|462030.31623931625|
|  1986|infielder| 409187.4285714286|
|  1986|  pitcher| 391221.0895953757|
|  1987|infielder| 395295.7682291667|
+------+---------+------------------+
only showing top 5 rows



In [15]:
avg_salary_pivot = (
    avg_salary
    .groupBy("yearid")
    .pivot("group", ["infielder", "pitcher"])
    .agg(F.first("avg_salary"))
    .orderBy("yearid")
)

avg_salary_pivot_formatted = avg_salary_pivot.withColumn(
    "Fielding", F.format_number("infielder", 0)
    ).withColumn(
    "Pitching", F.format_number("pitcher", 0)
    ).select(
        F.col('yearid').alias('Year'), 'Fielding', 'Pitching'
    )
    
avg_salary_pivot_formatted.show()

+----+---------+---------+
|Year| Fielding| Pitching|
+----+---------+---------+
|1985|  459,513|  462,030|
|1986|  409,187|  391,221|
|1987|  395,296|  416,934|
|1988|  418,353|  434,296|
|1989|  414,880|  482,678|
|1990|  510,073|  485,947|
|1991|  729,348|  893,026|
|1992|  863,854|1,081,095|
|1993|  818,053|  943,157|
|1994|  859,466|  958,085|
|1995|  866,347|  833,636|
|1996|  960,244|  817,770|
|1997|1,138,731|1,014,898|
|1998|1,080,729|1,118,212|
|1999|1,191,822|1,386,647|
|2000|1,545,218|1,802,145|
|2001|1,902,014|2,046,170|
|2002|1,889,677|2,138,134|
|2003|1,945,646|2,133,800|
|2004|2,077,279|2,122,075|
+----+---------+---------+
only showing top 20 rows



In [17]:
avg_salary.show()

+------+---------+------------------+
|yearID|    group|        avg_salary|
+------+---------+------------------+
|  1985|infielder|          459512.8|
|  1985|  pitcher|462030.31623931625|
|  1986|infielder| 409187.4285714286|
|  1986|  pitcher| 391221.0895953757|
|  1987|infielder| 395295.7682291667|
|  1987|  pitcher|416934.00340136053|
|  1988|infielder|418353.33007334964|
|  1988|  pitcher|434296.29931972787|
|  1989|infielder|  414879.793911007|
|  1989|  pitcher| 482678.3461538461|
|  1990|infielder| 510072.9520547945|
|  1990|  pitcher| 485947.2487437186|
|  1991|infielder| 729347.8961748633|
|  1991|  pitcher| 893026.4953271028|
|  1992|infielder| 863853.5205811138|
|  1992|  pitcher|1081095.2676056337|
|  1993|infielder| 818052.9783889981|
|  1993|  pitcher| 943156.7373068433|
|  1994|infielder| 859466.1241534989|
|  1994|  pitcher| 958084.8592233009|
+------+---------+------------------+
only showing top 20 rows



In [24]:
hof_players = (
    hof_df
    .filter(F.col("inducted") == "Y")
    .select("playerID", "yearID")
    .withColumnRenamed("yearID", "induction_year")
)

pitching_with_era = pitching_df.select("playerID", "yearID", "ERA")

hof_pitchers = hof_players.join(
    pitching_with_era, 'playerID'
)

hof_pitchers.show(3)


+---------+--------------+------+----+
| playerID|induction_year|yearID| ERA|
+---------+--------------+------+----+
|whitede01|          2013|  1876| 0.0|
|welchmi01|          1973|  1880|2.54|
|welchmi01|          1973|  1881|2.67|
+---------+--------------+------+----+
only showing top 3 rows



In [36]:

# 2. All-star appearances
allstar_appearances = allstar_df.select(
    "playerID", "yearID"
    ).groupBy(
        "playerID", "yearID"
    ).agg(
        F.count("*").alias("allstar_appearances")
    )

hof_pitchers_allstar = hof_pitchers.join(
    allstar_appearances, ['playerID', 'yearID'], 'left'
    ).withColumn(
        "allstar_appearances", F.coalesce(F.col("allstar_appearances"), F.lit(0))
    )

# hof_pitchers_allstar.show(3)
result = (
    hof_pitchers_allstar
    .groupBy("playerID")
    .agg(
        F.min(F.col("induction_year")).alias("Hall of Fame Induction Year"),
        (F.sum(F.when(F.col("allstar_appearances") > 0, F.col("ERA")).otherwise(0)) /
        F.sum(F.when(F.col("allstar_appearances") > 0, 1).otherwise(0))).alias("ERA"),
        F.sum("allstar_appearances").alias("# All Star Appearances")
    )
)

result_formatted = result.withColumnRenamed(
    "playerID", "Player"
).select(
    "Player", "ERA", "# All Star Appearances" , "Hall of Fame Induction Year"
)

result_formatted.show(10)

+---------+------------------+----------------------+---------------------------+
|   Player|               ERA|# All Star Appearances|Hall of Fame Induction Year|
+---------+------------------+----------------------+---------------------------+
|whitede01|              NULL|                     0|                       2013|
|welchmi01|              NULL|                     0|                       1973|
|orourji01|              NULL|                     0|                       1945|
|willivi01|              NULL|                     0|                       1995|
|mathech01|              NULL|                     0|                       1936|
|johnswa01|              NULL|                     0|                       1936|
|speaktr01|              NULL|                     0|                       1937|
|newhoha01|2.4771428571428578|                     7|                       1992|
|gibsobo01|2.5275000000000003|                     9|                       1981|
|seaveto01|2.498

In [None]:
from pyspark.sql import Window

regular_season = (
    pitching_df
    .withColumn("win_loss", F.col("W") / (F.col("W") + F.col("L")))
    .select("playerID", "yearID", "ERA", "win_loss")
)

post_season = (
    pitching_post_df
    .withColumn("post_win_loss", F.col("W") / (F.col("W") + F.col("L")))
    .select("playerID", "yearID", "ERA", "post_win_loss")
    .withColumnRenamed("ERA", "post_ERA")
)

window = (
    Window.partitionBy("yearID")
        .orderBy(F.col("ERA").asc())
)

top_pitchers = (
    regular_season
    .withColumn("rank", F.row_number().over(window))
    .filter(F.col("rank") <= 10)
    .drop("rank")
)

joined = top_pitchers.join(
    post_season, ["playerID", "yearID"], "left"
)

result = (
    joined
    .groupBy("yearID")
    .agg(
        F.round(F.avg("ERA"), 2).alias("avg_regular_ERA"),
        (100*F.round(F.avg("win_loss"), 2)).cast('int').alias("avg_regular_winloss"),
        (F.sum(F.when(F.col("post_ERA").isNotNull(), F.col("post_ERA")).otherwise(0)) /
        F.sum(F.when(F.col("post_ERA").isNotNull(), 1).otherwise(0))).alias("avg_post_ERA"),
        (100*F.sum(F.when(F.col("post_win_loss").isNotNull(), F.col("post_win_loss")).otherwise(0)) /
        F.sum(F.when(F.col("post_win_loss").isNotNull(), 1).otherwise(0))).cast('int').alias("avg_post_winloss"),
    )
    .orderBy("yearID")
)

result_formatted = result.select(
    F.col("yearID").alias("Year"),
    F.col("avg_regular_ERA").alias("Regular Season ERA"),
    F.col("avg_regular_winloss").alias("Regular Season Win/Loss"),
    F.col("avg_post_ERA").alias("Post-season ERA"),
    F.col("avg_post_winloss").alias("Post-season Win/Loss"),
)

result_formatted.show(100, truncate=False)

+----+------------------+-----------------------+------------------+--------------------+
|Year|Regular Season ERA|Regular Season Win/Loss|Post-season ERA   |Post-season Win/Loss|
+----+------------------+-----------------------+------------------+--------------------+
|1871|3.16              |54                     |NULL              |NULL                |
|1872|2.74              |69                     |NULL              |NULL                |
|1873|2.36              |48                     |NULL              |NULL                |
|1874|2.77              |40                     |NULL              |NULL                |
|1875|1.52              |71                     |NULL              |NULL                |
|1876|0.38              |48                     |NULL              |NULL                |
|1877|2.57              |62                     |NULL              |NULL                |
|1878|1.96              |50                     |NULL              |NULL                |
|1879|2.11

+-------+----+----+-------+
|Team ID|Year|Rank|At Bats|
+-------+----+----+-------+
|    PH1|1871|   1|   1281|
|    RC1|1871|   9|   1036|
|    BS1|1872|   1|   2137|
|    WS4|1872|  11|    460|
|    BS1|1873|   1|   2755|
|    BL4|1873|   9|    211|
|    BS1|1874|   1|   3122|
|    BL1|1874|   8|   1781|
|    BS1|1875|   1|   3516|
|    KEO|1875|  13|    449|
|    CHN|1876|   1|   2748|
|    CN1|1876|   8|   2372|
|    BSN|1877|   1|   2368|
|    CN1|1877|   6|   2135|
|    BSN|1878|   1|   2220|
|    ML2|1878|   6|   2212|
|    PRO|1879|   1|   3392|
|    TRN|1879|   8|   2841|
|    CHN|1880|   1|   3135|
|    CN1|1880|   8|   2895|
+-------+----+----+-------+
only showing top 20 rows

