## Installation de spark-cluster & findspark, pyspark 

In [1]:
# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://dlcdn.apache.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop2.7.tgz

# unzip the spark file to the current folder
!tar xf spark-3.2.0-bin-hadoop2.7.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop2.7"


# install findspark using pip
!pip install -q findspark

# install pyspark
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 43 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 39.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=6c0a03ca50f55a6782bb35893cdd046ee60d6dba76781676fd999b4b3fc4a9b2
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


# importation et initialisation de Spark

In [2]:
import findspark 
findspark.init()

## importer PySpark et initialiser SparkContext

In [3]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("actions").setMaster("local[*]")
sc = SparkContext(conf = conf)

## Afficher la configuration du cluster Spark

In [4]:
sc.getConf().getAll()

[('spark.app.startTime', '1638202563125'),
 ('spark.driver.host', '74dabd4d07d0'),
 ('spark.rdd.compress', 'True'),
 ('spark.driver.port', '45683'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.app.name', 'actions'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.id', 'local-1638202565117'),
 ('spark.ui.showConsoleProgress', 'true')]

## Créer un simple RDD

In [5]:
inputWords = ["esi", "esi-sba", "big-data", "python", "spark", "esi-sba", "spark","esi-sba"]
wordRdd = sc.parallelize(inputWords)

type(wordRdd)

pyspark.rdd.RDD

## Application de l'Action Collect() sur wordRdd

In [6]:
inputWords = ["esi", "esi-sba", "big-data", "python", "spark", 
              "esi-sba", "spark","esi-sba"]
wordRdd = sc.parallelize(inputWords)

wordRdd.collect()

['esi',
 'esi-sba',
 'big-data',
 'python',
 'spark',
 'esi-sba',
 'spark',
 'esi-sba']

## Application de l'Action count() sur wordRdd

In [7]:
wordRdd.count()

8

## Application de l'Action countByValue() sur wordRdd

In [8]:
wordRdd.countByValue ()

defaultdict(int,
            {'big-data': 1, 'esi': 1, 'esi-sba': 3, 'python': 1, 'spark': 2})

## Application de l'Action Take() sur wordRdd

In [9]:
wordRdd.take(3)

['esi', 'esi-sba', 'big-data']

## Application de l'Action reduce()

In [10]:
numRdd= sc.parallelize([5,5,4,3,2,9,2])

print(numRdd.reduce(lambda a,b: a+b))


def myfun(a,b):
    return a*9 + b/2

print(numRdd.reduce(myfun))


30
5020.25


## Application de l'Action saveAsTextFile ()

In [11]:
wordRdd.saveAsTextFile('word_txt')


In [12]:
wordRdd.coalesce(1).saveAsTextFile('word_txt2')

## Excercice 

In [13]:
numRdd= sc.parallelize(list(range(1,99)))
print(numRdd.reduce(myfun))

2.341020866017947e+47


In [16]:
wordRdd.cache()

ParallelCollectionRDD[1] at readRDDFromFile at PythonRDD.scala:274

In [17]:
wordRdd.count()

8

In [18]:
wordRdd.glom().collect()

[['esi', 'esi-sba', 'big-data', 'python'],
 ['spark', 'esi-sba', 'spark', 'esi-sba']]