#Instalação e importações de bibliotecas

In [None]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 26 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 31.7 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=07eacd80af5812fa9956e582fb8e58b878d497b78e4d9ab3581acfdd8c4b706c
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [None]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark import SparkConf
import pyspark.sql.functions as F
from pyspark.sql.functions import split
spark = SparkSession.builder.master("local").appName("etl-games").config("spark.ui.port","4050").getOrCreate() 

#Extração e transformação dos dados via Pandas e Pyspark

In [None]:
#Leitura do CSV e base de dados
df = pd.read_csv("/content/drive/MyDrive/Valve_Player_Data.csv")

In [None]:
#Exclusão de colunas irrelevantes para os Insights
df.drop(columns={"URL","Percent_Gain"},axis=1,inplace=True)

In [None]:
df.columns

Index(['Month_Year', 'Avg_players', 'Gain', 'Peak_Players', 'Date',
       'Game_Name'],
      dtype='object')

In [None]:
#Tradução das colunas para português
df.rename({'Month_Year':'Mes_Ano','Avg_players':'Media_Players','Gain':'Ganho','Peak_Players':'Pico_Players','Date':'Data','Game_Name':'Nome_Jogo'},axis=1,inplace=True)

In [None]:
#Verificação dos tipos de dados do dataframe
df.dtypes

Mes_Ano           object
Media_Players    float64
Ganho            float64
Pico_Players       int64
Data              object
Nome_Jogo         object
dtype: object

In [None]:
#Verificação de inconsistências, isto é, se os dados estão com caracteres especiais no início dos nomes
sorted(pd.unique(df["Data"]))

['2012-07-01',
 '2012-08-01',
 '2012-09-01',
 '2012-10-01',
 '2012-11-01',
 '2012-12-01',
 '2013-01-01',
 '2013-02-01',
 '2013-03-01',
 '2013-04-01',
 '2013-05-01',
 '2013-06-01',
 '2013-07-01',
 '2013-08-01',
 '2013-09-01',
 '2013-10-01',
 '2013-11-01',
 '2013-12-01',
 '2014-01-01',
 '2014-02-01',
 '2014-03-01',
 '2014-04-01',
 '2014-05-01',
 '2014-06-01',
 '2014-07-01',
 '2014-08-01',
 '2014-09-01',
 '2014-10-01',
 '2014-11-01',
 '2014-12-01',
 '2015-01-01',
 '2015-02-01',
 '2015-03-01',
 '2015-04-01',
 '2015-05-01',
 '2015-06-01',
 '2015-07-01',
 '2015-08-01',
 '2015-09-01',
 '2015-10-01',
 '2015-11-01',
 '2015-12-01',
 '2016-01-01',
 '2016-02-01',
 '2016-03-01',
 '2016-04-01',
 '2016-05-01',
 '2016-06-01',
 '2016-07-01',
 '2016-08-01',
 '2016-09-01',
 '2016-10-01',
 '2016-11-01',
 '2016-12-01',
 '2017-01-01',
 '2017-02-01',
 '2017-03-01',
 '2017-04-01',
 '2017-05-01',
 '2017-06-01',
 '2017-07-01',
 '2017-08-01',
 '2017-09-01',
 '2017-10-01',
 '2017-11-01',
 '2017-12-01',
 '2018-01-

In [None]:
#Verificação e checagem de valores nulos
df.isna().sum()

Mes_Ano           0
Media_Players     0
Ganho            98
Pico_Players      0
Data              0
Nome_Jogo         0
dtype: int64

In [None]:
#Substituição dos valores nulos da coluna ganho para a média
media = df['Ganho'].mean()

In [None]:
df['Ganho']=df['Ganho'].fillna(media)

In [None]:
df.isna().sum()

Mes_Ano          0
Media_Players    0
Ganho            0
Pico_Players     0
Data             0
Nome_Jogo        0
dtype: int64

In [None]:
#Transformação do dataframe pandas em PySpark
dfs = spark.createDataFrame(data=df)

In [None]:
dfs = dfs.select("Nome_Jogo","Media_Players","Pico_Players","Ganho","Mes_Ano","Data")

In [None]:
#Separando a coluna data em três colunas diferentes contendo ano, mês e dia
dfs=dfs.withColumn('Ano', split(dfs['Data'],'-').getItem(0))

In [None]:
dfs = dfs.withColumn('Mes',split(dfs['Data'],'-').getItem(1))\
.withColumn('Dia',split(dfs['Data'],'-').getItem(2))

In [None]:
#Exclusão da coluna Data e Mes_Ano após a criação de outras mais relevantes par análise.
dfs=dfs.drop('Mes_Ano','Data')

In [None]:
#Após a transformação, os dados serão salvos em parquet.
dfs.write.save("steam.snappy.parquet")

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
dfs.printSchema()

root
 |-- Nome_Jogo: string (nullable = true)
 |-- Media_Players: double (nullable = true)
 |-- Pico_Players: long (nullable = true)
 |-- Ganho: double (nullable = true)
 |-- Ano: string (nullable = true)
 |-- Mes: string (nullable = true)
 |-- Dia: string (nullable = true)

