## PySpark Tutorial

```
Important:
  Use bash when working with conda environments

Some conda commands:
conda info # show channels
conda config --add channels conda-forge
conda install --channel "conda-forge" pyspark
# or conda install -c conda-forge pyspark

# Add this line to your .bashrc:
export PYSPARK_PYTHON="$HOME/anaconda3/bin/python"

---------------------------------------------------
Note - Spark needs Java.
Make sure to install it.
Check it location in system preferences.
Add env. variables in your .bashrc file:

export JAVA_HOME='/Library/Internet Plug-Ins/JavaAppletPlugin.plugin/Contents/Home'
export JRE_HOME=$JAVA_HOME

---------------------------------------------------
#   # Note - you can create separate environment
#   # with specific python version if needed
#   conda create -n py37 python=3.7
#   conda activate py37
#   conda install jupyter ipython numpy pandas matplotlib
#
#   conda env list
#   conda activate py37
#   conda install -c conda-forge pyspark
```

Installing PySpark on Windows:
 - https://bigdata-madesimple.com/guide-to-install-spark-and-use-pyspark-from-jupyter-in-windows/
 - https://github.com/cdarlint/winutils

Note: PySpark and winutils versions should match each other.
<br>For example, Apache PySpark 3.1.2 plus 3.2.0 winutils 

Spark Programming Guide
- https://spark.apache.org/docs/latest/
- https://spark.apache.org/docs/latest/quick-start.html
- https://spark.apache.org/docs/latest/sql-programming-guide.html 
- https://spark.apache.org/docs/latest/rdd-programming-guide.html 
- https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
- https://spark.apache.org/docs/latest/streaming-programming-guide.html 
- https://spark.apache.org/docs/latest/ml-guide.html 
- https://spark.apache.org/docs/latest/graphx-programming-guide.html
- https://spark.apache.org/docs/latest/api/python/getting_started/index.html


In [1]:
!python --version

Python 3.9.12


In [2]:
from pyspark import SparkContext, SparkConf

In [3]:
conf = SparkConf().setMaster('local')

In [4]:
sc = SparkContext(conf=conf)

22/07/09 20:38:54 WARN Utils: Your hostname, MacBook-M1-Pro-16-Lev-2022.local resolves to a loopback address: 127.0.0.1; using 192.168.0.34 instead (on interface en0)
22/07/09 20:38:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/07/09 20:38:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
# look at context, 
# click on link to the Spark UI, 
# click on tabs on top
sc

In [6]:
rdd = sc.parallelize(range(10**6))

In [7]:
rdd.count()

                                                                                

1000000

In [8]:
sc.stop()
del sc

In [9]:
import pyspark
from pyspark import SparkConf, SparkContext
from operator import add

sc = SparkContext(master = 'local[2]')

In [10]:
# Inspect SparkContext
print( sc.version )              # SparkContext version 
print( sc.pythonVer )            # Python version 
print( sc.master )               # Master URL to connect to 
print( str(sc.sparkHome) )       # Path where Spark is installed on worker nodes 
print( str(sc.sparkUser()))      # name of the Spark User running SparkContext
print( sc.appName )              # application name 
print( sc.applicationId )        # application ID 
print( sc.defaultParallelism )   # default level of parallelism 
print( sc.defaultMinPartitions ) # minimum number of partitions for RDDs 

3.3.0
3.9
local[2]
None
levselector
pyspark-shell
local-1657413539953
2
2


In [11]:
sc.stop()
del sc

In [12]:
# Configuration
conf = (SparkConf()
         .setMaster("local")
         .setAppName("My app")
         .set("spark.executor.memory", "1g")) 

sc = SparkContext(conf = conf)

In [13]:
# Loading Data

# Parallelized Collections

rdd  = sc.parallelize([('a',7),('a',2),('b',2)])
rdd2 = sc.parallelize([('a',2),('d',1),('b',1)])
rdd3 = sc.parallelize(range(100))
rdd4 = sc.parallelize([("a",["x","y","z"]), ("b",["p", "r"])])

# External Data
# Read either one text file from HDFS, 
# a local file system 
# or any Hadoop-supported file system URI with textFile(), 
# or read in a directory of text files with wholeTextFiles().

textFile = sc.textFile("/tmp/lev/*.txt")
textFile2 = sc.wholeTextFiles("/tmp/lev/")

In [14]:
# Retrieving RDD Information
# Basic Information

print( rdd.getNumPartitions() ) # number of partitions 
  # 1
print( rdd.count() )            # Count RDD instances    
  # 3
print( rdd.countByKey() )       # Count RDD instances by key 
  # defaultdict(<class 'int'>, {'a': 2, 'b': 1})
    
print(rdd.countByValue())       # Count RDD instances by value 
  # defaultdict(<class 'int'>, {('a', 7): 1, ('a', 2): 1, ('b', 2): 1})

print( rdd.collectAsMap() )     # Return (key,value) pairs as a dictionary
  # {'a': 2,'b': 2}
    
print( rdd3.sum() )             # Sum of RDD elements 
  # 4950

print( sc.parallelize([]).isEmpty() ) # Check whether RDD is empty
  # True

1
3
defaultdict(<class 'int'>, {'a': 2, 'b': 1})
defaultdict(<class 'int'>, {('a', 7): 1, ('a', 2): 1, ('b', 2): 1})
{'a': 2, 'b': 2}
4950
True


In [15]:
# Summary

print( rdd3.max() )   # Maximum value of RDD elements
  # 99
print( rdd3.min() )   # Minimum value of RDD elements
  # 0
print( rdd3.mean() )  # Mean value of RDD elements
  # 49.5
print( rdd3.stdev() ) # Standard deviation of RDD elements
  # 28.866
print( rdd3.variance() )   # variance of RDD elements 
  # 833.25
print( rdd3.histogram(3) ) # histogram by bins  
  # ([0,33,66,99],[33,33,34])
print( rdd3.stats() )      # Summary statistics 
                           # (count, mean, stdev, max & min)

99
0
49.5
28.86607004772212
833.25
([0, 33, 66, 99], [33, 33, 34])
(count: 100, mean: 49.5, stdev: 28.86607004772212, max: 99.0, min: 0.0)


In [16]:
# Applying Functions

# Apply a function to each RDD element
print( rdd.map(lambda x: x+(x[1],x[0])).collect() )
  # [('a',7,7,'a'),('a',2,2,'a'),('b',2,2,'b')] 
    
# Apply a function to each RDD element and flatten the result
rdd5 = rdd.flatMap(lambda x: x+(x[1],x[0]))
print( rdd5.collect() )
  # ['a',7,7,'a','a',2,2,'a','b',2,2,'b'] 

# Apply a flatMap function to each (key,value) 
# pair of rdd4 without changing the keys
print( rdd4.flatMapValues(lambda x: x).collect() )
  # [('a','x'),('a','y'),('a','z'),('b','p'),('b','r')]

[('a', 7, 7, 'a'), ('a', 2, 2, 'a'), ('b', 2, 2, 'b')]
['a', 7, 7, 'a', 'a', 2, 2, 'a', 'b', 2, 2, 'b']
[('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]


In [17]:
# Selecting Data
# -------------------------------
# Getting

# Return a list with all RDD elements 
print( rdd.collect() )
    # [('a', 7), ('a', 2), ('b', 2)] 

# Take first 2 RDD elements 
print( rdd.take(2) )
    # [('a', 7), ('a', 2)] 

# Take first RDD element 
print( rdd.first() )
    # ('a', 7) 

# Take top 2 RDD elements
print( rdd.top(2) )
    # [('b', 2), ('a', 7)]

[('a', 7), ('a', 2), ('b', 2)]
[('a', 7), ('a', 2)]
('a', 7)
[('b', 2), ('a', 7)]


In [18]:
# -------------------------------
# Sampling
# Return sampled subset of rdd3
print( rdd3.sample(False, 0.15, 81).collect() )
    # [3,4,27,31,40,41,42,43,60,76,79,80,86,97]

[3, 4, 27, 28, 35, 41, 43, 49, 53, 58, 85, 93]


In [19]:
# -------------------------------
# Filtering 
# Filter the RDD
print( rdd.filter(lambda x: "a" in x).collect() )
    # [('a',7),('a',2)] 
    
# Return distinct RDD values 
print( rdd5.distinct().collect() )
    # ['a',2,'b',7] 

# Return (key,value) RDD's keys
print( rdd.keys().collect() )
    # ['a', 'a', 'b']

[('a', 7), ('a', 2)]
['a', 7, 2, 'b']
['a', 'a', 'b']


In [20]:
# Iterating
# Apply a function to all RDD elements
def g(x): 
    print(x)
print( rdd.foreach(g) )
  # None
    
rdd.take(3)
  # ('a', 7) ('b', 2) ('a', 2)

None


('a', 7)
('a', 2)
('b', 2)


[('a', 7), ('a', 2), ('b', 2)]

In [21]:
# Reshaping Data

# -------------------------------
# Reducing
# Merge the rdd values for each key
print( rdd.reduceByKey(lambda x,y : x+y).collect() )
   # [('a',9),('b',2)] 

[('a', 9), ('b', 2)]


In [22]:
# Merge the rdd values
print( rdd.reduce(lambda a, b: a + b) )
   # ('a',7,'a',2,'b',2)

('a', 7, 'a', 2, 'b', 2)


In [23]:
# -------------------------------
# Grouping by
print( rdd3.groupBy(lambda x: x % 2)
      .mapValues(list)
      .collect() )  # Return RDD of grouped values
print( rdd.groupByKey()
      .mapValues(list)
      .collect() )  # Group rdd by key
   # [('a',[7,2]),('b',[2])]

[(0, [0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98]), (1, [1, 3, 5, 7, 9, 11, 13, 15, 17, 19, 21, 23, 25, 27, 29, 31, 33, 35, 37, 39, 41, 43, 45, 47, 49, 51, 53, 55, 57, 59, 61, 63, 65, 67, 69, 71, 73, 75, 77, 79, 81, 83, 85, 87, 89, 91, 93, 95, 97, 99])]
[('a', [7, 2]), ('b', [2])]


In [24]:
# -------------------------------
# Aggregating

seqOp = (lambda x,y: (x[0]+y,x[1]+1)) 
combOp = (lambda x,y:(x[0]+y[0],x[1]+y[1]))

In [25]:
# Aggregate RDD elements of each partition and then the results 
rdd3.aggregate((0,0),seqOp,combOp)
    # (4950,100) 

(4950, 100)

In [26]:
# Aggregate values of each RDD key
rdd.aggregateByKey((0,0),seqOp,combOp).collect() 
    # [('a',(9,2)), ('b',(2,1))] 

[('a', (9, 2)), ('b', (2, 1))]

In [27]:
# Aggregate the elements of each partition, and then the results
rdd3.fold(0,add)
    # 4950 

4950

In [28]:
# Merge the values for each key
rdd.foldByKey(0, add).collect() 
    # [('a',9),('b',2)] 

# Create tuples of RDD elements by applying a function
rdd3.keyBy(lambda x: x+x).collect()
    # 

[(0, 0),
 (2, 1),
 (4, 2),
 (6, 3),
 (8, 4),
 (10, 5),
 (12, 6),
 (14, 7),
 (16, 8),
 (18, 9),
 (20, 10),
 (22, 11),
 (24, 12),
 (26, 13),
 (28, 14),
 (30, 15),
 (32, 16),
 (34, 17),
 (36, 18),
 (38, 19),
 (40, 20),
 (42, 21),
 (44, 22),
 (46, 23),
 (48, 24),
 (50, 25),
 (52, 26),
 (54, 27),
 (56, 28),
 (58, 29),
 (60, 30),
 (62, 31),
 (64, 32),
 (66, 33),
 (68, 34),
 (70, 35),
 (72, 36),
 (74, 37),
 (76, 38),
 (78, 39),
 (80, 40),
 (82, 41),
 (84, 42),
 (86, 43),
 (88, 44),
 (90, 45),
 (92, 46),
 (94, 47),
 (96, 48),
 (98, 49),
 (100, 50),
 (102, 51),
 (104, 52),
 (106, 53),
 (108, 54),
 (110, 55),
 (112, 56),
 (114, 57),
 (116, 58),
 (118, 59),
 (120, 60),
 (122, 61),
 (124, 62),
 (126, 63),
 (128, 64),
 (130, 65),
 (132, 66),
 (134, 67),
 (136, 68),
 (138, 69),
 (140, 70),
 (142, 71),
 (144, 72),
 (146, 73),
 (148, 74),
 (150, 75),
 (152, 76),
 (154, 77),
 (156, 78),
 (158, 79),
 (160, 80),
 (162, 81),
 (164, 82),
 (166, 83),
 (168, 84),
 (170, 85),
 (172, 86),
 (174, 87),
 (176, 88

In [29]:
# Mathematical Operations

# Return each rdd value not contained in rdd2
print( rdd.subtract(rdd2).collect() )
  # [('b',2),('a',7)] 
    
# Return each (key,value) pair of rdd2 with no matching key in rdd
print( rdd2.subtractByKey(rdd).collect() )
  # [('d', 1)] 

# Return the Cartesian product of rdd and rdd2
print( rdd.cartesian(rdd2).collect() )
  # [(('a', 7), ('a', 2)), 
  #  (('a', 7), ('d', 1)), 
  #  (('a', 7), ('b', 1)), 
  #  (('a', 2), ('a', 2)), 
  #  (('a', 2), ('d', 1)), 
  #  (('a', 2), ('b', 1)), 
  #  (('b', 2), ('a', 2)), 
  #  (('b', 2), ('d', 1)), 
  #  (('b', 2), ('b', 1))]

[('a', 7), ('b', 2)]
[('d', 1)]
[(('a', 7), ('a', 2)), (('a', 7), ('d', 1)), (('a', 7), ('b', 1)), (('a', 2), ('a', 2)), (('a', 2), ('d', 1)), (('a', 2), ('b', 1)), (('b', 2), ('a', 2)), (('b', 2), ('d', 1)), (('b', 2), ('b', 1))]


In [30]:
# Sort

# Sort RDD by given function
print( rdd2.sortBy(lambda x: x[1]).collect() )
  # [('d',1),('b',1),('a',2)] 

# Sort (key, value) RDD by key
print( rdd2.sortByKey().collect() )
  # [('a',2),('b',1),('d',1)]

[('d', 1), ('b', 1), ('a', 2)]
[('a', 2), ('b', 1), ('d', 1)]


In [31]:
# Stopping SparkContext
sc.stop()