In [2]:
spark

res0: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@75e776b9


In [3]:
spark.version

res1: String = 2.3.1


# Comprehensive Dataset Load

In [4]:
val input = sc.textFile("Scorecard.csv").map(line =>line.split(","))

input: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at <console>:25


In [5]:
val header = sc.parallelize(input.take(1))

header: org.apache.spark.rdd.RDD[Array[String]] = ParallelCollectionRDD[3] at parallelize at <console>:27


In [None]:
header.map(_.mkString(",")).coalesce(1).saveAsTextFile("header")

In [6]:
val headerLoad = sc.textFile("header.csv")

headerLoad: org.apache.spark.rdd.RDD[String] = header.csv MapPartitionsRDD[5] at textFile at <console>:25


In [7]:
val headerList= headerLoad.flatMap(line => line.split(","))

headerList: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at flatMap at <console>:27


In [8]:
val headerIndex = headerList.zipWithIndex

headerIndex: org.apache.spark.rdd.RDD[(String, Long)] = ZippedWithIndexRDD[7] at zipWithIndex at <console>:29


In [9]:
val idxSat = headerIndex.lookup("SAT_AVG")(0).toInt

idxSat: Int = 60


# 2013 Dataset Load

In [10]:
val input2013 = sc.textFile("MERGED2013_PP.csv").map(line =>line.split(","))

input2013: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[12] at map at <console>:25


In [11]:
val header2013 = sc.parallelize(input2013.take(1))

header2013: org.apache.spark.rdd.RDD[Array[String]] = ParallelCollectionRDD[13] at parallelize at <console>:27


In [66]:
header2013.map(_.mkString(",")).coalesce(1).saveAsTextFile("header2013")

In [12]:
val headerLoad2013 = sc.textFile("header2013.csv")

headerLoad2013: org.apache.spark.rdd.RDD[String] = header2013.csv MapPartitionsRDD[15] at textFile at <console>:25


In [13]:
val headerList2013= headerLoad2013.flatMap(line => line.split(","))

headerList2013: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at flatMap at <console>:27


In [14]:
val headerIndex2013 = headerList2013.zipWithIndex

headerIndex2013: org.apache.spark.rdd.RDD[(String, Long)] = ZippedWithIndexRDD[17] at zipWithIndex at <console>:29


In [71]:
headerIndex2013.coalesce(1).saveAsTextFile("headerIndex2013")

## Count 

In [15]:
input.count()

res2: Long = 124700


In [16]:
input.map( x => x(1)).distinct().count()

res3: Long = 10708


In [17]:
val year = input.map( x => x(1730))

year: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[22] at map at <console>:27


In [None]:
year.take(1100).foreach(println)

It looks like there are a lot of blank lines in there early on in the dataset. 

In [18]:
val notBlankYear = input.filter(x => x(1730) != "")

notBlankYear: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[23] at filter at <console>:27


In [19]:
notBlankYear.count()

res4: Long = 24630


In [25]:
notBlankYear.map(x => x(1730)).take(20)

res9: Array[String] = Array(Year, 1996, 1996, 1996, 1996, 1996, 1996, 1996, 1996, 1996, 1996, 1996, 1997, 1997, 1997, 1997, 1997, 1997, 1997, 1997)


## The count of rows for the 2013 merged file. 

In [72]:
input2013.count()

res33: Long = 7805


# St Thomas RDD  
This RDD contains all of the St. Thomas results over the years

In [5]:
val ust = input.filter( x => x(4) contains "University of St Thomas").filter(x => x(5) contains "Saint Paul")

ust: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[5] at filter at <console>:27


In [6]:
val ust2013 = input.filter( x => x(4) contains "University of St Thomas").filter(x => x(5) contains "Saint Paul").filter( x => x(1730) contains "2013")

ust2013: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[8] at filter at <console>:27


In [7]:
val ustHeader = header.union(ust)

ustHeader: org.apache.spark.rdd.RDD[Array[String]] = UnionRDD[9] at union at <console>:31


# Create an index of the header 

Really had a tough time figuring out how to get the header loaded as just a string. Loading it from the original source results in type RDD[Array[String]], so I ended up using a CSV file that I outputed from the original input. Ended up just needing a **flatMap** to keep it in the correct format. 

# Minnesota Schools  
This rdd contains all of the Minnesota universities for a statewide comparison

In [12]:
val mnColleges = input.filter( x => x(5) contains "MN").filter( x => x(1730) contains "2013")

mnColleges: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[17] at filter at <console>:27


# Regional Schools  
This RDD contains schools from the upper-midwest (Minnesota, Wisconsin, North Dakota, South Dakota, Iowa, Nebraska). This RDD will be used for regional comparisons. 

In [13]:
val upMidWestColleges = input.filter( x => x(19) contains "Plains")

upMidWestColleges: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[18] at filter at <console>:27


# TextFile Saves

In [None]:
headerIndex.foreach {
    case(field, ind) => println(s"$field is $ind")
}

In [None]:
ustHeader.map(_.mkString(",")).coalesce(1).saveAsTextFile("ustHeader")

In [None]:
headerIndex.map(_.mkString(",")).coalesce(1).saveAsTextFile("headerIndex")

In [None]:
mnColleges.map(_.mkString(",")).coalesce(1).saveAsTextFile("Minnesota")

In [14]:
val regionalHeader = header.union(upMidWestColleges)

regionalHeader: org.apache.spark.rdd.RDD[Array[String]] = UnionRDD[19] at union at <console>:31


In [58]:
regionalHeader.map(_.mkString(",")).coalesce(1).saveAsTextFile("regionalHeader")