# Py Spark experimentation

In [1]:
import pyspark

## Spark Context

Spark Context is the key entry point for Spark as it represents the connection to a Spark Cluster and can manipulate the cluster. Using Py4J it serves as a bridge between python and Java so that it your python code can interact with the Java SparkContext (how it was originally written). The Conext acts as a main (formerly master) node of the Sparke application. Only one SparkContext can be created per python kernel

In [2]:
sc = pyspark.SparkContext('local[*]')
type(sc)


# sc.parallelize(range(1000)).count()
# rdd = sc.parallelize(range(1000))
# rdd.takeSample(False,5)

pyspark.context.SparkContext

In [3]:
print('Current Version of Spark: ', sc.version)
print('Default number of cores in use: ', sc.defaultParallelism)
print('Current Application Name: ', sc.appName)
print('Current app id: ', sc.applicationId)

Current Version of Spark:  2.4.0
Default number of cores in use:  8
Current Application Name:  pyspark-shell
Current app id:  local-1611436112913


In [4]:
#all config settings for pyspark
sc._conf.getAll()

[('spark.driver.port', '53567'),
 ('spark.app.id', 'local-1611436112913'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.host', 'mitchells-mbp.lan'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.name', 'pyspark-shell')]

In [5]:
# ends spark context
sc.stop()

## Reseilient Distributed Datasets (RDDs) 

RDDs are the fundamental data structure of Spark. RDDs are a representation of a set of data spread across multiple machines. They are:
- **Resilient:** RDDs have built-in fault tolerance meaning that if a node goes offline the RDD will still be able to restore data
- **Distributed:** Data in RDDs are partitioned across multiple nodes of a cluster
- **Immutable:** Once created it cannot be modified
- **Lazily Evaluated:** RDDs are not evaluated until explicitly called. When they are created, they are programed to perform an action but it will not be activated until called (allows for organization into smaller actions to help with speed and memory useage on large quantities)
- **In Memory:** Operations are performed in memory not in the database (allows for speed with large quantities of data)

### Transformations & Actions

Every transformation on an RDD creates a new RDD becuase RDDs are immutable, however, it is not executed until an action is called (lazy evaluation). Actions return final results of computations on RDDs and execute using the lineage graph of the original RDD.

### RDD Example

In [6]:
#generate a simple data set
data = list(range(1,1001))

#create spark context
sc = pyspark.SparkContext('local[*]')

# create rdd with 10 partions
rdd = sc.parallelize(data, numSlices=10)
print(type(rdd))

<class 'pyspark.rdd.RDD'>


In [7]:
#check number of partitions
rdd.getNumPartitions()

10

In [8]:
#simple actions on RDD
print('Count: ', rdd.count()) #counts total number of items in RDD
print('First: ', rdd.first()) #returns first item in RDD
print('Take: ', rdd.take(10)) #returns first n items in RDD
print('Top: ', rdd.top(10)) #returns top/last n items in RDD
print('Collect', rdd.collect()) #returns everything in RDD

Count:  1000
First:  1
Take:  [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Top:  [1000, 999, 998, 997, 996, 995, 994, 993, 992, 991]
Collect [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196,

***
Simulate Data in RDD by multiplying each value in RDD with a random values between 0 & 1

In [20]:
import random 
import numpy as np

nums = np.array(range(1,1001))
data = nums * np.random.rand(1000)

#convert numpy array to RDD
rdd2 = sc.parallelize(data, numSlices=10)

Let's say every item in `rdd2` is a price. Calculate reveue afater 6% Sales tax is taken out:
 - Note `.map()` is a transformer and isn't executed until an actional is called

In [23]:
def sales_tax(num):
    return num*.94

revenue_after_tax = rdd2.map(sales_tax)
revenue_after_tax.top(10)

[920.922635608762,
 899.7650874927601,
 887.1885961828942,
 882.160165561848,
 867.6439348908704,
 859.5777504999264,
 854.8428118295329,
 844.9590061343738,
 836.0707910494224,
 805.3793290190358]

This can also be written using lamda function and chained

In [26]:
rdd2.map(lambda x: x*.94).top(10)

[920.922635608762,
 899.7650874927601,
 887.1885961828942,
 882.160165561848,
 867.6439348908704,
 859.5777504999264,
 854.8428118295329,
 844.9590061343738,
 836.0707910494224,
 805.3793290190358]

You can check the lineage (history) of RDD by using `.toDebugString()`

In [28]:
print(type(revenue_after_tax))
revenue_after_tax.toDebugString()

<class 'pyspark.rdd.PipelinedRDD'>


b'(10) PythonRDD[15] at RDD at PythonRDD.scala:53 []\n |   ParallelCollectionRDD[9] at parallelize at PythonRDD.scala:195 []'

In [29]:
revenue_after_tax

PythonRDD[15] at RDD at PythonRDD.scala:53