[Spark with Jupyter Notebook on MacOS (2.0.0 and higher)](https://medium.com/@roshinijohri/spark-with-jupyter-notebook-on-macos-2-0-0-and-higher-c61b971b5007)
==========================================================================================

#### Run in Terminal:
$\textrm{brew install apache-spark}$

$\textrm{brew info apache-spark}$

$\textrm{export SPARK_HOME='/usr/local/Cellar/apache-spark/2.4.5/libexec/'}$ -> Edit depending on version

$\textrm{pyspark}$

In [1]:
import os
exec(open(os.path.join(os.environ['SPARK_HOME'], 'python/pyspark/shell.py')).read())

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.5
      /_/

Using Python version 3.7.4 (default, Aug 13 2019 15:17:50)
SparkSession available as 'spark'.


In [2]:
import pyspark
from pyspark.sql.session import SparkSession
spark = SparkSession.builder \
    .appName('spark test') \
    .getOrCreate() \

columns = ['id', 'dogs', 'cats']
vals = [
    (1, 2, 0),
    (2, 0, 1)
]

In [3]:
# Create DataFrame
df = spark.createDataFrame(vals, columns)
df.show()

+---+----+----+
| id|dogs|cats|
+---+----+----+
|  1|   2|   0|
|  2|   0|   1|
+---+----+----+



# Spark Broadcast

In [4]:
from pyspark import SparkContext

In [5]:
# Shut down the SparkContext
sc.stop()

# Main entry point for Spark functionality; A SparkContext represents the connection to a Spark cluster, 
#   and can be used to create L{RDD} and broadcast variables on that cluster
# Note: master: Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4])
#      appName: a name for your job, to display on the cluster web UI
sc = SparkContext(master='local[*]', appName='pyspark')
sc

In [6]:
my_dict = {'item1': 1, 'item2': 2, 'item3': 3, 'item4': 4} 
my_list = ['item1', 'item2', 'item3', 'item4']

In [7]:
# Broadcast a read-only variable to the cluster, returning a L{Broadcast<pyspark.broadcast.Broadcast>} object for 
#  reading it in distributed functions. The variable will be sent to each cluster only once.
my_dict_bc = sc.broadcast(my_dict)

In [8]:
def my_func(letter):
    return my_dict_bc.value[letter] 

# Distribute a local Python collection to form an RDD
# Using xrange is recommended if the input represents a range for performance
my_list_rdd = sc.parallelize(my_list)

# Return a new RDD by applying a function to each element of this RDD
# Return a list that contains all of the elements in this RDD
# Note: This method should only be used if the resulting array is expected to be small, as all the data 
#       is loaded into the driver’s memory
result = my_list_rdd.map(lambda x: my_func(x)).collect()

print(result)

[1, 2, 3, 4]


# Optimizing for Data Skewness

In [9]:
from pyspark.sql import SparkSession

def explore_dataframe():
    
    # Instantiate a Spark session 
    # The entry point to programming Spark with the Dataset and DataFrame API
    # Note: appName(): sets a name for the application, which will be shown in the Spark web UI
    #       getOrCreate(): get or instantiate a SparkContext and register it as a singleton object
    spark = SparkSession.builder \
        .appName('Skewness Introduction') \
        .getOrCreate()

    input_path = '/Users/yangweichle/Documents/Employment/TRAINING/DATA SCIENCE/Spark/Udacity_Spark for Big Data/Optimizing for Data Skewness with Spark/parking_violation.csv'

    # Specifies the input data source format
    # Note: source: string, name of the data source, e.g. 'json', 'parquet'
    df = spark.read.format(source='csv').option('header', True).load(input_path)

    # Investigate what columns you have
    col_list = df.columns
    print(col_list)

    # groupby month and year to get count
    year_df = df.groupby('year').count().sort('count', ascending=False)
    month_df = df.groupby('month').count().sort('count', ascending=False)

    year_df.show()
    month_df.show()

    # TODO write file partition by year, and study the executor in the spark UI
    # TODO use repartition function

if __name__ == '__main__':
    explore_dataframe()

['_c0', 'Summons_Number', 'Plate_ID', 'Registration_State', 'Plate_Type', 'Issue_Date', 'Violation_Code', 'Vehicle_Body_Type', 'Vehicle_Make', 'Issuing_Agency', 'Street_Code1', 'Street_Code2', 'Street_Code3', 'Vehicle_Expiration_Date', 'Violation_Location', 'Violation_Precinct', 'Issuer_Precinct', 'Issuer_Code', 'Issuer_Command', 'Issuer_Squad', 'Violation_Time', 'Time_First_Observed', 'Violation_County', 'Violation_In_Front_Of_Or_Opposite', 'House_Number', 'Street_Name', 'Intersecting_Street', 'Date_First_Observed', 'Law_Section', 'Sub_Division', 'Violation_Legal_Code', 'Days_Parking_In_Effect____', 'From_Hours_In_Effect', 'To_Hours_In_Effect', 'Vehicle_Color', 'Unregistered_Vehicle?', 'Vehicle_Year', 'Meter_Number', 'Feet_From_Curb', 'Violation_Post_Code', 'Violation_Description', 'No_Standing_or_Stopping_Violation', 'Hydrant_Violation', 'Double_Parking_Violation', 'Latitude', 'Longitude', 'Community_Board', 'Community_Council_', 'Census_Tract', 'BIN', 'BBL', 'NTA', 'year', 'month']


### partitionBy

In [10]:
from pyspark.sql import SparkSession

def partitionBy():

    # Instantiate a Spark session 
    # The entry point to programming Spark with the Dataset and DataFrame API
    # Note: appName(): sets a name for the application, which will be shown in the Spark web UI
    #       getOrCreate(): get or instantiate a SparkContext and register it as a singleton object
    spark = SparkSession.builder \
        .appName('Repartition Example') \
        .getOrCreate()
    
    input_path = '/Users/yangweichle/Documents/Employment/TRAINING/DATA SCIENCE/Spark/Udacity_Spark for Big Data/Optimizing for Data Skewness with Spark/parking_violation.csv'

    # Specifies the input data source format
    # Note: source: string, name of the data source, e.g. 'json', 'parquet'
    df = spark.read.format(source='csv').option('header', True).load(input_path) 
    
    # Partitions the output by the given columns on the file system
    # Saves the content of the :class:`DataFrame` in CSV format at the specified path
    # Note: path: the path in any Hadoop supported file system
    #       mode: specifies the behavior of the save operation when data already exists
    #             * ``append``: Append contents of this :class:`DataFrame` to existing data.
    #             * ``overwrite``: Overwrite existing data.
    #             * ``ignore``: Silently ignore this operation if data already exists.
    #             * ``error`` or ``errorifexists`` (default case): Throw an exception if data already exists
    out_path = '/Users/yangweichle/Documents/Employment/TRAINING/DATA SCIENCE/Spark/Udacity_Spark for Big Data/Optimizing for Data Skewness with Spark/parking_violation_partition_year.csv'
    df.write.partitionBy('year').csv(path=out_path, mode='ignore')

if __name__ == '__main__':
    partitionBy()

### Repartition

In [11]:
# Returns a new :class:`DataFrame` partitioned by the given partitioning expressions
#   The resulting :class:`DataFrame` is hash partitioned
# Note: numPartitions: can be an int to specify the target number of partitions or a Column
df.repartition(6)

DataFrame[id: bigint, dogs: bigint, cats: bigint]

In [12]:
df.repartition(6).rdd.getNumPartitions()

6

In [13]:
from pyspark.sql import SparkSession

def repartition():

    # Instantiate a Spark session 
    # The entry point to programming Spark with the Dataset and DataFrame API
    # Note: appName(): sets a name for the application, which will be shown in the Spark web UI
    #       getOrCreate(): get or instantiate a SparkContext and register it as a singleton object
    spark = SparkSession.builder \
        .appName('Repartition Example') \
        .getOrCreate()
    
    input_path = '/Users/yangweichle/Documents/Employment/TRAINING/DATA SCIENCE/Spark/Udacity_Spark for Big Data/Optimizing for Data Skewness with Spark/parking_violation.csv'

    # Specifies the input data source format
    # Note: source: string, name of the data source, e.g. 'json', 'parquet'
    df = spark.read.format(source='csv').option('header', True).load(input_path) 
    
    # Returns a new :class:`DataFrame` partitioned by the given partitioning expressions
    #   The resulting :class:`DataFrame` is hash partitioned
    # Note: numPartitions: can be an int to specify the target number of partitions or a Column
    # Saves the content of the :class:`DataFrame` in CSV format at the specified path
    # Note: path: the path in any Hadoop supported file system
    #       mode: specifies the behavior of the save operation when data already exists
    #             * ``append``: Append contents of this :class:`DataFrame` to existing data.
    #             * ``overwrite``: Overwrite existing data.
    #             * ``ignore``: Silently ignore this operation if data already exists.
    #             * ``error`` or ``errorifexists`` (default case): Throw an exception if data already exists
    out_path = '/Users/yangweichle/Documents/Employment/TRAINING/DATA SCIENCE/Spark/Udacity_Spark for Big Data/Optimizing for Data Skewness with Spark/parking_violation_repartition6.csv'
    df.repartition(6).write.csv(path=out_path, mode='ignore')

if __name__ == '__main__':
    repartition()