<a href="https://colab.research.google.com/github/mengyanl/springboard/blob/main/Learning_spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 1st Google Colab notebook for PySpark
#### From **Linkedin Apache Pyspark by Example** and **Medium blog** showed the link below.

https://medium.com/geekculture/how-to-get-your-spark-installation-right-every-time-on-colab-218d57b6091d

In [None]:
!ls

In [9]:
# download java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
# Installing Spark 3.2.0 with Hadoop 2.7
!wget -q https://downloads.apache.org/spark/spark-3.2.0//spark-3.2.0-bin-hadoop2.7.tgz

If can not file the file, do following steps: <br>
Go to https://downloads.apache.org/spark/   <br>
Select folder for example: "spark-3.0.1/" <br>
Copy file name you want for example: "spark-3.0.1-bin-hadoop3.2.tgz" (ends with .tgz) <br>
Paste to the provided script

In [3]:
# unzip the folder
!tar xf spark-3.2.0-bin-hadoop2.7.tgz

In [4]:
# install ‘findspark’ library. 
#It will locate Spark on the system and import it as a regular library.
!pip install -q findspark

In [5]:
 # set the environmental path. 
 # This will enable us to run Pyspark in the Colab environment
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop2.7"

In [6]:
# locate spark in system
import findspark
findspark.init()

In [7]:
# to konw where spark is located
findspark.find()

'/content/spark-3.2.0-bin-hadoop2.7'

In [10]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [11]:
# create a spark session
import pyspark
from pyspark.sql import SparkSession
spark= SparkSession \
       .builder \
       .appName("Our First Spark example") \
       .getOrCreate()

In [None]:
# print the SparkSession variable
spark

In [13]:
!ls

sample_data  spark-3.2.0-bin-hadoop2.7	spark-3.2.0-bin-hadoop2.7.tgz


In [None]:
# test
import urllib.request
BASE_DIR= "/tmp"
OUTPUT_FILE= os.path.join(BASE_DIR, 'wine_data.csv')

In [None]:
# download dataset
wine_data=urllib.request.urlretrieve("https://archive.ics.uci.edu/ml/machine-learning-databases/wine/wine.data", OUTPUT_FILE)

In [None]:
# read data
wine_df=spark.read.option("InferSchema",'true').csv("/tmp/wine_data.csv", header=False)

In [None]:
# print the schema
wine_df.printSchema()

# Download Data

In [None]:
# download Data set Chicago criminal
# shift+enter to see progress
!wget https://data.cityofchicago.org/api/views/x2n5-8w5q/rows.csv?accessType=DOWNLOAD&api_foundry=true

In [None]:
!ls

In [16]:
# rm reported_crimes.csv

In [17]:
# rename the file
!mv rows.csv\?accessType\=DOWNLOAD reported_crimes.csv

# Read CSV

In [None]:
from pyspark.sql.functions import to_timestamp,col,lit
rc = spark.read.csv('reported_crimes.csv',header=True).withColumn('DATE  OF OCCURRENCE',to_timestamp(col('DATE  OF OCCURRENCE'),'MM/dd/yyyy hh:mm:ss a')).filter(col('DATE  OF OCCURRENCE') <= lit('2021-11-11'))
rc.show(10)  ## print out in a nice format

## df.take(n)  ## returns list of row objects calls collect() on limit() ## similar to df.head()
## df.head(n)  ## returns an array calls take() function ## similar with take()
## df.limit(n)  ## returns  new dataframe
## df.collect()  ## get entire dataframe careful it will crash drive

# Schema

In [None]:
## df.dtypes()
## df.printSchema()
rc.printSchema()

In [22]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BooleanType, DoubleType, IntegerType

In [None]:
rc.columns

In [None]:
StructType([
        StructField('CASE#', StringType, True),
        StructField('DATE  OF OCCURRENCE', TimestampType, True),
 'BLOCK',
 ' IUCR',
 ' PRIMARY DESCRIPTION',
 ' SECONDARY DESCRIPTION',
 ' LOCATION DESCRIPTION',
 'ARREST',
 'DOMESTIC',
 'BEAT',
 'WARD',
 'FBI CD',
 'X COORDINATE',
 'Y COORDINATE',
 'LATITUDE',
 'LONGITUDE',
 'LOCATION')    
])

In [24]:
labels = [ 
          ('CASE#', StringType()),
          ('DATE  OF OCCURRENCE', TimestampType()),
          ('BLOCK', StringType()),
 (' IUCR', StringType()),
 (' PRIMARY DESCRIPTION', StringType()),
 (' SECONDARY DESCRIPTION', StringType()),
 (' LOCATION DESCRIPTION', StringType()),
 ('ARREST', StringType()),
 ('DOMESTIC', BooleanType()),
 ('BEAT', StringType()),
 ('WARD', StringType()),
 ('FBI CD', StringType()),
 ('X COORDINATE', StringType()),
 ('Y COORDINATE', StringType()),
 ('LATITUDE', DoubleType()),
 ('LONGITUDE', DoubleType()),
 ('LOCATION', StringType()), 
  ]

In [None]:
schema = StructType([StructField(x[0], x[1], True) for x in labels])
schema

In [None]:
rc = spark.read.csv('reported_crimes.csv', schema = schema)
rc.printSchema()

In [None]:
rc.show(5)

# Columns

In [None]:
## PySpark columns
# df.withColumn('DoubleColumn', 2 * df['ColumnA'])
# df.withColumnRenamed(ExistingColumnName, NewColumnname)
# df.drop('columnName1', 'columnName2', 'columnName3')
# df.groupBy('column')

In [None]:
rc.select(' IUCR').show(5)

In [None]:
rc.select(rc.ARREST).show(5)

In [None]:
rc.select(col('ARREST')).show(5)

In [None]:
rc.select('ARREST', 'CASE#').show(5)

In [40]:
from pyspark.sql.functions import lit

In [None]:
rc.withColumn('One', lit(1)).show(5)

In [None]:
rc = rc.drop('ARRESTED')
rc.show(5)

# Filter

In [None]:
## df.filter(col('column') >1)  = in Pandas: df[df.column >1]
## df.select(column).distinct().show() = in Pandas: df['column'].unique()
## df.orderBy(col('column')) =  in Pandas: df.column.sort_values(by = 'column')
## df.union(df2) have to have same schema = Pandas: df.concat(df2)

In [None]:
one_day = spark.read.csv('reported_crimes.csv',header=True).withColumn('DATE  OF OCCURRENCE',to_timestamp(col('DATE  OF OCCURRENCE'),'MM/dd/yyyy hh:mm:ss a')).filter(col('DATE  OF OCCURRENCE') == lit('2021-11-12'))
one_day.count()  ## print out in a nice format

In [None]:
Second_day = spark.read.csv('reported_crimes.csv',header=True).withColumn('DATE  OF OCCURRENCE',to_timestamp(col('DATE  OF OCCURRENCE'),'MM/dd/yyyy hh:mm:ss a')).filter(col('DATE  OF OCCURRENCE') == lit('2021-11-13'))
Second_day.count()  ## print out in a nice format

In [None]:
Second_day.union(one_day).orderBy("DATE  OF OCCURRENCE", ascending = False).show(13)

In [None]:
one_day.groupBy(' PRIMARY DESCRIPTION').count().show()

In [None]:
one_day.groupBy(' PRIMARY DESCRIPTION').count().orderBy('count', ascending = False).show()

In [None]:
# challenges
Second_day.groupBy('ARREST').count().show()
Second_day.select('ARREST').distinct().show()
one_day.count()

In [None]:
Second_day.filter(col('ARREST') == 'N').count()/Second_day.select('ARREST').count()

In [None]:
one_day.groupby(' LOCATION DESCRIPTION').count().orderBy(' LOCATION DESCRIPTION', ascending = False).show(3)
