<a href="https://colab.research.google.com/github/victor1cg/Spark/blob/main/Big_Data_Spark_exs.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# INSTALAR AS DEPENDENCIAS 
#Instalar o JAVA 8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

#Baixar a versão mais recente do Spark
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz

#Deszipar o Spark
!tar xf spark-3.1.2-bin-hadoop2.7.tgz

#Criar as variaveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

#Instalar a lib que ajuda a localizar o Spark e instalar aqui
!pip install -q findspark
#Tornar o pyspark "importável"
import findspark
findspark.init('spark-3.1.2-bin-hadoop2.7')

In [2]:
# criar a spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master('local')\
        .appName('sparkcolab')\
        .getOrCreate()

spark

In [3]:
#Utilizar um schema definido pelo usuario
from pyspark.sql.types import StructType,StructField,StringType,IntegerType,DoubleType

import pandas as pd
import matplotlib as plt

In [7]:
def display_df(df,n=20):
  return df.limit(n).toPandas()

df = spark.read.csv('vgsales.csv', inferSchema=True, header = True)

In [8]:
display_df(df)

Unnamed: 0,Rank,Name,Platform,Year,Genre,Publisher,NA_Sales,EU_Sales,JP_Sales,Other_Sales,Global_Sales
0,1,Wii Sports,Wii,2006,Sports,Nintendo,41.49,29.02,3.77,8.46,82.74
1,2,Super Mario Bros.,NES,1985,Platform,Nintendo,29.08,3.58,6.81,0.77,40.24
2,3,Mario Kart Wii,Wii,2008,Racing,Nintendo,15.85,12.88,3.79,3.31,35.82
3,4,Wii Sports Resort,Wii,2009,Sports,Nintendo,15.75,11.01,3.28,2.96,33.0
4,5,Pokemon Red/Pokemon Blue,GB,1996,Role-Playing,Nintendo,11.27,8.89,10.22,1.0,31.37
5,6,Tetris,GB,1989,Puzzle,Nintendo,23.2,2.26,4.22,0.58,30.26
6,7,New Super Mario Bros.,DS,2006,Platform,Nintendo,11.38,9.23,6.5,2.9,30.01
7,8,Wii Play,Wii,2006,Misc,Nintendo,14.03,9.2,2.93,2.85,29.02
8,9,New Super Mario Bros. Wii,Wii,2009,Platform,Nintendo,14.59,7.06,4.7,2.26,28.62
9,10,Duck Hunt,NES,1984,Shooter,Nintendo,26.93,0.63,0.28,0.47,28.31


-----------
Utilizando a API em Python do Spark (PySpark) retorne um novo dataframe contendo duas colunas:
o nome do console (coluna Platform) e a contagem de jogos em cada plataforma no ano de 2010,
ordenado pela contagem decrescentemente:


In [25]:
import pyspark.sql.functions as f

df_2010 = df.select("Platform")\
          .filter(f.col("Year")==2010)\
          .groupBy("Platform")\
          .agg(
              f.count("Platform").alias("Qty_Games")
          )\
          .orderBy(f.desc("Qty_Games"))

In [26]:
display_df(df_2010)

Unnamed: 0,Platform,Qty_Games
0,DS,326
1,Wii,254
2,PSP,188
3,X360,182
4,PS3,181
5,PC,90
6,PS2,38


Utilizando SQL, retorne o total arrecadado (coluna Global_Sales ) por produtora (coluna
Publisher ) entre os anos 2000 e 2010 ordenado decrescentemente pelo total arrecadado:

In [29]:
df.createOrReplaceTempView("games_table")

df_publi = spark.sql("""
                 SELECT Publisher,
                 SUM(Global_Sales) AS Global_Sales
                 FROM games_table
                 WHERE YEAR BETWEEN 2000 AND 2010 
                 GROUP BY Publisher
                 ORDER BY Global_Sales DESC
""")
display_df(df_publi)

Unnamed: 0,Publisher,Global_Sales
0,Nintendo,944.96
1,Electronic Arts,728.87
2,Activision,444.84
3,Sony Computer Entertainment,346.1
4,Ubisoft,287.67
5,THQ,286.91
6,Take-Two Interactive,249.25
7,Konami Digital Entertainment,186.14
8,Sega,183.33
9,Microsoft Game Studios,165.89


Utilizando a API em Python do Spark (PySpark), escreva uma sequência de transformações que
retorne um dataframe contendo o total das vendas Global_Sales por Publisher em cada ano
compreendido entre 2000 e 2010:

In [70]:
#Selecionar tudo que sera utilizado
#Filtrar, na resolução> .filter((f.col("Year") >= 2000) & (f.col("Year") <= 2010)) \
#Agrupar pela coluna
#pivot pela outra coluna
#agg pelo valor central
#preenche com 0 > NaN

df_publi_py = df.select("Publisher","Year",'Global_Sales')\
              .where("Year between 2000 and 2010")\
              .groupBy("Publisher")\
              .pivot("Year")\
              .agg(
                  f.sum("Global_Sales").alias("Global_Sales")
                  )\
              .na.fill(0)

In [71]:
display_df(df_publi_py)

Unnamed: 0,Publisher,2000,2001,2002,2003,2004,2005,2006,2007,2008,2009,2010
0,Media Rings,0.0,0.0,0.19,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,bitComposer Games,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.03,0.0
2,Telegames,0.0,0.0,0.0,0.2,0.0,0.02,0.0,0.54,0.08,0.0,0.17
3,3DO,3.08,1.45,0.6,0.45,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,Sting,0.0,0.0,0.0,0.0,0.1,0.0,0.03,0.11,0.38,0.0,0.03
5,Jack of All Games,0.0,0.0,0.0,0.0,0.0,0.38,0.0,0.0,0.0,0.0,0.0
6,Game Life,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.13,0.0,0.17,0.0
7,IE Institute,0.0,0.0,0.0,0.0,0.0,0.0,0.73,0.17,0.04,0.0,0.0
8,Karin Entertainment,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.02
9,Infogrames,1.66,2.12,8.46,0.14,0.0,0.0,0.0,0.0,0.0,0.0,0.0


 Exporte o dataframe gerado no exercício 4 em formato JSON, com 4 partições em modo
overwrite ativado no caminho ./output/vgsales_json :


In [76]:
df_publi_py.repartition(4)\
  .write\
  .mode('overwrite')\
  .json("./output/vgsales_json")


In [77]:
!ls ./output/vgsales_json/

part-00000-713b5405-3811-4c8b-bec7-6e2455b5e767-c000.json
part-00001-713b5405-3811-4c8b-bec7-6e2455b5e767-c000.json
part-00002-713b5405-3811-4c8b-bec7-6e2455b5e767-c000.json
part-00003-713b5405-3811-4c8b-bec7-6e2455b5e767-c000.json
_SUCCESS
