In [0]:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col}
import org.apache.spark.sql.functions.udf
import java.text.SimpleDateFormat

In [1]:
val spark = SparkSession.builder().config("spark.submit.deployMode","client").enableHiveSupport().getOrCreate()

In [2]:
spark.sql("USE lr2947_nyu_edu")

In [3]:
spark.sql("""CREATE TABLE nyc_census
USING parquet
OPTIONS (
path '/user/lr2947_nyu_edu/project/Cleaned_population.parquet',
header 'true',
inferSchema 'true',
mode 'FAILFAST'
)""")


In [4]:
spark.sql("SELECT * FROM nyc_census").show(false)

In [5]:
spark.sql("""CREATE TABLE nyc_arrest
USING parquet
OPTIONS (
path '/user/lr2947_nyu_edu/project/Cleaned_Arrest.parquet',
header 'true',
inferSchema 'true',
mode 'FAILFAST'
)""")

In [6]:
spark.sql("SELECT * FROM nyc_arrest").show(20,false)

In [7]:
spark.sql("""CREATE TABLE nyc_hate_crimes
USING csv
OPTIONS (
path '/user/lr2947_nyu_edu/project/HateCrimeDataFinal/part-00000-a6899586-2874-453f-afc3-94cf7a843794-c000.csv',
header 'true',
inferSchema 'true',
mode 'FAILFAST'
)""")


In [8]:
spark.sql("""CREATE TABLE nyc_911_calls
USING csv
OPTIONS (
path '/user/lr2947_nyu_edu/project/NYC911CallsDataFinal/*',
header 'true',
inferSchema 'true',
mode 'FAILFAST'
)""")


In [9]:
spark.sql("""CREATE TABLE nyc_complaints
USING csv
OPTIONS (
path '/user/lr2947_nyu_edu/project/nyccomplaintfinal/*',
header 'true',
inferSchema 'true',
mode 'FAILFAST'
)""")

In [10]:
spark.sql("SELECT * FROM nyc_complaints").show(20,false)

In [11]:
spark.sql("SELECT * FROM nyc_911_calls").show(20,false)

In [12]:
spark.sql("SELECT * FROM nyc_hate_crimes").show(20,false)

In [13]:
val arrest = spark.sql("SELECT *, CONCAT(SUBSTRING(arrest_date, 1, 2), '/',SUBSTRING(arrest_date, 7, 4) ) AS AYEAR from nyc_arrest")

In [14]:
val arrestsYear  = arrest.groupBy("AYear").count().alias("arrest_count").sort(col("AYear"))
val arrestYear = arrestsYear.withColumnRenamed("count", "arrest_count")

In [15]:
val complaints = spark.sql("SELECT *, CONCAT(SUBSTRING(`complaint date`, 1, 2), '/',SUBSTRING(`complaint date`, 7, 4) ) AS CYEAR from nyc_complaints")

In [16]:
val complaintYear  = complaints.groupBy("CYear").count().sort(col("CYear"))
val complaintsYear = complaintYear.withColumnRenamed("count", "complaint_count")

In [17]:
val calls = spark.sql("SELECT *, CONCAT(SUBSTRING(create_date, 1, 2), '/',SUBSTRING(create_date, 7, 4) ) AS YEAR from nyc_911_calls")

In [18]:
val calYear  = calls.groupBy("Year").count().sort(col("Year"))
val callsYear = calYear.withColumnRenamed("count", "calls_count")

In [19]:
val year =  callsYear.join(complaintsYear,callsYear("Year") === complaintsYear("CYear"),"inner")
            .join(arrestYear,arrestYear("AYear") === complaintsYear("CYear"),"inner")

In [20]:
val yearFinal = year.select("Year", "calls_count", "complaint_count", "arrest_count")

In [21]:
yearFinal.write.option("header",true).format("csv").save("/user/lr2947_nyu_edu/project/yearwise/monthwise")

In [22]:
val Calls911 = spark.sql("SELECT DISP_TS_TRANSFORMED, ARRIVD_TS_TRANSFORMED, BORO_NM , CREATE_DATE FROM nyc_911_calls ")

In [23]:
val complaintsFinal = spark.sql("SELECT `BOROUGH NAME`, `NYPD PRETINCT`, SUSP_AGE_GROUP, SUSP_RACE, SUSP_SEX, VIC_AGE_GROUP, VIC_SEX, VIC_RACE FROM nyc_complaints")
.withColumnRenamed("BOROUGH NAME","BOROUGH_NAME").withColumnRenamed("NYPD PRETINCT","NYPD_PCT")

In [24]:
Calls911.write.option("header",true).format("csv").save("/user/lr2947_nyu_edu/project/yearwise/callsmin")

In [25]:
complaintsFinal.write.option("header",true).format("csv").save("/user/lr2947_nyu_edu/project/yearwise/complaintsFinalFinal")

In [26]:
//To group by borough and precincts

In [27]:

val abourough = arrest.withColumn("ABORO", when(col("ARREST_BORO") === "M","MANHATTAN")
      .when(col("ARREST_BORO") === "Q","QUEENS")
      .when(col("ARREST_BORO") === "B","BRONX")
      .when(col("ARREST_BORO") === "S","STATEN ISLAND")
      .when(col("ARREST_BORO") === "K","BROOKLYN")
      .otherwise(col("ARREST_BORO")))                

In [28]:
abourough.show(5,false)

In [29]:
val Aboroughs = abourough.select("ABORO").groupBy("ABORO").count()
val callBoroughs = calls.groupBy("BORO_NM").count().withColumnRenamed("count", "calls_count")
val complaintsBoroughs = complaints.groupBy("BOROUGH NAME").count().withColumnRenamed("count", "complaints_count")

In [30]:
Aboroughs.show(5,false)

In [31]:
val boroughs =  callBoroughs.join(complaintsBoroughs,callBoroughs("BORO_NM") === complaintsBoroughs("BOROUGH NAME"),"inner")
            .join(Aboroughs,Aboroughs("ABORO") === complaintsBoroughs("BOROUGH NAME"),"inner")

In [32]:
boroughs.show(5,false)

In [33]:
val finalBorough = boroughs.select("BORO_NM", "calls_count","complaints_count", "count" )

In [34]:
finalBorough.show(false)

In [35]:
finalBorough.write.option("header",true).format("csv").save("/user/lr2947_nyu_edu/project/new_insight/boroughs")
val complaints = spark.sql("SELECT *, SUBSTRING(`complaint date`, 7, 4)  AS OnlyCYEAR from nyc_complaints")

In [36]:
val arrest_year = spark.sql("SELECT *,SUBSTRING(arrest_date, 7, 4) AS OnlyAYEAR from nyc_arrest")
val complaints_year = spark.sql("SELECT *, SUBSTRING(`complaint date`, 7, 4)  AS OnlyCYEAR from nyc_complaints")


In [37]:
val arr_offense = arrest_year.select("OnlyAYEAR" , "OFNS_DESC", "ARREST_BORO").where(arrest_year("OnlyAYEAR") === "2018" || arrest_year("OnlyAYEAR") === "2019" || arrest_year("OnlyAYEAR") === "2020"|| arrest_year("OnlyAYEAR") === "2021")
val comp_offense = complaints_year.select("OnlyCYEAR", "OFFENSE DESC", "BOROUGH NAME").where(complaints_year("OnlyCYEAR") === "2018" || complaints_year("OnlyCYEAR") === "2019" || complaints_year("OnlyCYEAR") === "2020"|| complaints_year("OnlyCYEAR") === "2021")

In [38]:
arr_offense.show(false)

In [39]:
arr_offense.write.option("header",true).format("csv").save("/user/lr2947_nyu_edu/project/offense/arrest")
comp_offense.write.option("header",true).format("csv").save("/user/lr2947_nyu_edu/project/offense/complaints")
