<a href="https://colab.research.google.com/github/menwcode/Data-Science/blob/main/pyspark_intro.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null


In [None]:
!wget -q https://downloads.apache.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
!tar xf spark-3.0.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"

In [None]:
#list all the files
!ls -l

total 214788
drwxr-xr-x  1 root root      4096 Nov  6 17:30 sample_data
drwxr-xr-x 13 1000 1000      4096 Aug 28 08:10 spark-3.0.1-bin-hadoop2.7
-rw-r--r--  1 root root 219929956 Aug 28 09:25 spark-3.0.1-bin-hadoop2.7.tgz


In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark

In [None]:
!wget https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD

In [None]:
!ls -l

In [None]:
#move the rows.csv?accessType=DOWNLOAD
!mv rows.csv\?accessType\=DOWNLOAD reported-crimes.csv


In [None]:
!ls -l

In [None]:
#Laod data in pyspark
from pyspark.sql.functions import to_timestamp, col, lit
df = spark.read.csv('reported-crimes.csv', header = True).withColumn('Date', to_timestamp(col('Date'), 'MM/dd/yyyy hh:mm:ss a')).filter(col('Date') <= lit('2018-11-11'))
df.show(5)

In [None]:
##List all the columns
df.columns

In [None]:
#Length of the all the fields
len(df.columns)

In [None]:
#Count method to get the all the records
df.count()

In [None]:
#A duplicate of Pandas shape
print((df.count()), (len(df.columns)))

In [None]:
#List the Schema
df.printSchema()

In [None]:
#show only 2 columns values
df.select('Case Number', 'Arrest').show(5)

In [None]:
#Statistical measure of each columns of the dataframe
df.describe().show()

In [None]:
#Adding a new column
#withcolumn function of Spark, lets you add a new column  sexual_crime by checking "Primary Type"
#df.withColumn("sexual_crime")


In [None]:
from pyspark.sql.types import StringType, DoubleType

Filtering data


In [None]:
df.filter(df["Primary Type"] == "CRIM SEXUAL ASSAULT").show()

In [None]:
#Show all the crimes related to Sexual assulat and Predatory
df.filter((df["Primary Type"] == "CRIM SEXUAL ASSAULT") & (df["Description"] == "PREDATORY")).show()


Show Distinct Values

In [None]:
#Show distinct values of with Truncate set to False
df.select("Primary Type").distinct().show(truncate = False)

In [None]:
#Grouping Data
df.groupBy('Primary Type').count().show(50, False)

Put order in above output

In [None]:
df.groupBy("Primary Type").count().orderBy('count', ascending=False).show(50, False)

In [None]:
df.groupBy("Block").count().orderBy('count', ascending=False).show(50, False)

In [None]:
#Show all the Arrest by the crime type
df.select("Arrest").distinct().show()

In [45]:
#What is the arrest percentage
df.filter(df["Arrest"] == "true").count()/df.select("Arrest").count()

0.27754484080007524

# Show the top location of Crime

In [48]:
#Top crime location in the city
df.groupBy("Location Description").count().orderBy('count', ascending=False).show(50, False)

+---------------------------------+-------+
|Location Description             |count  |
+---------------------------------+-------+
|STREET                           |1770578|
|RESIDENCE                        |1144978|
|APARTMENT                        |698338 |
|SIDEWALK                         |665552 |
|OTHER                            |256793 |
|PARKING LOT/GARAGE(NON.RESID.)   |193760 |
|ALLEY                            |150911 |
|SCHOOL, PUBLIC, BUILDING         |142316 |
|RESIDENCE-GARAGE                 |131605 |
|SMALL RETAIL STORE               |119270 |
|RESIDENCE PORCH/HALLWAY          |117902 |
|VEHICLE NON-COMMERCIAL           |108435 |
|RESTAURANT                       |105608 |
|GROCERY FOOD STORE               |87301  |
|DEPARTMENT STORE                 |83663  |
|GAS STATION                      |71985  |
|RESIDENTIAL YARD (FRONT/BACK)    |69449  |
|CHA PARKING LOT/GROUNDS          |55452  |
|PARK PROPERTY                    |52340  |
|COMMERCIAL / BUSINESS OFFICE   

## String Functions
Display the primary type column in lower and upper characters and the first 4 characters



In [52]:
from pyspark.sql.functions import lower, upper, substring
df.select(lower(df["Primary Type"]), upper(df["Primary Type"]), substring(df["Primary Type"], 1, 4)).show(5, truncate=False)

+-------------------+-------------------+-----------------------------+
|lower(Primary Type)|upper(Primary Type)|substring(Primary Type, 1, 4)|
+-------------------+-------------------+-----------------------------+
|deceptive practice |DECEPTIVE PRACTICE |DECE                         |
|crim sexual assault|CRIM SEXUAL ASSAULT|CRIM                         |
|burglary           |BURGLARY           |BURG                         |
|theft              |THEFT              |THEF                         |
|crim sexual assault|CRIM SEXUAL ASSAULT|CRIM                         |
+-------------------+-------------------+-----------------------------+
only showing top 5 rows



# Show the oldest date and most recent dates

In [53]:
from pyspark.sql.functions import  min, max
df.select(min(df["Date"]), max(col('Date'))).show()

+-------------------+-------------------+
|          min(Date)|          max(Date)|
+-------------------+-------------------+
|2001-01-01 00:00:00|2018-11-11 00:00:00|
+-------------------+-------------------+



# What is 3 days earlier than the older date and 3 days later than the most recent dates

In [57]:
from pyspark.sql.functions import date_add, date_sub
df.select(date_sub(min(df["Date"])), 3)

TypeError: ignored

In [59]:
#
df.groupBy('Primary Type').max().show(5, False)

+--------------------------+
|Primary Type              |
+--------------------------+
|OFFENSE INVOLVING CHILDREN|
|CRIMINAL SEXUAL ASSAULT   |
|STALKING                  |
|PUBLIC PEACE VIOLATION    |
|OBSCENITY                 |
+--------------------------+
only showing top 5 rows



# UDF (User Defined Functions): there is a performance impact

In [63]:
#Traditional Python function
from pyspark.sql.functions import  udf

def price_range(brand):
  if brand in ['Samsung','Apple']:
    return 'High Price'
  elif brand == "MI":
    return "mid Price"
  else:
    return "low price"

In [None]:
#Now you could call
brand_udf = udf()