In [None]:
import os
import pyspark
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer
from pyspark.sql import Window
import re
from pyspark.sql.functions import col, lag, concat_ws
from haversine import haversine, Unit

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
import matplotlib.pyplot as plt
import pandas as pd
import pandas as pd
import plotly.express as px
import json
import requests
file_path = "Unemployment.xlsx"  # 
unemployment = pd.read_excel(file_path, sheet_name=0)

unemployment = unemployment.iloc[3:]
unemployment.columns = unemployment.iloc[0]  
unemployment = unemployment[1:]  
unemployment = unemployment[~unemployment['FIPS_Code'].astype(str).str.endswith("000")]
unemployment.columns = unemployment.columns.str.strip()
unemployment = unemployment.reset_index(drop=True)  # Reset the index

selected_columns = [
    "FIPS_Code", "State", "Area_Name",
    "Civilian_labor_force_2019", "Unemployed_2019", "Unemployment_rate_2019",
    "Civilian_labor_force_2020", "Unemployed_2020", "Unemployment_rate_2020",
    "Civilian_labor_force_2021", "Unemployed_2021", "Unemployment_rate_2021",
    "Civilian_labor_force_2022", "Unemployed_2022", "Unemployment_rate_2022"
]


unemployment = unemployment[selected_columns]

unemployment.columns = [
    "FIPS_Code", "State", "Area_Name",
    "Civilian_labor_force_2020", "Unemployed_2020", "Unemployment_rate_2020",
    "Civilian_labor_force_2021", "Unemployed_2021", "Unemployment_rate_2021",
    "Civilian_labor_force_2022", "Unemployed_2022", "Unemployment_rate_2022",
    "Civilian_labor_force_2023", "Unemployed_2023", "Unemployment_rate_2023"
]
# Reset the index
unemployment = unemployment.reset_index(drop=True)

state_abbreviations = {
    "AL": "Alabama", "AK": "Alaska", "AZ": "Arizona", "AR": "Arkansas", "CA": "California",
    "CO": "Colorado", "CT": "Connecticut", "DE": "Delaware", "FL": "Florida", "GA": "Georgia",
    "HI": "Hawaii", "ID": "Idaho", "IL": "Illinois", "IN": "Indiana", "IA": "Iowa",
    "KS": "Kansas", "KY": "Kentucky", "LA": "Louisiana", "ME": "Maine", "MD": "Maryland",
    "MA": "Massachusetts", "MI": "Michigan", "MN": "Minnesota", "MS": "Mississippi", "MO": "Missouri",
    "MT": "Montana", "NE": "Nebraska", "NV": "Nevada", "NH": "New Hampshire", "NJ": "New Jersey",
    "NM": "New Mexico", "NY": "New York", "NC": "North Carolina", "ND": "North Dakota", "OH": "Ohio",
    "OK": "Oklahoma", "OR": "Oregon", "PA": "Pennsylvania", "RI": "Rhode Island", "SC": "South Carolina",
    "SD": "South Dakota", "TN": "Tennessee", "TX": "Texas", "UT": "Utah", "VT": "Vermont",
    "VA": "Virginia", "WA": "Washington", "WV": "West Virginia", "WI": "Wisconsin", "WY": "Wyoming"
}

def expand_state_name(area_name):
    if "," in area_name:
        county, state_abbr = area_name.rsplit(", ", 1)
        state_full = state_abbreviations.get(state_abbr, state_abbr)
        return f"{county}, {state_full}"
    return area_name

unemployment["Area_Name"] = unemployment["Area_Name"].apply(expand_state_name)




unemployment

In [None]:
import pandas as pd

file_path = "lagdp1224.csv"

raw_data = pd.read_csv(file_path, skiprows=2)

raw_data = raw_data.iloc[:, :5]


cleaned_data = raw_data.dropna()
cleaned_data = cleaned_data[~cleaned_data.isin(['--']).any(axis=1)]
cleaned_data.columns = ["Area_Name", "gdp_2020", "gdp_2021", "gdp_2022","gdp_2023"]

cleaned_data

In [None]:

cleaned_data.columns = ["Area_Name", "gdp_2020", "gdp_2021", "gdp_2022", "gdp_2023"]
unemployment.columns = [
    "FIPS_Code", "State", "Area_Name",
    "Civilian_labor_force_2020", "Unemployed_2020", "Unemployment_rate_2020",
    "Civilian_labor_force_2021", "Unemployed_2021", "Unemployment_rate_2021",
    "Civilian_labor_force_2022", "Unemployed_2022", "Unemployment_rate_2022",
    "Civilian_labor_force_2023", "Unemployed_2023", "Unemployment_rate_2023"
]

merged_data = pd.merge(cleaned_data, unemployment, on="Area_Name", how="inner")

merged_data.dropna(inplace=True)

merged_data

In [None]:
file_path = "PopulationEstimates.xlsx"  # Replace with the path to your file
population = pd.read_excel(file_path, sheet_name=0)
# Remove the first 4 rows and reset column headers
population = population.iloc[3:]
population.columns = population.iloc[0]  # Set the first row as column headers
population = population[1:]  

population.columns = population.columns.str.strip()
population = population.reset_index(drop=True)  # Reset the index


columns_to_keep = ["Area_Name", "FIPStxt"]
population_columns = [col for col in population.columns if "POP_ESTIMATE_" in col]
columns_to_keep += population_columns

filtered_data = population[columns_to_keep]

filtered_data.rename(
    columns={
        f"POP_ESTIMATE_{year}": f"Population_{year}" for year in ["2020", "2021", "2022", "2023"]
    },
    inplace=True,
)

filtered_data

In [None]:

merged_data["FIPS_Code"] = merged_data["FIPS_Code"].astype(str)
filtered_data["FIPStxt"] = filtered_data["FIPStxt"].astype(str)

final_data = pd.merge(merged_data, filtered_data, left_on="FIPS_Code", right_on="FIPStxt", how="inner")
final_data.dropna(inplace=True)

final_data

In [None]:
print(final_data.columns.tolist())

In [None]:

df = final_data.loc[:, ~final_data.columns.duplicated()]

if 'Area_Name_x' in df.columns:
    df.rename(columns={'Area_Name_x': 'Area_Name'}, inplace=True)
if 'FIPS_Code' in df.columns:
    df.rename(columns={'FIPS_Code': 'FIPS_Code'}, inplace=True)
if 'State' in df.columns:
    df.rename(columns={'State': 'State'}, inplace=True)

df.drop(columns=['Area_Name_y', 'FIPStxt'], errors='ignore', inplace=True)

columns_order = ['Area_Name', 'FIPS_Code', 'State'] + [col for col in df.columns if col not in ['Area_Name', 'FIPS_Code', 'State']]
df = df[columns_order]

print(df.columns.tolist())

In [None]:
df

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace

# Initialize Spark Session
spark = SparkSession.builder.appName("ConvertColumnsToDouble").getOrCreate()

# Sample data creation
# Replace this with loading your actual data
# Assuming `df` is your Pandas DataFrame already loaded
for column in df.columns:
    if df[column].dtype in ["int64", "float64"]:
        df[column] = df[column].astype(float)
    elif df[column].dtype == "object":
        df[column] = df[column].astype(str)

# Create Spark DataFrame from the Pandas DataFrame
data = spark.createDataFrame(df)

# Define the list of columns to be converted to double
columns_to_convert = [
    'gdp_2020', 'gdp_2021', 'gdp_2022', 'gdp_2023',
    'Civilian_labor_force_2020', 'Unemployed_2020', 'Unemployment_rate_2020',
    'Civilian_labor_force_2021', 'Unemployed_2021', 'Unemployment_rate_2021',
    'Civilian_labor_force_2022', 'Unemployed_2022', 'Unemployment_rate_2022',
    'Civilian_labor_force_2023', 'Unemployed_2023', 'Unemployment_rate_2023',
    'Population_2020', 'Population_2021', 'Population_2022', 'Population_2023'
]

# Replace commas with empty strings and cast columns to double
for column in columns_to_convert:
    data = data.withColumn(column, regexp_replace(col(column), ",", ""))
    data = data.withColumn(column, col(column).cast("double"))

# Display the schema to verify types
data.printSchema()

# Show a sample of the data to verify

In [None]:
print(df.dtypes)

In [None]:
# List of columns to convert to float
columns_to_cast = [
    "gdp_2020","gdp_2021","gdp_2022","gdp_2023", 
    "Civilian_labor_force_2020", "Unemployed_2020", "Unemployment_rate_2020",
    "Civilian_labor_force_2021", "Unemployed_2021", "Unemployment_rate_2021",
    "Civilian_labor_force_2022", "Unemployed_2022", "Unemployment_rate_2022",
    "Civilian_labor_force_2023", "Unemployed_2023", "Unemployment_rate_2023",
    "Population_2020", "Population_2021", "Population_2022", "Population_2023"
]

# Convert columns to float
for column in columns_to_cast:
    if column in df.columns:
        # Handle missing values, commas, and non-numeric characters
        df[column] = df[column].replace({',': ''}, regex=True).replace('', '0').astype(float)
    else:
        print(f"Warning: Column {column} does not exist in the DataFrame")

df

In [None]:
from pyspark.sql import SparkSession, functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

spark = SparkSession.builder \
    .appName("UnemploymentRatePrediction") \
    .getOrCreate()

spark_df = spark.createDataFrame(df)

feature_columns = [
    "gdp_2020", "gdp_2021", "gdp_2022",
    "Civilian_labor_force_2020", "Civilian_labor_force_2021", "Civilian_labor_force_2022",
    "Population_2020", "Population_2021", "Population_2022"
]
label_column = "Unemployment_rate_2023"

agg_exprs = [F.avg(c).alias(f"avg_{c}") for c in feature_columns + [label_column]]
state_means = spark_df.groupBy("State").agg(*agg_exprs)

spark_df = spark_df.join(state_means, on="State", how="left")
for c in feature_columns + [label_column]:
    spark_df = spark_df.withColumn(c, F.coalesce(F.col(c), F.col(f"avg_{c}")))


assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df_assembled = assembler.transform(spark_df)

train_df, test_df = df_assembled.randomSplit([0.8, 0.2], seed=42)

lr = LinearRegression(featuresCol="features", labelCol=label_column)
lr_model = lr.fit(train_df)

# 
predictions_test = lr_model.transform(test_df)
evaluator = RegressionEvaluator(labelCol=label_column, predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions_test)
r2_evaluator = RegressionEvaluator(labelCol=label_column, predictionCol="prediction", metricName="r2")
r2 = r2_evaluator.evaluate(predictions_test)

print(f"Test RMSE: {rmse}")
print(f"Test R2: {r2}")

# 
all_predictions = lr_model.transform(df_assembled)

# 
all_count = all_predictions.count()
print(f"Number of rows in all_predictions: {all_count}")

all_predictions.select("Area_Name", "FIPS_Code", label_column, "prediction").show(50, truncate=False)

In [None]:
# from pyspark.sql import SparkSession

# # Get the active Spark session
# spark = SparkSession.getActiveSession()

# # Stop the Spark session if it's active
# if spark:
#     spark.stop()
#     print("All Spark sessions have been successfully closed.")
# else:
#     print("No active Spark sessions found.")

In [None]:
from pyspark.sql.functions import col, abs, expr

predictions_with_error = all_predictions.withColumn(
    "absolute_error",
    abs(col("prediction") - col(label_column))
)

predictions_with_error.select("Area_Name", label_column, "prediction", "absolute_error").show(50, truncate=False)

mae = predictions_with_error.selectExpr("avg(abs(prediction - Unemployment_rate_2023)) as MAE").collect()[0]["MAE"]
print(f"Test MAE: {mae}")

In [None]:
import pandas as pd
import plotly.express as px

predictions_pdf = all_predictions.select("Area_Name", label_column, "prediction").toPandas()

fig = px.scatter(
    predictions_pdf, 
    x=label_column, 
    y="prediction", 
    hover_data=["Area_Name"],
    title="Predicted vs Actual Unemployment Rate (2023)",
    labels={
        label_column: "Actual Unemployment Rate (2023)",
        "prediction": "Predicted Unemployment Rate (2023)"
    }
)

# 
fig.add_shape(
    type="line",
    x0=predictions_pdf[label_column].min(),
    y0=predictions_pdf[label_column].min(),
    x1=predictions_pdf[label_column].max(),
    y1=predictions_pdf[label_column].max(),
    line=dict(color="Red", dash="dot"),
    name="Ideal Prediction Line"
)

# 
fig.show()

In [None]:
# !pip install plotly

In [None]:
# pip install --upgrade nbformat

In [None]:
import plotly.express as px
import requests, json

predictions_pdf = all_predictions.select("FIPS_Code", "prediction", "Unemployment_rate_2023").toPandas()

predictions_pdf["FIPS_Code"] = predictions_pdf["FIPS_Code"].astype(str).str.zfill(5)
predictions_pdf["error"] = predictions_pdf["prediction"] - predictions_pdf["Unemployment_rate_2023"]

counties_url = "https://raw.githubusercontent.com/plotly/datasets/master/geojson-counties-fips.json"
counties = json.loads(requests.get(counties_url).text)

fig = px.choropleth(
    predictions_pdf,
    geojson=counties,
    locations='FIPS_Code',
    color='error',
    color_continuous_scale=["#0000FF", "#FFFFFF", "#FF0000"],
    color_continuous_midpoint=0,
    range_color=(-2, 2),
    scope="usa",
    labels={'error':'Prediction Error'}
)

fig.update_layout(
    title="County-level Unemployment Rate Prediction Error (2023)",
    margin={"r":0,"t":30,"l":0,"b":0},
    width=1200,
    height=800
)

fig.show()