<h1>Spark basics practice</h1>
<li>The code in the next cell extracts covid data from New York State's covid repository</li>
<li>The extracted data is stored in an RDD containing an Array of (String, String,Int,Int) matching (date, borough, positive cases, tests)  (the data is ordered by time</li>
<li>Use this RDD to answer the questions below</li>
<li>Note: The API for health.data.ny.gov returns 1000 lines for each request. My data starts on 2020-12-04 and ends on 2023-08-30. Your's may be slightly different depending on when you pull the data</li>

In [1]:
//DON'T CHANGE THIS CODE!!!!

val counties = Array("Kings","Queens","New+York","Suffolk","Bronx","Nassau","Westchester","Erie",
                     "Monroe","Richmond","Onondaga","Orange","Rockland","Albany","Dutchess",
                     "Saratoga","Oneida","Niagara","Broome","Ulster","Rensselaer","Schenectady",
                     "Chautauqua","Oswego","Jefferson","Ontario","St.+Lawrence","Tompkins",
                     "Putnam","Steuben","Wayne","Chemung","Sullivan","Clinton","Cattaraugus",
                     "Cayuga","Madison","Warren","Columbia","Livingston","Washington","Herkimer",
                     "Otsego","Genesee","Fulton","Montgomery","Greene","Tioga","Franklin","Chenango",
                     "Cortland","Allegany","Delaware","Wyoming","Orleans"
                     ,"Essex","Seneca","Schoharie","Lewis","Yates","Schuyler","Hamilton")

val base_url = "https://health.data.ny.gov/resource/xdss-u53e.json?County="
val urls = counties.map(a => base_url+a) //Makes a url for each county

//This gets the contents of the url
//results is an array with one entry per county
//the data for each county is in JSON format
val results = urls.map(u => scala.io.Source.fromURL(u).mkString) 

//Create an rdd (there is a lot of data)
//Reads the json and converts it into a spark dataframe (we'll do this later)
//and then converts the df into an rdd
//finally, extract the date, the county name, the new cases on that date and tests done
//on that date
val data_rdd = spark.read.json(sc.parallelize(results,32))
                    .rdd
                    .map(r => (r(5).toString.slice(0,10), 
                               r(0).toString,
                               r(4).toString.toInt,
                               r(7).toString.toInt))


Intitializing Scala interpreter ...

Spark Web UI available at http://192.168.7.189:4042
SparkContext available as 'sc' (version = 3.4.1, master = local[*], app id = local-1698077633628)
SparkSession available as 'spark'


counties: Array[String] = Array(Kings, Queens, New+York, Suffolk, Bronx, Nassau, Westchester, Erie, Monroe, Richmond, Onondaga, Orange, Rockland, Albany, Dutchess, Saratoga, Oneida, Niagara, Broome, Ulster, Rensselaer, Schenectady, Chautauqua, Oswego, Jefferson, Ontario, St.+Lawrence, Tompkins, Putnam, Steuben, Wayne, Chemung, Sullivan, Clinton, Cattaraugus, Cayuga, Madison, Warren, Columbia, Livingston, Washington, Herkimer, Otsego, Genesee, Fulton, Montgomery, Greene, Tioga, Franklin, Chenango, Cortland, Allegany, Delaware, Wyoming, Orleans, Essex, Seneca, Schoharie, Lewis, Yates, Schuyler, Hamilton)
base_url: String = https://health.data.ny.gov/resource/xdss-u53e.json?County=
urls: Array[String] = Array(https://health.data.ny.gov/resource/xdss-u53e.json?County=Kings, https://health.d...


In [2]:
val data_start_date = data_rdd.filter(t=>t._2=="Kings").map(_._1).collect.min
val data_end_date = data_rdd.filter(t=>t._2=="Kings").map(_._1).collect.max

data_start_date: String = 2020-12-04
data_end_date: String = 2023-08-30


<h1>Question 1</h1>
<li>Using <span style="color:blue">reduce</span> calculate the total number of cases and total number of tests in New York State</li>

In [3]:
val total_cases_tests = data_rdd.map(r=> (r._3,r._4))
                                .reduce((a,b) => (a._1+b._1,a._2+b._2))
// (6061104,111330258)
//Your's may be higher or lower but probably not very different

total_cases_tests: (Int, Int) = (6061104,111330258)


<h1>Question 2</h1>
Using <span style="color:blue">reduceByKey</span> calculate the number of cases and total number of tests by county


<pre>
cases_tests_by_county: Array[(String, (Int, Int))] = Array((Oneida,(64776,1294299)), (Tompkins,(26160,2353515)), (Chemung,(23954,419142)), (Schenectady,(41902,723594)), (Cattaraugus,(18425,256914)), (Greene,(10547,155326)), (Wyoming,(9503,132128)), (Columbia,(13210,205032)), (Chenango,(11492,205982)), (Ulster,(40786,753237)), (Clinton,(22917,369415)), (Wayne,(21395,337507)), (Herkimer,(16998,282726)), (Nassau,(497090,7525925)), (Seneca,(7596,124835)), (Lewis,(7070,92362)), (Broome,(54740,1015959)), (Erie,(247285,3575806)), (Livingston,(13948,242701)), (Bronx,(454349,8224338)), (Allegany,(9928,216499)), (Queens,(785941,13639767)), (Jefferson,(27446,383771)), (Orleans,(10148,140539)), (Putnam,(29271,440648)), (Rensselaer,(40735,757294)), (Ontario,(25574,420044)), (Suffolk,(514280,7630714)...
</pre>

In [4]:
val cases_tests_by_county =data_rdd.map(r => (r._2,(r._3,r._4))).reduceByKey((a,b) => (a._1 + b._1, a._2 + b._2)).collect

cases_tests_by_county: Array[(String, (Int, Int))] = Array((Oneida,(64776,1294299)), (Tompkins,(26160,2353515)), (Chemung,(23954,419142)), (Schenectady,(41902,723594)), (Cattaraugus,(18425,256914)), (Greene,(10547,155326)), (Wyoming,(9503,132128)), (Columbia,(13210,205032)), (Chenango,(11492,205982)), (Ulster,(40786,753237)), (Clinton,(22917,369415)), (Wayne,(21395,337507)), (Herkimer,(16998,282726)), (Nassau,(497090,7525925)), (Seneca,(7596,124835)), (Lewis,(7070,92362)), (Broome,(54740,1015959)), (Erie,(247285,3575806)), (Livingston,(13948,242701)), (Bronx,(454349,8224338)), (Allegany,(9928,216499)), (Queens,(785941,13639767)), (Jefferson,(27446,383771)), (Orleans,(10148,140539)), (Putnam,(29271,440648)), (Rensselaer,(40735,757294)), (Ontario,(25574,420044)), (Suffolk,(514280,7630714)...


<h1>Question 3</h1>
Using <span style="color:blue">reduceByKey</span> calculate the positivity by month for New York State. Then use <span style="color:blue">Math.round</span>, <span style="color:blue">sortBy</span>, and <span style="color:blue">collect</span> to display your results as a percentage with 2 decimal places. A sample of the expected result:
<pre>
(2020-12,5.73)
(2021-01,6.27)
(2021-02,3.66)
(2021-03,3.35)
(2021-04,2.69)
(2021-05,1.09)
(2021-06,0.42)
(2021-07,1.56)
(2021-08,3.15)
(2021-09,2.9)
(2021-10,2.31)
(2021-11,3.55)
(2021-12,10.54)
(2022-01,15.09)
(2022-02,2.93)
(2022-03,1.95)
(2022-04,5.45)
(2022-05,7.87)
(2022-06,5.88)
(2022-07,9.54)
(2022-08,7.08)
(2022-09,6.94)
(2022-10,6.82)
(2022-11,6.93)
(2022-12,8.07)
(2023-01,7.49)
(2023-02,4.63)
(2023-03,2.62)
(2023-04,1.92)
(2023-05,2.67)
(2023-06,4.89)
(2023-07,8.82)
(2023-08,14.12)
</pre>

In [5]:
val cases_tests_by_month = data_rdd.map(r => (r._1.slice(0,7),(r._3,r._4))).reduceByKey((a,b) => (a._1 + b._1, a._2 + b._2))//.sortBy(_._2._1)

cases_tests_by_month: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[22] at reduceByKey at <console>:24


In [6]:
cases_tests_by_month
    .map(t => (t._1,Math.round(t._2._1.toDouble/t._2._2*10000)/100.0))
    .sortBy(t=>t._1).collect.foreach(println)

(2020-12,5.73)
(2021-01,6.27)
(2021-02,3.66)
(2021-03,3.35)
(2021-04,2.69)
(2021-05,1.09)
(2021-06,0.42)
(2021-07,1.56)
(2021-08,3.15)
(2021-09,2.9)
(2021-10,2.31)
(2021-11,3.55)
(2021-12,10.54)
(2022-01,15.09)
(2022-02,2.93)
(2022-03,1.95)
(2022-04,5.45)
(2022-05,7.87)
(2022-06,5.88)
(2022-07,9.54)
(2022-08,7.08)
(2022-09,6.94)
(2022-10,6.82)
(2022-11,6.93)
(2022-12,8.07)
(2023-01,7.49)
(2023-02,4.63)
(2023-03,2.62)
(2023-04,1.92)
(2023-05,2.67)
(2023-06,4.89)
(2023-07,8.82)
(2023-08,14.12)


<h1>Question 4</h1>
Return an RDD of (date,county,positivity) where positivity is the percentage of tests that are positive. For this problem, you must use the Option case class to handle the case where the divisor is zero

<pre>
res0: Array[(String, String, Double)] = Array((2023-08-30,Kings,13.0), (2023-08-29,Kings,11.0), (2023-08-28,Kings,12.0), (2023-08-27,Kings,13.0), (2023-08-26,Kings,10.0), (2023-08-25,Kings,11.0), (2023-08-24,Kings,13.0), (2023-08-23,Kings,12.0), (2023-08-22,Kings,11.0), (2023-08-21,Kings,13.0), (2023-08-20,Kings,12.0), (2023-08-19,Kings,11.0), (2023-08-18,Kings,11.0), (2023-08-17,Kings,11.0), (2023-08-16,Kings,11.0), (2023-08-15,Kings,10.0), (2023-08-14,Kings,11.0), (2023-08-13,Kings,12.0), (2023-08-12,Kings,10.0), (2023-08-11,Kings,9.0), (2023-08-10,Kings,12.0), (2023-08-09,Kings,12.0), (2023-08-08,Kings...
</pre>

In [12]:
def divide(t: (String,String,Int,Int)): Option[(String,String,Double)] = {
    try {
        Some((t._1,t._2,100.0*t._3/t._4))
    } catch {
        case e: Exception => None
    }
}
val positivity = data_rdd.map(t => divide(t)).flatMap(e=>e)
positivity.collect

divide: (t: (String, String, Int, Int))Option[(String, String, Double)]
positivity: org.apache.spark.rdd.RDD[(String, String, Double)] = MapPartitionsRDD[39] at flatMap at <console>:33
res5: Array[(String, String, Double)] = Array((2023-08-30,Kings,13.480885311871228), (2023-08-29,Kings,11.209239130434783), (2023-08-28,Kings,12.693798449612403), (2023-08-27,Kings,13.846153846153847), (2023-08-26,Kings,10.393700787401574), (2023-08-25,Kings,11.889596602972398), (2023-08-24,Kings,13.73825018076645), (2023-08-23,Kings,12.794853466761973), (2023-08-22,Kings,11.842105263157896), (2023-08-21,Kings,13.132400430570506), (2023-08-20,Kings,12.35370611183355), (2023-08-19,Kings,11.458333333333334), (2023-08-18,Kings,11.594202898550725), (2023-08-17,Kings,11.892296185489903), (2023-08-16,Kings,11.6...


<h1>Question 5</h1>
Return the tuple (date,county,positivity) where the positivity was the highest (use <span style="color:blue">takeOrdered</span>)

<pre>
Array((2023-07-04,Broome,100.0))
</pre>

In [8]:
val highest_positive = positivity.takeOrdered(1)(Ordering[Double].on(x => -1*x._3))

highest_positive: Array[(String, String, Double)] = Array((2023-07-04,Broome,100.0))


In [9]:
data_rdd.filter(t=>t._2 == "Broome").filter(t=>t._3==t._4).collect

res2: Array[(String, String, Int, Int)] = Array((2023-07-04,Broome,2,2))
