https://www.dezyre.com/apache-spark-tutorial/pyspark-tutorial

## Basic command actions

##### Spark Actions

* myRDDdata.collect(): see the content of the RDD data to the Driver Node (which can cause out of memory)
* myRDDdata.select(): holds only the columns which were selected
* myRDDdata.First(): returns the first element from the dataset
* myRDDdata.Take(n): returns the first n lines from the dataset
* myRDDdata.TakeSample(withReplacement, n, [seed]): sample n lines from the dataset
* myRDDdata.Count(): returns the total number of lines in the dataset
* myRDDdata.reduce()
* myRDDdata.unpersist(): removes the cached data from memory
* myRDDdata.countByValue(): works as value_counts() in pandas

##### Spark transformation

* map()
* flatMap()
* filter()
* sample()
* union()
* intersection()
* distinct()
* join()

## RDD Partitions

Parallelism is the key feature of any distributed system where operations are done by dividing the data into multiple parallel partitions. The same operation is performed on the partitions simultaneously which helps achieve fast data processing with spark. **Map and Reduce** operations can be effectively applied in parallel in apache spark by dividing the data into multiple partitions. A copy of each partition within an RDD is distributed across several workers running on different nodes of a cluster so that in case of failure of a single worker the RDD still remains available.

Degree of parallelism of each operation on RDD depends on **the fixed number of partitions** that an RDD has. We can specify the degree of parallelism or the number of partitions when creating it or later on using the repartition() and coalesce() methods.

* partRDD = sc.textFile('/opt/spark/CHANGES.txt', 4)
* partRDD.getNumPartitions()

When processing data with **reduceByKey** operation, Spark will form as many number of output partitions based on the default parallelism which depends on the numbers of nodes and cores available on each node.

Higher level --> Lower level: 
* Clusters --> Nodes --> Cores/Workers 

This runs a map operation individually on each partition, unlike a normal map operation where map is used to operate on each line of the entire RDD.
* partRDD.mapPartitions() 

## Sample code (in local mode)

In [None]:
# from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName('MyFirstStandalongApp')
sc = SparkContext(conf=conf)

userRDD = sc.textFile("/usr/lib/data_backup/opt/spark_usecases/movie/ml-100k/u.user")

def parse_N_calculate_age(data):
    userid, age, gender, occupation, zipcode = data.split("|")
    return userid, age_group(int(age)), gender, occupation, zipcode, int(age)

def age_group(age):
    if age < 10:
        return "0-10"
    elif age < 20:
        return "10-20"
    elif age < 30:
        return "20-30"
    elif age < 40:
        return "30-40"
    elif age < 50:
        return "40-50"
    elif age < 60:
        return "50-60"
    elif age < 70:
        return "60-70"
    elif age < 80:
        return "70-80"
    else: 
        return "80+"

data_with_age_bucket = userRDD.map(parse_N_calculate_age)

RDD_20_30 = data_with_age_bucket.filter(lambda line: "20-30" in line)
freq = RDD_20_30.map(lambda line: line[3]).countByValue()

print("total user count is", userRDD.count())
print("total movie users profession are", dict(freq))



# Cover the global variables by Accumulator feature to share across tasks
Under_age = sc.accumulator(0)
Over_age = sc.accumulator(0)

def outliers(data):
    global Over_age, Under_age # without Accumulator, the global variable is only defined on the driver node
    age_grp = data[1]
    if age_grp in ["70-80","80+"]:
        Over_age += 1
    elif age_grp == "0-10":
        Under_age += 1
    return data


# collect() returns the entire dataset to the Driver Node which can cause out of memory
df = data_with_age_bucket.map(outliers).collect()

print("under age users of the movie are ", Under_age)
print("over age users of the movie are ", Over_age)

In [None]:
import pandas as pd
mylist = pd.Series([1,2,3,4,2,3,4,2,1,2,1,1,2,3,4,3,4,3,2,1,2,1])
mydf = mylist.value_counts().sort_index().reset_index()
mydf.columns = ['A','B']
mydf

## Launch Spark on Clustering

1. Apache Hadoop YARN: HDFS is the source storage and YARN is the resource manager in this scenario. All read or write operations in this mode are performed on HDFS.
2. Apache Mesos: A distributed mode where the resource management is handled by the cluster manager Apache Mesos developed by UC Berkeley.
3. Standalone Mode: In this mode the resource management is handled by the spark in-built resource manager.

In order to run the application in cluster mode you should have your distributed cluster set up already with all the workers listening to the master.

