In [0]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import when, col
import pandas as pd

def extract_spark():
    url = "https://raw.githubusercontent.com/fivethirtyeight/data/refs/heads/master/college-majors/grad-students.csv"
    FILESTORE_PATH = "dbfs:/FileStore/mini_project11/"  # Use /tmp as an alternative
    dbfs_file_path = FILESTORE_PATH + "/grad-students.csv"
    
    # Initialize Spark session
    spark = SparkSession.builder.appName("grade_student").getOrCreate()
    # Fetch the data
    df = pd.read_csv(url)
    
    # Save the file locally in the cluster
    local_path = "/tmp/grad-students.csv"
    df.to_csv(local_path, index=False)
    print(f"CSV saved locally at {local_path}")
    # Create the DBFS directory if it doesn't exist
    dbutils.fs.mkdirs(FILESTORE_PATH)
    # Copy the local file to DBFS
    dbutils.fs.cp(f"file:{local_path}", dbfs_file_path)
    print(f"CSV successfully saved to {dbfs_file_path}")
    
    # Convert Pandas DataFrame to Spark DataFrame
    spark_df = spark.createDataFrame(df)
    
    # To avoid mismatch between the schema of the existing Delta table and the DataFrame I am trying to append
    spark.sql("DROP TABLE IF EXISTS grade_student_delta")
    # Write to Delta table (no need for explicit directory creation)
    spark_df.write.format("delta").mode("append").saveAsTable("grade_student_delta")
    print("Data successfully written to Delta table 'grade_student_delta'")
    
    return dbfs_file_path



In [0]:
extract_spark()


CSV saved locally at /tmp/grad-students.csv
CSV successfully saved to dbfs:/FileStore/mini_project11//grad-students.csv
Data successfully written to Delta table 'grade_student_delta'


'dbfs:/FileStore/mini_project11//grad-students.csv'

In [0]:
display(dbutils.fs.ls("dbfs:/FileStore/mini_project11/"))

path,name,size,modificationTime
dbfs:/FileStore/mini_project11/Impact_of_Remote_Work_on_Mental_Health.csv,Impact_of_Remote_Work_on_Mental_Health.csv,598566,1731686709000
dbfs:/FileStore/mini_project11/WRRankingsWeek5.csv,WRRankingsWeek5.csv,10025,1731723705000
dbfs:/FileStore/mini_project11/ds_salaries.csv,ds_salaries.csv,210076,1731709340000
dbfs:/FileStore/mini_project11/event_times.csv,event_times.csv,6185,1731522970000
dbfs:/FileStore/mini_project11/extract_test_dbfs,extract_test_dbfs,17,1731721636000
dbfs:/FileStore/mini_project11/grad-students.csv,grad-students.csv,33249,1731726214000
dbfs:/FileStore/mini_project11/match_data_vg157.csv,match_data_vg157.csv,21445,1731725165000
dbfs:/FileStore/mini_project11/serve_times.csv,serve_times.csv,6472,1731522969000


In [0]:
from pyspark.sql import SparkSession

def load(dataset="dbfs:/FileStore/mini_project11/grad-students.csv"):
    # Initialize Spark session
    spark = SparkSession.builder.appName("grade_student").getOrCreate()

    # Read the dataset from DBFS
    spark_df = spark.read.csv(dataset, header=True, inferSchema=True)
    print(f"Dataset loaded from {dataset}")

    # Drop the Delta table if it exists, when I don't use "overwrite"
    spark.sql("DROP TABLE IF EXISTS grade_student_delta")

    # Write the Spark DataFrame to a Delta table
    spark_df.write.format("delta").mode("overwrite").saveAsTable("grade_student_delta")
    print("Data successfully written to Delta table 'grade_student_delta'")

    if spark.catalog.tableExists("grade_student_delta"):
      print("Table exists. Proceeding with overwrite.")
    else:
      print("Table does not exist. Creating a new one.")
      
    # Print the number of rows in the DataFrame
    nrows = spark_df.count()
    print(f"Number of rows in the dataset: {nrows}")



In [0]:
load()

Dataset loaded from dbfs:/FileStore/mini_project11/grad-students.csv
Data successfully written to Delta table 'grade_student_delta'
Table exists. Proceeding with overwrite.
Number of rows in the dataset: 173


In [0]:
def data_transform(table="grade_student_delta"):
    # Initialize Spark session
    spark = SparkSession.builder.appName("grade_student").getOrCreate()

    # Define STEM categories
    core_STEM = [
        'Engineering',
        'Computers & Mathematics',
        'Biology & Life Science',
        'Physical Sciences'
    ]

    other_STEM = [
        'Agriculture & Natural Resources',
        'Health',
        'Interdisciplinary'
    ]
    
    # Load the table into a DataFrame
    sparktable = spark.table(table)
    
    # Add the "STEM_major" column based on conditions
    sparktable = sparktable.withColumn(
        "STEM_major",
        when(col("Major_category").isin(core_STEM), "core_STEM")
        .when(col("Major_category").isin(other_STEM), "other_STEM")
        .otherwise("Other")
    )

    # Overwrite the Delta table with schema evolution
    sparktable.write.format("delta") \
        .mode("overwrite") \
        .option("mergeSchema", "true") \
        .saveAsTable(table)
    print(f"Table '{table}' updated successfully.")

# Call the function
data_transform()


Table 'grade_student_delta' updated successfully.


In [0]:
data_transform()

Table 'grade_student_delta' updated successfully.


In [0]:
spark = SparkSession.builder.appName("grade_student").getOrCreate()

query_sample ="""
            SELECT 
                Major_category,
                SUM(Nongrad_employed) AS Total_Nongrad_employed,
                SUM(Grad_employed) AS Total_Grad_employed,
                SUM(Grad_unemployed) AS Total_Grad_unemployed,
                SUM(Nongrad_unemployed) AS Total_Nongrad_unemployed,
                SUM(Grad_total) AS Total_Grad_total,
                SUM(Nongrad_total) AS Total_Nongrad_total
            FROM grade_student_delta
            GROUP BY Major_category
            HAVING Total_Grad_employed + Total_Nongrad_employed > 10000
            ORDER BY Total_Grad_employed + Total_Nongrad_employed DESC
        """

query_result=spark.sql(query_sample)

query_result.show()

+--------------------+----------------------+-------------------+---------------------+------------------------+----------------+-------------------+
|      Major_category|Total_Nongrad_employed|Total_Grad_employed|Total_Grad_unemployed|Total_Nongrad_unemployed|Total_Grad_total|Total_Nongrad_total|
+--------------------+----------------------+-------------------+---------------------+------------------------+----------------+-------------------+
|            Business|               7123852|            2124495|               101994|                  393222|         2718897|            9345634|
|           Education|               2659824|            2437166|                66938|                  111875|         3945300|            4488291|
|Humanities & Libe...|               2289696|            1986572|                85033|                  154239|         2825975|            3448921|
|         Engineering|               2483802|            1634563|                65073|             

In [0]:
# Write a SQL query to the file
with open("sample_query.sql", "w") as file:
    file.write("SELECT * FROM grade_student_delta")

# Read the query from the file and execute it
with open("sample_query.sql", "r") as file:
    sql_query = file.read()

query_result = spark.sql(sql_query)
query_result.show()


+----------+--------------------+--------------------+----------+----------------+-------------+-------------------------+---------------+----------------------+-----------+--------+--------+-------------+----------------+----------------------------+------------------+-------------------------+--------------+-----------+-----------+-----------+------------+----------+
|Major_code|               Major|      Major_category|Grad_total|Grad_sample_size|Grad_employed|Grad_full_time_year_round|Grad_unemployed|Grad_unemployment_rate|Grad_median|Grad_P25|Grad_P75|Nongrad_total|Nongrad_employed|Nongrad_full_time_year_round|Nongrad_unemployed|Nongrad_unemployment_rate|Nongrad_median|Nongrad_P25|Nongrad_P75| Grad_share|Grad_premium|STEM_major|
+----------+--------------------+--------------------+----------+----------------+-------------+-------------------------+---------------+----------------------+-----------+--------+--------+-------------+----------------+----------------------------+-----