# Comandos para realização do trabalho da matéria de Big Data com uso da biblioteca PySpark.

## <font color=red>Observação importante:</font>

<font color=yellow>Trabalho realizado com uso da biblioteca pandas não será aceito!</font>

## Upload do arquivo `imdb-reviews-pt-br.csv` para dentro do Google Colab

In [15]:
!wget https://raw.githubusercontent.com/N-CPUninter/Big_Data/main/data/imdb-reviews-pt-br.zip -O imdb-reviews-pt-br.zip
!unzip imdb-reviews-pt-br.zip
!rm imdb-reviews-pt-br.zip

--2024-10-25 19:17:46--  https://raw.githubusercontent.com/N-CPUninter/Big_Data/main/data/imdb-reviews-pt-br.zip
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 49549692 (47M) [application/zip]
Saving to: ‘imdb-reviews-pt-br.zip’


2024-10-25 19:17:47 (168 MB/s) - ‘imdb-reviews-pt-br.zip’ saved [49549692/49549692]

Archive:  imdb-reviews-pt-br.zip
replace imdb-reviews-pt-br.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: 

## Instalação manual das dependências para uso do pyspark no Google Colab

In [16]:
!pip install pyspark

[31mERROR: Operation cancelled by user[0m[31m
[0m

## Importar, instanciar e criar a SparkSession

In [17]:
from pyspark.sql import SparkSession

appName = "PySpark Trabalho de Big Data"
master = "local"

spark = SparkSession.builder.appName(appName).master(master).getOrCreate()

## Criar spark dataframe do CSV utilizando o método read.csv do spark

In [18]:
imdb_df = spark.read.csv('imdb-reviews-pt-br.csv',
                         header=True,
                         quote="\"",
                         escape="\"",
                         encoding="UTF-8")

# Questão 1

## Criar funções de MAP:
- Criar função para mapear o "sentiment" como chave e o "id" como valor do tipo inteiro

In [9]:
from pyspark.sql.functions import col

def map_sentiment_id(df):
    # Converter a coluna "id" para inteiro
    df = df.withColumn("id", col("id").cast("int"))

    # Criar um dicionário com "sentimento" como chave e listas de "id" como valores
    sentiment_id_map = df.select("sentiment", "id").rdd.map(lambda row: (row.sentiment, row.id)).groupByKey().mapValues(list).collectAsMap()

    return sentiment_id_map

## Cria funções de REDUCE:

- Criar função de reduce para somar os IDs por "sentiment".

In [10]:
# Função para reduzir e somar os IDs por "sentiment"
def reduce_sum_ids(df):
    # Converter a coluna "id" para inteiro
    df = df.withColumn("id", col("id").cast("int"))

    # Converter para RDD e aplicar reduceByKey
    sum_ids_rdd = df.select("sentiment", "id").rdd.map(lambda row: (row.sentiment, row.id)).reduceByKey(lambda x, y: x + y)

    # Coletar e imprimir os resultados
    result = sum_ids_rdd.collect()
    for row in result:
        print(f"Sentiment: {row[0]}, Soma dos IDs: {row[1]}")

## Aplicação do map/reduce e visualização do resultado

In [11]:
# Aplicar map/reduce e realizar collect
sum_ids_rdd = imdb_df.select("sentiment", "id").rdd.map(lambda row: (row.sentiment, int(row.id))).reduceByKey(lambda x, y: x + y).collect()

# Visualizar os dados
for row in sum_ids_rdd:
    print(f"Sentiment: {row[0]}, Soma dos IDs: {row[1]}")

Sentiment: neg, Soma dos IDs: 459568555
Sentiment: pos, Soma dos IDs: 763600041


# Questão 2:

## Criar funções de MAP:
- Criar função para mapear o "sentiment" como chave e uma tupla com a soma das palavras de cada texto como valor.

In [12]:
from pyspark.sql.functions import split, size

def map_sentiment_wordcount(df):
    # Contar o número de palavras em cada texto (pt e en)
    df = df.withColumn("text_pt_wordcount", size(split(col("text_pt"), " ")))
    df = df.withColumn("text_en_wordcount", size(split(col("text_en"), " ")))

    # Converter para RDD e mapear "sentiment" com a soma das palavras
    sentiment_wordcount_rdd = df.select("sentiment", "text_pt_wordcount", "text_en_wordcount")\
                                .rdd.map(lambda row: (row.sentiment, (row.text_pt_wordcount, row.text_en_wordcount)))

    return sentiment_wordcount_rdd

## Cria funções de REDUCE:

- Criar função de reduce para somar o numero de palavras de cada texto português e inglês por "sentiment".

In [13]:
def reduce_sum_wordcount(rdd):
    # Aplicar reduceByKey para somar as contagens de palavras para cada sentimento
    sum_wordcount_rdd = rdd.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))

    # Coletar e imprimir os resultados
    result = sum_wordcount_rdd.collect()

    return result

## Aplicação do map/reduce e visualização do resultado

1. Aplicar o map/reduce no seu dataframe spark e realizar o collect() ao final
2. Selecionar os dados referentes aos textos negativos para realizar a subtração.
3. Realizar a subtração das contagens de palavras dos textos negativos para obter o resultado final

In [14]:
# Chamando as funções map e reduce
sentiment_wordcount_rdd = map_sentiment_wordcount(imdb_df)
wordcount_results = reduce_sum_wordcount(sentiment_wordcount_rdd)

# Selecionar dados dos textos negativos
negative_wordcount = [item for item in wordcount_results if item[0] == 'neg'][0]

# Realizar a subtração das contagens de palavras
subtracted_wordcount = (negative_wordcount[1][0] - negative_wordcount[1][1])

print(f"Contagem das Palavras em Textos Negativos (PT - EN): {subtracted_wordcount}")

Contagem das Palavras em Textos Negativos (PT - EN): 54036
