In [0]:
df_agi_raw_2020 = spark.read.csv(
    "/Volumes/ets_thriventcohort/default/census/20incyallnoagi.csv",
    header=True,       
    inferSchema=True   
)
display(df_agi_raw_2020)

In [0]:
selected_columns = ["STATE", "COUNTYNAME","A00100"]
filtered_df_2020 = df_agi_raw_2020.select(*selected_columns)
renamed_df_2020 = filtered_df_2020.withColumnRenamed("A00100", "AGI2020")
display(renamed_df_2020)

In [0]:
df_agi_raw_2021 = spark.read.csv(
    "/Volumes/ets_thriventcohort/default/census/21incyallnoagi.csv",
    header=True,       
    inferSchema=True   
)
display(df_agi_raw_2021)

In [0]:
filtered_df_2021 = df_agi_raw_2021.select(*selected_columns)
renamed_df_2021 = filtered_df_2021.withColumnRenamed("A00100", "AGI2021")
display(renamed_df_2021)

In [0]:
df_agi_raw_2022 = spark.read.csv(
    "/Volumes/ets_thriventcohort/default/census/22incyallnoagi.csv",
    header=True,       
    inferSchema=True   
)
display(df_agi_raw_2022)

In [0]:
filtered_df_2022 = df_agi_raw_2022.select(*selected_columns)
renamed_df_2022 = filtered_df_2022.withColumnRenamed("A00100", "AGI2022")
display(renamed_df_2022)

In [0]:
join_keys = ["STATE", "COUNTYNAME"]

dftemp = renamed_df_2020.join(renamed_df_2021, on=join_keys, how="inner")


merged_df = dftemp.join(renamed_df_2022, on=join_keys, how="inner")
display(merged_df)


In [0]:
from pyspark.sql.functions import col

states = [
    "Alabama", "Alaska", "Arizona", "Arkansas", "California",
    "Colorado", "Connecticut", "Delaware", "Florida", "Georgia",
    "Hawaii", "Idaho", "Illinois", "Indiana", "Iowa",
    "Kansas", "Kentucky", "Louisiana", "Maine", "Maryland",
    "Massachusetts", "Michigan", "Minnesota", "Mississippi", "Missouri",
    "Montana", "Nebraska", "Nevada", "New Hampshire", "New Jersey",
    "New Mexico", "New York", "North Carolina", "North Dakota", "Ohio",
    "Oklahoma", "Oregon", "Pennsylvania", "Rhode Island", "South Carolina",
    "South Dakota", "Tennessee", "Texas", "Utah", "Vermont",
    "Virginia", "Washington", "West Virginia", "Wisconsin", "Wyoming"
]

stateless_df = merged_df.filter(~col("COUNTYNAME").isin(states))
display(stateless_df)


In [0]:
state_mapping = {
    "AL": "Alabama", "AK": "Alaska", "AZ": "Arizona", "AR": "Arkansas", "CA": "California",
    "CO": "Colorado", "CT": "Connecticut", "DE": "Delaware", "FL": "Florida", "GA": "Georgia",
    "HI": "Hawaii", "ID": "Idaho", "IL": "Illinois", "IN": "Indiana", "IA": "Iowa",
    "KS": "Kansas", "KY": "Kentucky", "LA": "Louisiana", "ME": "Maine", "MD": "Maryland",
    "MA": "Massachusetts", "MI": "Michigan", "MN": "Minnesota", "MS": "Mississippi", "MO": "Missouri",
    "MT": "Montana", "NE": "Nebraska", "NV": "Nevada", "NH": "New Hampshire", "NJ": "New Jersey",
    "NM": "New Mexico", "NY": "New York", "NC": "North Carolina", "ND": "North Dakota", "OH": "Ohio",
    "OK": "Oklahoma", "OR": "Oregon", "PA": "Pennsylvania", "RI": "Rhode Island", "SC": "South Carolina",
    "SD": "South Dakota", "TN": "Tennessee", "TX": "Texas", "UT": "Utah", "VT": "Vermont",
    "VA": "Virginia", "WA": "Washington", "WV": "West Virginia", "WI": "Wisconsin", "WY": "Wyoming"
}

from pyspark.sql.functions import create_map, lit, col
from itertools import chain

# Flatten the dictionary into key-value pairs for create_map
mapping_expr = create_map([lit(x) for x in chain(*state_mapping.items())])

# Apply the mapping
df_with_full_names = stateless_df.withColumn("FullStateName", mapping_expr[col("STATE")])

df_final = df_with_full_names.drop("State")

display(df_final)


In [0]:
df_final = df_final.filter(df_final["FullStateName"] != "Minnesota")
display(df_final)

In [0]:
mn_agi = stateless_df.filter(stateless_df["STATE"] == "MN")
display(mn_agi)

In [0]:
mn_agi = mn_agi.withColumn("FullStateName", mapping_expr[col("STATE")])

mn_agi = mn_agi.drop("State")

display(mn_agi)

In [0]:
mn_agi = mn_agi.withColumnRenamed("COUNTYNAME", "CTYNAME") \
                      .withColumnRenamed("FullStateName", "STNAME")
display(mn_agi)

In [0]:
from pyspark.sql.functions import col, array, struct, lit
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression


# Reshape to long format (year-value pairs per row)
from pyspark.sql.functions import expr
long_df = df_final.selectExpr(
    "COUNTYNAME", "FullStateName",
    "stack(3, '2020', AGI2020, '2021', AGI2021, '2022', AGI2022) as (Year, AGI)"
).withColumn("Year", col("Year").cast("int"))

# Assemble features
assembler = VectorAssembler(inputCols=["Year"], outputCol="features")
assembled_df = assembler.transform(long_df)

# Train model for each county

from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import PandasUDFType
from pyspark.ml.regression import LinearRegression

schema = StructType([
    StructField("COUNTYNAME", StringType()),
    StructField("FullStateName", StringType()),
    StructField("Predicted_AGI2027", DoubleType())
])

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def forecast_agi(pdf):
    from sklearn.linear_model import LinearRegression
    import numpy as np
    import pandas as pd

    name = pdf["COUNTYNAME"].iloc[0]
    state = pdf["FullStateName"].iloc[0]
    
    X = pdf["Year"].values.reshape(-1,1)
    y = pdf["AGI"].values
    
    if len(set(y)) > 1:
        model = LinearRegression().fit(X, y)
        pred = float(model.predict([[2027]]))
    else:
        pred = float(y.mean())

    return pd.DataFrame([[name, state, pred]])

# Group and apply forecast
forecast_df = assembled_df.groupBy("COUNTYNAME", "FullStateName").apply(forecast_agi)

# Join with original for comparison
result_df = df_final.join(forecast_df, on=["COUNTYNAME", "FullStateName"], how="left")

display(result_df)


In [0]:
from pyspark.sql.functions import floor

truncated_df = result_df.withColumn("Predicted_AGI2027", floor("Predicted_AGI2027")) 
display(truncated_df)

In [0]:
from pyspark.sql.functions import round, col

truncated_df = truncated_df.withColumn(
    "AGI_Diff_2022_to_2027",
    round(col("Predicted_AGI2027") - col("AGI2022"), 2)
)

truncated_df = truncated_df.withColumn(
    "AGI_PercChange_2022_to_2027",
    round((col("Predicted_AGI2027") - col("AGI2022")) / col("AGI2022") * 100, 2)
)
display(truncated_df)


In [0]:
renamed_df = truncated_df.withColumnRenamed("COUNTYNAME", "CTYNAME") \
                      .withColumnRenamed("FullStateName", "STNAME")
display(renamed_df)


In [0]:
df_pop_20_24 = spark.read.csv(
    "/Volumes/ets_thriventcohort/default/census/co-est2024-alldata.csv",
    header=True,       
    inferSchema=True   
)
display(df_pop_20_24)

In [0]:
selected_columns = ["STNAME", "CTYNAME","ESTIMATESBASE2020","POPESTIMATE2020","POPESTIMATE2021","POPESTIMATE2022", "POPESTIMATE2023","POPESTIMATE2024"]
filtered_df_pop = df_pop_20_24.select(*selected_columns)
display(filtered_df_pop)

In [0]:
stateless_df_pop = filtered_df_pop.filter(~col("CTYNAME").isin(states))
display(stateless_df_pop)

In [0]:
mn_pop = stateless_df_pop.filter(stateless_df_pop["STNAME"] == "Minnesota")
mn_pop = mn_pop.orderBy(col("POPESTIMATE2024").desc())
display(mn_pop)

In [0]:
combined_mn = mn_pop.join(mn_agi, ['STNAME', 'CTYNAME'], 'inner')
combined_mn = combined_mn.drop("ESTIMATESBASE2020")
display(combined_mn)

In [0]:
from pyspark.sql.functions import round, col

combined_mn = combined_mn.withColumn(
    "MN_AGI_per_capita_2022",
    round(col("AGI2022") / col("POPESTIMATE2022"), 2)
)
combined_mn = combined_mn.orderBy(col("MN_AGI_per_capita_2022").desc())
display(combined_mn)

In [0]:
new_pop = stateless_df_pop.drop("ESTIMATESBASE2020")
new_pop = new_pop.filter(new_pop["STNAME"] != "Minnesota")
display(new_pop)

In [0]:
from pyspark.sql.functions import expr

# Stack population columns into rows
stack_expr = "stack(5, '2020', POPESTIMATE2020, '2021', POPESTIMATE2021, '2022', POPESTIMATE2022, '2023', POPESTIMATE2023, '2024', POPESTIMATE2024) as (year, population)"
long_df = new_pop.select("STNAME", "CTYNAME", expr(stack_expr))
display(long_df)


In [0]:
from pyspark.sql.functions import col

# Normalize year to index (2020 = 0)
long_df = long_df.withColumn("year_index", col("year").cast("int") - 2020)
long_df = long_df.withColumn("year_squared", col("year_index") ** 2)
display(long_df)


In [0]:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from sklearn.linear_model import LinearRegression
import pandas as pd

# Define schema for output
schema = StructType([
    StructField("STNAME", StringType()),
    StructField("CTYNAME", StringType()),
    StructField("year_index", IntegerType()),
    StructField("predicted_population", DoubleType())
])

from pyspark.sql.functions import PandasUDFType
@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)

def forecast_poly(pdf: pd.DataFrame) -> pd.DataFrame:
    model = LinearRegression()
    X = pdf[["year_index", "year_squared"]]
    y = pdf["population"]
    model.fit(X, y)

    # Forecast for index 5–7 (years 2025–2027)
    future = pd.DataFrame({
        "year_index": [5, 6, 7]
    })
    future["year_squared"] = future["year_index"] ** 2
    future["predicted_population"] = model.predict(future[["year_index", "year_squared"]])
    future["STNAME"] = pdf["STNAME"].iloc[0]
    future["CTYNAME"] = pdf["CTYNAME"].iloc[0]

    return future[["STNAME", "CTYNAME", "year_index", "predicted_population"]]

# Apply grouped map UDF
forecast_df = long_df.groupby(["STNAME", "CTYNAME"]).apply(forecast_poly)
display(forecast_df)

In [0]:
from pyspark.sql.functions import col


forecast_df = forecast_df.withColumn("year", col("year_index") + 2020)

pivot_df = forecast_df.groupBy("STNAME", "CTYNAME").pivot("year").agg({"predicted_population": "first"})
display(pivot_df)

In [0]:
sorted_df = pivot_df.orderBy(["STNAME", "CTYNAME"], ascending=True)
display(sorted_df)

In [0]:
from pyspark.sql.functions import floor

truncated_df = sorted_df.withColumn("2025", floor("2025")) \
                 .withColumn("2026", floor("2026")) \
                 .withColumn("2027", floor("2027"))

display(truncated_df)

In [0]:
new_filtered_df = new_pop.join(truncated_df, ['STNAME', 'CTYNAME'], 'inner')
display(new_filtered_df)


In [0]:
from pyspark.sql.functions import round, col


new_filtered_df = new_filtered_df.withColumn(
    "Pop_Diff_2022_to_2027",
    round(col("2027") - col("POPESTIMATE2022"), 2)
)


new_filtered_df = new_filtered_df.withColumn(
    "Pop_PercChange_2022_to_2027",
    round((col("2027") - col("POPESTIMATE2022")) / col("POPESTIMATE2022") * 100, 2)
)
display(new_filtered_df)


In [0]:
combined_df = new_filtered_df.join(renamed_df, ['STNAME', 'CTYNAME'], 'inner')
display(combined_df)

In [0]:
final_df = combined_df.drop("2025", "2026")
final_df = final_df.withColumnRenamed("2027", "PREDICTEDPOP2027")
display(final_df)

In [0]:
from pyspark.sql.functions import round, col

final_df = final_df.withColumn(
    "AGI_per_capita_2027",
    round(col("Predicted_AGI2027") / col("PREDICTEDPOP2027"), 2)
)
display(final_df)

In [0]:
from pyspark.sql.functions import concat_ws

final_df = final_df.withColumn("Location", concat_ws(", ", col("CTYNAME"), col("STNAME")))


In [0]:
final_df.write.format("delta").mode("overwrite").saveAsTable("County_Population_AGI")

In [0]:
from pyspark.sql.functions import concat_ws

combined_mn = combined_mn.withColumn("Location", concat_ws(", ", col("CTYNAME"), col("STNAME")))

In [0]:
combined_mn.write.format("delta").mode("overwrite").saveAsTable("MN_County_Population_AGI")