<div>
<img src="./assets/iland_core_logo.jpg" style="{width: 150px; height: 150px;}">
<img src="./assets/spark-logo-trademark.png" style="{float: left}">
</div>

<h1>Setting Up Your Spark Environment</h1>

<h3>Downloading Spark</h3>

<a href="https://spark.apache.org/">Link to Spark's website</a>

<p>Use the tar command to extract <br> </p>
<code>tar -xzf /path/to/file</code>

<h3>Spark Documentation</h3>
<a href="https://spark.apache.org/docs/latest/" target="_blank">Spark Documentation</a><br>
<a href="https://spark.apache.org/docs/latest/api/python/index.html" target="_blank">PySpark Documentation</a>

You can play around in an interactive Spark Shell by doing
<code>$SPARK_HOME/bin/pyspark</code>

Or if you are feeling brave enough to jump into Scala, then
<code>$SPARK_HOME/bin/spark-shell</code>

<h3>Running a Spark Script</h3>

You can run Spark jobs through the submit in a bash file:

<code>/path/to/spark/bin/spark-submit \
    --master local[*] \
    path/to/script.py 
</code>

<p>The point of Spark is to leverage the computing and memory of a cluster of nodes. Here is a link for how to have Spark point to your cluster management tool.
    <a href="https://spark.apache.org/docs/latest/submitting-applications.html#master-urls">Spark Master's</a>
</p>


<h3>Packages and Configurations</h3>
    
<p>You can use flags to add packages and configurations to your spark job. <a href="https://spark.apache.org/docs/latest/configuration.html" target="_blank">Spark Configurations</a></p>


<code>path/to/spark/bin/spark-submit \
        --master mesos://MESOS_IP_VARIABLE:7077 \
        --conf spark.mesos.coarse=true \
        --executor-memory 1G \
        --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.1 \
    path/to/script.py
</code>
    

<h3>Driver's</h3>

<p>Reading and writing to a SQL database requires a JDBC driver, which is straightforward when running Spark through spark-submit</p>

<code>/path/to/spark/bin/spark-submit \
    --master local[*] \
    --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.1 \
    --driver-class-path /path/to/jar/file.jar --jars path/to/jar/file.jar
    path/to/script.py 
</code>

<h2>¡VERY IMPRORTANT!</h2>
<h3>Special Cases for Jupyter</h3>

<p>You will need to set the SPARK_HOME path for Jupyter to find Spark (in the terminal)</p>
<code>export SPARK_HOME=/path/to/spark</code>

<p>To make this work properly in a Jupyter Notebook, you need to something like this in the Notebook itself (for PostgreSQL)</p>

<code>import os
 os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.postgresql:postgresql:42.2.5 pyspark-shell'
</code>

<p>Then when reading or writing to a SQL Database</p>

<code>spark.read.format("jdbc").option("driver","org.postgresql.Driver")...</code>

<h1>Spark Basics</h1>


<h3>Spark Libraries</h3>
<ul>
    <li><strong>Spark SQL</strong></li>
    <li><strong>Spark Streaming</strong></li>
    <li>MLLib</li>
    <li>GraphX</li>
</ul>

<h2>Spark Session</h2>

<p>The Spark Session is how your script interacts with Spark.</p>

In [4]:
from pyspark.sql import SparkSession
from pyspark import SparkConf

conf = SparkConf() \
    .setAppName("Flatiron Presentation") \
    .setMaster("spark://asprague-laptop:7077") \
    .set("spark.driver.allowMultipleContexts", True) 

spark = SparkSession.builder.config(conf=conf).getOrCreate()

<h2>RDDs</h2>

<p>RDDs, or Resilient Distributed Datasets, are the basis of Spark. A lot of the RDD functionality had been abstracted away by DataFrames, so I will not spend too much time on RDDs. But here is a small example.</p>

In [5]:
list_names = [{'first_name': 'Aegon', 'last_name': 'Targaryen'},
              {'first_name': 'Daenerys', 'last_name': 'Targaryen'},
              {'first_name': 'Arya', 'last_name': 'Stark'},
              {'first_name': 'Sansa', 'last_name': 'Stark'},
              {'first_name': 'Ned', 'last_name': 'Stark'}]


got_character_rdd = spark.sparkContext.parallelize(list_names)

In [6]:
# Collect in for loop to print

{'first_name': 'Aegon', 'last_name': 'Targaryen'}
{'first_name': 'Daenerys', 'last_name': 'Targaryen'}
{'first_name': 'Arya', 'last_name': 'Stark'}
{'first_name': 'Sansa', 'last_name': 'Stark'}
{'first_name': 'Ned', 'last_name': 'Stark'}


<h3>A note on Lazy Evaluation</h3>

<p>Spark does not execute until an action is triggered.</p>
<ul>
    <li>Increases Manageability</li>
    <li>Saves Computation and increases Speed</li>
    <li>Reduces Complexities</li>
    <li>Optimization</li>
    
</ul>
<p>Source: <a href="https://data-flair.training/blogs/apache-spark-lazy-evaluation/">Data Flair</a>
    
<h3>Count the people with the same last name</h3>

In [7]:
character_family_count = got_character_rdd.map(lambda x: (x['last_name'], 1)).reduceByKey(lambda a, b: a + b)

for house in character_family_count.collect():
    print(house)

('Targaryen', 2)
('Stark', 3)


<h2>DataFrames</h2>

<a href="https://spark.apache.org/docs/2.1.2/api/python/_modules/pyspark/sql/types.html" target="_blank">PySpark Types</a>

In [9]:
from pyspark.sql.types import StructField, StructType, StringType


got_character_schema = StructType([
    StructField("first_name", StringType(), False),
    StructField("last_name", StringType(), False),
])

got_character_df = spark.createDataFrame(list_names, schema=got_character_schema)

In [10]:
got_character_df.groupBy("last_name").count().show()

+---------+-----+
|last_name|count|
+---------+-----+
|Targaryen|    2|
|    Stark|    3|
+---------+-----+



<h3>Reading From Other File Types</h3>

<ul>
    <li>Text</li>
    <li>CSV</li>
    <li>JSON</li>
    <li>Parquet (Columnar Storage)</li>
</ul>

<h3>Reading from JSON</h3>

In [31]:
from pyspark.sql.types import DateType, ArrayType, DecimalType, BooleanType

fed_chair_path = "docs/fed_chairs.json"

fed_schema = StructType([
    StructField("first_name", StringType(), False),
    StructField("last_name", StringType(), False),
    StructField("start_date", StringType(), False),
    StructField("end_date", StringType(), False),
    StructField("meetings",ArrayType(StructType([
        StructField("date", StringType(), False),
        StructField("current_fed_funds", StructType([
            StructField("upper", DecimalType(6,4), False),
            StructField("lower", DecimalType(6,4), False)
        ]),False),
        StructField("new_fed_funds", StructType([
            StructField("upper", DecimalType(6,4), False),
            StructField("lower", DecimalType(6,4), False)
        ]), False), 
        StructField("rate_change", BooleanType(), False)
    ]), False), False)
])



spark.read.json(fed_chair_path, schema=fed_schema).show()

+----------+---------+----------+--------+--------+
|first_name|last_name|start_date|end_date|meetings|
+----------+---------+----------+--------+--------+
|      null|     null|      null|    null|    null|
|      null|     null|      null|    null|    null|
|      null|     null|      null|    null|    null|
|      null|     null|      null|    null|    null|
|      null|     null|      null|    null|    null|
|      null|     null|      null|    null|    null|
|      null|     null|      null|    null|    null|
|      null|     null|      null|    null|    null|
|      null|     null|      null|    null|    null|
|      null|     null|      null|    null|    null|
|      null|     null|      null|    null|    null|
|      null|     null|      null|    null|    null|
|      null|     null|      null|    null|    null|
|      null|     null|      null|    null|    null|
|      null|     null|      null|    null|    null|
|      null|     null|      null|    null|    null|
|      null|

<h3>Reading from CSV</h3>

In [17]:
from jobs.ncaa_basketball_schema import bb_schema

bb_event_path = "./docs/pbp000000000000.csv"

bb_event_df = spark.read \
    .csv(bb_event_path, header=True, schema=bb_schema)

In [18]:
bb_event_df.printSchema()

root
 |-- game_id: string (nullable = true)
 |-- load_timestamp: string (nullable = true)
 |-- season: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- scheduled_date: string (nullable = true)
 |-- venue_id: string (nullable = true)
 |-- venue_name: string (nullable = true)
 |-- venue_city: string (nullable = true)
 |-- venue_state: string (nullable = true)
 |-- venue_address: string (nullable = true)
 |-- venue_zip: string (nullable = true)
 |-- venue_country: string (nullable = true)
 |-- venue_capacity: integer (nullable = true)
 |-- attendance: integer (nullable = true)
 |-- neutral_site: boolean (nullable = true)
 |-- conference_game: boolean (nullable = true)
 |-- tournament: string (nullable = true)
 |-- tournament_type: string (nullable = true)
 |-- round: string (nullable = true)
 |-- game_no: string (nullable = true)
 |-- away_market: string (nullable = true)
 |-- away_name: string (nullable = true)
 |-- away_id: string (nullable = true)
 |-- away_alias: s

In [19]:
from pyspark.sql.functions import col
bb_event_df.select(col("game_id"),col("home_name"),col("away_name")).show(truncate=False)

+------------------------------------+-------------+----------+
|game_id                             |home_name    |away_name |
+------------------------------------+-------------+----------+
|60f60dfe-4423-4ef2-9b6c-aee096a0b65c|Bears        |Cowboys   |
|60f60dfe-4423-4ef2-9b6c-aee096a0b65c|Bears        |Cowboys   |
|5be9a30e-3582-465d-b19e-5388eddbacd5|Trojans      |Musketeers|
|5be9a30e-3582-465d-b19e-5388eddbacd5|Trojans      |Musketeers|
|5be9a30e-3582-465d-b19e-5388eddbacd5|Trojans      |Musketeers|
|5be9a30e-3582-465d-b19e-5388eddbacd5|Trojans      |Musketeers|
|63d257d7-4865-49d1-afd1-fd4ca560bbd5|Miners       |Hawkeyes  |
|63d257d7-4865-49d1-afd1-fd4ca560bbd5|Miners       |Hawkeyes  |
|679a305f-2359-4e0f-97ca-3d886ca3d1c3|Demon Deacons|Volunteers|
|7bee62f8-1a9f-4ab9-896c-9200ef45eea8|Jayhawks     |Miners    |
|846c1adc-fb8e-47a6-8795-758b4c3fe05d|Musketeers   |Hawkeyes  |
|846c1adc-fb8e-47a6-8795-758b4c3fe05d|Musketeers   |Hawkeyes  |
|846c1adc-fb8e-47a6-8795-758b4c3fe05d|Mu

In [None]:
StructType([
    StructField("upper", DecimalType(6,4), False),
    StructField("lower", DecimalType(6,4), False),
])
