In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, log, when

In [14]:
# Initialize Spark Session
print("[INFO] Initializing Spark Session...")
spark = SparkSession.builder.appName("GDP_Prediction_Preprocessing").getOrCreate()
print("[INFO] Spark Session created successfully.")


[INFO] Initializing Spark Session...


PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.

In [None]:
# Define file paths
file_paths = {
    "Capital Stock": "data/raw/CapitalStockData.csv",
    "Human Capital": "data/raw/Human_Capital_Data.csv",
    "Energy Use": "data/raw/energy_use.csv",
    "Labor Force": "data/raw/labor_force.csv",
    "Patents": "data/raw/patents_res_nonres.csv",
    "R&D Expenditure": "data/raw/R&D.csv",
    "Unemployment": "data/raw/unemployed_ilo_estimate.csv"
}

# Load datasets
dfs = {}
for name, path in file_paths.items():
    print(f"[INFO] Loading dataset: {name} from {path}")
    df = spark.read.csv(path, header=True, inferSchema=True)
    print(f"[INFO] {name} loaded successfully with {df.count()} rows and {len(df.columns)} columns.")
    df.printSchema()
    dfs[name] = df

In [None]:
# Check for missing values
for name, df in dfs.items():
    print(f"\n[DEBUG] Checking missing values for: {name}")
    df.select([(col(c).isNull().cast("int")).alias(c) for c in df.columns]).summary("count").show()

# Standardize column names and handle missing values
for name in dfs:
    print(f"[INFO] Standardizing column names and handling missing values for: {name}")
    df = dfs[name]

    # Rename columns (remove spaces, convert to lowercase)
    df = df.select([col(c).alias(c.replace(" ", "_").lower()) for c in df.columns])

    # Fill missing values with column mean (modify if needed)
    for column in df.columns:
        df = df.withColumn(column, when(col(column).isNull(), df.agg({column: "mean"}).collect()[0][0]).otherwise(col(column)))

    dfs[name] = df
    print(f"[INFO] Standardization and missing value handling completed for {name}")

# Merge datasets on 'Country' and 'Year'
print("[INFO] Merging datasets...")
from functools import reduce
merged_df = reduce(lambda left, right: left.join(right, ["country", "year"], "inner"), dfs.values())
print(f"[INFO] Merged dataset has {merged_df.count()} rows and {len(merged_df.columns)} columns.")

# Apply log transformation for Cobb-Douglas function
print("[INFO] Applying log transformation...")
log_columns = ["gdp", "capital_stock", "labor_force", "energy_use", "patents", "r&d_expenditure", "unemployment"]
for col_name in log_columns:
    merged_df = merged_df.withColumn(f"log_{col_name}", log(col(col_name)))

# Save preprocessed data
output_path = "path_to/preprocessed_gdp_data.csv"
print(f"[INFO] Saving preprocessed data to {output_path}...")
merged_df.write.csv(output_path, header=True, mode="overwrite")
print("[SUCCESS] Preprocessing completed successfully!")
