<a href="https://colab.research.google.com/github/robertosa13/SteamBigData/blob/develop/Atividade_Final_Processamento_de_Dados_e_Longa_Escala.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Atividade final avaliativa da disciplina de Processamento de Dados e Longa Escala** 

## **Tutor**: Anderson Felipe Rocha
##**Alunos**: Caio Serpa, Eden Coelho e Roberto Sá

Proposta:

Realizar a busca de um dataset no Kaggle que fosse um "Big Data", e realizar operações com o dataset buscado

# Sobre o conjunto de dados

Os dados foram retirados do **Kaggle**

 Fonte dos dados: [Stream Reviews 2021](https://www.kaggle.com/datasets/najzeko/steam-reviews-2021?resource=download)

**Tamanho do arquivo:** 8.17GB

**Formato do arquivo:** CSV


**Conjunto de dados de cerca de 21 milhões de avaliações de usuários de cerca de 300 jogos diferentes no Steam. Obtido usando a API fornecida pelo Steam descrita na documentação do Steamworks**

# Descrição do DataFrame

**'#'**: Index

**app_id**: Steam app id

**app_name**: App name

**review_id**: Review id

**language**: Language of review

**review**: Review text

**timestamp_created**: Review creation timestamp

**timestamp_updated**: Review latest update timestamp

**recommended**: Whether review recommends the app

**votes_helpful**: Number of "helpful" votes for review

**votes_funny**: Number of "funny" votes for review

**weighted_vote_score**: Score based on number of helpful votes

**comment_count**: Number of comments for review

**steam_purchase**: Whether review author purchased the app on Steam

**received_for_free**: Whether review author received the app for free

**written_during_early_access**: Whether review was written during early access

**author.steamid**: Review author steam id

**author.num_games_owned**: Number of games review author owns

**author.num_reviews**: Number of lifetime app reviews by author

**author.playtime_forever**: Author lifetime playtime of reviewed app

**author.playtime_last_two_weeks**: Author playtime of reviewed app in last 2 weeks

**author.playtime_at_review**: Author playtime of reviewed app at time of review

**author.last_played**: Author time last played reviewed app

#Configuração do ambiente, instalação do hadoop e spark

In [2]:
#instalar o Java 8 na maquina da sessão
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

#setando jdk 8
#!sudo apt install openjdk-8-jdk
#!sudo update-alternatives --config java 

#spark
!wget -q https://ftp.unicamp.br/pub/apache/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
#extração do spark
!tar xf spark-3.1.2-bin-hadoop3.2.tgz
#findspark para facilitar na criação da sessão spark
!pip install -q findspark
!pip install  pyspark==3.1.2

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark==3.1.2
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 61 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 21.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880769 sha256=ef657be05ae3a0f8b51649c81902169c2fd8a2936062baa1813c4b7986d1fc7a
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


# Verificando versão do Java

In [3]:
!java -version

openjdk version "11.0.17" 2022-10-18
OpenJDK Runtime Environment (build 11.0.17+8-post-Ubuntu-1ubuntu218.04)
OpenJDK 64-Bit Server VM (build 11.0.17+8-post-Ubuntu-1ubuntu218.04, mixed mode, sharing)


In [4]:
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

#variáveis de ambiente
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

"""
os.environ['PYSPARK_SUBMIT_ARGS'] = '\
    --driver-memory 20G \
    --executor-memory 20G \
    pyspark-shell'

"""
print(os.environ['JAVA_HOME'])
print(os.environ['SPARK_HOME']) 

/usr/lib/jvm/java-8-openjdk-amd64
/content/spark-3.1.2-bin-hadoop3.2


In [5]:
import findspark

findspark.find()
findspark.init()

conf = SparkConf().setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
#sc.stop()

# Importação do conjunto de dados


In [6]:
#recebendo os dados via api do google drive
from google.colab import drive 

drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [7]:
# Para quem usar Spark SQL
from pyspark.sql.functions import to_timestamp
spark = SparkSession \
    .builder \
    .getOrCreate()


#data_spark = spark.read.csv('gdrive/My Drive/Colab Notebooks/steam_reviews.csv', header=True, inferSchema=False)

In [8]:
from pyspark.sql.types import *

schema = StructType([
    StructField('_c0', IntegerType(), False),
    StructField('app_id',IntegerType(), False),
    StructField('app_name', StringType(),False),
    StructField('review_id',IntegerType(), False),
    StructField('language', StringType(),False),
    StructField('review', StringType(),False),
    StructField('timestamp_created', LongType(), False),
    StructField('timestamp_updated', LongType(), False),
    StructField('recommended', BooleanType(), False),
    StructField('votes_helpful',IntegerType(), False),
    StructField('votes_funny',IntegerType(), False),
    StructField('weighted_vote_score', FloatType(), False),
    StructField('comment_count',IntegerType(), False),
    StructField('steam_purchase', BooleanType(), False),
    StructField('received_for_free', BooleanType(), False),
    StructField('written_during_early_access', BooleanType(), False),
    StructField('author.steamid', LongType(), False),
    StructField('author.num_games_owned',IntegerType(), False),
    StructField('author.num_reviews',IntegerType(), False),
    StructField('author.playtime_forever', FloatType(), False),
    StructField('author.playtime_last_two_weeks', FloatType(), False),
    StructField('author.playtime_at_review', FloatType(), False),
    StructField('author.last_played', StringType(), False),
])


In [10]:
data_spark = spark.read.csv('gdrive/My Drive/Colab Notebooks/steam_reviews.csv', header=True, schema=schema)
#data_spark = spark.read.csv('gdrive/My Drive/Colab Notebooks/steam_reviews.csv', header=True, inferSchema=True)

In [11]:
data_spark.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- app_id: integer (nullable = true)
 |-- app_name: string (nullable = true)
 |-- review_id: integer (nullable = true)
 |-- language: string (nullable = true)
 |-- review: string (nullable = true)
 |-- timestamp_created: long (nullable = true)
 |-- timestamp_updated: long (nullable = true)
 |-- recommended: boolean (nullable = true)
 |-- votes_helpful: integer (nullable = true)
 |-- votes_funny: integer (nullable = true)
 |-- weighted_vote_score: float (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- steam_purchase: boolean (nullable = true)
 |-- received_for_free: boolean (nullable = true)
 |-- written_during_early_access: boolean (nullable = true)
 |-- author.steamid: long (nullable = true)
 |-- author.num_games_owned: integer (nullable = true)
 |-- author.num_reviews: integer (nullable = true)
 |-- author.playtime_forever: float (nullable = true)
 |-- author.playtime_last_two_weeks: float (nullable = true)
 |-- author.pl

In [13]:
data_spark = data_spark.withColumnRenamed('_c0', 'id')\
            .withColumnRenamed('author.steamid', 'author_steamid')\
            .withColumnRenamed('author.num_games_owned', 'author_num_games_owned')\
            .withColumnRenamed('author.num_reviews', 'author_num_reviews')\
            .withColumnRenamed('author.playtime_forever', 'author_playtime_forever')\
            .withColumnRenamed('author.playtime_last_two_weeks', 'author_playtime_last_two_weeks')\
            .withColumnRenamed('author.playtime_at_review', 'author_playtime_at_review')\
            .withColumnRenamed('author.last_played', 'author_last_played').cache()

data_spark.count()

40848659

In [None]:
data_spark.show(5)

In [None]:
data_spark = data_spark.na.drop().cache()
data_spark.count()

16619206

In [None]:
data_spark.show(10,truncate=False)

+---+------+------------------------+---------+--------+-----------------------------------------------------------+-----------------+-----------------+-----------+-------------+-----------+-------------------+-------------+--------------+-----------------+---------------------------+-----------------+----------------------+------------------+-----------------------+------------------------------+-------------------------+------------------+
|id |app_id|app_name                |review_id|language|review                                                     |timestamp_created|timestamp_updated|recommended|votes_helpful|votes_funny|weighted_vote_score|comment_count|steam_purchase|received_for_free|written_during_early_access|author_steamid   |author_num_games_owned|author_num_reviews|author_playtime_forever|author_playtime_last_two_weeks|author_playtime_at_review|author_last_played|
+---+------+------------------------+---------+--------+----------------------------------------------------

In [None]:
#data_spark.count()

In [None]:
from pyspark.sql.functions import *

#função para remover o ponto e o zero da coluna autor_last_played
get_number_without_dot = udf(lambda s: s.split('.')[0], StringType())
data_spark = data_spark.withColumn('s_author_last_played', get_number_without_dot(data_spark.author_last_played)).cache()
data_spark.count()

16619206

In [None]:
data_spark.show(10,truncate=False)

+---+------+------------------------+---------+--------+-----------------------------------------------------------+-----------------+-----------------+-----------+-------------+-----------+-------------------+-------------+--------------+-----------------+---------------------------+-----------------+----------------------+------------------+-----------------------+------------------------------+-------------------------+------------------+--------------------+
|id |app_id|app_name                |review_id|language|review                                                     |timestamp_created|timestamp_updated|recommended|votes_helpful|votes_funny|weighted_vote_score|comment_count|steam_purchase|received_for_free|written_during_early_access|author_steamid   |author_num_games_owned|author_num_reviews|author_playtime_forever|author_playtime_last_two_weeks|author_playtime_at_review|author_last_played|s_author_last_played|
+---+------+------------------------+---------+--------+----------

In [None]:
#Convertendo para longType() a coluna s_author_last played
data_spark = data_spark.withColumn('s_author_last_played',data_spark["s_author_last_played"].cast(LongType())).cache()

#Convertendo para data as seguintes colunas
data_spark  = data_spark.withColumn('s_author_last_played', from_unixtime(col('s_author_last_played')))\
          .withColumnRenamed('s_author_last_played','t_author_last_played')\
          .withColumn('t_timestamp_created', from_unixtime(col('timestamp_created')))\
          .withColumn('t_timestamp_updated', from_unixtime(col('timestamp_updated')))\
          .cache()

#dropando colunas
cols = ("timestamp_created","timestamp_updated","author_last_played")
data_spark = data_spark.drop(*cols).cache()
data_spark.count()

data_spark.show(10,truncate=False)

NameError: ignored

# Com os dados limpos

In [13]:
spark = SparkSession.builder\
    .master("local[*]")\
    .getOrCreate()

data_spark = spark.read.csv('gdrive/My Drive/Colab Notebooks/clean_data.csv', header=False)


In [14]:

data_spark = data_spark\
            .withColumnRenamed('_c0', 'id')\
            .withColumnRenamed('_c1', 'app_id')\
            .withColumnRenamed('_c2', 'review_id')\
            .withColumnRenamed('_c3', 'language')\
            .withColumnRenamed('_c4', 'review')\
            .withColumnRenamed('_c5', 'timestamp_created')\
            .withColumnRenamed('_c6', 'timestamp_updated')\
            .withColumnRenamed('_c7', 'recommended')\
            .withColumnRenamed('_c8', 'votes_helpful')\
            .withColumnRenamed('_c9', 'votes_funny')\
            .withColumnRenamed('_c10', 'weighted_vote_score')\
            .withColumnRenamed('_c11', 'comment_count')\
            .withColumnRenamed('_c12', 'steam_purchase')\
            .withColumnRenamed('_c13', 'received_for_free')\
            .withColumnRenamed('_c14', 'written_during_early_access')\
            .withColumnRenamed('_c15', 'author_last_played')\
            .withColumnRenamed('_c16', 'author_last_played')\
            .withColumnRenamed('_c17', 'author_last_played')\
            .withColumnRenamed('_c18', 'author_last_played')\
            .withColumnRenamed('_c19', 'author_last_played')\
            .withColumnRenamed('_c20', 'author_last_played').cache()

In [15]:
#Creating temp view to work with sql queries
data_spark.createOrReplaceTempView("tb_reviews")

In [16]:
data_spark.show(5)

+---+------+--------------------+--------+--------+----------------------------------+-----------------+-----------+-------------+-----------+-------------------+-------------+--------------+-----------------+---------------------------+------------------+------------------+------------------+------------------+------------------+-------------------+-------------------+-------------------+
| id|app_id|           review_id|language|  review|                 timestamp_created|timestamp_updated|recommended|votes_helpful|votes_funny|weighted_vote_score|comment_count|steam_purchase|received_for_free|written_during_early_access|author_last_played|author_last_played|author_last_played|author_last_played|author_last_played| author_last_played|               _c21|               _c22|
+---+------+--------------------+--------+--------+----------------------------------+-----------------+-----------+-------------+-----------+-------------------+-------------+--------------+-----------------+-----

#1 -  Qual usuário é mais deu reviews em jogos?

In [None]:
spark = SparkSession.builder\
    .master("local[*]")\
    .getOrCreate()

query = """
""""

spark.sql

# 2 - Qual jogo é mais recomendado e qual jogo é o menos recomendado?

# 3 - Qual a variedade de linguas em reviews?

# 4 - Mostre o usário que mais possuem jogos e sua quantidade de reviews? Faça o mesmo para situação inversa

# 5 - Quais usuários mais tiveram votos que auxiliam a comudidade? Eles compraram os jogos? Quais são os jogos?

# 6- Quais são os jogos mais jogados? Eles foram, comprados ou usados durante o tempo gratuito?

# 7 - Algum usuário comentou em mais de um idioma? Se sim, mostre os comentários e liste os usuários

# Parando atividades no PySPark

In [None]:
sc.stop()