### Imports and Create Spark Session

In [18]:
from pyspark.sql import SparkSession
from datetime import datetime
from pyspark.storagelevel import StorageLevel

spark = SparkSession.builder.appName("NYCCrimesAnalysisNoCache").getOrCreate()

### Read CSV File into Spark DataFrame 

####Note the limit of the number of rows, when you go to cache this the local machines resources might not be able to handle it

In [19]:
nycrimes = spark.read.option("header", "true").option("inferSchema", "true")\
    .csv(r"C:\Users\rich\Data\NYCCrimeData\NYPD_Complaint_Data_Historic.csv").limit(2000000)

### Cache the DF

In [None]:
#nycrimes.cache()

### Print DF Metadata

In [20]:
print("######Here is our inferred schema:")
nycrimes.printSchema()

print("######First 5 rows")
nycrimes.show(5,truncate=False)

print("#####Row count")
print(nycrimes.count())

######Here is our inferred schema:
root
 |-- CMPLNT_NUM: integer (nullable = true)
 |-- CMPLNT_FR_DT: string (nullable = true)
 |-- CMPLNT_FR_TM: string (nullable = true)
 |-- CMPLNT_TO_DT: string (nullable = true)
 |-- CMPLNT_TO_TM: string (nullable = true)
 |-- ADDR_PCT_CD: integer (nullable = true)
 |-- RPT_DT: string (nullable = true)
 |-- KY_CD: integer (nullable = true)
 |-- OFNS_DESC: string (nullable = true)
 |-- PD_CD: integer (nullable = true)
 |-- PD_DESC: string (nullable = true)
 |-- CRM_ATPT_CPTD_CD: string (nullable = true)
 |-- LAW_CAT_CD: string (nullable = true)
 |-- BORO_NM: string (nullable = true)
 |-- LOC_OF_OCCUR_DESC: string (nullable = true)
 |-- PREM_TYP_DESC: string (nullable = true)
 |-- JURIS_DESC: string (nullable = true)
 |-- JURISDICTION_CODE: integer (nullable = true)
 |-- PARKS_NM: string (nullable = true)
 |-- HADEVELOPT: string (nullable = true)
 |-- HOUSING_PSA: string (nullable = true)
 |-- X_COORD_CD: integer (nullable = true)
 |-- Y_COORD_CD: int

### Create new DF by Selecting 2 cols, Fire Spark up to Group By,Agg those cols

In [21]:
start = datetime.now()
crimesByBoroVictimGender = nycrimes.select("BORO_NM","VIC_SEX")
aggCrimes = crimesByBoroVictimGender.groupBy("BORO_NM","VIC_SEX").count().orderBy("BORO_NM")
aggCrimes.show()
end = datetime.now()
print("###### Elapsed Time:" )
print(end-start)

+---------+-------+------+
|  BORO_NM|VIC_SEX| count|
+---------+-------+------+
|     null|      U|     2|
|     null|      M|  2211|
|     null|      E|   483|
|     null|      D|   308|
|     null|      F|  1327|
|     null|   null|     1|
|    BRONX|      F|169270|
|    BRONX|      D| 32012|
|    BRONX|   null|    40|
|    BRONX|      E|109824|
|    BRONX|      M|121528|
| BROOKLYN|      M|201522|
| BROOKLYN|      D| 56710|
| BROOKLYN|      F|240683|
| BROOKLYN|      E| 94444|
| BROOKLYN|   null|    53|
|MANHATTAN|      M|140364|
|MANHATTAN|      E| 87469|
|MANHATTAN|      F|163590|
|MANHATTAN|      D| 88106|
+---------+-------+------+
only showing top 20 rows

###### Elapsed Time:
0:00:08.411387


## Important!!! Remove the persisted object!

In [None]:
#nycrimes.unpersist()