# Bonus

## Description

### Data
[Chicago Crime dataset](https://data.cityofchicago.org/Public-Safety/Crimes-2001-to-present/ijzp-q8t2) - about 6 million reported records, 1.45G in size

### Format
One text file consisting of lines of records.

1. ID
+ Case number
+ Date: date & time
+ Block: text
+ IUCR
+ Primary type: text
+ Description: text
+ Location description: text
+ Arrest: boolean
+ Domestic
+ Beat
+ District
+ Ward
+ Community area
+ FBI code
+ X coordinate: numeric
+ Y coordinate: numeric
+ Year
+ Updated on
+ Latitude: numeric
+ Longitude: numeric
+ Location

### Task
3 subtasks:
+ (30pt) For the attributes ‘Primary type’ and ‘Location description’, output the list of each value and the corresponding frequency count, sorted in descending order of the count, respectively.
+ (30pt) Output the most frequently occurred ‘Primary type‘ for each possible value of ‘Location description’, sorted in descending order of the frequency count.
+ (40pt) Output the most frequently occurred street name in the attribute ‘Block‘ for each ‘Primary type’, sorted in descending order of the frequency count. (You should remove the numbers in the ‘Block’ address of a street/avenue/boulevard)
+ (Bonus) From the attribute ‘Date’, extract the time in hours and output the most frequently occurred hour for each ‘Primary type’ and ‘Location description’, sorted in descending order of the frequency count, respectively.

### Output Format
1. two sorted lists of (value, count)
    + Sorted list of ‘Primary type’
    + Sorted list of ‘Location description’
1. n sorted lists of ‘Primary type’ for each ‘Location description’
    + n: # of possible values for ‘Location description’
1. n sorted list of street names for each ‘Primary type’
    + n: # of possible values for ‘Primary type’
1. two types of sorted lists:
    + Sorted lists of hours for each possible ‘Primary type’
    + Sorted lists of hours for each possible ‘Location description’

## Implementation

In [1]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.hadoop.fs._
import org.apache.hadoop.conf.Configuration

import java.io.{File,PrintWriter}

  def printSample(writer: Any, data: Any, title: String = "", format: String = ""){
    println("\n"+title+" Data Sample: " + format)
    println(data+"\n")
  }

  def printSpark(writer: Any, spark: SparkSession): Unit = {
    println("Spark Entity:       " + spark)
    println("Spark version:      " + spark.version)
    println("Spark master:       " + spark.sparkContext.master)
    println("Running 'locally'?: " + spark.sparkContext.isLocal)
    println("")
  }

  def outputWriter(fileString: String): PrintWriter ={
    val outputPath = new Path(fileString)
    val outputStream = outputPath.getFileSystem(new Configuration()).create(outputPath);
    new PrintWriter(outputStream)
  }

  def getFile(fileString: String): Array[String] ={
    val inputPath = new Path(fileString)
    val inputBuffer = scala.collection.mutable.ArrayBuffer.empty[String]
    val iterator = inputPath.getFileSystem(new Configuration()).listFiles(inputPath, false)
    while(iterator.hasNext()){
        val fileStatus = iterator.next()
        if(fileStatus.isFile()){
          inputBuffer += fileStatus.getPath().toString()
        }
    }
    inputBuffer.toArray
  }


val writer = null
printSpark(writer, spark)

Spark Entity:       org.apache.spark.sql.SparkSession@17827ded
Spark version:      2.3.0
Spark master:       local[*]
Running 'locally'?: true



printSample: (writer: Any, data: Any, title: String, format: String)Unit
printSpark: (writer: Any, spark: org.apache.spark.sql.SparkSession)Unit
outputWriter: (fileString: String)java.io.PrintWriter
getFile: (fileString: String)Array[String]
writer: Null = null


In [2]:
val data = spark.sparkContext.textFile("./data/data.csv")
data.take(2).foreach(println)

ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location
10000092,HY189866,03/18/2015 07:44:00 PM,047XX W OHIO ST,041A,BATTERY,AGGRAVATED: HANDGUN,STREET,false,false,1111,011,28,25,04B,1144606,1903566,2015,02/10/2018 03:50:01 PM,41.891398861,-87.744384567,"(41.891398861, -87.744384567)"


data = ./data/data.csv MapPartitionsRDD[1] at textFile at <console>:35


./data/data.csv MapPartitionsRDD[1] at textFile at <console>:35

In [3]:
// Remove Header
val header = data.first()
val rows = data.filter(l => l!=header)
println(rows.first())
println("\nCount: "+rows.count+"\n")

10000092,HY189866,03/18/2015 07:44:00 PM,047XX W OHIO ST,041A,BATTERY,AGGRAVATED: HANDGUN,STREET,false,false,1111,011,28,25,04B,1144606,1903566,2015,02/10/2018 03:50:01 PM,41.891398861,-87.744384567,"(41.891398861, -87.744384567)"

Count: 6614026



header = ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location
rows = MapPartitionsRDD[2] at filter at <console>:39


MapPartitionsRDD[2] at filter at <console>:39

In [4]:
import scala.collection.mutable.ListBuffer
val regex = "\\d{2}\\/\\d{2}\\/\\d{4}\\s(\\d{2}):\\d{2}:\\d{2}\\s(\\wM)".r
val flattenData = spark.sparkContext.parallelize(rows.takeSample(false, 300, System.nanoTime.toInt)).
    map{ dataString =>
        val tuple = dataString.split(",").toSeq       
        var list =  ListBuffer[String]()
        for(m <- regex.findAllIn(tuple(2)).matchData;
          e <- m.subgroups)
          if(e!=null) list+=e
        (tuple(5), tuple(7), tuple(3).split("^\\w+\\s*\\w*\\s*")(1), list.toSeq.mkString(""))
    }
printSample(writer, flattenData.take(10).mkString("\n"), "Flatten Data Sample", "( Primary Type, Location Description, Street, Date)\n")


Flatten Data Sample Data Sample: ( Primary Type, Location Description, Street, Date)

(OTHER OFFENSE,VEHICLE NON-COMMERCIAL,LOREL AVE,01AM)
(THEFT,RESIDENTIAL YARD (FRONT/BACK),WELLS ST,04PM)
(BATTERY,APARTMENT,VINCENNES AVE,12AM)
(DECEPTIVE PRACTICE,BANK,KARLOV AVE,07AM)
(THEFT,RESIDENCE,LAWNDALE AVE,04PM)
(OTHER OFFENSE,RESIDENCE,HENDERSON ST,05PM)
(MOTOR VEHICLE THEFT,GAS STATION,ADDISON ST,04AM)
(BURGLARY,ABANDONED BUILDING,103RD PL,08PM)
(BURGLARY,APARTMENT,WASHTENAW AVE,11AM)
(CRIM SEXUAL ASSAULT,APARTMENT,63RD ST,03PM)



regex = \d{2}\/\d{2}\/\d{4}\s(\d{2}):\d{2}:\d{2}\s(\wM)
flattenData = MapPartitionsRDD[5] at map at <console>:46


MapPartitionsRDD[5] at map at <console>:46

### Task 1 - Frequency Count of ‘Primary type’ and ‘Location description’

In [5]:
val primeCount = flattenData.map(tuple => (tuple._1, 1)).reduceByKey{(i, j) => i+j}.sortBy{case(prim, count) => -count}
printSample(writer, primeCount.take(5).mkString("\n"), "Primary Count Sample", "( Primary Type, Count)")
primeCount.saveAsTextFile("./output/task1/primeCount")


Primary Count Sample Data Sample: ( Primary Type, Count)
(THEFT,68)
(BATTERY,55)
(NARCOTICS,37)
(CRIMINAL DAMAGE,27)
(BURGLARY,23)



primeCount = MapPartitionsRDD[12] at sortBy at <console>:47


MapPartitionsRDD[12] at sortBy at <console>:47

In [6]:
val locCount = flattenData.map(tuple => (tuple._2, 1)).reduceByKey{(i, j) => i+j}.sortBy{case(loc, count) => -count}
printSample(writer, locCount.take(5).mkString("\n"), "Location Count Sample", "( Location Description, Count)\n")
primeCount.saveAsTextFile("./output/task1/locCount")


Location Count Sample Data Sample: ( Location Description, Count)

(STREET,87)
(RESIDENCE,48)
(APARTMENT,30)
(SIDEWALK,27)
(OTHER,13)



locCount = MapPartitionsRDD[20] at sortBy at <console>:49


MapPartitionsRDD[20] at sortBy at <console>:49

### Task 2 - Primary Type Frequency Count per Location Type

In [7]:
val locGroup = flattenData.map(tuple => ((tuple._2, tuple._1), 1)).
                reduceByKey{(i, j) => i+j}.sortBy{case((loc, prime), count) => -count}.
                map{
                  tuple => (tuple._1._1, (tuple._1._2, tuple._2))  
                }.groupByKey
printSample(writer, locGroup.take(3).mkString("\n"), "Location Group Sample", "( Location Description, [(Primary Type, Count)])\n")


Location Group Sample Data Sample: ( Location Description, [(Primary Type, Count)])

(ABANDONED BUILDING,CompactBuffer((BURGLARY,1)))
(GOVERNMENT BUILDING/PROPERTY,CompactBuffer((BURGLARY,1)))
(ALLEY,CompactBuffer((ROBBERY,2), (BATTERY,1), (CRIMINAL TRESPASS,1), (NARCOTICS,1), (CRIMINAL DAMAGE,1)))



locGroup = ShuffledRDD[30] at groupByKey at <console>:51


ShuffledRDD[30] at groupByKey at <console>:51

In [8]:
locGroup.foreach{
    case(loc, vect) =>
    var newLoc = loc.split("\\W+").mkString("_")
    val writer = outputWriter("./output/task2/"+newLoc+".csv")
    vect.foreach{
        vectString => writer.println(vectString)
    }
    writer.close()
}

### Task 3 - Street Name Frequency Count per Primary Type

In [9]:
val primeGroup = flattenData.map{
                    tuple => 
                    ( ( tuple._1, tuple._3 ), 1)
                }.
                reduceByKey{(i, j) => i+j}.sortBy{case((prime, loc), count) => -count}.
                map{
                  tuple => (tuple._1._1, (tuple._1._2, tuple._2))  
                }.groupByKey
printSample(writer, primeGroup.take(3).mkString("\n"), "Primary Group Sample", "( Primary Type, [(Street, Count)])\n")


Primary Group Sample Data Sample: ( Primary Type, [(Street, Count)])

(CRIM SEXUAL ASSAULT,CompactBuffer((63RD ST,1)))
(WEAPONS VIOLATION,CompactBuffer((EAST END AVE,1), (MUSKEGON AVE,1), (SOUTH CHICAGO AVE,1), (RIDGELAND AVE,1), (FLOURNOY ST,1)))
(PROSTITUTION,CompactBuffer((CICERO AVE,1), (CHICAGO AVE,1), (KOSTNER AVE,1), (KEATING AVE,1)))



primeGroup = ShuffledRDD[39] at groupByKey at <console>:54


ShuffledRDD[39] at groupByKey at <console>:54

In [10]:
primeGroup.foreach{
    case(prime, vect) =>
    var newPrime = prime.split("\\W+").mkString("_")
    val writer = outputWriter("./output/task3/"+newPrime+".csv")
    vect.foreach{
        vectString => writer.println(vectString)
    }
    writer.close()
}

### Bonus - Hour Frequency Count per Primary Type and Location Type 

In [14]:
val primeHours = flattenData.map{
                    tuple => 
                    ( ( tuple._1,  tuple._4), 1)
                }.
                reduceByKey{(i, j) => i+j}.sortBy{case((prime, hours), count) => -count}.
                map{
                  tuple => (tuple._1._1, (tuple._1._2, tuple._2))  
                }.groupByKey
printSample(writer, primeHours.take(3).mkString("\n"), "Primary Hours Sample", "( Primary Type, [(Hours, Count)])\n")

primeHours.foreach{
    case(prime, vect) =>
    var newPrime = prime.split("\\W+").mkString("_")
    val writer = outputWriter("./output/task4-prime/"+newPrime+".csv")
    vect.foreach{
        vectString => writer.println(vectString)
    }
    writer.close()
}


Primary Hours Sample Data Sample: ( Primary Type, [(Hours, Count)])

(ROBBERY,CompactBuffer((11PM,2), (01AM,1), (10AM,1), (03AM,1), (12AM,1), (06PM,1)))
(CONCEALED CARRY LICENSE VIOLATION,CompactBuffer((10PM,1)))
(GAMBLING,CompactBuffer((08PM,1)))



primeHours = ShuffledRDD[48] at groupByKey at <console>:55


ShuffledRDD[48] at groupByKey at <console>:55

In [15]:
val locHours = flattenData.map{
                    tuple => 
                    ( ( tuple._2,  tuple._4), 1)
                }.
                reduceByKey{(i, j) => i+j}.sortBy{case((loc, hours), count) => -count}.
                map{
                  tuple => (tuple._1._1, (tuple._1._2, tuple._2))  
                }.groupByKey
printSample(writer, locHours.take(3).mkString("\n"), "Location Hours Sample", "( Location Type, [(Hours, Count)])\n")

locHours.foreach{
    case(loc, vect) =>
    var newLoc = loc.split("\\W+").mkString("_")
    val writer = outputWriter("./output/task4-loc/"+newLoc+".csv")
    vect.foreach{
        vectString => writer.println(vectString)
    }
    writer.close()
}


Location Hours Sample Data Sample: ( Location Type, [(Hours, Count)])

(ABANDONED BUILDING,CompactBuffer((08PM,1)))
(GOVERNMENT BUILDING/PROPERTY,CompactBuffer((05PM,1)))
(ALLEY,CompactBuffer((11PM,2), (01AM,1), (09PM,1), (07PM,1), (10AM,1)))



locHours = ShuffledRDD[57] at groupByKey at <console>:55


ShuffledRDD[57] at groupByKey at <console>:55

103820004 Michael Fu