In [1]:
import polars as pl

In [2]:
# load data

inspection = pl.read_parquet("data/clean/inspections.parquet")
violation = pl.read_parquet("data/clean/violations.parquet")
injury = pl.read_parquet("data/clean/injuries.parquet")

In [3]:
# create midwest and nationwide farm datasets

midwest = ['IL', 'IN', 'IA', 'KS', 
           'MI', 'MN', 'MO', 'NE',
           'ND', 'OH', 'SD', 'WI']

farm_inspections  = inspection.filter(
    pl.col("OPEN_DATE").is_between(pl.date(2015,1,1), pl.date(2025,12,12)) &
    (
        pl.col("NAICS_CODE").str.starts_with("111") |
        pl.col("NAICS_CODE").str.starts_with("1151")
    )
).with_columns(
    pl.when(pl.col("SITE_STATE").is_in(midwest))
      .then(pl.lit("MIDWEST"))
      .otherwise(pl.lit("NON_MIDWEST"))
      .alias("REGION_FLAG")
)

# merge inspections and violations

farm_violations = violation.join(
    farm_inspections,
        on="ACTIVITY_NR", 
        how="inner"
    ).with_columns(
    pl.when(pl.col("NR_IN_ESTAB") <= 25) # size flag
        .then(pl.lit("Small"))
    .when((pl.col("NR_IN_ESTAB") > 25) & (pl.col("NR_IN_ESTAB") <= 250))
        .then(pl.lit("Medium"))
    .when(pl.col("NR_IN_ESTAB") > 250)
        .then(pl.lit("Large"))
    .otherwise(None)
    .alias("ESTAB_SIZE")
).filter(
    (pl.col("DELETE_FLAG").is_null()) & # filter out deleted violations
    (pl.col("VIOL_TYPE") == "S") # filter to only serious violations
)

In [4]:
# calculate penalty reductions 

penalty_sum_df = (
    farm_violations
        .group_by("ACTIVITY_NR").agg([ 
            pl.sum("INITIAL_PENALTY").alias("total_initial_penalty"),
            pl.sum("CURRENT_PENALTY").alias("total_current_penalty")])
        ).with_columns([
            (pl.col("total_initial_penalty") - pl.col("total_current_penalty"))
                .alias("penalty_reduction_amount"),
            ((pl.col("total_initial_penalty") - pl.col("total_current_penalty"))
            / pl.col("total_initial_penalty") * 100).alias("penalty_pct_reduction")
    ]).with_columns(
        (pl.col("penalty_reduction_amount") > 0)
            .cast(pl.Int8)
            .alias("any_reduction")
    )

# merge back region flag, open year and establishment size 

penalty_sum = penalty_sum_df.join(
        farm_violations.select(["ACTIVITY_NR", "REGION_FLAG", 
                                "OPEN_YEAR", "ESTAB_SIZE"]).unique(),
        on="ACTIVITY_NR",
        how="left"
)

In [5]:
# Q1.a - how often does OSHA reduce penalties - midwest vs nationwide

penalty_prop_comparison_df = (
    penalty_sum
    .group_by(["OPEN_YEAR", "REGION_FLAG"])
    .agg([
        pl.mean("any_reduction").alias("prop_with_reduction"),
        pl.sum("any_reduction").alias("n_with_reduction"),
        pl.count("ACTIVITY_NR").alias("n_inspections")
    ]).with_columns(
        (pl.col("prop_with_reduction") * 100).alias("pct_with_reduction")
    )
   .pivot(
        index="OPEN_YEAR",
        on="REGION_FLAG",
        values="pct_with_reduction"
    )
    .sort("OPEN_YEAR")
)

In [6]:
# Q1.b - what if midwest had the same size make-up as nationwide? 

national_size_weights = (
    penalty_sum
    .filter(pl.col("REGION_FLAG") == "NON_MIDWEST")
    .group_by(["OPEN_YEAR", "ESTAB_SIZE"])
    .agg(pl.count("ACTIVITY_NR").alias("n"))
    .with_columns(
        (pl.col("n") / pl.sum("n")).over("OPEN_YEAR").alias("weight")
    )
    .select(["OPEN_YEAR", "ESTAB_SIZE", "weight"])
)

midwest_prop_reduction = (
    penalty_sum
    .filter(pl.col("REGION_FLAG") == "MIDWEST")
    .group_by(["OPEN_YEAR", "ESTAB_SIZE"])
    .agg(
        pl.mean("any_reduction").alias("midwest_prop_reduction"),
        pl.count("ACTIVITY_NR").alias("n_inspections")
    ).with_columns(
        (pl.col("midwest_prop_reduction") * 100).alias("midwest_pct_with_reduction")
    )
)

midwest_weighted = (
    midwest_prop_reduction.join(
        national_size_weights.select(["OPEN_YEAR", "ESTAB_SIZE", "weight"]),
        on=["OPEN_YEAR", "ESTAB_SIZE"],
        how="left"
    )
    .with_columns(
        (pl.col("midwest_pct_with_reduction") * pl.col("weight"))
        .alias("weighted_prop")
    ).group_by("OPEN_YEAR")
    .agg([
        pl.sum("weighted_prop").alias("midwest_size_adjusted_prop")
    ])
    .sort("OPEN_YEAR")
)

penalty_prop_comparison = penalty_prop_comparison_df.join(
    midwest_weighted, 
    on="OPEN_YEAR",
    how="left"
)

penalty_prop_comparison.write_csv("output/prop_with_reduction.csv")

In [157]:
# Q2 - among those who received reductions, what was the amount reduced? 

midwest_penalty_by_size = (
    penalty_sum
    .filter((pl.col("REGION_FLAG") == "MIDWEST") & 
            (pl.col("penalty_reduction_amount") > 0))
    .group_by(["OPEN_YEAR", "ESTAB_SIZE"])
    .agg(
        pl.median("penalty_pct_reduction").alias("midwest_median_reduction"),
        pl.count("ACTIVITY_NR").alias("n_inspections")
    )
)

midwest_weighted_penalty = (
    midwest_penalty_by_size.join(
        national_size_weights,
        on=["OPEN_YEAR", "ESTAB_SIZE"],
        how="left"
    )
    .with_columns(
        (pl.col("midwest_median_reduction") * pl.col("weight"))
        .alias("weighted_reduction")
    )
    .group_by("OPEN_YEAR")
    .agg(
        pl.sum("weighted_reduction").alias("MIDWEST_REDISTRIBUTED")
    )
    .sort("OPEN_YEAR")
)

penalty_reduction_comparison = (
    penalty_sum
    .filter(pl.col("penalty_reduction_amount") > 0)
    .group_by(["OPEN_YEAR", "REGION_FLAG"])
    .agg(pl.median("penalty_pct_reduction").alias("median_pct_reduction"))
    .pivot(
        index="OPEN_YEAR",
        on="REGION_FLAG",
        values="median_pct_reduction"
    )
    .sort("OPEN_YEAR")
   ).join( # merge in the redistributed midwest value
            midwest_weighted_penalty,
            on="OPEN_YEAR",
            how="left"
)

penalty_reduction_comparison.write_csv("output/pct_reduction.csv")

In [None]:
# Q3 - fatality cases

fatal_incidents = injury.filter(pl.col("DEGREE_OF_INJ") == 1)

fatal_violations = violation.join(
    fatal_incidents.select(["REL_INSP_NR", "DEGREE_OF_INJ"]).unique(),
    left_on="ACTIVITY_NR",
    right_on="REL_INSP_NR",
    how="inner"  # keep inspections with a fatality
).filter(
    (pl.col("VIOL_TYPE") == "S") & 
    (pl.col("DELETE_FLAG").is_null())
)

# calculate % penalty reduction for fatality cases 

fatal_penalty_summary = (
    fatal_violations
    .group_by("ACTIVITY_NR")
    .agg([
        pl.sum("INITIAL_PENALTY").alias("total_initial_penalty"),
        pl.sum("CURRENT_PENALTY").alias("total_current_penalty")
    ])
    .with_columns([
        (pl.col("total_initial_penalty") - pl.col("total_current_penalty"))
            .alias("penalty_reduction_amount"),
        ((pl.col("total_initial_penalty") - pl.col("total_current_penalty"))
          / pl.col("total_initial_penalty") * 100).alias("penalty_pct_reduction")
    ]).join( 
    farm_inspections,
        on="ACTIVITY_NR", 
        how="inner"
))

fat_penalty_reduction = fatal_penalty_summary.filter(
    (pl.col("REGION_FLAG") == "MIDWEST") & 
    (pl.col("penalty_reduction_amount") > 0)
).select(["total_initial_penalty", "total_current_penalty", "penalty_reduction_amount", "penalty_pct_reduction", "OPEN_YEAR"])

fat_penalty_reduction.write_csv("output/fat_penalty_reduction.csv")