## APACHE PYSPARK

## WHY SPARK 
**Apache spark is written in SCALA**
- faster
- scalable
- supoorts many different languages
- has its own ecosystem

While working on Big data we wanted code to run fast, which is not satisfied by many other languages, **spark runs on JVM and connects with other Java based BIG DATA systems such as cassandra, HBase and HDFS (these are more like DB's)**

- Spark runs locally or in a cluster(that can be on-premise or a cloud)
- Runs in top of Hadoop, Yarn , Apache mesos and a cloud
- It is designed to run on large distributed clusters , but can also run on single machine
- **Spark core API** is the foundation for the spark ecosystem
- Spark ecosystem has 1)spark SQL, 2)streaming (for real time data) 3)MLlib (for ML) 4)GraphX (for graph structed data)

**What Spark core API does ?**
- task scheduling
- memory management
- fault recovery
- interacting with storage systems
- NOTE: Scalar is the default language

**Spark SQL and data frames**
- Spark allows us to do EDA using SQL ( via. DF programming abstraction )
- Allows complex analytics with SQL

**Spark Streaming**
- process & analyze real time data or historic data
- code used for batch data can be used in real time data

**MLlib**
- gives scalable ML algo's 
- 100 times faster than map reduce

**GraphX**
- a graph computation engine that allows to work with graph structured data
- a new graph abstraction :: a directed multi graph, with properties attached with all vertices edges

**What is pySpark**
- A python wrapper around the spark
- but we have pandas, hadoop and Dask that work similarly :: Then why spark
    - Pandas :: for tabular data, mature and feature rich , but limited to single machine ( won't be able to store data across differnt machines )
    - hadoop :: a distributed system, where compute system was there in Map reduce and storage system in HDFS (closely intergrated ie. can't you a cloud storage to do computations here )
        - the chances of failure is more & at that time Hadoop read and writes it to another cluster, whereas spark works on memory 
        - Hadoop requires more memory on disk, spark requires more RAM (setting up spark is expensive)
    - Dask :: a lib for parallel computing in python
        - Built on python and supports only python
        - no SQL support, DASK only supports python MLlibs, don't have graphX computation support

**How spark emerged ?**
- Hadoop map reduce was inefficient for iterative and interactive computing tasks
- Amp lab created spark in 2013 and gave it to Apache 
- Then the creaters of spark , brought Databricks to the table

**What's special with Databricks version of spark**
- It provides interactive notebooks
- Gives optimized version of Apache spark
- Gives enterpise security for organizations
- Other business intelligence tools can be conncted using databricks

**Spark Components : Driver, worker nodes, cluster manager**
- DRIVER
    - Contains info about program 
    - controls or distributes the work to worker nodes that has executors
    - the executors in turn will return the result back to driver
- Cluster manager : allocates the resources to the driver
- Cluster manager types: 1)standalone 2)Apache mesos 3)hadoop yarn and 4) kubernetes

**Working with spark**
- need to create a spark session first
- spark session is a single unified entry point to manipulate spark

**When a spark session is started in python :: in backgroud the py4j (python interpreter access the JVM & java objects)**
   
**Partitions :** As spark is a disributed system, so to work in parallel spark need to break the data into **Chunks or Partitions** - This is a set of rows distributed among clusters
NO PARALLELIZATION :: when there is only one partion but has many worker nodes OR when there is several partition but only one worker

**Transformation :** core data structure in spark that are immutable (can't be changed ), instructes used to modify the data frame are called transformations. When transformation are performed **nothing seems to happen** to actually do them we need something called ACTION operations

**Lazy Evaluation :** When transformations are done , spark won't do them right away, instead there is a plan of transformations to be performed on the data, this waiting until last moment to do the work is called lazy evaluation . This is helpful as it creates a streamline plan and useful in many big data jobs

**Actions :** Three type of actions 1) view data in the console eg: .show() 2) collect data eg: .collect() - collects the data to the driver 3)write data to output data sources eg: .write.format()

In [None]:
!ls

In [None]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
!tar xf spark-2.3.1-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install py4j

In [None]:
!ls

In [None]:
!pwd

#### Setting up environment

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/kaggle/working/spark-2.3.1-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()
import warnings
warnings.filterwarnings('ignore')
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() 
spark

#### Downloading the data

In [None]:
# !wget https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD
# renaming the downloaded file
# !mv rows.csv\?accessType\=DOWNLOAD reported_crimes.csv

In [None]:
!ls

In [None]:
from pyspark.sql.functions import to_timestamp, col, lit
rc = spark.read.csv('../input/reported-crime/Crimes_-_2001_to_Present.csv', header = True).withColumn('Date',to_timestamp(col('Date'), 'mm/dd/yyyy hh:mm:ss a')).filter(col('Date') <= '2018-11-11')
rc.show(3)

#### DataFrame API
- Two main API's are learned here
- Dataframe API [High level API's]
- Resilient Distributed DataFrames (RDD) [Low level API's]

In intial times spark made distributed data processing using RDD's [ a simple API for distributed data processing ]
In Spark :: A dataframe is a distributed collection of object of type ROW, A dataframe can be created using structed data/HIVE/external RDD's

- Dataset API
    - This is only supported by statically typed language
    - Python - a dynamically typed language , this is not supported by it 
    

#### Working with DataFrame

In [None]:
# temp
# Using pandas
# import pandas as pd
# df = pd.read_Csv(fileName) # to load a file
# df.head(3) # to view the df


from pyspark.sql import SparkSession
spark_session = SparkSession.builder.getOrCreate() # creating a spark session
df= spark_session.read.csv('../input/reported-crime/Crimes_-_2001_to_Present.csv', header = True) # to load a csv file
# df.take(3) # this will return a list of row object, this internally calls collect() with limit()
# df.collect() # will return all the rows of the df, may cause the driver node to crash for large datasets
df.show(2) # this will return rows with a good display or nice format
# df.limit(3) # will return a new dataframe with the specified number of rows
# df.head(3) # this will return an array of len 3, head() in turn calls the take()


#### Schemas

- spark can infer the schema by default [by looking into the couple of rows and determining the data type]
- But in a production environment, we need to explicitly define the schemas
- Defining types in pyspark is easy 
- import pyspark.sql.types
- schema is a struct type, which in turn contain three elements (name of the column, type of the column, whether the column can have null values or not (true/false), metadata of the column (its optional))


In [None]:
# In pandas :: df.dtypes
df.printSchema(), df.dtypes # both are same except only the way/style it is getting displayed differs

In [None]:
# preprocessing 
df = df.withColumn('Date', to_timestamp(col('Date'), 'mm/dd/yyyy hh:mm:ss a')).filter(col('Date') <= '2018-11-11')
df.printSchema()

In [None]:
df.columns

In [None]:
# explictly defining the data types in spark
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BooleanType, DoubleType, IntegerType
labels = [
    ('ID', StringType()),
 ('Case Number',StringType()),
 ('Date',TimestampType()),
 ('Block',StringType()),
 ('IUCR',StringType()),
 ('Primary Type',StringType()),
 ('Description',StringType()),
 ('Location Description',StringType()),
 ('Arrest',StringType()),
 ('Domestic',BooleanType()),
 ('Beat',StringType()),
 ('District',StringType()),
 ('Ward',StringType()),
 ('Community Area',StringType()),
 ('FBI Code',StringType()),
 ('X Coordinate',StringType()),
 ('Y Coordinate',StringType()),
 ('Year',IntegerType()),
 ('Updated On',TimestampType()),
 ('Latitude',DoubleType()),
 ('Longitude',DoubleType()),
 ('Location', StringType())
]


# iterating over the labels list 
schema = StructType([StructField (x[0],x[1],True) for x in labels ])
schema

In [None]:
rc = spark_session.read.csv('../input/reported-crime/Crimes_-_2001_to_Present.csv', schema= schema, header = True)
rc.printSchema()

In [None]:
rc.show(3) # these null values mean that there are some values which can't be converted into desired data type as specified in the schema

#### Working with Columns

In [None]:
# ACCESSING A COLUMN
# In both python and pyspark :: df.columns will give list of column names
# In pandas :: df.column_name, df['column_name'] returns the entire column
# In pyspark :: df.select(col('column_name')) will return the column

# FOR MULTIPLE COLUMN SELECTION
# In pandas :: df[['column_name1', 'column_name2']]
# In pyspark :: df.select(column_name1, column_name2).show(3)

# ADDING A NEW COLUMN
# IN pandas :: df['new_column'] = df['old_column'] * 2
# In pyspark :: df.withColumn('new_column', 2*df['old_column'])

# RENAMING THE COLUMN 
# In pandas :: df.rename(columns = {'oldName':'newName'}, inplace = True)
# In pyspark :: df.withColumnRenamed('oldName', 'newName') -- this will return a new dataframe

# DROPPING A COLUMN IN BOTH PYSPARK AND PANDAS ARE SAME


In [None]:
df.select('Date').show(3)

In [None]:
df.select('Case Number', 'Date', 'Arrest').show(3)

In [None]:
df.select(df.IUCR).show(3), df.select(col('Updated On')).show(3)

In [None]:
# Adding a column with constant value in all the rows
from pyspark.sql.functions import lit
df.withColumn('ONEs', lit(1)).show(3) # not an inplace function

In [None]:
df.drop('IUCR').show(3) # not an inplace function 

#### Working with rows

In [None]:
# FILTERING ROWS
# In pandas ::  df[df.column_name > condition ]
# In pyspark :: df.filter(col('columns_name') >= condition)

# UNIQUE ROWS OF a DF
# In pandas :: df.column_name.unique()
# In pyspark :: df.select('column_name').distinct().show()

# SORTING ROWS
# In Pandas :: df.column_name.sort_values()
# In pyspark :: df.orderby('column_name')

# Appending two DF
# In pandas :: df.concat([df1, df2])
# In pyspark :: df.union(df1) - make sure that both df & df1 has same schema and same number of cols


**Add the reported crimes for an additional day, 12-Nov-2018, to our dataset.**

In [None]:
# can't use df here , since df itself is a filtered DF
one_day = spark_session.read.csv('../input/reported-crime/Crimes_-_2001_to_Present.csv', header = True).withColumn('Date', to_timestamp(col('Date'), 'mm/dd/yyyy hh:mm:ss a'))#.filter(col('Date') == lit('2018-11-15'))
one_day.count()

In [None]:
one_day.filter(col('Date') > lit('2018-11-11')).select('Date').distinct().show()

In [None]:
interval = one_day.filter((col('Date') > lit('2018-11-11')) & (col('Date')<= lit('2019-01-30')))
interval.count()

In [None]:
df.count()

In [None]:
combined = df.union(interval)
assert combined.count() == (df.count() + interval.count())

In [None]:
combined = combined.orderBy('Date')
combined.select('Date').show(3)

**Top 10 reported crimes in the order of primary type in desc order**

In [None]:
combined.sort(col('Primary Type').desc()).select('Primary Type').show(10)

**What are the top 10 number of reported crimes by Primary type, in descending order of occurence?**

In [None]:
combined.orderBy(col('Primary Type').desc()).select('Primary Type').distinct().show(10)

In [None]:
# the above cmd will give only the "Primary Type" in desc order (alphabetically -
# not based on thier number of occurance)
combined.groupBy('Primary Type').count().orderBy('count', ascending = False).show(10) # after a groupby its essential to have a aggregate function

**1) What percent of reported crime resulted in arrest**
**2) What are the top 3 locations of the reported crimes**

In [None]:
#1
combined.fillna('unknown', subset = ['Arrest'])
combined.select('Arrest').distinct().show()

In [None]:
(combined.filter(col('Arrest') == 'true').count() / combined.count())*100

In [None]:
# 2
combined.groupBy('Location Description').count().orderBy('count', ascending = False).show(3)

In [None]:
from pyspark.sql.functions import to_timestamp, col, lit
combined = spark.read.csv('../input/reported-crime/Crimes_-_2001_to_Present.csv', header = True).withColumn('Date',to_timestamp(col('Date'), 'mm/dd/yyyy hh:mm:ss a'))#.filter(col('Date') <= '2018-11-11')
combined.show(2)

- groupBy in multiple columns and aggregation in multiple columns

In [None]:
from pyspark.sql.functions import countDistinct
combined.groupBy('Location Description').agg(countDistinct(col('Primary Type')).alias('numberOf_PrimaryType'), pyspark.sql.functions.count(col('Location Description')).alias('LocationCount')).orderBy(col('LocationCount'), ascending = False).show(3) #.count().orderBy('count', ascending = False).show(3)

#### Built-in functions with pyspark

- Built in functions are available in pyspark.sql.functions
- dir(functions) - will give you the list of available functions 

In [None]:
import pyspark.sql.functions as func
print(dir(func))

**Using string built-in functions :: Print lower and upper case of "Primary Type" column and first 4 characters of that column**

In [None]:
help(func.lower)

In [None]:
help(func.substring) # note it starts with 1 as the first index not zero 

In [None]:
combined.select(func.lower(col('Primary Type')).alias('lower_Primary_type'), func.upper(col('Primary Type')).alias('upper_Primary_type'), func.substring(col('Primary Type'), 1, 4).alias('substring_Primary_type')).show(4)

**For numeric built-in functions :: oldest data and most recent date in Date column**

In [None]:
combined.select(func.min(col('Date')).alias('oldest_date'), func.max(col('Date')).alias('recent_date')).show(1)

**What is 3 days earlier oldest date and 3 days later from recent date**

In [None]:
help(func.date_add)

In [None]:
combined.select(func.date_add(func.min(col('Date')), 3).alias('3DaysEarlier'),func.date_sub(func.max(col('Date')), 3).alias('3DaysLater')).show(1)

#### Working with dates
- There exist several formats to specify the date identifying some ways of using it in pyspark
- the to_date() and to_timestamp() -- are more like changing whatever given in the string to be converted into standard format [yyyy-MM-dd HH:mm:ss] 

In [None]:
from pyspark.sql.functions import to_date, to_timestamp, lit
df = spark.createDataFrame([('2020-12-11 13:30:00',)], ['Bday'])
df.show()

In [None]:
df.printSchema()

**Date format : 2019-12-25 13:30:00**

In [None]:
df.select(to_date(col('Bday'), 'yyyy-MM-dd HH:mm:ss').alias('Date'), to_timestamp(col('Bday'),'yyyy-MM-dd HH:mm:ss').alias('TimeStamp')).show(1)

**Date format : 25/Dec/2019 13:30:00**

In [None]:
# we can't change format from the given string ..
# the string should be of the same form to change from string to data/time type
# df.select(to_date(col('Bday'), 'dd/MM/yyyy HH:mm:ss').alias('Date'), to_timestamp(col('Bday'),'dd/MM/yyyy HH:mm:ss').alias('TimeStamp')).show(1)

df = spark.createDataFrame([('25/Dec/2019 13:30:00',)], ['Bday'])
df.show()

In [None]:
df.select(to_date(col('Bday'), 'dd/MMM/yyyy HH:mm:ss').alias('Date'), to_timestamp(col('Bday'),'dd/MMM/yyyy HH:mm:ss').alias('TimeStamp')).show(1)

**Date format : 12/25/2019 01:30:00 PM**

In [None]:
df = spark.createDataFrame([('12/25/2019 01:30:00 PM',)], ['Bday'])
df.show(truncate = False)

In [None]:
# if AM/PM is mentioned the timestamp hour should be in smaller case (hh)
df.select(to_date(col('Bday'), 'MM/dd/yyyy hh:mm:ss a').alias('Date'), to_timestamp(col('Bday'),'MM/dd/yyyy hh:mm:ss a').alias('TimeStamp')).show(1)

In [None]:
nrc = spark.read.csv('../input/reported-crime/Crimes_-_2001_to_Present.csv', header = True)
nrc.show(1, truncate = False)

In [None]:
# here original date format dd/mm/yyyy with am/pm

#### User defined functions (UDF)
- self defined functions
- these functions has to be registered in the spark to be used in all worker nodes
- So that spark serializes the function on the driver for distribution to all executor processers
- There is a performance penalty, so UDF's not so effective as built-in functions
- In case of python :: spark will start a process in worker node, so it will serialize 
- It applies the function rows by row to the dataset and returns it 
- The real problem is serializing the function, that has higher computation
- So in the backend : JVM and py4j competes for memory and can fall into "out of memory issues"
- To overcome this , its better to not use UDF in python , "but it will be fast if we write the function in JAVA or SCALA" still we be able to use that and will be fast
- Apache Arrow stores data in columns and can avoid serialization and de-serialization process in UDF's
- Best to seek for built-in functions 

#### Working with Joins
- "Join expression" decides where the two rows should join
- "Join type" decides what should be the result
- Types of Join
    - Inner join : Only keeps key present in both
    - outer join : keeps all the keys present in both df's
    - left outer join : keeps all rows in left
    - right outer join : keeps all rows in right
- Syntax ::  **df1.join(df2, df1.column_name == df2.column_name, how = {'inner'})**

In [None]:
!wget -O police-station.csv https://data.cityofchicago.org/api/views/z8bn-74gv/rows.csv?accessType=DOWNLOAD
!ls -l  

In [None]:
!ls

In [None]:
!pwd

In [None]:
policeStation = spark.read.csv('/kaggle/working/police-station.csv', header= True)
policeStation.show(1, truncate = False)

**The reported crimes dataset has only the district number. Add the district name by joining with the police station dataset**

In [None]:
nrc.cache()
nrc.count()

In [None]:
nrc.columns, policeStation.columns

In [None]:
policeStation.select('DISTRICT').distinct().show(30, truncate = False)

In [None]:
nrc.select('District').distinct().show(30, truncate = False)

In [None]:
# since we need to remove the leading zeros
from pyspark.sql.functions import lpad
help(lpad)

In [None]:
nrc = nrc.withColumn('paddedDistrict', lpad(col('District'),3,'0'))
policeStation = policeStation.withColumn('paddedDistrict', lpad(col('DISTRICT'), 3, '0'))

In [None]:
nrc.show(1)

In [None]:
nrc.select('paddedDistrict').distinct().show(30, truncate = False)

In [None]:
policeStation.show(1)

In [None]:
joined = nrc.join(policeStation, nrc.paddedDistrict == policeStation.paddedDistrict,'left_outer').drop('DISTRICT',
 'ADDRESS',
 'CITY',
 'STATE',
 'ZIP',
 'WEBSITE',
 'PHONE',
 'FAX',
 'TTY',
 'X COORDINATE',
 'Y COORDINATE',
 'LATITUDE',
 'LONGITUDE',
 'LOCATION',
 'paddedDistrict')
joined.show(2, truncate = False)

**What is the most frequently reported non-criminal activity?**

In [None]:
# domestic == true
nrc.select(col('Domestic')).distinct().show()

In [None]:
nrc.filter(col('Domestic') == 'true').groupBy('Primary Type').count().orderBy('count', ascending = False).show(10, truncate = False)

**Find Day of the week has the most number of reported crime.**

In [None]:
from pyspark.sql.functions import dayofweek
nrc1 = nrc.withColumn('Date', to_date(col('Date'), 'dd/MM/yyyy hh:mm:ss a'))
nrc1.show(1)

In [None]:
help(dayofweek)

In [None]:
from pyspark.sql.functions import when, lit
nrc1 = nrc1.withColumn('dayOfCrime', dayofweek(col('Date'))) #, when(nrc1.dayOfCrime.isNull(),lit('0')).otherwise(nrc1.dayOfCrime))
nrc1.show(2)

In [None]:
nrc1.fillna('0', subset = ['dayOfCrime'])

#### NOTE: IN PYSPARK ONLY SAME DTYPE COLUMNS CAN BE REPLACED
- string column can't be replace with int

In [None]:
# replacing numbers of day with string
from pyspark.sql.functions import StringType
nrc1 = nrc1.withColumn('dayOfCrime', nrc1['dayOfCrime'].cast(StringType()))
days = {'1':'Sunday', '2':'Monday', '3':'Tuesday', '4':'Wednesday','5':'Thursday', '6':'Friday', '7':'Saturday'}
nrc2 = nrc1.replace(to_replace = days, subset = ['dayOfCrime'])
nrc2.show(1, truncate  = False)

In [None]:
nrc2.cache()

In [None]:
nrc2.groupBy('dayOfCrime').count().orderBy('count', ascending = False).show()

#### RDD (Resilient Distributed Dataset)
- RDD : a immutable partioned collection of records that can be worked on in parallel
- In DF each data is a structured row, but in case of RDD records are scala/Java/Python objects  
- A low level API
- All the DataFrame API code complies down to a RDD
- Spark doesn't understand the inner data as it does with DataFrame
- RDD's in scala & java will be fast 
- But running a RDD in python is similar to running a UDF function [so in this case , it has to serialized for faster performance ] 
- RDD's lack optimization seen in DF
- Can't access built in functions with RDD's

In [None]:
psrdd = sc.textFile('/kaggle/working/police-station.csv')
psrdd.first()

In [None]:
psrdd_header  = psrdd.first()

In [None]:
ps_rest = psrdd.filter(lambda line : line!= psrdd_header)
ps_rest.first()

**How many police stations are there ?**

In [None]:
ps_rest.map(lambda line: line.split(',')).collect() # this gives all the rows in the file

In [None]:
ps_rest.map(lambda line: line.split(',')).count()

**Display the District ID, District name, Address and Zip for the police station with District ID 7**

In [None]:
(ps_rest.filter(lambda line: line.split(',')[0] == '7').
 map(lambda line: (line.split(',')[0],
                   line.split(',')[1],
                   line.split(',')[2],
                   line.split(',')[5],
                  )).collect())

**Police stations 10 and 11 are geographically close to each other. Display the District ID, District name, address and zip code**

In [None]:
(ps_rest.filter(lambda line:line.split(',')[0] in ['10', '11']).
map(lambda line:(line.split(',')[0], 
                line.split(',')[1],
                   line.split(',')[2],
                   line.split(',')[5]
                )).collect())