# Install Spark

In [None]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

0% [Working]            Hit:1 http://security.ubuntu.com/ubuntu bionic-security InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [Waiting for headers] [Co                                                                               Hit:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [Connecting to ppa.launch0% [1 InRelease gpgv 88.7 kB] [Connecting to archive.ubuntu.com (91.189.88.142)                                                                               Ign:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [1 InRelease gpgv 88.7 kB] [Waiting for headers] [Connecting to ppa.launchpa                                                                               Hit:4 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [1 InRelease gpgv 88.7 kB] [Waiting for headers] [Waiting for headers] [Wait                                       

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [None]:
!ls

reported-crimes.csv	   spark-2.4.5-bin-hadoop2.7.tgz
sample_data		   spark-2.4.5-bin-hadoop2.7.tgz.1
spark-2.4.5-bin-hadoop2.7


In [None]:
import findspark
findspark.init()
from pyspark import SparkContext

sc = SparkContext.getOrCreate()
sc

In [None]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate() 
spark

# Loading a Dataset

In [None]:
# Downloading and preprocessing Chicago's Reported Crime Data
!wget https://data.cityofchicago.org/api/views/w98m-zvie/rows.csv?accessType=DOWNLOAD

--2021-03-28 03:21:10--  https://data.cityofchicago.org/api/views/w98m-zvie/rows.csv?accessType=DOWNLOAD
Resolving data.cityofchicago.org (data.cityofchicago.org)... 52.206.140.205, 52.206.68.26, 52.206.140.199
Connecting to data.cityofchicago.org (data.cityofchicago.org)|52.206.140.205|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/csv]
Saving to: ‘rows.csv?accessType=DOWNLOAD’

rows.csv?accessType     [         <=>        ]  59.27M  3.27MB/s    in 20s     

2021-03-28 03:21:31 (2.93 MB/s) - ‘rows.csv?accessType=DOWNLOAD’ saved [62151310]



- **Where** is the above data stored in the first place?
- How big is the csv file?

In [None]:
!ls

 reported-crimes.csv		 spark-2.4.5-bin-hadoop2.7
'rows.csv?accessType=DOWNLOAD'	 spark-2.4.5-bin-hadoop2.7.tgz
 sample_data			 spark-2.4.5-bin-hadoop2.7.tgz.1


In [None]:
#Renaming the downloaded file
!mv rows.csv?accessType=DOWNLOAD reported-crimes.csv

In [None]:
!ls

reported-crimes.csv	   spark-2.4.5-bin-hadoop2.7.tgz
sample_data		   spark-2.4.5-bin-hadoop2.7.tgz.1
spark-2.4.5-bin-hadoop2.7


>Alternatively, you can enable spark.sql.repl.eagerEval.enabled configuration for the eager evaluation of PySpark DataFrame in notebooks such as Jupyter. The number of rows to show can be controlled via spark.sql.repl.eagerEval.maxNumRows configuration.

In [None]:
df = spark.read.csv("reported-crimes.csv", header=True)
#setting eager evaluation
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
df.show(5)

+--------+-----------+--------------------+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|      ID|Case Number|                Date|               Block|IUCR|      Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|    Latitude|    Longitude|            Location|
+--------+-----------+--------------------+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|11864018|   JC476123|09/24/2019 08:00:...|022XX S MICHIGAN AVE|1154|DECEPTIVE PRACTICE|FINANCIAL IDENTIT...|COMMERCIAL / BUSI...|

# Using the DataFrame API

## Multiple options to view df
- `take(number_of_rows)`:  get the specified number of rows
- `df.collect()`:  get **ALL** data from dataframe
    - Can crash the driver node
- `df.show(first_N_rows)`:  works like head for specified N rows
    - Can truncate values, but there's an arg to turn that off

## DataFrame Schemas

In [None]:
df.dtypes

[('ID', 'string'),
 ('Case Number', 'string'),
 ('Date', 'string'),
 ('Block', 'string'),
 ('IUCR', 'string'),
 ('Primary Type', 'string'),
 ('Description', 'string'),
 ('Location Description', 'string'),
 ('Arrest', 'string'),
 ('Domestic', 'string'),
 ('Beat', 'string'),
 ('District', 'string'),
 ('Ward', 'string'),
 ('Community Area', 'string'),
 ('FBI Code', 'string'),
 ('X Coordinate', 'string'),
 ('Y Coordinate', 'string'),
 ('Year', 'string'),
 ('Updated On', 'string'),
 ('Latitude', 'string'),
 ('Longitude', 'string'),
 ('Location', 'string')]

In [None]:
df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: string (nullable = true)
 |-- Domestic: string (nullable = true)
 |-- Beat: string (nullable = true)
 |-- District: string (nullable = true)
 |-- Ward: string (nullable = true)
 |-- Community Area: string (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- X Coordinate: string (nullable = true)
 |-- Y Coordinate: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Updated On: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Location: string (nullable = true)



In [None]:
df.columns

['ID',
 'Case Number',
 'Date',
 'Block',
 'IUCR',
 'Primary Type',
 'Description',
 'Location Description',
 'Arrest',
 'Domestic',
 'Beat',
 'District',
 'Ward',
 'Community Area',
 'FBI Code',
 'X Coordinate',
 'Y Coordinate',
 'Year',
 'Updated On',
 'Latitude',
 'Longitude',
 'Location']

## User Defined Schema

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BooleanType, DoubleType, IntegerType

After viewing the dataframe, we would decide the best data type for each column. Below, we group the column name and the pyspark data type together in a tuple.

In [None]:
labels = [
     ('ID',StringType()),
     ('Case Number',StringType()),
     ('Date',TimestampType()),
     ('Block',StringType()),
     ('IUCR',StringType()),
     ('Primary Type',StringType()),
     ('Description',StringType()),
     ('Location Description',StringType()),
     ('Arrest',StringType()),
     ('Domestic',BooleanType()),
     ('Beat',StringType()),
     ('District',StringType()),
     ('Ward',StringType()),
     ('Community Area',StringType()),
     ('FBI Code',StringType()),
     ('X Coordinate',StringType()),
     ('Y Coordinate',StringType()),
     ('Year',IntegerType()),
     ('Updated On',StringType()),
     ('Latitude',DoubleType()),
     ('Longitude',DoubleType()),
     ('Location',StringType()),
     ('Historical Wards 2003-2015',StringType()),
     ('Zip Codes',StringType()),
     ('Community Areas',StringType()),
     ('Census Tracts',StringType()),
     ('Wards',StringType()),
     ('Boundaries - ZIP Codes',StringType()),
     ('Police Districts',StringType()),
     ('Police Beats',StringType())
]

Now that the column names and data types are paired up, we use the `StructType` to define a new schema.

`StructType` takes a `list` of `StructField` objects.

We specify beforehand that the value might take on a NULL value by setting `nullable=True` in the `StructField` constructor.

In [None]:
schema = StructType([StructField(x[0], x[1], True) for x in labels])

In [None]:
#give the csv our hand crafted schema
df = spark.read.csv("reported-crimes.csv", schema=schema)
df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: string (nullable = true)
 |-- Domestic: boolean (nullable = true)
 |-- Beat: string (nullable = true)
 |-- District: string (nullable = true)
 |-- Ward: string (nullable = true)
 |-- Community Area: string (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- X Coordinate: string (nullable = true)
 |-- Y Coordinate: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Updated On: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- Historical Wards 2003-2015: string (nullable = true)
 |-- Zip Codes: string (nullable = true)
 |-

~~Compare this schema with what we saw before.~~

~~For example, *Date* values are timestamps instead of regular strings.~~

Somehow, the datatypes assigned were incorrect. Let's see how in the next section.

In [None]:
df.show()

+----+-----------+----+-----+----+------------+-----------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------+--------+---------+--------+--------------------------+---------+---------------+-------------+-----+----------------------+----------------+------------+
|  ID|Case Number|Date|Block|IUCR|Primary Type|Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|Updated On|Latitude|Longitude|Location|Historical Wards 2003-2015|Zip Codes|Community Areas|Census Tracts|Wards|Boundaries - ZIP Codes|Police Districts|Police Beats|
+----+-----------+----+-----+----+------------+-----------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------+--------+---------+--------+--------------------------+---------+---------------+-------------+-----+----------------------+----------------+------

In [None]:
#these functions allow us to manually change the date dtype
from pyspark.sql.functions import col, to_timestamp

In [None]:
#transform the strings in the Date column according to this format
df = spark.read.csv("reported-crimes.csv", header=True).withColumn("Date", to_timestamp(col("Date"), "MM/dd/yyyy hh:mm:ss a"))
#view the results; do not truncate any values
df.show(5, False)

+--------+-----------+-------------------+--------------------+----+------------------+---------------------------------------+----------------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------------------+------------+-------------+-----------------------------+
|ID      |Case Number|Date               |Block               |IUCR|Primary Type      |Description                            |Location Description        |Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|Updated On            |Latitude    |Longitude    |Location                     |
+--------+-----------+-------------------+--------------------+----+------------------+---------------------------------------+----------------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------------------+------------+-------------+-----------------------------+
|11864018|JC476123 

# Working with Columns

## Selecting Columns

Works mostly like `pandas`. Has the same problem that using the dot notation might conflict with reserved names or attributes.

In [None]:
#use dot notation to select the "Block" column
df.Block

Column<b'Block'>

In [None]:
#use slicing to select the "Block" column
df["Block"]

Column<b'Block'>

Notice that *unlike* `pandas`, slicing/selecting from a dataframe does not automatically display the result. 

To display the result we need to `select(column_object)`. 

You can additionally specify that you want to `show()` the resulting selection using the `show()` object method.

In [None]:
df.select(col("Block"))

Block
022XX S MICHIGAN AVE
024XX W CHICAGO AVE
0000X N LOOMIS ST
016XX W ADDISON ST
051XX N BROADWAY
011XX W JACKSON BLVD
064XX S VERNON AVE
070XX S EGGLESTON...
065XX S DR MARTIN...
051XX W WEST END AVE


In [None]:
df.select(col("Block")).show(truncate=False)

+-----------------------------------+
|Block                              |
+-----------------------------------+
|022XX S MICHIGAN AVE               |
|024XX W CHICAGO AVE                |
|0000X N LOOMIS ST                  |
|016XX W ADDISON ST                 |
|051XX N BROADWAY                   |
|011XX W JACKSON BLVD               |
|064XX S VERNON AVE                 |
|070XX S EGGLESTON AVE              |
|065XX S DR MARTIN LUTHER KING JR DR|
|051XX W WEST END AVE               |
|005XX N ogden ave                  |
|085XX S KEDVALE AVE                |
|022XX N SHEFFIELD AVE              |
|057XX N WINTHROP AVE               |
|037XX W NORTH AVE                  |
|004XX N WABASH AVE                 |
|104XX S WABASH AVE                 |
|076XX S MAY ST                     |
|015XX N WELLS ST                   |
|0000X E MONROE ST                  |
+-----------------------------------+
only showing top 20 rows



In [None]:
#skipping the dot notation example because I don't like dot notation

In [None]:
#why don't i need to convert these strings to column objects first?
df.select("Block", "Description").show(truncate=False)

#it looks like specifying col() isn't strictly necessary, but
#it will ALSO work

+-----------------------------------+---------------------------------------+
|Block                              |Description                            |
+-----------------------------------+---------------------------------------+
|022XX S MICHIGAN AVE               |FINANCIAL IDENTITY THEFT $300 AND UNDER|
|024XX W CHICAGO AVE                |RETAIL THEFT                           |
|0000X N LOOMIS ST                  |OVER $500                              |
|016XX W ADDISON ST                 |TO VEHICLE                             |
|051XX N BROADWAY                   |SIMPLE                                 |
|011XX W JACKSON BLVD               |RETAIL THEFT                           |
|064XX S VERNON AVE                 |DOMESTIC BATTERY SIMPLE                |
|070XX S EGGLESTON AVE              |TO PROPERTY                            |
|065XX S DR MARTIN LUTHER KING JR DR|DOMESTIC BATTERY SIMPLE                |
|051XX W WEST END AVE               |HARASSMENT BY ELECTRONIC ME

## Adding New Columns

In [None]:
#f'n for creating a column of LITERAL values
from pyspark.sql.functions import lit

In [None]:
#change the current df to a df WITH COLUMN(column name, func for values)
df = df.withColumn("One", lit(1))

In [None]:
#the newly created column appears at the end of the dataframe
df.show(5)

+--------+-----------+-------------------+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+---+
|      ID|Case Number|               Date|               Block|IUCR|      Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|    Latitude|    Longitude|            Location|One|
+--------+-----------+-------------------+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+---+
|11864018|   JC476123|2019-09-24 08:00:00|022XX S MICHIGAN AVE|1154|DECEPTIVE PRACTICE|FINANCIAL IDENTIT...|COMMERCIAL / 

## Renaming columns

In [None]:
df = df.withColumnRenamed("One", "Test")
df.show(truncate=False)

+--------+-----------+-------------------+-----------------------------------+----+------------------+---------------------------------------+----------------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------------------+------------+-------------+-----------------------------+----+
|ID      |Case Number|Date               |Block                              |IUCR|Primary Type      |Description                            |Location Description        |Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|Updated On            |Latitude    |Longitude    |Location                     |Test|
+--------+-----------+-------------------+-----------------------------------+----+------------------+---------------------------------------+----------------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------------------+------------+---------

## Groupby Operations

In [None]:
df.groupBy("Year").count().show(5)

+----+------+
|Year| count|
+----+------+
|2019|260603|
+----+------+



Looks like there are 260,603 occurances of the class "2019" in the Year column

## Remove columns

In [None]:
df = df.drop("Test")
df.show(5, truncate=False)

+--------+-----------+-------------------+--------------------+----+------------------+---------------------------------------+----------------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------------------+------------+-------------+-----------------------------+
|ID      |Case Number|Date               |Block               |IUCR|Primary Type      |Description                            |Location Description        |Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|Updated On            |Latitude    |Longitude    |Location                     |
+--------+-----------+-------------------+--------------------+----+------------------+---------------------------------------+----------------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------------------+------------+-------------+-----------------------------+
|11864018|JC476123 

# Working with Rows

In [None]:
df.filter(col("Date")<"2019-06-01").show(truncate=False)

+--------+-----------+-------------------+-----------------------------------+----+------------------+---------------------------------------+-----------------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------------------+------------+-------------+-----------------------------+
|ID      |Case Number|Date               |Block                              |IUCR|Primary Type      |Description                            |Location Description         |Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|Updated On            |Latitude    |Longitude    |Location                     |
+--------+-----------+-------------------+-----------------------------------+----+------------------+---------------------------------------+-----------------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------------------+------------+-------------+--

So filtering isn't as "convenient" as with something like `pandas` `(df[df["year"]<....])`, but the syntax is much more obvious with the method call.

The filter also works passing a Python string object instead of a datetime object.

## Select Distinct Rows

In [None]:
df.select("Date").distinct().show(5, truncate=False)

+-------------------+
|Date               |
+-------------------+
|2019-10-15 13:15:00|
|2019-10-15 15:59:00|
|2019-10-14 14:39:00|
|2019-10-18 12:30:00|
|2019-10-18 01:45:00|
+-------------------+
only showing top 5 rows



## Sorting Rows

In [None]:
df.orderBy("Date").show(truncate=False)

+--------+-----------+-------------------+------------------------+----+--------------------------+-------------------------------------------------+-------------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------------------+------------+-------------+-----------------------------+
|ID      |Case Number|Date               |Block                   |IUCR|Primary Type              |Description                                      |Location Description     |Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|Updated On            |Latitude    |Longitude    |Location                     |
+--------+-----------+-------------------+------------------------+----+--------------------------+-------------------------------------------------+-------------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------------------+------------+-------

`oderBy()` has options for coonfig. the sort

In [None]:
#latest first
df.orderBy("Date", ascending=False).show(truncate=False)

+--------+-----------+-------------------+----------------------+----+-----------------+--------------------------------------------------+----------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------------------+------------+-------------+-----------------------------+
|ID      |Case Number|Date               |Block                 |IUCR|Primary Type     |Description                                       |Location Description  |Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|Updated On            |Latitude    |Longitude    |Location                     |
+--------+-----------+-------------------+----------------------+----+-----------------+--------------------------------------------------+----------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------------------+------------+-------------+-----------------------------+
|

In [None]:
#earliest first
df.orderBy("Date", ascending=True).show(truncate=False)

+--------+-----------+-------------------+------------------------+----+--------------------------+-------------------------------------------------+-------------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------------------+------------+-------------+-----------------------------+
|ID      |Case Number|Date               |Block                   |IUCR|Primary Type              |Description                                      |Location Description     |Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|Updated On            |Latitude    |Longitude    |Location                     |
+--------+-----------+-------------------+------------------------+----+--------------------------+-------------------------------------------------+-------------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------------------+------------+-------

## Combining DataFrames

As always, there is more than one way to do this.
- `union()`:  If 2 DataFrames have the same schema, merge them (how?); Else, error
    - **NB THIS IS PREFERRED METHOD FOR COMBINING**
- `unionByName()`:  Use column names to specify how to merge

*Removing Duplicate records*:  
- `distinct()` or `dropDuplicates()`

In [None]:
#select the rows where date is "2019-07-30"
one_day = spark.read.csv("reported-crimes.csv", header=True).withColumn("Date", to_timestamp(col("Date"), "MM/dd/yyyy hh:mm:ss a")).filter(col("Date")==lit("2019-07-30"))
#count the number of instances "2019-07-30" appears in the dates column
df.filter(col("Date")==lit("2019-07-30")).count()

10

In [None]:
#add our duplicate rows (from above) to the end of the dataframe
#also set df to the result of this union
df = df.union(one_day)

In [None]:
#query the number of times "2019-07-30" appears again (should have increased because we just added rows!)
df.filter(col("Date")==lit("2019-07-30")).count()

20

## Example of Union by Name

In [None]:
#make a dataframe
df1 = spark.createDataFrame([[1,2,3]], ["col0", "col1", "col2"])

#permute the columns (relative to df1)
df2 = spark.createDataFrame([[4,5,6]], ["col1", "col2", "col0"])

In [None]:
df1.dtypes

[('col0', 'bigint'), ('col1', 'bigint'), ('col2', 'bigint')]

In [None]:
#we union by name so it doesn't matter that the columns are relatively permuted
df1.unionByName(df2).show()

+----+----+----+
|col0|col1|col2|
+----+----+----+
|   1|   2|   3|
|   6|   4|   5|
+----+----+----+



let's try this last thing without looking

In [None]:
df.columns

['ID',
 'Case Number',
 'Date',
 'Block',
 'IUCR',
 'Primary Type',
 'Description',
 'Location Description',
 'Arrest',
 'Domestic',
 'Beat',
 'District',
 'Ward',
 'Community Area',
 'FBI Code',
 'X Coordinate',
 'Y Coordinate',
 'Year',
 'Updated On',
 'Latitude',
 'Longitude',
 'Location']

In [None]:
# Top 10 number of reported crimes by Primary Type, in descending order of Occurence
df.groupBy("Primary Type").count().orderBy("count", ascending=False).show(10, truncate=False)

+-------------------+-----+
|Primary Type       |count|
+-------------------+-----+
|THEFT              |62445|
|BATTERY            |49499|
|CRIMINAL DAMAGE    |26680|
|ASSAULT            |20617|
|DECEPTIVE PRACTICE |18648|
|OTHER OFFENSE      |16754|
|NARCOTICS          |15060|
|BURGLARY           |9637 |
|MOTOR VEHICLE THEFT|8974 |
|ROBBERY            |7992 |
+-------------------+-----+
only showing top 10 rows



# Functions

In [None]:
#list of available functions from PySpark
from pyspark.sql import functions
print(dir(functions))



## String functions

In [None]:
#"reset" reading in the data again
df = spark.read.csv("reported-crimes.csv", header=True).withColumn("Date", to_timestamp(col("Date"), 'MM/dd/yyyy hh:mm:ss a'))

In [None]:
from pyspark.sql.functions import col, lower, upper, substring

In [None]:
#read the manual
help(substring)

Help on function substring in module pyspark.sql.functions:

substring(str, pos, len)
    Substring starts at `pos` and is of length `len` when str is String type or
    returns the slice of byte array that starts at `pos` in byte and is of length `len`
    when str is Binary type.
    
    .. note:: The position is not zero based, but 1 based index.
    
    >>> df = spark.createDataFrame([('abcd',)], ['s',])
    >>> df.select(substring(df.s, 1, 2).alias('s')).collect()
    [Row(s='ab')]
    
    .. versionadded:: 1.5



In [None]:
df.select(lower(col("Primary Type")), upper(col("Primary Type")), substring(col("Primary Type"), 1, 4)).show(5, truncate=False)

+-------------------+-------------------+-----------------------------+
|lower(Primary Type)|upper(Primary Type)|substring(Primary Type, 1, 4)|
+-------------------+-------------------+-----------------------------+
|deceptive practice |DECEPTIVE PRACTICE |DECE                         |
|theft              |THEFT              |THEF                         |
|theft              |THEFT              |THEF                         |
|criminal damage    |CRIMINAL DAMAGE    |CRIM                         |
|assault            |ASSAULT            |ASSA                         |
+-------------------+-------------------+-----------------------------+
only showing top 5 rows



## Numeric Functions

In [None]:
from pyspark.sql.functions import min, max

In [None]:
df.select(min(col("Date")), max(col("Date"))).show()

+-------------------+-------------------+
|          min(Date)|          max(Date)|
+-------------------+-------------------+
|2019-01-01 00:00:00|2019-12-31 23:55:00|
+-------------------+-------------------+



### Date example

**What is 3 days earlier than the oldest date and 3 days later than the most recent date?**

Since these are datetimes, it's not as simple as +/- from the day. To avoid having to manually change types, there are functions available from `pyspark.sql.functions`

In [None]:
from pyspark.sql.functions import date_add, date_sub

In [None]:
df.select(date_sub(min(col("Date")), 3), date_add(max(col("Date")), 3)).show()

+----------------------+----------------------+
|date_sub(min(Date), 3)|date_add(max(Date), 3)|
+----------------------+----------------------+
|            2018-12-29|            2020-01-03|
+----------------------+----------------------+



In [None]:
from pyspark.sql.functions import to_date, to_timestamp, lit
xmas_df = spark.createDataFrame([('2019-12-25 13:30:00',)], ["XMAS"])
xmas_df.show()

+-------------------+
|               XMAS|
+-------------------+
|2019-12-25 13:30:00|
+-------------------+



In [None]:
xmas_df.select(to_date(col("XMAS"), 'yyyy-MM-dd HH:mm:ss'), to_timestamp(col("XMAS"), 'yyyy-MM-dd HH:mm:ss')).show()

+--------------------------------------+-------------------------------------------+
|to_date(`XMAS`, 'yyyy-MM-dd HH:mm:ss')|to_timestamp(`XMAS`, 'yyyy-MM-dd HH:mm:ss')|
+--------------------------------------+-------------------------------------------+
|                            2019-12-25|                        2019-12-25 13:30:00|
+--------------------------------------+-------------------------------------------+



### FINISH ABOVE LATER

This tutorial doesn't *exactly* show me how to compute over the whole dataset, but I think it's a start.

I think I remember reading something about being able to create new data sets by performing **ACTIONS** with Spark. I should look more into that.