<a href="https://colab.research.google.com/github/mohammadRahimi1993/PythonEducation/blob/main/Step0_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
# install java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"


# install findspark using pip
!pip install -q findspark

In [3]:
import findspark
findspark.init()

In [4]:
import pyspark
from pyspark.sql import SparkSession

In [5]:
spark=SparkSession.builder.master("local[1]").appName('Step0-Hands-On').getOrCreate()

In [6]:
import random

NUM_SAMPLES=100

def inside(p):
    x,y = random.random(), random.random()
    return x*x + y*y < 1

data_rdd = spark.sparkContext.parallelize(range(0,NUM_SAMPLES)).filter(inside)
data = data_rdd.collect()
print(data)
print("-"*50)
print('Pi is roughly {}'.format(4.0 * len(data) / NUM_SAMPLES))

[1, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 14, 16, 17, 18, 20, 21, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 37, 38, 39, 40, 41, 42, 43, 44, 46, 47, 48, 49, 50, 51, 52, 55, 56, 57, 58, 60, 61, 62, 63, 64, 66, 68, 69, 70, 71, 72, 73, 74, 75, 76, 79, 81, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 97, 99]
--------------------------------------------------
Pi is roughly 3.24


In [7]:
data_rdd.getNumPartitions()

1

### Number of Partitions

In [8]:
data_rdd = spark.sparkContext.parallelize(range(0,NUM_SAMPLES),12).filter(inside)
print(data_rdd)
print("-"*50)
data = data_rdd.collect()
print(data)
print("-"*50)
print('Pi is roughly {}'.format(4.0 * len(data) / NUM_SAMPLES))
print("-"*50)
print(f"# Partitions : {data_rdd.getNumPartitions()}")

PythonRDD[3] at RDD at PythonRDD.scala:53
--------------------------------------------------
[0, 3, 4, 5, 6, 7, 9, 11, 13, 14, 16, 18, 20, 21, 22, 24, 25, 27, 28, 30, 31, 32, 33, 34, 35, 36, 39, 40, 41, 42, 43, 44, 45, 47, 48, 51, 53, 54, 55, 56, 57, 58, 59, 60, 61, 63, 64, 65, 66, 69, 73, 77, 78, 79, 81, 82, 84, 85, 87, 88, 90, 91, 93, 94, 95, 96, 97]
--------------------------------------------------
Pi is roughly 2.68
--------------------------------------------------
# Partitions : 12


### Some Basic RDD Operations

In [9]:
spark.version

'3.0.0'

In [10]:
numbers = range(0,100)

numbers_rdd= spark.sparkContext.parallelize(numbers,4)

In [11]:
help(spark.sparkContext.parallelize)

Help on method parallelize in module pyspark.context:

parallelize(c, numSlices=None) method of pyspark.context.SparkContext instance
    Distribute a local Python collection to form an RDD. Using xrange
    is recommended if the input represents a range for performance.
    
    >>> sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()
    [[0], [2], [3], [4], [6]]
    >>> sc.parallelize(xrange(0, 6, 2), 5).glom().collect()
    [[], [0], [], [2], [4]]



In [12]:
help(numbers_rdd.glom)

Help on method glom in module pyspark.rdd:

glom() method of pyspark.rdd.PipelinedRDD instance
    Return an RDD created by coalescing all elements within each partition
    into a list.
    
    >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
    >>> sorted(rdd.glom().collect())
    [[1, 2], [3, 4]]



In [13]:
numbers_rdd2= numbers_rdd.filter(lambda x: x%3==0 and x%2==0).glom()
numbers_rdd2.collect()

[[0, 6, 12, 18, 24], [30, 36, 42, 48], [54, 60, 66, 72], [78, 84, 90, 96]]

In [14]:
numbers_rdd3= spark.sparkContext.parallelize(numbers,10)
numbers_rdd4= numbers_rdd3.filter(lambda x: x%3==0 and x%2==0).glom()
numbers_rdd4.collect()

[[0, 6],
 [12, 18],
 [24],
 [30, 36],
 [42, 48],
 [54],
 [60, 66],
 [72, 78],
 [84],
 [90, 96]]

#### coalesce


In [20]:
numbers_rdd5= numbers_rdd3.filter(lambda x: x%3==0 and x%2==0).coalesce(2).glom()
numbers_rdd5.collect()

[[0, 6, 12, 18, 24, 30, 36, 42, 48], [54, 60, 66, 72, 78, 84, 90, 96]]

In [None]:
help(numbers_rdd.coalesce)

Help on method coalesce in module pyspark.rdd:

coalesce(numPartitions, shuffle=False) method of pyspark.rdd.PipelinedRDD instance
    Return a new RDD that is reduced into `numPartitions` partitions.
    
    >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()
    [[1], [2, 3], [4, 5]]
    >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
    [[1, 2, 3, 4, 5]]



#### setName

In [21]:
numbers_rdd5.setName("Numbers_100_%6==0")

Numbers_100_%6==0 PythonRDD[13] at collect at <ipython-input-20-e48a575489c8>:2

In [22]:
numbers_rdd5.id()

13

### Get RDD Objects

In [23]:
numbers_rdd.first()

0

In [28]:
numbers_rdd5.top(1)

[[54, 60, 66, 72, 78, 84, 90, 96]]

In [29]:
numbers_rdd.take(10)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [30]:
numbers_rdd.top(10)

[99, 98, 97, 96, 95, 94, 93, 92, 91, 90]

In [31]:
numbers_rdd.takeOrdered(10)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [32]:
help(numbers_rdd.takeOrdered)

Help on method takeOrdered in module pyspark.rdd:

takeOrdered(num, key=None) method of pyspark.rdd.PipelinedRDD instance
    Get the N elements from an RDD ordered in ascending order or as
    specified by the optional key function.
    
    .. note:: this method should only be used if the resulting array is expected
        to be small, as all the data is loaded into the driver's memory.
    
    >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6)
    [1, 2, 3, 4, 5, 6]
    >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x)
    [10, 9, 7, 6, 5, 4]



In [None]:
numbers_rdd.takeOrdered(10,key=lambda x : -x)

[99, 98, 97, 96, 95, 94, 93, 92, 91, 90]

In [None]:
numbers_rdd4.takeOrdered(10,key=lambda x : len(x))

[[24],
 [54],
 [84],
 [0, 6],
 [12, 18],
 [30, 36],
 [42, 48],
 [60, 66],
 [72, 78],
 [90, 96]]

In [None]:
numbers_rdd4.takeOrdered(10,key=lambda x : -x[-1])

[[90, 96],
 [84],
 [72, 78],
 [60, 66],
 [54],
 [42, 48],
 [30, 36],
 [24],
 [12, 18],
 [0, 6]]

In [35]:
numbers_rdd.sample(withReplacement=False, fraction=0.1).collect()

[33, 53, 62, 68, 81]

### Arithmetic Functions

In [36]:
numbers_rdd.count()

100

In [37]:
def isPrime(n) :
    prime=True
    if n==0 or n==1 :
        return False
    for i in range(2, n//2+1) :
        if n%i == 0 :
            prime=False
            break
    return prime
numbers_rdd.filter(isPrime).take(15)

[2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47]

In [38]:
numbers_rdd.filter(isPrime).min()

2

In [39]:
numbers_rdd.filter(isPrime).max()

97

In [40]:
numbers_rdd.filter(isPrime).mean()

42.4

In [None]:
my_list= [ random.randint(0,4) for x in range(0,10)]
my_list

[1, 1, 1, 2, 1, 2, 3, 3, 3, 2]

In [None]:
mylist_rdd= spark.sparkContext.parallelize(my_list,2)
mylist_rdd.countByValue()

defaultdict(int, {1: 4, 2: 3, 3: 3})

### map / flatMap     

In [41]:
tuple_list = [(1,'t'),(2,'b'),(3,'r')]
tuple_rdd= spark.sparkContext.parallelize(tuple_list,2)

tuple_rdd.collect()


[(1, 't'), (2, 'b'), (3, 'r')]

In [42]:
tuple_rdd.map(lambda x: (x[0]**2,x[1])).collect()

[(1, 't'), (4, 'b'), (9, 'r')]

In [43]:
tuple_rdd.map(lambda x: x[1]*3).collect()

['ttt', 'bbb', 'rrr']

In [44]:
import random
def test(x) : 
    x0 = x[1] * random.randint(1,10)
    x1 = len(x0)
    return (x0,x1, x[0])

tuple_rdd.map(test).collect()

[('ttttttt', 7, 1), ('bbbbbbbbbb', 10, 2), ('rrr', 3, 3)]

flatMap : 

Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.



In [46]:
numbers_random= spark.sparkContext.parallelize([random.randint(0,20) for x in range(10)], 3)
numbers_random.glom().collect()

[[11, 3, 9], [14, 11, 0], [7, 10, 7, 9]]

In [47]:
numbers_random.glom().flatMap(lambda x: x).collect()

[11, 3, 9, 14, 11, 0, 7, 10, 7, 9]

In [49]:
print(tuple_rdd.collect())
tuple_rdd.flatMap(lambda x: (x[0]**2,x[1])).collect()

[(1, 't'), (2, 'b'), (3, 'r')]


[1, 't', 4, 'b', 9, 'r']

In [50]:
tuple_rdd.filter(lambda x : x[0]>1).flatMap(lambda x: (x[0]**2,x[1])).collect()

[4, 'b', 9, 'r']

#### reduce / reduceBy

In [52]:
# from math import abs
def dif(a,b) :
    return abs(a-b)

numbers_new = [4,7,23,30]
numbers_new_rdd = spark.sparkContext.parallelize(numbers_new,4)

t = numbers_new_rdd.glom().collect()
print(t)
numbers_new_rdd.reduce(dif)


[[4], [7], [23], [30]]


10

In [53]:
def add(a,b) :
    return a+b

numbers_new = [4,7,23,30]
numbers_new_rdd = spark.sparkContext.parallelize(numbers_new,3)

t = numbers_new_rdd.glom().collect()
print(t)
numbers_new_rdd.reduce(add)


[[4], [7], [23, 30]]


64

In [56]:
help(numbers_new_rdd.fold)

Help on method fold in module pyspark.rdd:

fold(zeroValue, op) method of pyspark.rdd.RDD instance
    Aggregate the elements of each partition, and then the results for all
    the partitions, using a given associative function and a neutral "zero value."
    
    The function ``op(t1, t2)`` is allowed to modify ``t1`` and return it
    as its result value to avoid object allocation; however, it should not
    modify ``t2``.
    
    This behaves somewhat differently from fold operations implemented
    for non-distributed collections in functional languages like Scala.
    This fold operation may be applied to partitions individually, and then
    fold those results into the final result, rather than apply the fold
    to each element sequentially in some defined ordering. For functions
    that are not commutative, the result may differ from that of a fold
    applied to a non-distributed collection.
    
    >>> from operator import add
    >>> sc.parallelize([1, 2, 3, 4, 5]).fold(

In [60]:
numbers_new_rdd.fold(0,add)

64

In [63]:
paired_list = [('a',10), ('b',5), ('a',6)]
paired_rdd = spark.sparkContext.parallelize(paired_list,2)
print(paired_rdd.glom().collect())
print(paired_rdd.groupByKey().collect())

[[('a', 10)], [('b', 5), ('a', 6)]]
[('b', <pyspark.resultiterable.ResultIterable object at 0x7fc62f71f340>), ('a', <pyspark.resultiterable.ResultIterable object at 0x7fc62f71f1f0>)]


In [64]:
paired_rdd.groupByKey().mapValues(lambda x : sum(x)).collect()

[('b', 5), ('a', 16)]

In [65]:
print(paired_rdd.groupByKey().mapValues(lambda x : sum(x)).toDebugString().decode('utf-8'))

(2) PythonRDD[70] at RDD at PythonRDD.scala:53 []
 |  MapPartitionsRDD[69] at mapPartitions at PythonRDD.scala:133 []
 |  ShuffledRDD[68] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(2) PairwiseRDD[67] at groupByKey at <ipython-input-65-aab1acb1a315>:1 []
    |  PythonRDD[66] at groupByKey at <ipython-input-65-aab1acb1a315>:1 []
    |  ParallelCollectionRDD[54] at readRDDFromFile at PythonRDD.scala:262 []


In [66]:
paired_rdd.reduceByKey(add).collect()

[('b', 5), ('a', 16)]

In [67]:
print(paired_rdd.reduceByKey(add).toDebugString().decode('utf-8'))

(2) PythonRDD[80] at RDD at PythonRDD.scala:53 []
 |  MapPartitionsRDD[79] at mapPartitions at PythonRDD.scala:133 []
 |  ShuffledRDD[78] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(2) PairwiseRDD[77] at reduceByKey at <ipython-input-67-e660eb148bba>:1 []
    |  PythonRDD[76] at reduceByKey at <ipython-input-67-e660eb148bba>:1 []
    |  ParallelCollectionRDD[54] at readRDDFromFile at PythonRDD.scala:262 []


### cache

In [68]:
new_rdd = paired_rdd.groupByKey()
new_rdd.cache()
new_rdd.mapValues(lambda x : sum(x)).collect()
print(new_rdd.mapValues(lambda x : sum(x)).toDebugString().decode('utf-8'))

(2) PythonRDD[87] at RDD at PythonRDD.scala:53 []
 |  PythonRDD[85] at RDD at PythonRDD.scala:53 []
 |      CachedPartitions: 2; MemorySize: 287.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
 |  MapPartitionsRDD[84] at mapPartitions at PythonRDD.scala:133 []
 |  ShuffledRDD[83] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(2) PairwiseRDD[82] at groupByKey at <ipython-input-68-dcdfeba87f2e>:1 []
    |  PythonRDD[81] at groupByKey at <ipython-input-68-dcdfeba87f2e>:1 []
    |  ParallelCollectionRDD[54] at readRDDFromFile at PythonRDD.scala:262 []


In [69]:
print(new_rdd.getStorageLevel())

Memory Serialized 1x Replicated


In [70]:
from pyspark import StorageLevel
new_rdd.unpersist()
new_rdd.persist(StorageLevel.DISK_ONLY)
print(new_rdd.getStorageLevel())

Disk Serialized 1x Replicated


DISK_ONLY = StorageLevel(True, False, False, False, 1)

DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)

MEMORY_AND_DISK = StorageLevel(True, True, False, False, 1)

MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2)

MEMORY_AND_DISK_SER = StorageLevel(True, True, False, False, 1)

MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, False, 2)

MEMORY_ONLY = StorageLevel(False, True, False, False, 1)

MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)

MEMORY_ONLY_SER = StorageLevel(False, True, False, False, 1)

MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, False, 2)

OFF_HEAP = StorageLevel(True, True, True, False, 1)

### Working With Dataframes

In [72]:
df = spark.read.csv('/content/drive/MyDrive/DataSet/deniro.csv')
df.show(10)

+----+-----+--------------------+
| _c0|  _c1|                 _c2|
+----+-----+--------------------+
|Year|Score|               Title|
|1968|   86|           Greetings|
|1970|   17|         Bloody Mama|
|1970|   73|             Hi Mom!|
|1971|   40|         Born to Win|
|1973|   98|        Mean Streets|
|1973|   88|Bang the Drum Slowly|
|1974|   97|       The Godfather|
|1976|   41|     The Last Tycoon|
|1976|   99|         Taxi Driver|
+----+-----+--------------------+
only showing top 10 rows



In [None]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)



In [73]:
df = spark.read.csv('/content/drive/MyDrive/DataSet/deniro.csv',header=True,inferSchema=True, )

df.show(10)

+----+-----+--------------------+
|Year|Score|               Title|
+----+-----+--------------------+
|1968|   86|           Greetings|
|1970|   17|         Bloody Mama|
|1970|   73|             Hi Mom!|
|1971|   40|         Born to Win|
|1973|   98|        Mean Streets|
|1973|   88|Bang the Drum Slowly|
|1974|   97|       The Godfather|
|1976|   41|     The Last Tycoon|
|1976|   99|         Taxi Driver|
|1977|   47|                1900|
+----+-----+--------------------+
only showing top 10 rows



In [74]:
df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Score: integer (nullable = true)
 |-- Title: string (nullable = true)



In [76]:
df.filter(df['Score']<50).show(10)
df.filter(df['Year'] > 1990).show(10)

+----+-----+--------------------+
|Year|Score|               Title|
+----+-----+--------------------+
|1970|   17|         Bloody Mama|
|1971|   40|         Born to Win|
|1976|   41|     The Last Tycoon|
|1977|   47|                1900|
|1989|   47|     We're No Angels|
|1990|   29|      Stanley & Iris|
|1994|   39|Mary Shelley's Fr...|
|1996|   38|             The Fan|
|1998|   38|  Great Expectations|
|1999|   43|            Flawless|
+----+-----+--------------------+
only showing top 10 rows

+----+-----+--------------------+
|Year|Score|               Title|
+----+-----+--------------------+
|1991|   76|           Cape Fear|
|1991|   69|            Mistress|
|1991|   65| Guilty by Suspicion|
|1991|   71|           Backdraft|
|1992|   87|        Thunderheart|
|1992|   67|  Night and the City|
|1993|   75|     This Boy's Life|
|1993|   78|   Mad Dog and Glory|
|1993|   96|        A Bronx Tale|
|1994|   39|Mary Shelley's Fr...|
+----+-----+--------------------+
only showing top 10 ro

In [77]:
import pyspark.sql.functions as f
df.filter(df['Year']<1980).select(f.avg('Score')).show()


+----------+
|avg(Score)|
+----------+
|      70.5|
+----------+



In [None]:
spark.stop()
