# Big Data: Como instalar o PySpark no Google Colab

Como instalar o PySpark no Google Colab é uma dúvida comum entre aqueles que estão migrando seus projetos de Data Science para ambientes na nuvem.

O termo Big Data está cada vez mais presente, e mesmo projetos pessoais podem assumir uma grande dimensionalidade devido à quantidade de dados disponíveis.

Para analisar grandes volumes de dados, Big Data, com velocidade, o Apache Spark é uma ferramenta muito utilizada, dada a sua capacidade de processamento de dados e computação paralela.

O Spark foi pensado para ser acessível, oferecendo diversas APIs e frameworks em Python, Scala, SQL e diversas outras linguagens.

## PySpark no Google Colab

PySpark é a interface alto nível que permite você conseguir acessar e usar o Spark por meio da linguagem Python. Usando o PySpark, você consegue escrever todo o seu código usando apenas o nosso estilo Python de escrever código.

Já o Google Colab é uma ferramenta incrível, poderosa e gratuita – com suporte de GPU inclusive. Uma vez que roda 100% na nuvem, você não tem a necessidade de instalar qualquer coisa na sua própria máquina.

No entanto, apesar da maioria das bibliotecas de Data Science estarem previamente instaladas no Colab, o mesmo não acontece com o PySpark. Para conseguir usar o PySpark é necessário alguns passos intermediários, que não são triviais para aqueles que estão começando.

Dessa maneira, preparei um tutorial simples e direto ensinando a instalar as dependências e a biblioteca.

## Instalando o PySpark no Google Colab

Instalar o PySpark não é um processo direto como de praxe em Python. Não basta usar um pip install apenas. Na verdade, antes de tudo é necessário instalar dependências como o Java 8, Apache Spark 2.3.2 junto com o Hadoop 2.7.

In [None]:
# instalar as dependências
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

A próxima etapa é configurar as variáveis de ambiente, pois isso habilita o ambiente do Colab a identificar corretamente onde as dependências estão rodando.

Para conseguir “manipular” o terminal e interagir como ele, você pode usar a biblioteca os.

In [None]:
# configurar as variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

# tornar o pyspark "importável"
import findspark
findspark.init('spark-2.4.4-bin-hadoop2.7')

Com tudo pronto, vamos rodar uma sessão local para testar se a instalação funcionou corretamente.

In [None]:
#Upload de arquivos para o Colab
from google.colab import files
arquivo = files.upload()

Saving salary.csv to salary (1).csv


In [None]:
import pandas as pd
data = pd.read_csv('salary.csv')
data.head()

Unnamed: 0,age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race,sex,capital-gain,capital-loss,hours-per-week,native-country,salary
0,39,State-gov,77516,Bachelors,13,Never-married,Adm-clerical,Not-in-family,White,Male,2174,0,40,United-States,<=50K
1,50,Self-emp-not-inc,83311,Bachelors,13,Married-civ-spouse,Exec-managerial,Husband,White,Male,0,0,13,United-States,<=50K
2,38,Private,215646,HS-grad,9,Divorced,Handlers-cleaners,Not-in-family,White,Male,0,0,40,United-States,<=50K
3,53,Private,234721,11th,7,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,0,0,40,United-States,<=50K
4,28,Private,338409,Bachelors,13,Married-civ-spouse,Prof-specialty,Wife,Black,Female,0,0,40,Cuba,<=50K


In [None]:
# iniciar uma sessão local e importar dados do Airbnb
from pyspark.sql import SparkSession
sc = SparkSession.builder.master('local[*]').getOrCreate()

In [None]:
# File location and type
file_location = "./salary.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = sc.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- salary: string (nullable = true)



In [None]:
display(df)

DataFrame[age: int, workclass: string, fnlwgt: int, education: string, education-num: int, marital-status: string, occupation: string, relationship: string, race: string, sex: string, capital-gain: int, capital-loss: int, hours-per-week: int, native-country: string, salary: string]

In [None]:
from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler

## Define Target
label_indexer = StringIndexer(inputCol="workclass", outputCol="workclass_i")
df_t = label_indexer.fit(df).transform(df)

label_indexer = StringIndexer(inputCol="education", outputCol="education_i")
df_t = label_indexer.fit(df_t).transform(df_t)

label_indexer = StringIndexer(inputCol="marital-status", outputCol="marital-status_i")
df_t = label_indexer.fit(df_t).transform(df_t)

label_indexer = StringIndexer(inputCol="occupation", outputCol="occupation_i")
df_t = label_indexer.fit(df_t).transform(df_t)

label_indexer = StringIndexer(inputCol="relationship", outputCol="relationship_i")
df_t = label_indexer.fit(df_t).transform(df_t)

label_indexer = StringIndexer(inputCol="race", outputCol="race_i")
df_t = label_indexer.fit(df_t).transform(df_t)

label_indexer = StringIndexer(inputCol="sex", outputCol="sex_i")
df_t = label_indexer.fit(df_t).transform(df_t)

label_indexer = StringIndexer(inputCol="native-country", outputCol="native-country_i")
df_t = label_indexer.fit(df_t).transform(df_t)

label_indexer = StringIndexer(inputCol="salary", outputCol="label")
df_t = label_indexer.fit(df_t).transform(df_t)

display(df_t)

DataFrame[age: int, workclass: string, fnlwgt: int, education: string, education-num: int, marital-status: string, occupation: string, relationship: string, race: string, sex: string, capital-gain: int, capital-loss: int, hours-per-week: int, native-country: string, salary: string, workclass_i: double, education_i: double, marital-status_i: double, occupation_i: double, relationship_i: double, race_i: double, sex_i: double, native-country_i: double, label: double]

In [None]:
display(df_t.schema)

StructType(List(StructField(age,IntegerType,true),StructField(workclass,StringType,true),StructField(fnlwgt,IntegerType,true),StructField(education,StringType,true),StructField(education-num,IntegerType,true),StructField(marital-status,StringType,true),StructField(occupation,StringType,true),StructField(relationship,StringType,true),StructField(race,StringType,true),StructField(sex,StringType,true),StructField(capital-gain,IntegerType,true),StructField(capital-loss,IntegerType,true),StructField(hours-per-week,IntegerType,true),StructField(native-country,StringType,true),StructField(salary,StringType,true),StructField(workclass_i,DoubleType,false),StructField(education_i,DoubleType,false),StructField(marital-status_i,DoubleType,false),StructField(occupation_i,DoubleType,false),StructField(relationship_i,DoubleType,false),StructField(race_i,DoubleType,false),StructField(sex_i,DoubleType,false),StructField(native-country_i,DoubleType,false),StructField(salary_i,DoubleType,false)))

In [None]:
## Define Features
dfInputCols = ['age', 'education-num', 'capital-gain', 'capital-loss', 'hours-per-week', 'marital-status_i', 'occupation_i', 'relationship_i', 'race_i', 'sex_i', 'native-country_i']
assembler = VectorAssembler(inputCols=dfInputCols, outputCol="features")
outputData = assembler.transform(df_t).select(['features','label'])

outputData.printSchema()

root
 |-- features: vector (nullable = true)
 |-- label: double (nullable = false)



In [None]:
## Split Dataset
(trainingData, testData) = outputData.randomSplit([0.7, 0.3], seed = 100)

print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 22838
Test Dataset Count: 9723


In [None]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

lrModel = lr.fit(trainingData)

print("Coefficients: \n" + str(lrModel.coefficientMatrix))
print("Intercept: " + str(lrModel.interceptVector))

Coefficients: 
1 X 11 CSRMatrix

Intercept: [-1.1491107500349205]


In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

predictions = lrModel.transform(testData)

evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

0.5

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=2)

cvModel = crossval.fit(outputData)

In [None]:
predictions = cvModel.transform(testData)
evaluator.evaluate(predictions)

0.8823722459614406