In [17]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import os

spark = SparkSession.builder \
    .appName("StackOverflow Survey Analysis") \
    .getOrCreate()

def filter_columns(df, year):
    """
    Filter and rename columns to keep only the relevant ones:
    - industry
    - employment
    - gender
    - education
    - salary
    - experience
    - technologies/languages
    
    Returns a dataframe with standardized column names.
    """
    # First, print all available columns to see what we have
    print(f"Available columns for year 20{year}:")
    for col_name in df.columns:
        print(f"  - {col_name}")
    print("\n")
    
    # Different naming conventions across years
    column_mappings = {
        # 2020
        20: {
            'industry': 'Industry', 
            'employment': 'DevType',
            'gender': 'Gender',
            'education': 'EdLevel',
            'salary': 'ConvertedComp',
            'experience': 'YearsCode',
            'technologies': 'LanguageWorkedWith'
        },
        # 2021
        21: {
            'industry': 'Industry', 
            'employment': 'DevType',
            'gender': 'Gender',
            'education': 'EdLevel',
            'salary': 'ConvertedComp',
            'experience': 'YearsCode',
            'technologies': 'LanguageWorkedWith'
        },
        # 2022
        22: {
            'industry': 'Industry', 
            'employment': 'DevType',
            'gender': 'Gender',
            'education': 'EdLevel',
            'salary': 'ConvertedComp',
            'experience': 'YearsCode',
            'technologies': 'LanguageHaveWorkedWith'
        },
        # 2023
        23: {
            'industry': 'Industry', 
            'employment': 'DevType',
            'gender': 'Gender',
            'education': 'EdLevel',
            'salary': 'ConvertedCompYearly',
            'experience': 'YearsCode',
            'technologies': 'LanguageHaveWorkedWith'
        },
        # 2024
        24: {
            'industry': 'Industry', 
            'employment': 'DevType',
            'gender': 'Gender',
            'education': 'EdLevel',
            'salary': 'ConvertedCompYearly',
            'experience': 'YearsCode',
            'technologies': 'LanguageHaveWorkedWith'
        }
    }
    
    if year in column_mappings:
        mapping = column_mappings[year]
        columns_to_select = []
        
        for std_name, orig_name in mapping.items():
            if orig_name in df.columns:
                columns_to_select.append(col(orig_name).alias(std_name))
            else:
                print(f"Warning: Column {orig_name} not found for year 20{year}")
        
        # Add year as a column
        columns_to_select.append(lit(f"20{year}").alias("survey_year"))
        
        if columns_to_select:
            return df.select(columns_to_select)
        else:
            print(f"No matching columns found for year 20{year}")
            return df
    else:
        print(f"No column mapping defined for year 20{year}")
        return df

def load_csvs():
    """Load and process StackOverflow survey data CSV files"""
    dfs = {}
    
    for i in range(20, 25):
        # Check if file exists before trying to load it
        file_path = f'../bronze/stack_overflow_data/survey_results_public{i}.csv'
        
        try:
            # Print the path we're trying to load
            print(f"Attempting to load file: {file_path}")
            
            # Read the CSV file
            df = spark.read.option("header", "true") \
                          .option("inferSchema", "true") \
                          .csv(file_path)
            
            # Filter and standardize columns
            df_filtered = filter_columns(df, i)
            
            # Store in our dictionary
            dfs[f'df_{i}'] = df_filtered
            
            print(f'Successfully processed data for year 20{i}')
            print(f'Number of rows and columns in processed df_{i}: ({df_filtered.count()}, {len(df_filtered.columns)})')
            print(f'Selected columns:')
            for col_name in df_filtered.columns:
                print(f'  - {col_name}')
            print(f'Preview of processed data:')
            df_filtered.show(5, truncate=False)
            print('\n' + '-'*50 + '\n')
            
        except FileNotFoundError:
            print(f'Warning: File {file_path} not found')
        except Exception as e:
            # Print more detailed error information
            import traceback
            print(f'Error processing file {file_path}:')
            print(f'Error type: {type(e).__name__}')
            print(f'Error message: {str(e)}')
            print('Traceback:')
            traceback.print_exc()
            print('\n' + '-'*50 + '\n')
    
    return dfs


In [18]:
load_csvs()

Attempting to load file: ../bronze/stack_overflow_data/survey_results_public20.csv
Available columns for year 2020:
  - Respondent
  - MainBranch
  - Hobbyist
  - Age
  - Age1stCode
  - CompFreq
  - CompTotal
  - ConvertedComp
  - Country
  - CurrencyDesc
  - CurrencySymbol
  - DatabaseDesireNextYear
  - DatabaseWorkedWith
  - DevType
  - EdLevel
  - Employment
  - Ethnicity
  - Gender
  - JobFactors
  - JobSat
  - JobSeek
  - LanguageDesireNextYear
  - LanguageWorkedWith
  - MiscTechDesireNextYear
  - MiscTechWorkedWith
  - NEWCollabToolsDesireNextYear
  - NEWCollabToolsWorkedWith
  - NEWDevOps
  - NEWDevOpsImpt
  - NEWEdImpt
  - NEWJobHunt
  - NEWJobHuntResearch
  - NEWLearn
  - NEWOffTopic
  - NEWOnboardGood
  - NEWOtherComms
  - NEWOvertime
  - NEWPurchaseResearch
  - NEWPurpleLink
  - NEWSOSites
  - NEWStuck
  - OpSys
  - OrgSize
  - PlatformDesireNextYear
  - PlatformWorkedWith
  - PurchaseWhat
  - Sexuality
  - SOAccount
  - SOComm
  - SOPartFreq
  - SOVisitFreq
  - SurveyEase
 

{'df_20': DataFrame[employment: string, gender: string, education: string, salary: string, experience: string, technologies: string, survey_year: string],
 'df_21': DataFrame[employment: string, gender: string, education: string, experience: string, survey_year: string],
 'df_22': DataFrame[employment: string, gender: string, education: string, experience: string, technologies: string, survey_year: string],
 'df_23': DataFrame[industry: string, employment: string, education: string, salary: string, experience: string, technologies: string, survey_year: string],
 'df_24': DataFrame[industry: string, employment: string, education: string, salary: string, experience: string, technologies: string, survey_year: string]}