### Step1: Import file crime_incidents.csv into Databrick File System

### Step2: Import SQLContext

In [0]:
from pyspark.sql import SQLContext

### Step3: Load the file and create DataFrame
Here, we are inferring the schema from the first row of the data-containing document

In [0]:
crimeincidentDF = spark.read.csv("dbfs:/FileStore/tables/crime_incidents.csv", inferSchema="true", header="true")

### Step4: Collect
shows all the records as a list of row using collect() function

In [0]:
crimeincidentDF.collect()

### Step5: Take
take(n) function shows first n number of records as a list of row

In [0]:
crimeincidentDF.take(5)

### Step6: Show
show() functions shows the records in a tabular format

In [0]:
#shows all records in tabular format
crimeincidentDF.show()

### Step7: Shows first 5 records in tabular format

In [0]:
#shows first 5 records in tabular format
crimeincidentDF.show(5)

### Step8: Finding number of records
count function() returns the number of records in the dataframe

In [0]:
crimeincidentDF.count()

### Step9: Printing the first row of a data set

In [0]:
crimeincidentDF.first()

### Step10: Schema of dataframe
printSchema() function displays the schema of the dataframe

In [0]:
crimeincidentDF.printSchema()

###  Step11: Describe dataframe
describe() function shows the various characteristics such as mean, minValue, maxValue, count of each field in dataframe

In [0]:
crimeincidentDF.describe().show()

###  Step12: Select Function
selects fields from an existing dataframe and returns a new DataFrame having selected fields.

In [0]:
#selects the fields IncidntNum, Category and Descript and returns a new dataframe newDF with these 2 fields
newDF = crimeincidentDF.select('IncidntNum','Category','Descript')
newDF.show()

###  Step13: Selecting all columns

In [0]:
newDF = crimeincidentDF.select("*")
newDF.show()

###  Step14: Correlation between 2 fields
Calculates the correlation of two columns of a DataFrame as a double value. Currently only supports the Pearson Correlation Coefficient

In [0]:
crimeincidentDF.corr('X','Y')

###  Step15: Removing Duplicate Records
Returns a new DataFrame with duplicate rows removed

In [0]:
newDF = crimeincidentDF.dropDuplicates()
newDF.show()

###  Step16: Finding the name of a specific coloumn using index

In [0]:
crimeincidentDF[4]

###  Step17: withColumn
Used to change or update the value, convert the datatype of an existing DataFrame column, add/create a new column.
Add 3 new columns IncidntNum, Category, Resolution to DataFrame crimeincidentsDF.

In [0]:
crimeincidentsNC = crimeincidentDF.withColumn('IncidntNum',crimeincidentDF.IncidntNum).withColumn('Category',crimeincidentDF.Category).withColumn('Resolution',crimeincidentDF.Resolution)
crimeincidentsNC.show()

## Registering table as a template, and applying SQL queries to it
###  Step18: createOrReplaceTempView(table)
This function registers the dataframe as a temporary table using the given name. Execute an SQL statement (Resolution like ARRE%)

We can perform various other SQL operations on our registered table to fetch an equivalent dataframe

In [0]:
crimeincidentsNC.createOrReplaceTempView("crime_incident_table")
newDF = sqlContext.sql("select * from crime_incident_table WHERE Resolution LIKE \'ARRE%\'")
newDF.show()

###  Step19:  Filtering Operations
Fetching a subset dataframe from an existing dataframe based on some filtering condition (Resolution=NONE)

In [0]:
#returns a new dataframe
subsetDF = crimeincidentsNC.filter(crimeincidentsNC.Resolution == "NONE")
subsetDF.show(10)

###  Step20: Filtering through multiple conditions

In [0]:
#the statement below applies filter function on an already filtered dataframe 
#and then again applie filter on recieved dataframe (Resolution=LOCATED,Category=ASSAULT)
crimeincidentsNC.filter(crimeincidentsNC.Resolution == 'LOCATED').filter(crimeincidentsNC.Category == 'ASSAULT')
crimeincidentsNC.show()

###  Step21: Groupby
Groups the DataFrame using the specified columns (category).

In [0]:
crimeincidentDF.groupBy(['Category', crimeincidentDF.Category]).count().show()

###   Step22: Applying Aggregation functions after grouping

In [0]:
#Calculates aveage on every coloumn 
crimeincidentDF.groupBy().avg().show()

###   Step23: Aggregation using column 'Resolution'

In [0]:
# returns an object of Grouped Data for applying aggregate functions
newDF = crimeincidentDF.groupBy(crimeincidentDF.Resolution)
newDF.count().show()

###   Step24: Display 15 elements of the dataframe

In [0]:
crimeincidentDF.show(15)

### Step25: Writing the dataframe as Parquet File

In [0]:
#Writes the dataframe as a parquet file in Databrick File System tables directory
crimeincidentDF.write.parquet("dbfs:/FileStore/tables/crime_incidents.parquet")

### Step26: The parquet file will be stored by default in the HDFS

In [0]:
newDF = spark.read.parquet("dbfs:/FileStore/tables/crime_incidents.parquet", inferSchema="true" , header="true")
newDF.show()