# CPSC 4330 | HW3 | Spark

The file `College_2015_16.csv` (posted on Canvas) contains the following fields:

- `Unique ID`
- `Name`
- `City`
- `State`
- `Zip`
- `Admission rate`
- `Average SAT score`
- `Enrollment`
- `CostA`
- `CostP`

**The last two columns are the cost of public and private universities. If one is non-null, the other should be
null. If both are null, that's a missing value. If both are non-null, use either value** 

In [8]:
# Imports & Logger Config
# pyspark                   3.5.4              pyhd8ed1ab_0 

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, avg, desc
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

import pyspark

import json

import logging
import os

print("pyspark --v: " + pyspark.__version__)

COLLEGE_2015 = r'College_2015_16.csv'
COLLEGE_2017 = r'College_2017_18.csv'

# --------------------- CONFIG --------------------- #

# make sure the 'logs' directory exists
log_directory = "logs"
os.makedirs(log_directory, exist_ok=True)

def setup_logger(name, log_filename):
    log_file = os.path.join(log_directory, log_filename)
    logger = logging.getLogger(name)
    logger.setLevel(logging.DEBUG)

    # file handler
    file_handler = logging.FileHandler(log_file, mode='w')
    file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))

    # prevent duplicate handlers
    if not logger.handlers:
        logger.addHandler(file_handler)

    return logger


# separate loggers for each agent
spark_logger = setup_logger('current-spark-session (hw3_2.ipynb)', 'spark-hw3-2.log')
# --------------------- CONFIG --------------------- #

pyspark --v: 3.5.4


In [9]:
# Schema

# Line: 100654,Alabama A & M University,Normal,AL,35762,0.9027,929,4824,22886,NULL

# - Unique ID
# - Name
# - City
# - State
# - Zip
# - Admission rate
# - Average SAT score
# - Enrollment
# - CostA
# - CostP

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("name", StringType(), True),
  StructField("city", StringType(), True),
  StructField("state", StringType(), True),
  StructField("zip", StringType(), True),
  StructField("admissionRate", DoubleType(), True),
  StructField("averageSAT", IntegerType(), True),
  StructField("enrollment", IntegerType(), True),
  StructField("costA", IntegerType(), True),
  StructField("costP", IntegerType(), True)
])

In [10]:
# initialize spark session
spark = SparkSession.builder \
                    .appName("college-data-analysis") \
                    .master("local[*]") \
                    .getOrCreate()

# to see errors in the console
spark.sparkContext.setLogLevel("INFO")

# load data
file_path = "../" + COLLEGE_2015
df = spark.read.csv(file_path, header=False, schema=schema)

KeyboardInterrupt: 

## Question 1

Convert the lines in the file to a tuple of fields, and only keep these attributes: ID, name, state,
enrollment, and cost, where cost is either costA or costP as above. If enrollment cannot be
converted to an int, set it to null.

In [None]:
# filter the data
df_filtered = df.select(
    col("id"),
    col("name"),
    col("state"),
    when(col("enrollment").isNotNull(), col("enrollment")).otherwise(None).alias("enrollment"),
    when(col("costA").isNotNull(), col("costA")).otherwise(col("costP")).alias("cost")
)

## Question 2

Find how many records were filtered due to the invalid number of fields in the data (the file has 10 fields).

In [None]:
total_records = df.count()
valid_records = df_filtered.count()
filtered_records = total_records - valid_records

print("-- records filtered due to invalid fields:", json.dumps(filtered_records, indent=4))

## Question 3

Find how many records are there from the state of California?

In [None]:
ca_count = df_filtered.filter(col("state") == "CA").count()
print("-- number of records from CA:", json.dumps(ca_count, indent=4))

## Question 4

What percentage of the records have a non-null enrollment?

In [None]:
total_valid_enrollment = df_filtered.filter(col("enrollment").isNotNull()).count()
percentage_enrollment = (total_valid_enrollment / valid_records) * 100
print("-- percentage of records with non-null enrollment:", json.dumps(percentage_enrollment, indent=4))

## Question 5

What is the name and cost of the 5 most expensive universities?

In [None]:
df_filtered.orderBy(desc("cost")).select("name", "cost").show(5) #check for null

## Question 6

Find the number of universities in each state.

In [None]:
df_filtered.groupBy("state").count().show()

## Question 7

Find the total number of enrollments in each state.

In [None]:
df_filtered.groupBy("state").sum("enrollment").show()

## Question 8

Find the average enrollment for each state.

In [None]:
df_filtered.groupBy("state").agg(avg("enrollment")).show()

## Question 9

Another file “College_2017_18.csv” (posted on Canvas) has the college enrollment data of year
2017 - 2018. Write code to calculate percent of change in enrollment from year 2015 – 2016 to
the year 2017 – 2018

In [None]:
df_2017 = spark.read.csv("College_2017_18.csv", schema=schema, header=False)
df_2017_filtered = df_2017.select(
    col("id"),
    col("enrollment").alias("enrollment-2017")
)

# Join datasets on ID
df_joined = df_filtered.join(df_2017_filtered, "id", "inner")

# Compute percent change
df_change = df_joined.withColumn("percent-change", ((col("enrollment-2017") - col("enrollment")) / col("enrollment")) * 100)
df_change.select("id", "enrollment", "enrollment-2017", "percent-change").show()

