## Introduction to pyspark

The environment of pyspark is quite annoying.  
I followed [this blog](https://blog.sicara.com/get-started-pyspark-jupyter-guide-tutorial-ae2fe84f594f).  
Below is some code you can check if you set the environment properly.  

In [1]:
import sys
print(sys.executable)

/anaconda3/envs/spark_env/bin/python


In [2]:
# Creating Spark Configuration and Spark Context-
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("Word Counter")
sc = SparkContext(conf = conf)

# Reading the file-
myTextFile = sc.textFile("/Users/raleighliu/tools/spark-2.4.0-bin-hadoop2.7/README.md")

# Removing the empty lines-
non_emptyLines = myTextFile.filter(lambda line: len(line) > 0)

# Return a new RDD "words" by first applying "split()" function to all elements of this RDD, and then flattening the results.
words = non_emptyLines.flatMap(lambda x: x.split(' '))

# Executing three different functions-
# a) .map() - it takes each line of the rdd "words" which is now a list of words, then creates a tuple like ('apple', 1) etc.
# b) .reduceByKey() - it merges the values for each key using an associative and commutative reduce function. e.g. ('apple', 5) etc.
# c) .map() - It just change the position on the tupple as (5, 'apple') and sorts the key descending

wordCount = words.map(lambda x: (x, 1)).\
                    reduceByKey(lambda x, y : x + y).\
                        map(lambda x: (x[1], x[0])).sortByKey(False)

# Save this RDD as a text file, using string representations of elements.
# Note: It creates part-00000, part-00001 ... files which shows how the job has been performed across multiple partions (executor nodes)

wordCount.saveAsTextFile("wordCountResult")

# To make this as a single file, you can just repartion it using coalesce(). 
# It returns a new RDD that is reduced into `numPartitions` partitions.

wordCount.coalesce(1).saveAsTextFile("wordCountResult2")

# Both these functions creates a file showing "SUCCESS" if it is successfully written.
# These files are calles .CRC file which stands for "Cyclic Redundancy Check". It's an error detecting code which is used to detect accidental changes to raw data. 
sc.stop()


## Analyze some flight data

In [3]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("EndToEndApp")
sc = SparkContext(conf = conf)

from pyspark.sql import SparkSession 
spark = SparkSession(sc)

# Creating a Spark DataFrame by reading from a csv
flightData2015 = spark\
        .read\
        .option("inferSchema", "true")\
        .option("header", "true")\
        .csv("/Users/raleighliu/Desktop/Spark-The-Definitive-Guide/data/flight-data/csv/2015-summary.csv")

flightData2015.take(2)

flightData2015.sort("count").explain() # sort and looking at the explain plan

spark.conf.set("spark.sql.shuffle.partitions", "5") # setting number of partions

flightData2015.sort("count").show(5)

flightData2015.createOrReplaceTempView("flight_data_2015") # creating a temporary view from DF

# Groups by dest_country_name and count how many using SQL
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")

# Same using DataFrame operations
dataFrameWay = flightData2015\
    .groupBy("DEST_COUNTRY_NAME")\
    .count()
    
# We will use the max function, to establish the maximum number of flights to and from any given location.

spark.sql("SELECT max(count) from flight_data_2015").take(1) # using SQL

from pyspark.sql.functions import max
flightData2015.select(max("count")).take(1) # using DataFrame operations

# top five destination countries in the data  

# SQL Way
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")

maxSql.show()

# DataFrame way
from pyspark.sql.functions import desc

flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .sum("count")\
  .withColumnRenamed("sum(count)", "destination_total")\
  .sort(desc("destination_total"))\
  .limit(5)\
  .show()

== Physical Plan ==
*(2) Sort [count#12 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#12 ASC NULLS FIRST, 200)
   +- *(1) FileScan csv [DEST_COUNTRY_NAME#10,ORIGIN_COUNTRY_NAME#11,count#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/raleighliu/Desktop/Spark-The-Definitive-Guide/data/flight-data/csv/..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>
+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
+--------------------+-------------------+-----+
only showing top 5 rows

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destina