# Instalação do PySpark no Google Colab

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285387 sha256=fd3b81376b78f1a39d314836fe35e74ba7859eaaa7f3967a3d024bf8ee44bb07
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [2]:
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [3]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

In [4]:
sc = SparkContext.getOrCreate()

In [5]:
spark = SparkSession.builder.appName('PySpark DataFrame').getOrCreate()

# Transformations

## map()

In [6]:
data= [1, 2, 3, 4, 5]
myRDD= sc.parallelize(data)
newRDD= myRDD.map(lambda x: x*2)
print(newRDD.collect())

[2, 4, 6, 8, 10]


## filter()

In [7]:
data= [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
myRDD= sc.parallelize(data)
newRDD= myRDD.filter(lambda x: x%2 == 0)
print(newRDD.collect())

[2, 4, 6, 8, 10]


## distinct()

In [8]:
data= [1, 1, 1, 2, 2, 2, 3, 3, 3, 3]
myRDD= sc.parallelize(data)
newRDD= myRDD.distinct()
print(newRDD.collect())

[2, 1, 3]


## GroupByKey()

In [9]:
myRDD = sc.parallelize([('a', 1), ('a', 2), ('a', 3), ('b', 1)])
resultList= myRDD.groupByKey().mapValues(list)
resultList.collect()

[('b', [1]), ('a', [1, 2, 3])]

## ReduceByKey()

In [12]:
from operator import add
myRDD = sc.parallelize([('a', 1), ('a', 2), ('a', 3), ('b', 1)])
newRDD= myRDD.reduceByKey(add)
newRDD.collect()

[('b', 1), ('a', 6)]

## SortByKey()

In [13]:
myRDD = sc.parallelize([('c', 1), ('d', 2), ('a', 3), ('b', 4)])
#sort by key
newRDD= myRDD.sortByKey()
newRDD.collect()

[('a', 3), ('b', 4), ('c', 1), ('d', 2)]

## Union()

In [14]:
myRDD1 = sc.parallelize([1, 2, 3, 4])
myRDD2 = sc.parallelize([ 3, 4, 5, 6, 7])
#union of myRDD1 and myRDD2
newRDD = myRDD1.union(myRDD2)
newRDD.collect()

[1, 2, 3, 4, 3, 4, 5, 6, 7]

# Actions

## count()

In [16]:
data= [1, 1, 1, 4, 5, 6, 7, 8, 9, 10]
myRDD= sc.parallelize(data)
#Returns 4 as output
myRDD.count()

10

## reduce()

In [17]:
data= [1, 2, 3, 4, 5]
myRDD= sc.parallelize(data)
#returns the product of all the elements
myRDD.reduce(lambda x, y: x * y)

120

## foreach()

In [18]:
def fun(x):
    print(x)
data= ['Scala', 'Python', 'Java', 'R']
myRDD= sc.parallelize(data)
#function applied to all the elements
myRDD.foreach(fun)

## countByValue()

In [19]:
data= ['Python', 'Scala', 'Python', 'R', 'Python', 'Java', 'R']
myRDD= sc.parallelize(data)
#items() returns a list with all the dictionary keys and values returned by countByValue()
myRDD.countByValue().items()

dict_items([('Python', 3), ('Scala', 1), ('R', 2), ('Java', 1)])

## countByKey()

In [20]:
data= [('a', 1), ('b', 1), ('c', 1), ('a', 1)]
myRDD = sc.parallelize(data)
myRDD.countByKey().items()

dict_items([('a', 2), ('b', 1), ('c', 1)])

## take(n)

In [22]:
data= [2, 5, 3, 8, 4]
myRDD= sc.parallelize(data)
#return the first 3 elements
myRDD.take(3)

[2, 5, 3]

## top(n)

In [24]:
data= [2, 5, 3, 8, 4]
myRDD= sc.parallelize(data)
#return the first 2 elements
myRDD.top(3)

[8, 5, 4]