### Instalação do PySpark

In [1]:
%pip install pyspark

Note: you may need to restart the kernel to use updated packages.


### Definindo IP local

In [2]:
!export SPARK_LOCAL_IP="127.0.0.1"

### Criação da sessão Spark

In [16]:
from pyspark.sql import SparkSession, DataFrame, types
import pyspark.sql.functions as F

app_name = "Questão 1"
master_url = "local"

spark: SparkSession = SparkSession.builder.appName(app_name).master(master_url).getOrCreate()

### Criação do RDD spark do arquivo CSV

In [17]:
imdb_schema = types.StructType(
    [
        types.StructField("id", types.IntegerType(), False),
        types.StructField("text_en", types.StringType(), False),
        types.StructField("text_pt", types.StringType(), False),
        types.StructField("sentiment", types.StringType(), False),
    ]
)

imdb_df = spark.read.csv(
    "imdb-reviews-pt-br.csv",
    header=True,
    quote='"',
    escape='"',
    encoding="UTF-8",
    schema=imdb_schema,
)

imdb_df.show(10)

+---+--------------------+--------------------+---------+
| id|             text_en|             text_pt|sentiment|
+---+--------------------+--------------------+---------+
|  1|Once again Mr. Co...|Mais uma vez, o S...|      neg|
|  2|This is an exampl...|Este é um exemplo...|      neg|
|  3|First of all I ha...|Primeiro de tudo ...|      neg|
|  4|Not even the Beat...|Nem mesmo os Beat...|      neg|
|  5|Brass pictures mo...|Filmes de fotos d...|      neg|
|  6|A funny thing hap...|Uma coisa engraça...|      neg|
|  7|This German horro...|Este filme de ter...|      neg|
|  8|Being a long-time...|Sendo um fã de lo...|      neg|
|  9|"Tokyo Eyes" tell...|"Tokyo Eyes" fala...|      neg|
| 10|Wealthy horse ran...|Fazendeiros ricos...|      neg|
+---+--------------------+--------------------+---------+
only showing top 10 rows



## QUESTÃO 1

### Mapper

In [7]:
def map_ids(x: DataFrame) -> DataFrame:
    return imdb_df.select(
        x.filter(x["sentiment"] == "neg")["id"],
        x.filter(x["sentiment"] == "neg")["sentiment"],
    )

### Reducer

In [8]:
def reduce_ids(x: DataFrame) -> DataFrame:
    return x.agg(F.sum(x["id"]))

### Resultado

In [10]:
resultado = reduce_ids(map_ids(imdb_df)).collect()[0]

print("=" * 40)
print(f" A soma dos IDs negativos é: {resultado[0]}")
print("=" * 40)

[Stage 4:>                                                          (0 + 1) / 1]

 A soma dos IDs negativos é: 1223168596


                                                                                

## QUESTÃO 2

### Mapper

In [11]:
def map_text(df: DataFrame) -> DataFrame:
    avaliações = df.select(
        df.filter(df["sentiment"] == "neg")["text_en"],
        df.filter(df["sentiment"] == "neg")["text_pt"],
    )

    num_palavras = avaliações.rdd.map(
        lambda x: (len(x["text_en"].split()), len(x["text_pt"].split()))
    )
    num_palavras_df = num_palavras.toDF(avaliações.schema)

    avaliações_numeradas = avaliações.join(
        num_palavras_df.withColumnsRenamed(
            {"text_en": "length_en", "text_pt": "length_pt"}
        )
    )

    return avaliações_numeradas

### Reducer

In [12]:
def reduce_text(df: DataFrame) -> DataFrame:
    return df.agg(F.sum(df["length_en"]), F.sum(df["length_pt"]))

### Resultado

In [18]:
resultado = reduce_text(map_text(imdb_df)).collect()[0]

print("=" * 60)
print(f" A soma das palavras negativas em inglês é: {resultado[0]:n}")
print(f" A soma das palavras negativas em português é: {resultado[1]:n}")
print("=" * 60)
print(
    f" As avaliações negativas possuem {resultado[1] - resultado[0]:n} palavras a mais \n em português do que em inglês."
)
print("=" * 60)


[Stage 1:>                                                          (0 + 1) / 1]

 A soma das palavras negativas em inglês é: 5.34903e+11
 A soma das palavras negativas em português é: 5.39967e+11
 As avaliações negativas possuem 5.0649e+09 palavras a mais 
em português do que em inglês.


                                                                                

## Encerramento da sessão

In [19]:
spark.stop()