# H1B Dataset Cleaning

In [None]:
import pandas_gbq

project_id = 'h1b-analytics'
table_name = 'h1b-analytics.h1b_raw_data.h1b_2025'

# Construct the SQL query
sql_query = f"SELECT * FROM `{table_name}`"

# Load data from BigQuery into a pandas DataFrame
df = pandas_gbq.read_gbq(sql_query, project_id=project_id)

# Display the first 5 rows of the DataFrame
display(df.head())

Downloading: 100%|[32m██████████[0m|


Unnamed: 0,_airbyte_raw_id,_airbyte_extracted_at,_airbyte_meta,_airbyte_generation_id,Tax_ID,Line_by_line,Amended_Denial,Fiscal_Year_,Petitioner_City,Amended_Approval,...,Industry__NAICS__Code,New_Concurrent_Denial,New_Employment_Denial,New_Concurrent_Approval,New_Employment_Approval,Change_of_Employer_Denial,Employer__Petitioner__Name,Change_of_Employer_Approval,Change_with_Same_Employer_Denial,Change_with_Same_Employer_Approval
0,019c2019-c954-73f0-b0ce-c8f8e9e8c7d1,2026-02-02 20:44:34.101000+00:00,"{""changes"":[],""sync_id"":1}",1,1419.0,58132,0,2025,IRVING,0,...,"54 - Professional, Scientific, and Technical S...",0,0,0,1,0,VAAKRUTHI SOLUTIONS INC,0,0,0
1,019c2019-c727-7459-92c8-c9d1947ff3d6,2026-02-02 20:44:34.101000+00:00,"{""changes"":[],""sync_id"":1}",1,5389.0,13547,0,2025,FARMINGTON HILLS,0,...,"54 - Professional, Scientific, and Technical S...",0,0,0,0,0,COPPER DART INC,0,0,0
2,019c2019-c95e-75e8-aa45-4447106f178c,2026-02-02 20:44:34.101000+00:00,"{""changes"":[],""sync_id"":1}",1,8337.0,59665,0,2025,MCKINNEY,0,...,"54 - Professional, Scientific, and Technical S...",0,0,0,1,0,VYORA INTERNATIONAL LLC,0,0,0
3,019c2019-c700-79e2-9709-13c8d1d987dd,2026-02-02 20:44:34.101000+00:00,"{""changes"":[],""sync_id"":1}",1,747.0,7726,0,2025,MARLBOROUGH,0,...,44-45 - Retail Trade,0,0,0,0,0,"BJ'S WHOLESALE CLUB, INC.",1,0,0
4,019c2019-c843-7fb9-b9a1-1b29b3018638,2026-02-02 20:44:34.101000+00:00,"{""changes"":[],""sync_id"":1}",1,9502.0,37592,0,2025,PRINCETON,0,...,"54 - Professional, Scientific, and Technical S...",0,0,0,1,0,NEUREALM EKA GAVS TECHNLOGIES N.A. INC,0,0,0


## Pyspark setup

In [None]:
# Verify Java Installation and JAVA_HOME
!java -version
!echo "JAVA_HOME is set to: $JAVA_HOME"
!ls -l $JAVA_HOME/bin/java

openjdk version "17.0.18" 2026-01-20
OpenJDK Runtime Environment (build 17.0.18+8-Ubuntu-122.04.1)
OpenJDK 64-Bit Server VM (build 17.0.18+8-Ubuntu-122.04.1, mixed mode, sharing)
JAVA_HOME is set to: 
lrwxrwxrwx 1 root root 22 Dec  9 14:17 /bin/java -> /etc/alternatives/java


In [None]:
# Install Java Development Kit 17 (Spark requires Java)
!apt-get update -qq > /dev/null
!apt-get install openjdk-17-jdk-headless -qq > /dev/null

W: https://packages.cloud.google.com/apt/dists/gcsfuse-jammy/InRelease: Key is stored in legacy trusted.gpg keyring (/etc/apt/trusted.gpg), see the DEPRECATION section in apt-key(8) for details.
W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)


In [None]:
# Download and install Apache Spark (version 3.5.0)
!wget -q https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz
!pip install -q findspark pyspark

In [None]:
# Set up environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"
!export JAVA_HOME="/usr/lib/jvm/java-17-openjdk-amd64"
os.environ["PYSPARK_SUBMIT_ARGS"] = "--driver-memory 2g --executor-memory 2g --conf spark.driver.maxResultSize=2g pyspark-shell"

print("JAVA_HOME (os.environ):", os.environ.get("JAVA_HOME"))
!echo "JAVA_HOME (shell): $JAVA_HOME"

JAVA_HOME (os.environ): /usr/lib/jvm/java-17-openjdk-amd64
JAVA_HOME (shell): /usr/lib/jvm/java-17-openjdk-amd64


## Initialize Pyspark and Ingest Data

In [None]:
# Initialize findspark
import findspark
findspark.init()

In [None]:
# # Start a SparkSession, excluding external connectors for troubleshooting
# from pyspark.sql import SparkSession
# spark = SparkSession.builder \
#     .master("local[*]") \
#     .appName("Colab_PySpark") \
#     .getOrCreate()

In [None]:
# Download Spark BigQuery Connector JAR
!wget https://repo1.maven.org/maven2/com/google/cloud/spark/spark-bigquery-with-dependencies_2.12/0.36.1/spark-bigquery-with-dependencies_2.12-0.36.1.jar

# Verify download
!echo "Verifying JAR files:"
!ls -l spark-bigquery-with-dependencies_2.12-0.36.1.jar

--2026-02-07 01:54:42--  https://repo1.maven.org/maven2/com/google/cloud/spark/spark-bigquery-with-dependencies_2.12/0.36.1/spark-bigquery-with-dependencies_2.12-0.36.1.jar
Resolving repo1.maven.org (repo1.maven.org)... 104.18.19.12, 104.18.18.12, 2606:4700::6812:130c, ...
Connecting to repo1.maven.org (repo1.maven.org)|104.18.19.12|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 51452424 (49M) [application/java-archive]
Saving to: ‘spark-bigquery-with-dependencies_2.12-0.36.1.jar.7’


2026-02-07 01:54:42 (232 MB/s) - ‘spark-bigquery-with-dependencies_2.12-0.36.1.jar.7’ saved [51452424/51452424]

Verifying JAR files:
-rw-r--r-- 1 root root 51452424 Jan 31  2024 spark-bigquery-with-dependencies_2.12-0.36.1.jar


In [None]:
# Start a SparkSession, using explicitly downloaded JAR
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Colab_PySpark") \
    .config("spark.jars", "spark-bigquery-with-dependencies_2.12-0.36.1.jar") \
    .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .config("spark.hadoop.fs.gs.auth.service.account.enable", "true") \
    .config("spark.hadoop.google.cloud.project.id", project_id) \
    .getOrCreate()

In [None]:
# Verify SparkSession
print(spark.version)
print(spark.catalog.listTables())

3.5.0
[]


In [None]:
# Convert the pandas DataFrame to a PySpark DataFrame
spark_df = spark.createDataFrame(df)

In [None]:
# Display the schema and first few rows of the PySpark DataFrame
spark_df.printSchema()

root
 |-- _airbyte_raw_id: string (nullable = true)
 |-- _airbyte_extracted_at: timestamp (nullable = true)
 |-- _airbyte_meta: string (nullable = true)
 |-- _airbyte_generation_id: long (nullable = true)
 |-- Tax_ID: decimal(38,18) (nullable = true)
 |-- Line_by_line: string (nullable = true)
 |-- Amended_Denial: long (nullable = true)
 |-- Fiscal_Year_: long (nullable = true)
 |-- Petitioner_City: string (nullable = true)
 |-- Amended_Approval: long (nullable = true)
 |-- Petitioner_State: string (nullable = true)
 |-- Continuation_Denial: long (nullable = true)
 |-- Petitioner_Zip_Code: decimal(38,18) (nullable = true)
 |-- Continuation_Approval: long (nullable = true)
 |-- Industry__NAICS__Code: string (nullable = true)
 |-- New_Concurrent_Denial: long (nullable = true)
 |-- New_Employment_Denial: long (nullable = true)
 |-- New_Concurrent_Approval: long (nullable = true)
 |-- New_Employment_Approval: long (nullable = true)
 |-- Change_of_Employer_Denial: long (nullable = true)
 |-

# Creating Silver Layer

Aim for silver layer
1. Selection of wanted columns and renaming it
2. Employer Standardization
3. Type casting
4. Handling missing values

## Columns Standardization

In [None]:
col_to_remove = [
    "_airbyte_raw_id",
    "_airbyte_extracted_at",
    "_airbyte_meta",
    "_airbyte_generation_id",
    "Line_by_line"
]

In [None]:
df = spark_df

The `df.drop()` method in PySpark expects individual column names as separate arguments, not a list of column names. To drop columns using a list, you can use the splat operator (`*`) to unpack the list into separate arguments.

In [None]:
# Assuming 'df' is your PySpark DataFrame and 'col_to_remove' is your list of columns
# to remove. The previous attempt failed because df.drop() expects individual column
# arguments, not a list. The '*' operator unpacks the list into separate arguments.
silver_df = spark_df.drop(*col_to_remove)

In [None]:
# Display the schema of the new DataFrame to confirm columns were dropped
silver_df.printSchema()

root
 |-- Tax_ID: decimal(38,18) (nullable = true)
 |-- Amended_Denial: long (nullable = true)
 |-- Fiscal_Year_: long (nullable = true)
 |-- Petitioner_City: string (nullable = true)
 |-- Amended_Approval: long (nullable = true)
 |-- Petitioner_State: string (nullable = true)
 |-- Continuation_Denial: long (nullable = true)
 |-- Petitioner_Zip_Code: decimal(38,18) (nullable = true)
 |-- Continuation_Approval: long (nullable = true)
 |-- Industry__NAICS__Code: string (nullable = true)
 |-- New_Concurrent_Denial: long (nullable = true)
 |-- New_Employment_Denial: long (nullable = true)
 |-- New_Concurrent_Approval: long (nullable = true)
 |-- New_Employment_Approval: long (nullable = true)
 |-- Change_of_Employer_Denial: long (nullable = true)
 |-- Employer__Petitioner__Name: string (nullable = true)
 |-- Change_of_Employer_Approval: long (nullable = true)
 |-- Change_with_Same_Employer_Denial: long (nullable = true)
 |-- Change_with_Same_Employer_Approval: long (nullable = true)



### Renaming the columns

In [None]:

# Create a dictionary where { "Old_Name": "new_name" }
rename_map = {
    "Tax_ID": "tax_id",
    "Amended_Denial": "amended_denial",
    "Fiscal_Year_": "fiscal_year",
    "Petitioner_City": "city",
    "Amended_Approval": "amended_approval",
    "Petitioner_State": "state",
    "Continuation_Denial": "continuation_denial",
    "Petitioner_Zip_Code": "zip_code",
    "Continuation_Approval": "continuation_approval",
    "Industry__NAICS__Code": "industry_naics_code",
    "New_Concurrent_Denial": "new_concurrent_denial",
    "New_Employment_Denial": "new_employment_denial",
    "New_Concurrent_Approval": "new_concurrent_approval",
    "New_Employment_Approval": "new_employment_approval",
    "Change_of_Employer_Denial": "change_of_employer_denial",
    "Employer__Petitioner__Name": "employer_name",
    "Change_of_Employer_Approval": "change_of_employer_approval",
    "Change_with_Same_Employer_Denial": "same_employer_denial",
    "Change_with_Same_Employer_Approval": "same_employer_approval"
}

# Apply the rename using a loop
for old_name, new_name in rename_map.items():
    silver_df = silver_df.withColumnRenamed(old_name, new_name)

# Check your new, clean schema
silver_df.printSchema()

root
 |-- tax_id: decimal(38,18) (nullable = true)
 |-- amended_denial: long (nullable = true)
 |-- fiscal_year: long (nullable = true)
 |-- city: string (nullable = true)
 |-- amended_approval: long (nullable = true)
 |-- state: string (nullable = true)
 |-- continuation_denial: long (nullable = true)
 |-- zip_code: decimal(38,18) (nullable = true)
 |-- continuation_approval: long (nullable = true)
 |-- industry_naics_code: string (nullable = true)
 |-- new_concurrent_denial: long (nullable = true)
 |-- new_employment_denial: long (nullable = true)
 |-- new_concurrent_approval: long (nullable = true)
 |-- new_employment_approval: long (nullable = true)
 |-- change_of_employer_denial: long (nullable = true)
 |-- employer_name: string (nullable = true)
 |-- change_of_employer_approval: long (nullable = true)
 |-- same_employer_denial: long (nullable = true)
 |-- same_employer_approval: long (nullable = true)



### Rearranging columns

In [None]:
# 1. Define your 'Priority' columns in the exact order you want them
priority_cols = [
    "employer_name",
    "fiscal_year",
    "state",
    "city",
    "zip_code",
    "tax_id"
]

# 2. Get the remaining columns automatically
# We use a list comprehension: "Get all columns that ARE NOT in priority_cols"
remaining_cols = [c for c in silver_df.columns if c not in priority_cols]

# 3. Combine the lists (Priority + Everything else)
final_column_order = priority_cols + remaining_cols

# 4. Apply the new order
silver_df = silver_df.select(*final_column_order)

# 5. Verify the order
silver_df.show(5)

+--------------------+-----------+-----+----------------+--------+--------------------+--------------+----------------+-------------------+---------------------+--------------------+---------------------+---------------------+-----------------------+-----------------------+-------------------------+---------------------------+--------------------+----------------------+
|       employer_name|fiscal_year|state|            city|zip_code|              tax_id|amended_denial|amended_approval|continuation_denial|continuation_approval| industry_naics_code|new_concurrent_denial|new_employment_denial|new_concurrent_approval|new_employment_approval|change_of_employer_denial|change_of_employer_approval|same_employer_denial|same_employer_approval|
+--------------------+-----------+-----+----------------+--------+--------------------+--------------+----------------+-------------------+---------------------+--------------------+---------------------+---------------------+-----------------------+----

## Datatypes

In [None]:
silver_df.printSchema()

root
 |-- employer_name: string (nullable = true)
 |-- fiscal_year: long (nullable = true)
 |-- state: string (nullable = true)
 |-- city: string (nullable = true)
 |-- zip_code: decimal(38,18) (nullable = true)
 |-- tax_id: decimal(38,18) (nullable = true)
 |-- amended_denial: long (nullable = true)
 |-- amended_approval: long (nullable = true)
 |-- continuation_denial: long (nullable = true)
 |-- continuation_approval: long (nullable = true)
 |-- industry_naics_code: string (nullable = true)
 |-- new_concurrent_denial: long (nullable = true)
 |-- new_employment_denial: long (nullable = true)
 |-- new_concurrent_approval: long (nullable = true)
 |-- new_employment_approval: long (nullable = true)
 |-- change_of_employer_denial: long (nullable = true)
 |-- change_of_employer_approval: long (nullable = true)
 |-- same_employer_denial: long (nullable = true)
 |-- same_employer_approval: long (nullable = true)



In [None]:
from pyspark.sql.types import IntegerType, StringType

# Define which columns should be Integers
# This includes years, zip codes, and all approval/denial counts
int_columns = [
    "fiscal_year", "amended_denial",
    "amended_approval", "continuation_denial", "continuation_approval",
    "new_concurrent_denial", "new_employment_denial", "new_concurrent_approval",
    "new_employment_approval", "change_of_employer_denial",
    "change_of_employer_approval", "same_employer_denial", "same_employer_approval"
]

Casting `zip_code` and `tax_id` to `StringType` ensures that leading zeros are preserved and they are treated as categorical identifiers, rather than numerical values.

In [None]:
# List of columns to convert to StringType
columns_to_string = ['zip_code', 'tax_id']

# Apply the cast to StringType for the specified columns
for col_name in columns_to_string:
    silver_df = silver_df.withColumn(col_name, silver_df[col_name].cast(StringType()))

In [None]:
from pyspark.sql.functions import col

for c in int_columns:
  silver_df = silver_df.withColumn(c, col(c).cast(IntegerType()))

silver_df.printSchema()

root
 |-- employer_name: string (nullable = true)
 |-- fiscal_year: integer (nullable = true)
 |-- state: string (nullable = true)
 |-- city: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- tax_id: string (nullable = true)
 |-- amended_denial: integer (nullable = true)
 |-- amended_approval: integer (nullable = true)
 |-- continuation_denial: integer (nullable = true)
 |-- continuation_approval: integer (nullable = true)
 |-- industry_naics_code: string (nullable = true)
 |-- new_concurrent_denial: integer (nullable = true)
 |-- new_employment_denial: integer (nullable = true)
 |-- new_concurrent_approval: integer (nullable = true)
 |-- new_employment_approval: integer (nullable = true)
 |-- change_of_employer_denial: integer (nullable = true)
 |-- change_of_employer_approval: integer (nullable = true)
 |-- same_employer_denial: integer (nullable = true)
 |-- same_employer_approval: integer (nullable = true)



## Important Data Standardization: Deduplication

In [None]:
from pyspark.sql.functions import col, upper, trim, regexp_replace

In [None]:
# We define a 'Regex' pattern to find common suffixes at the end of the string
# The 'r' means 'raw string', and '$' means 'at the end of the text'
suffix_pattern = r"\b(LLC|INC|CORP|LTD|PC|LLP|INCORPORATED)\b\.?$"

In [None]:
# We are overwriting the 'employer_name' column with a transformed version of itself
silver_df = silver_df.withColumn(
    "employer_name",
    # 4. REGEXP_REPLACE: Finally, look at the end of that clean string.
    #    If you find anything matching our 'suffix_pattern' (like LLC or INC),
    #    replace it with "" (nothing).
    regexp_replace(
        # 3. UPPER: Take the trimmed string and force it to all CAPITAL LETTERS.
        #    This ensures "Google" and "google" are treated as the exact same thing.
        upper(
            # 2. TRIM: Look at the raw column and remove any accidental leading
            #    or trailing spaces (e.g., " Google " becomes "Google").
            trim(
                # 1. COL: This is the starting point—grabbing the raw data from the column.
                col("employer_name")
            )
        ),
        suffix_pattern,
        ""
    )
)

In [None]:
# One final trim to catch any spaces left behind after removing the suffix
silver_df = silver_df.withColumn("employer_name", trim(col("employer_name")))

# Let's see the result!
silver_df.select("employer_name").distinct().show(10, truncate=False)

+----------------------------------+
|employer_name                     |
+----------------------------------+
|UNIVERSITY OF ALASKA FAIRBANKS    |
|BAPTIST HEALTH DBA MONTGOMERY FAMI|
|RISING STAR SPED ACADEMY          |
|GET APPLAUD                       |
|RUCKUS WIRELESS                   |
|MENDEL HEALTH                     |
|SINICA EDUCATION                  |
|AYANA GLOBAL SOLUTIONS            |
|THE UNIVERSITY OF SAN FRANCISCO   |
|MIRANTIS                          |
+----------------------------------+
only showing top 10 rows



In [None]:
# 1. Add PLLC to our suffix list
# 2. Add [^A-Z0-9] at the end to catch trailing commas or periods
refined_pattern = r"\b(LLC|INC|CORP|LTD|PC|LLP|PLLC|INCORPORATED|INIC)\b\.?$"

silver_df = silver_df.withColumn(
    "employer_name",
    regexp_replace(col("employer_name"), refined_pattern, "")
)

# 3. Use another regex to remove any trailing commas or dots specifically
silver_df = silver_df.withColumn(
    "employer_name",
    regexp_replace(col("employer_name"), r"[,.]\s*$", "")
)

silver_df = silver_df.withColumn("employer_name", trim(col("employer_name")))

silver_df.select("employer_name").show(10, truncate=False)

+---------------------------------+
|employer_name                    |
+---------------------------------+
|VAAKRUTHI SOLUTIONS              |
|COPPER DART                      |
|VYORA INTERNATIONAL              |
|BJ'S WHOLESALE CLUB              |
|NEUREALM EKA GAVS TECHNLOGIES N.A|
|GEHMS ITSERVICES                 |
|WE STAFFING                      |
|PRIAMBA SOFT                     |
|SBAS IT SYSTEMS                  |
|IMR SOFT                         |
+---------------------------------+
only showing top 10 rows



## Null Values

To check for null values in a PySpark DataFrame, you need to import specific functions and iterate through the columns. PySpark does not have a direct `.isnull().sum()` method like Pandas.

In [None]:
from pyspark.sql.functions import col, sum

# Create a list to hold expressions for counting nulls
null_counts_expressions = []
for column_name in silver_df.columns:
    null_counts_expressions.append(sum(col(column_name).isNull().cast('int')).alias(column_name))

# Select these expressions to get the null counts for all columns
silver_df.select(null_counts_expressions).show()

+-------------+-----------+-----+----+--------+------+--------------+----------------+-------------------+---------------------+-------------------+---------------------+---------------------+-----------------------+-----------------------+-------------------------+---------------------------+--------------------+----------------------+
|employer_name|fiscal_year|state|city|zip_code|tax_id|amended_denial|amended_approval|continuation_denial|continuation_approval|industry_naics_code|new_concurrent_denial|new_employment_denial|new_concurrent_approval|new_employment_approval|change_of_employer_denial|change_of_employer_approval|same_employer_denial|same_employer_approval|
+-------------+-----------+-----+----+--------+------+--------------+----------------+-------------------+---------------------+-------------------+---------------------+---------------------+-----------------------+-----------------------+-------------------------+---------------------------+--------------------+-------

In [None]:
null_counts_expressions

[Column<'sum(CAST((employer_name IS NULL) AS INT)) AS employer_name'>,
 Column<'sum(CAST((fiscal_year IS NULL) AS INT)) AS fiscal_year'>,
 Column<'sum(CAST((state IS NULL) AS INT)) AS state'>,
 Column<'sum(CAST((city IS NULL) AS INT)) AS city'>,
 Column<'sum(CAST((zip_code IS NULL) AS INT)) AS zip_code'>,
 Column<'sum(CAST((tax_id IS NULL) AS INT)) AS tax_id'>,
 Column<'sum(CAST((amended_denial IS NULL) AS INT)) AS amended_denial'>,
 Column<'sum(CAST((amended_approval IS NULL) AS INT)) AS amended_approval'>,
 Column<'sum(CAST((continuation_denial IS NULL) AS INT)) AS continuation_denial'>,
 Column<'sum(CAST((continuation_approval IS NULL) AS INT)) AS continuation_approval'>,
 Column<'sum(CAST((industry_naics_code IS NULL) AS INT)) AS industry_naics_code'>,
 Column<'sum(CAST((new_concurrent_denial IS NULL) AS INT)) AS new_concurrent_denial'>,
 Column<'sum(CAST((new_employment_denial IS NULL) AS INT)) AS new_employment_denial'>,
 Column<'sum(CAST((new_concurrent_approval IS NULL) AS INT)

However, it's not a list of simple column names or strings. Instead, it's a list of PySpark Column expressions. Each element in the list is a Column object created by sum(col(column_name).isNull().cast('int')).alias(column_name).

These expressions are then passed to the select method of your DataFrame, which tells Spark how to compute the null counts for each column.

To drop rows with null values specifically from the `employer_name` column, use the `na.drop()` method and specify the subset of columns to check for nulls.

In [None]:
# Drop rows where 'employer_name' is null
silver_df = silver_df.na.drop(subset=['employer_name'])

In [None]:
# Fill null values with "UNKNOWN" in all columns
silver_df = silver_df.na.fill('UNKNOWN')

In [None]:
null_counts_expressions = []
for column_name in silver_df.columns:
    null_counts_expressions.append(sum(col(column_name).isNull().cast('int')).alias(column_name))

# Select these expressions to get the null counts for all columns
silver_df.select(null_counts_expressions).show()

+-------------+-----------+-----+----+--------+------+--------------+----------------+-------------------+---------------------+-------------------+---------------------+---------------------+-----------------------+-----------------------+-------------------------+---------------------------+--------------------+----------------------+
|employer_name|fiscal_year|state|city|zip_code|tax_id|amended_denial|amended_approval|continuation_denial|continuation_approval|industry_naics_code|new_concurrent_denial|new_employment_denial|new_concurrent_approval|new_employment_approval|change_of_employer_denial|change_of_employer_approval|same_employer_denial|same_employer_approval|
+-------------+-----------+-----+----+--------+------+--------------+----------------+-------------------+---------------------+-------------------+---------------------+---------------------+-----------------------+-----------------------+-------------------------+---------------------------+--------------------+-------

In [None]:
# Save your hard work to the Silver/Staged area
silver_df.write.format("bigquery")\
    .option("temporaryGcsBucket", "h1b_analytics_temp_bucket") \
    .option("table", "h1b-analytics.h1b_staged.silver_h1b_2025")\
    .mode("overwrite")\
    .save()

Py4JJavaError: An error occurred while calling o487.save.
: java.lang.NoClassDefFoundError: com/google/api/client/http/HttpRequestInitializer
	at java.base/java.lang.Class.forName0(Native Method)
	at java.base/java.lang.Class.forName(Class.java:469)
	at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2625)
	at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2590)
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2686)
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3431)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at com.google.cloud.spark.bigquery.SparkBigQueryUtil.getUniqueGcsPath(SparkBigQueryUtil.java:143)
	at com.google.cloud.spark.bigquery.SparkBigQueryUtil.createGcsPath(SparkBigQueryUtil.java:124)
	at com.google.cloud.spark.bigquery.write.BigQueryWriteHelper.<init>(BigQueryWriteHelper.java:89)
	at com.google.cloud.spark.bigquery.write.BigQueryDeprecatedIndirectInsertableRelation.insert(BigQueryDeprecatedIndirectInsertableRelation.java:41)
	at com.google.cloud.spark.bigquery.write.CreatableRelationProviderHelper.createRelation(CreatableRelationProviderHelper.java:54)
	at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelation(BigQueryRelationProvider.scala:107)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:248)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.ClassNotFoundException: com.google.api.client.http.HttpRequestInitializer
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
	... 59 more


In [None]:
# Define the GCS path where you want to save the Parquet files
gcs_output_path = "gs://h1b_analytics_temp_bucket/silver_layer/silver_h1b_2025_parquet/"

# Save the silver_df to GCS in Parquet format
# Parquet is a highly efficient columnar format for analytics
silver_df.write \
    .mode("overwrite") \
    .parquet(gcs_output_path)

print(f"Successfully saved silver_df to: {gcs_output_path}")

Py4JJavaError: An error occurred while calling o549.parquet.
: java.lang.NoClassDefFoundError: com/google/api/client/http/HttpRequestInitializer
	at java.base/java.lang.Class.forName0(Native Method)
	at java.base/java.lang.Class.forName(Class.java:469)
	at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2625)
	at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2590)
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2686)
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3431)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:454)
	at org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:530)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:792)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.ClassNotFoundException: com.google.api.client.http.HttpRequestInitializer
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
	... 30 more


After running the cell above to save your data to GCS, you will need to load it into your BigQuery table. You can do this using a SQL query in the BigQuery UI, or via the `bq` command-line tool, or using the BigQuery Python client library.

### Option 1: Using SQL in BigQuery UI or `bq` CLI

```sql
LOAD DATA OVERWRITE `h1b-analytics.h1b_staged.silver_h1b_2025`
FROM FILES (format = 'PARQUET', uris = ['gs://h1b_analytics_temp_bucket/silver_layer/silver_h1b_2025_parquet/*']);
```

Or using `bq load` command if you prefer the CLI:

```bash
bq load --source_format=PARQUET \
    --replace \
    h1b-analytics:h1b_staged.silver_h1b_2025 \
    gs://h1b_analytics_temp_bucket/silver_layer/silver_h1b_2025_parquet/*
```

### Option 2: Using BigQuery Python Client Library

```python
from google.cloud import bigquery

client = bigquery.Client(project='h1b-analytics')

job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.PARQUET,
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE, # Overwrite the table
)

gcs_source_uri = "gs://h1b_analytics_temp_bucket/silver_layer/silver_h1b_2025_parquet/*"
table_id = "h1b-analytics.h1b_staged.silver_h1b_2025"

load_job = client.load_table_from_uri(
    gcs_source_uri,
    table_id,
    job_config=job_config
)

load_job.result()  # Waits for the job to complete

print(f"Loaded {load_job.output_rows} rows into {table_id}.")
```

This approach is more robust because it leverages services designed specifically for each task (Spark for processing, GCS for storage, BigQuery for data warehousing) without inter-process dependency conflicts that are hard to manage.

In [None]:
import os
import pandas as pd
from google.cloud import storage

# --- Step 1: Convert PySpark DataFrame to Pandas DataFrame ---
# WARNING: This step can be very memory intensive if silver_df is large.
# Ensure your Colab instance has enough RAM to hold the entire dataset.
print("Converting PySpark DataFrame to Pandas...")
pandas_silver_df = silver_df.toPandas()
print("Conversion complete.")

# --- Step 2: Save Pandas DataFrame to a local Parquet file ---
local_parquet_path = "silver_h1b_2025.parquet"
pandas_silver_df.to_parquet(local_parquet_path, index=False)
print(f"Pandas DataFrame saved locally to {local_parquet_path}")

# --- Step 3: Upload the local Parquet file to GCS ---
bucket_name = "spark_bucket_h1b"
gcs_destination_blob_name = "silver_layer/silver_h1b_2025_parquet/silver_h1b_2025.parquet"

client = storage.Client(project=project_id)
bucket = client.bucket(bucket_name)
blob = bucket.blob(gcs_destination_blob_name)

print(f"Uploading {local_parquet_path} to gs://{bucket_name}/{gcs_destination_blob_name}...")
blob.upload_from_filename(local_parquet_path)
print("Upload complete.")

# Clean up local file
os.remove(local_parquet_path)
print(f"Removed local file: {local_parquet_path}")

# Define the full GCS path for clarity, even if we upload a single file
gcs_output_path = f"gs://{bucket_name}/silver_layer/silver_h1b_2025_parquet/"
print(f"Data successfully staged in GCS at: {gcs_output_path}")

Converting PySpark DataFrame to Pandas...
Conversion complete.
Pandas DataFrame saved locally to silver_h1b_2025.parquet
Uploading silver_h1b_2025.parquet to gs://spark_bucket_h1b/silver_layer/silver_h1b_2025_parquet/silver_h1b_2025.parquet...
Upload complete.
Removed local file: silver_h1b_2025.parquet
Data successfully staged in GCS at: gs://spark_bucket_h1b/silver_layer/silver_h1b_2025_parquet/


In [None]:
# sql_engine: bigquery
# output_variable: df
# start _sql
_sql = """
CREATE SCHEMA IF NOT EXISTS `h1b-analytics.h1b_staged`
OPTIONS(
  location = 'us-central1'
);
""" # end _sql
from google.colab.sql import bigquery as _bqsqlcell
df = _bqsqlcell.run(_sql)
df

<bigframes.display.anywidget.TableWidget object at 0x7c209284fb30>

In [None]:
# sql_engine: bigquery
# output_variable: df
# start _sql
_sql = """
LOAD DATA OVERWRITE `h1b-analytics.h1b_staged.silver_h1b_2025`
FROM FILES (format = 'PARQUET', uris = ['gs://spark_bucket_h1b/silver_layer/silver_h1b_2025_parquet/*']);
""" # end _sql
from google.colab.sql import bigquery as _bqsqlcell
df = _bqsqlcell.run(_sql)
df

<bigframes.display.anywidget.TableWidget object at 0x7c2060be6e70>

In [None]:
# sql_engine: bigquery
# output_variable: df
# start _sql
_sql = """
SELECT *
FROM `h1b-analytics.h1b_staged.silver_h1b_2025`
LIMIT 5;
""" # end _sql
from google.colab.sql import bigquery as _bqsqlcell
df = _bqsqlcell.run(_sql)
df

<bigframes.display.anywidget.TableWidget object at 0x7c20900cac00>

After running the cell above to save your data to GCS, you will need to load it into your BigQuery table. You can do this using a SQL query in the BigQuery UI, or via the `bq` command-line tool, or using the BigQuery Python client library.

### Option 1: Using SQL in BigQuery UI or `bq` CLI

```sql
LOAD DATA OVERWRITE `h1b-analytics.h1b_staged.silver_h1b_2025`
FROM FILES (format = 'PARQUET', uris = ['gs://h1b_analytics_temp_bucket/silver_layer/silver_h1b_2025_parquet/*']);
```

Or using `bq load` command if you prefer the CLI:

```bash
bq load --source_format=PARQUET \
    --replace \
    h1b-analytics:h1b_staged.silver_h1b_2025 \
    gs://h1b_analytics_temp_bucket/silver_layer/silver_h1b_2025_parquet/*
```

### Option 2: Using BigQuery Python Client Library

```python
from google.cloud import bigquery

client = bigquery.Client(project='h1b-analytics')

job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.PARQUET,
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE, # Overwrite the table
)

gcs_source_uri = "gs://h1b_analytics_temp_bucket/silver_layer/silver_h1b_2025_parquet/*"
table_id = "h1b-analytics.h1b_staged.silver_h1b_2025"

load_job = client.load_table_from_uri(
    gcs_source_uri,
    table_id,
    job_config=job_config
)

load_job.result()  # Waits for the job to complete

print(f"Loaded {load_job.output_rows} rows into {table_id}.")
```

This approach is more robust because it leverages services designed specifically for each task (Spark for processing, GCS for storage, BigQuery for data warehousing) without inter-process dependency conflicts that are hard to manage.

# Task
Create a Python function `process_h1b_year` that takes a year as input. This function will load raw H1B data from the "h1b-analytics.h1b_raw_data.h1b_<year>" BigQuery table, apply all the cleaning and transformation steps (column dropping, renaming, reordering, type casting, employer name standardization, and null handling) using PySpark. It will then convert the cleaned PySpark DataFrame to a Pandas DataFrame, save it as a local Parquet file, upload it to the "spark_bucket_h1b" GCS bucket, and finally load it into the "h1b-analytics.h1b_staged.silver_h1b_<year>" BigQuery table.
After defining this function, process the H1B data for the years 2023 and 2024 by calling `process_h1b_year` for each.
Finally, confirm the successful processing and staging of H1B data for all specified years (2023, 2024, and 2025) into the BigQuery `h1b_staged` dataset.

## Create h1b processing function

### Subtask:
Define a Python function `process_h1b_year` that takes a year as input. This function will load raw H1B data from BigQuery for that year, apply all the cleaning, transformation, and standardization steps (column dropping, renaming, reordering, type casting, employer name standardization, null handling) using PySpark. It will then convert the cleaned PySpark DataFrame to a Pandas DataFrame, save it as a local Parquet file, upload it to the `spark_bucket_h1b` GCS bucket, and finally load it into the `h1b-analytics.h1b_staged.silver_h1b_<year>` BigQuery table.


**Reasoning**:
The user wants to define a Python function `process_h1b_year` that encapsulates all the data cleaning and loading steps performed previously in the notebook. This function will take a `year` as input and apply transformations, then save the data to GCS and load it into BigQuery. I will include all necessary imports and logic within this function as per the instructions.



In [None]:
import os
import pandas as pd
from pyspark.sql.functions import col, upper, trim, regexp_replace
from pyspark.sql.types import IntegerType, StringType
from google.cloud import storage, bigquery

def process_h1b_year(year):
    print(f"--- Processing H1B data for Fiscal Year: {year} ---")

    # 1. Define project_id and bucket_name (using global values from previous cells)
    # project_id and bucket_name are expected to be globally defined
    # project_id = 'h1b-analytics'
    # bucket_name = "spark_bucket_h1b" # Assuming this was the bucket name used before

    # 2. Construct the raw BigQuery table name
    raw_table_name = f"{project_id}.h1b_raw_data.h1b_{year}"
    print(f"Loading data from BigQuery table: {raw_table_name}")

    # 3. Load data from BigQuery into a PySpark DataFrame
    # Ensure 'spark' session is available globally or passed as an argument if not.
    # From the notebook, 'spark' is a global variable.
    spark_df = spark.read.format("bigquery") \
        .option("table", raw_table_name) \
        .load()
    print(f"Loaded raw data for year {year}. Row count: {spark_df.count()}")

    # 4. Define columns to remove
    col_to_remove = [
        "_airbyte_raw_id",
        "_airbyte_extracted_at",
        "_airbyte_meta",
        "_airbyte_generation_id",
        "Line_by_line"
    ]

    # 5. Drop these columns
    silver_df = spark_df.drop(*col_to_remove)
    print(f"Dropped unwanted columns. Remaining columns: {len(silver_df.columns)}")

    # 6. Define rename_map
    rename_map = {
        "Tax_ID": "tax_id",
        "Amended_Denial": "amended_denial",
        "Fiscal_Year_": "fiscal_year",
        "Petitioner_City": "city",
        "Amended_Approval": "amended_approval",
        "Petitioner_State": "state",
        "Continuation_Denial": "continuation_denial",
        "Petitioner_Zip_Code": "zip_code",
        "Continuation_Approval": "continuation_approval",
        "Industry__NAICS__Code": "industry_naics_code",
        "New_Concurrent_Denial": "new_concurrent_denial",
        "New_Employment_Denial": "new_employment_denial",
        "New_Concurrent_Approval": "new_concurrent_approval",
        "New_Employment_Approval": "new_employment_approval",
        "Change_of_Employer_Denial": "change_of_employer_denial",
        "Employer__Petitioner__Name": "employer_name",
        "Change_of_Employer_Approval": "change_of_employer_approval",
        "Change_with_Same_Employer_Denial": "same_employer_denial",
        "Change_with_Same_Employer_Approval": "same_employer_approval"
    }

    # 7. Apply the renaming
    for old_name, new_name in rename_map.items():
        silver_df = silver_df.withColumnRenamed(old_name, new_name)
    print("Columns renamed.")

    # 8. Define priority_cols for reordering
    priority_cols = [
        "employer_name",
        "fiscal_year",
        "state",
        "city",
        "zip_code",
        "tax_id"
    ]

    # 9. Get remaining columns and create final order
    remaining_cols = [c for c in silver_df.columns if c not in priority_cols]
    final_column_order = priority_cols + remaining_cols

    # 10. Reorder columns
    silver_df = silver_df.select(*final_column_order)
    print("Columns reordered.")

    # 11. Define columns for type casting
    int_columns = [
        "fiscal_year", "amended_denial",
        "amended_approval", "continuation_denial", "continuation_approval",
        "new_concurrent_denial", "new_employment_denial", "new_concurrent_approval",
        "new_employment_approval", "change_of_employer_denial",
        "change_of_employer_approval", "same_employer_denial", "same_employer_approval"
    ]
    columns_to_string = ['zip_code', 'tax_id']

    # 12. Cast columns to StringType
    for col_name in columns_to_string:
        silver_df = silver_df.withColumn(col_name, col(col_name).cast(StringType()))
    print("Zip code and Tax ID cast to StringType.")

    # 13. Cast columns to IntegerType
    for c in int_columns:
        silver_df = silver_df.withColumn(c, col(c).cast(IntegerType()))
    print("Numerical columns cast to IntegerType.")

    # 14. Standardize employer_name
    refined_pattern = r"\b(LLC|INC|CORP|LTD|PC|LLP|PLLC|INCORPORATED|INIC)\b\.?$"

    silver_df = silver_df.withColumn(
        "employer_name",
        regexp_replace(
            upper(
                trim(
                    col("employer_name")
                )
            ),
            refined_pattern,
            ""
        )
    )
    silver_df = silver_df.withColumn(
        "employer_name",
        regexp_replace(col("employer_name"), r"[,.]\s*$", "")
    )
    silver_df = silver_df.withColumn("employer_name", trim(col("employer_name")))
    print("Employer names standardized.")

    # 15. Handle null values
    silver_df = silver_df.na.drop(subset=['employer_name'])
    silver_df = silver_df.na.fill('UNKNOWN')
    print("Null values handled.")

    # 16. Convert PySpark DataFrame to Pandas DataFrame
    print("Converting PySpark DataFrame to Pandas...")
    pandas_silver_df = silver_df.toPandas()
    print("Conversion complete.")

    # 17. Define local_parquet_path
    local_parquet_path = f"silver_h1b_{year}.parquet"
    pandas_silver_df.to_parquet(local_parquet_path, index=False)
    print(f"Pandas DataFrame saved locally to {local_parquet_path}")

    # 18. Upload the local Parquet file to GCS
    gcs_destination_blob_name = f"silver_layer/silver_h1b_{year}_parquet/silver_h1b_{year}.parquet"

    gcs_client = storage.Client(project=project_id) # Renamed to gcs_client to avoid conflict
    bucket = gcs_client.bucket(bucket_name)
    blob = bucket.blob(gcs_destination_blob_name)

    print(f"Uploading {local_parquet_path} to gs://{bucket_name}/{gcs_destination_blob_name}...")
    blob.upload_from_filename(local_parquet_path)
    print("Upload to GCS complete.")

    # 19. Remove the local Parquet file
    os.remove(local_parquet_path)
    print(f"Removed local file: {local_parquet_path}")

    # 20. Load data from GCS to BigQuery
    bq_client = bigquery.Client(project=project_id)

    # Ensure the h1b_staged schema exists
    schema_query = f"CREATE SCHEMA IF NOT EXISTS `{project_id}.h1b_staged` OPTIONS(location = 'us-central1');"
    bq_client.query(schema_query).result()
    print(f"Ensured BigQuery schema `{project_id}.h1b_staged` exists.")

    table_id = f"{project_id}.h1b_staged.silver_h1b_{year}"
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.PARQUET,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
    )
    gcs_source_uri = f"gs://{bucket_name}/silver_layer/silver_h1b_{year}_parquet/*"

    print(f"Loading data from GCS ({gcs_source_uri}) to BigQuery table: {table_id}")
    load_job = bq_client.load_table_from_uri(
        gcs_source_uri,
        table_id,
        job_config=job_config
    )
    load_job.result()
    print(f"Loaded {load_job.output_rows} rows into BigQuery table: {table_id}")
    print(f"--- Finished processing for Fiscal Year: {year} ---")


## Process h1b data for 2023 and 2024

### Subtask:
Define the years 2023 and 2024. Iterate through these years, calling the `process_h1b_year` function for each, to clean and stage their respective H1B datasets in BigQuery.


**Reasoning**:
Iterate through the years 2023 and 2024, calling the `process_h1b_year` function for each, as per the subtask instructions.



In [None]:
years_to_process = [2023, 2024]

for year in years_to_process:
    process_h1b_year(year)

--- Processing H1B data for Fiscal Year: 2023 ---
Loading data from BigQuery table: h1b-analytics.h1b_raw_data.h1b_2023
Loaded raw data for year 2023. Row count: 57361
Dropped unwanted columns. Remaining columns: 19
Columns renamed.
Columns reordered.
Zip code and Tax ID cast to StringType.
Numerical columns cast to IntegerType.
Employer names standardized.
Null values handled.
Converting PySpark DataFrame to Pandas...
Conversion complete.
Pandas DataFrame saved locally to silver_h1b_2023.parquet
Uploading silver_h1b_2023.parquet to gs://spark_bucket_h1b/silver_layer/silver_h1b_2023_parquet/silver_h1b_2023.parquet...
Upload to GCS complete.
Removed local file: silver_h1b_2023.parquet
Ensured BigQuery schema `h1b-analytics.h1b_staged` exists.
Loading data from GCS (gs://spark_bucket_h1b/silver_layer/silver_h1b_2023_parquet/*) to BigQuery table: h1b-analytics.h1b_staged.silver_h1b_2023
Loaded 57357 rows into BigQuery table: h1b-analytics.h1b_staged.silver_h1b_2023
--- Finished processing

**Reasoning**:
To confirm the successful processing and staging of H1B data for all specified years (2023, 2024, and 2025), I will query each of the corresponding BigQuery tables in the h1b_staged dataset and display a few rows from each.



In [None]:
from google.colab.sql import bigquery as _bqsqlcell

years_to_confirm = [2023, 2024, 2025]

for year in years_to_confirm:
    table_id = f"h1b-analytics.h1b_staged.silver_h1b_{year}"
    print(f"\n--- Confirming data for {table_id} ---")
    _sql = f"""
    SELECT *
    FROM `{table_id}`
    LIMIT 5;
    """
    df_confirmed = _bqsqlcell.run(_sql)
    display(df_confirmed)


--- Confirming data for h1b-analytics.h1b_staged.silver_h1b_2023 ---


<bigframes.display.anywidget.TableWidget object at 0x7c2091a13fe0>


--- Confirming data for h1b-analytics.h1b_staged.silver_h1b_2024 ---


<bigframes.display.anywidget.TableWidget object at 0x7c2060be74a0>


--- Confirming data for h1b-analytics.h1b_staged.silver_h1b_2025 ---


<bigframes.display.anywidget.TableWidget object at 0x7c209920faa0>

## Final task

### Subtask:
Confirm the successful processing and staging of H1B data for all specified years into the BigQuery `h1b_staged` dataset.


## Summary:

### Q&A

1.  **What did the error `PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.` mean?**
    This error indicated that the Java Virtual Machine (JVM), which is crucial for PySpark to operate, failed to start. This was initially attributed to incorrect or inconsistent `JAVA_HOME` environment variable settings (a mix of Java 11 and Java 17, and an empty `JAVA_HOME` in the shell) and later potentially insufficient memory for the JVM.

2.  **How was the `PySparkRuntimeError` initially fixed?**
    The fix involved ensuring the `JAVA_HOME` environment variable was correctly and consistently set to Java 17 and exported to the shell environment. Additionally, `PYSPARK_SUBMIT_ARGS` was configured to allocate more memory (2GB for driver and executor) to the Spark processes.

3.  **Was the method `silver_df.write.format("bigquery").option("temporaryGcsBucket", "...").option("table", "...").mode("overwrite").save()` correct for saving a PySpark DataFrame to BigQuery?**
    Yes, this method is correct and typically the most efficient way to save a PySpark DataFrame to BigQuery using the Spark BigQuery connector, provided all underlying dependencies and configurations are properly set.

4.  **Can a PySpark DataFrame be saved directly to BigQuery using `pandas_gbq.to_gbq()`?**
    No, `pandas_gbq.to_gbq()` expects a Pandas DataFrame. A PySpark DataFrame (`silver_df`) would first need to be converted to a Pandas DataFrame using `.toPandas()`. However, this is generally not recommended for large datasets as it collects all data into the driver's memory, which can lead to out-of-memory errors. The Spark BigQuery connector is the more efficient and scalable approach for PySpark DataFrames.

5.  **What caused the `Py4JJavaError: java.io.UncheckedIOException: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "gs"` error?**
    This error occurred because Spark's underlying Hadoop configuration was not properly set up to recognize and handle Google Cloud Storage (GCS) paths (`gs://`), even though the GCS connector JAR was made available.

6.  **What caused the `Py4JJavaError: java.lang.NoClassDefFoundError: com/google/api/client/http/HttpRequestInitializer` error?**
    This error indicated that a critical Java class (`com/google/api/client/http/HttpRequestInitializer`), necessary for interacting with Google Cloud services (including GCS), was missing from Spark's runtime classpath. This often arises from complex dependency conflicts or incomplete JAR packaging, where the required client libraries are not properly loaded or visible to Spark.

7.  **Were the `NoClassDefFoundError` errors caused by the same GCS bucket naming mismatch that caused a later `NotFound: 404` error?**
    No, they were distinct issues. The `NoClassDefFoundError` was a Java runtime classpath problem, preventing Spark from initializing its internal GCS client components. The `NotFound: 404` error (for `gs://h1b_analytics_temp_bucket`) occurred later, after Spark's GCS client was functional, but indicated that the specified GCS bucket literally did not exist or was inaccessible under that name.

8.  **How was the data loaded into BigQuery after persistent errors with direct Spark-BigQuery integration?**
    A workaround was implemented:
    1.  The PySpark DataFrame was converted to a Pandas DataFrame (`silver_df.toPandas()`).
    2.  The Pandas DataFrame was saved locally as a Parquet file.
    3.  The local Parquet file was uploaded to a GCS bucket using the `google-cloud-storage` Python client library.
    4.  Finally, BigQuery's native `LOAD DATA OVERWRITE` SQL command was used to import the Parquet file from GCS into the target BigQuery table.

9.  **How was the H1B data for 2023 and 2024 processed efficiently after 2025 data was done?**
    A reusable Python function named `process_h1b_year` was created. This function encapsulated all the data cleaning, transformation, and the GCS-to-BigQuery loading steps. It was then called in a loop for the years 2023 and 2024.

### Data Analysis Key Findings

*   Initial PySpark `JAVA_GATEWAY_EXITED` errors were resolved by correctly setting the `JAVA_HOME` environment variable to Java 17 and increasing PySpark's driver and executor memory to 2GB each.
*   Direct integration of Spark with BigQuery and GCS connectors via `spark.jars.packages` and even `spark.jars` proved problematic due to persistent `java.lang.NoClassDefFoundError` issues related to Google Cloud client libraries.
*   A robust data loading pipeline was established by implementing a workaround: converting PySpark DataFrames to Pandas, saving them as local Parquet files, uploading these files to GCS using the `google-cloud-storage` client, and then using BigQuery's `LOAD DATA` command to import from GCS.
*   The `gcs-connector-hadoop3-2.2.20.jar` was correctly identified as the working GCS connector version, replacing an unavailable `hadoop-gcs-1.9.17.jar`.
*   Crucial Hadoop configurations (`spark.hadoop.fs.gs.impl`, `spark.hadoop.fs.gs.auth.service.account.enable`, `spark.hadoop.google.cloud.project.id`) were added to the SparkSession to enable GCS filesystem support.
*   Naming inconsistencies in GCS bucket references (`h1b_analytics_temp_bucket` vs. `spark_bucket_h1b`) caused `NotFound: 404` errors and were corrected.
*   The `h1b-analytics.h1b_staged` BigQuery dataset was explicitly created to resolve dataset not found errors.
*   Data for H1B 2023 (57,357 rows) and H1B 2024 (62,014 rows) were successfully cleaned, transformed, and loaded into their respective BigQuery tables, confirmed by querying the first 5 rows of each.

### Insights or Next Steps

*   **Dependency Management in PySpark with Cloud Connectors**: Be prepared for complex dependency resolution challenges when integrating PySpark with cloud services. Explicitly defining and managing JARs, along with specific Hadoop configurations, is often necessary.
*   **Decouple Processing and Loading for Robustness**: When direct Spark-to-BigQuery/GCS writes prove unstable, a staged approach involving PySpark to GCS (via Pandas workaround if needed) followed by BigQuery's native load from GCS can provide a more reliable data pipeline.
*   **Automate Repeatable Tasks with Functions**: Encapsulate cleaning and loading logic into reusable functions to efficiently process multiple similar datasets (e.g., different years of H1B data), ensuring consistency and reducing manual effort.


# Dbt

# Task
Set up a dbt (data build tool) environment in Google Colab, connect it to your BigQuery project `h1b-analytics` which contains the silver layer data in `h1b-analytics.h1b_staged.silver_h1b_<year>` tables, and prepare for defining gold layer data models.

## Introduce dbt for Gold Layer

### Subtask:
Explain what dbt is and why it's a suitable tool for preparing your gold layer data, focusing on its benefits for transformation, testing, and documentation.

#### Instructions
Data Build Tool (dbt) is an open-source command-line tool that helps data analysts and engineers transform data in their warehouse by simply writing SQL select statements. dbt handles the creation of tables and views, manages dependencies, and automates testing and documentation.

For building the 'gold layer' of our H1B analytics project, dbt is particularly beneficial because:
1.  **Transformation**: It allows us to define complex data transformations from our 'silver layer' tables into highly curated 'gold layer' tables using modular SQL. This means we can aggregate data, calculate key metrics, and apply specific business logic to create readily consumable datasets.
2.  **Version Control**: dbt models are SQL files, which can be easily version-controlled (e.g., using Git), allowing for collaborative development, change tracking, and rollback capabilities.
3.  **Testing**: dbt includes a robust testing framework, enabling us to define and execute data quality tests on our transformed data. This ensures the accuracy, completeness, and integrity of our gold layer datasets.
4.  **Documentation**: dbt automatically generates comprehensive documentation for our data models, including descriptions, relationships, and lineage. This makes it easy for other users to understand and trust the data.
5.  **Modularity and Reusability**: Each dbt model is a SELECT statement, promoting modularity. We can build complex transformations step-by-step, making the logic easier to understand, debug, and reuse across different gold layer models.

In essence, dbt empowers us to treat our SQL transformations like software code, bringing engineering best practices to data analytics and ensuring a high-quality, well-governed gold layer for our H1B data.

## dbt Environment Setup

### Subtask:
Outline the steps required to install dbt and configure it to connect to your BigQuery project, which hosts your silver layer data.


**Reasoning**:
The first step is to install the `dbt-bigquery` Python package, which includes `dbt-core` and the necessary adapter for BigQuery.



In [None]:
# sql_engine: bigquery
# output_variable: df
# start _sql
_sql = """
CREATE SCHEMA IF NOT EXISTS `h1b-analytics.h1b_gold`
OPTIONS(
  location = 'us-central1'
);
""" # end _sql
from google.colab.sql import bigquery as _bqsqlcell
df = _bqsqlcell.run(_sql)
df

<bigframes.display.anywidget.TableWidget object at 0x7c20980d0920>

In [None]:
import subprocess
import sys

# Install dbt-bigquery
try:
    subprocess.check_call([sys.executable, "-m", "pip", "install", "dbt-bigquery"])
    print("dbt-bigquery installed successfully.")
except subprocess.CalledProcessError as e:
    print(f"Error installing dbt-bigquery: {e}")


KeyboardInterrupt: 