In [1]:
# Install pyspark
!pip install pyspark
#Install findspark
!pip install findspark



In [2]:
# Import findspark and initialize.
import findspark
findspark.init()

In [3]:
import pandas as pd
import numpy as np
import warnings
warnings.filterwarnings('ignore')
import os

# Import packages
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import StructType,StructField,StringType, DateType,IntegerType
from pyspark import SparkFiles

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [4]:
# spark.sparkContext.addFile(url)
data_df = spark.read.csv("/content/drive/MyDrive/Colab Notebooks/Project4_Group3/data/Clean_Most-Recent-Cohorts-Institution.csv", header=True, inferSchema=True)

# Show DataFrame
data_df.show()

+---+------+------+-------+-------+--------+-------------+--------------+-----------------+------------------+--------+---------------+--------------+--------------+--------------+--------------+
|_c0|UNITID|STABBR|HIGHDEG|CONTROL|COSTT4_A|TUITIONFEE_IN|TUITIONFEE_OUT|GRAD_DEBT_MDN10YR|GRAD_DEBT_MDN_SUPP|NPT4_PUB|MN_EARN_WNE_P10|MN_EARN_WNE_P6|MN_EARN_WNE_P7|MN_EARN_WNE_P8|MN_EARN_WNE_P9|
+---+------+------+-------+-------+--------+-------------+--------------+-----------------+------------------+--------+---------------+--------------+--------------+--------------+--------------+
|  0|100654|    AL|      4|      1|   21924|        10024|         18634| 328.651592227982|             31000|   13057|          35500|         28400|         29600|         30600|         33800|
|  1|100663|    AL|      4|      1|   26248|         8568|         20400| 236.417113118838|             22300|   16585|          48400|         39400|         40000|         44100|         44500|
|  3|100706|    AL| 

In [5]:
# spark.sparkContext.addFile(url)
field_df = spark.read.csv("/content/drive/MyDrive/Colab Notebooks/Project4_Group3/data/clean_Most-Recent-Cohorts-Field-of-Study.csv", header=True, inferSchema=True)

# Show DataFrame
field_df.show()

+---+------+--------------------+-------+-------+--------------------+----------------------+--------------------+---------------------+-----------------------+--------------------------+---------------------------+---------------+---------------+---------------+------------+
|_c0|UNITID|              INSTNM|CIPCODE|CREDLEV|             CIPDESC|DEBT_ALL_STGP_ANY_MEAN|DEBT_ALL_PP_ANY_MEAN|DEBT_ALL_PP_EVAL_MEAN|DEBT_ALL_STGP_EVAL_MEAN|DEBT_ALL_PP_ANY_MDN10YRPAY|DEBT_ALL_PP_EVAL_MDN10YRPAY|EARN_MDN_HI_1YR|EARN_MDN_HI_2YR|EARN_NE_MDN_3YR|EARN_MDN_4YR|
+---+------+--------------------+-------+-------+--------------------+----------------------+--------------------+---------------------+-----------------------+--------------------------+---------------------------+---------------+---------------+---------------+------------+
| 66|100663|University of Ala...|    901|      3|Communication and...|                 26312|             32247.0|              34950.0|                22131.0|         

In [6]:
data_df.createOrReplaceTempView('table1')
field_df.createOrReplaceTempView('table2')

In [7]:
result_df = spark.sql("SELECT * FROM table1 JOIN table2 ON table1.UNITID = table2.UNITID")
result_df.show()


+---+------+------+-------+-------+--------+-------------+--------------+-----------------+------------------+--------+---------------+--------------+--------------+--------------+--------------+---+------+--------------------+-------+-------+--------------------+----------------------+--------------------+---------------------+-----------------------+--------------------------+---------------------------+---------------+---------------+---------------+------------+
|_c0|UNITID|STABBR|HIGHDEG|CONTROL|COSTT4_A|TUITIONFEE_IN|TUITIONFEE_OUT|GRAD_DEBT_MDN10YR|GRAD_DEBT_MDN_SUPP|NPT4_PUB|MN_EARN_WNE_P10|MN_EARN_WNE_P6|MN_EARN_WNE_P7|MN_EARN_WNE_P8|MN_EARN_WNE_P9|_c0|UNITID|              INSTNM|CIPCODE|CREDLEV|             CIPDESC|DEBT_ALL_STGP_ANY_MEAN|DEBT_ALL_PP_ANY_MEAN|DEBT_ALL_PP_EVAL_MEAN|DEBT_ALL_STGP_EVAL_MEAN|DEBT_ALL_PP_ANY_MDN10YRPAY|DEBT_ALL_PP_EVAL_MDN10YRPAY|EARN_MDN_HI_1YR|EARN_MDN_HI_2YR|EARN_NE_MDN_3YR|EARN_MDN_4YR|
+---+------+------+-------+-------+--------+-------------+

In [8]:
result_df.createOrReplaceTempView('colleges')

In [9]:
# Get the number of degrees
spark.sql("""
  SELECT
    CIPDESC,
    count(*) AS number_of_degress
  FROM colleges
  GROUP by CIPDESC
  """).show(30)

+--------------------+-----------------+
|             CIPDESC|number_of_degress|
+--------------------+-----------------+
|         Philosophy.|                1|
|Construction Mana...|                1|
|Computer and Info...|               63|
|  Cognitive Science.|                2|
|Family and Consum...|                7|
|English Language ...|               75|
|Linguistic, Compa...|                2|
|        Criminology.|               24|
|Fine and Studio A...|               40|
|Ecology, Evolutio...|                3|
|Natural Resources...|               33|
|Computer Engineer...|                7|
|          Economics.|               53|
|Radio, Television...|               22|
|Computer/Informat...|                5|
| Education, General.|                5|
|Mental and Social...|                8|
|Agricultural Busi...|                9|
|Ethnic, Cultural ...|                7|
|Mechanical Engine...|                5|
|Marine Transporta...|                3|
|Accounting and 

In [10]:
spark.sql("select count(distinct(CIPDESC)) from colleges").show()

+-----------------------+
|count(DISTINCT CIPDESC)|
+-----------------------+
|                    163|
+-----------------------+



In [11]:
# Get the number of distinct colleges
spark.sql("select count(distinct(INSTNM)) from colleges").show()

+----------------------+
|count(DISTINCT INSTNM)|
+----------------------+
|                   417|
+----------------------+



In [12]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

# Ensure you have your DataFrame, let's call it result_df
# result_df = ...

# Convert the categorical column to a numerical index
indexer = StringIndexer(inputCol="CIPDESC", outputCol="CIPDESC_index")
indexed = indexer.fit(result_df).transform(result_df)


# One-hot encode the numerical index
encoder = OneHotEncoder(inputCol="CIPDESC_index", outputCol="CIPDESC_vec")
encoded = encoder.fit(indexed).transform(indexed)

# Assemble features into a single vector column
assembler = VectorAssembler(inputCols=["CIPDESC_vec", "COSTT4_A"], outputCol="features")
final_result = assembler.transform(encoded)
final_pd = final_result.toPandas()

IllegalArgumentException: ignored

In [None]:
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split

# Assuming final_result is the DataFrame after all transformations
# and 'income' is the target column
X = final_pd.drop("MN_EARN_WNE_P10", axis=1)
y = final_pd["MN_EARN_WNE_P10"]


# Split the data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Initialize and train the model
rf_regressor = RandomForestRegressor(n_estimators=100, random_state=42)
rf_regressor.fit(X_train, y_train)

# Predict on the test data
y_pred = rf_regressor.predict(X_test)

# Evaluate the model
mse = mean_squared_error(y_test, y_pred)
rmse = mse ** 0.5
print(f"Root Mean Squared Error: {rmse}")

