In [None]:
!pip install pyspark

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

sc = spark.sparkContext
#from google.colab import drive
#drive.mount('/My_drive/')

In [None]:
import pyspark.sql.functions as F
import pyspark.sql.types as T
sc

In [None]:
#Charger un csv
df = spark.read.csv("./vgsales.csv",header=True)
#Compter le nombre de lignes
print(df.count())
#Afficher les 20 premières lignes
df.show()

In [None]:
df.dtypes

## Caster vos colonnes

In [None]:
df = df.withColumn("NA_Sales",df["NA_Sales"].cast("double"))
df = df.withColumn("EU_Sales",df["EU_Sales"].cast("double"))
df = df.withColumn("JP_Sales",df["JP_Sales"].cast("double"))
df = df.withColumn("Other_Sales",df["Other_Sales"].cast("double"))
df = df.withColumn("Global_Sales",df["Global_Sales"].cast("double"))
df = df.withColumn("Year",df["Year"].cast("Int"))

df.dtypes

<h1> Compter le nombre de jv Nintendo </h1>

In [None]:
#Via un DF
df.where(df.Publisher == 'Nintendo').count()
#df.where("Publisher == 'Nintendo'").count()

In [None]:
#Via un RDD
#df.rdd.filter(....
#Voir corriger

In [None]:
#Via du SQL
df.createOrReplaceTempView("jv_vente")
requete_sql = spark.sql("select count(*) from jv_vente where Publisher = 'Nintendo'")
requete_sql.show()

<h1>Rajouter une colonne concernant le pourcentage de vente EU </h1>

In [None]:
#Via un Dataframe

def pourcentage_calcul(a,b):
    return float(a)/float(b)*100

df_EUpercent = df.withColumn('pourcent_EU', df.EU_Sales/df.Global_Sales*100)
df_EUpercent.select('Rank', 'Name', 'Platform', 'Year', 'Genre', 'Publisher', 'EU_Sales', 'Global_Sales', 'pourcent_EU').show()

In [None]:
# DF méthode numéro 2 (UDF, complexe)
def pourcentage_calcul(a,b):
    return float(a)/float(b)*100

df_EUpercent = df.withColumn('pourcent_EU', F.udf( lambda a,b : pourcentage_calcul(a,b), T.FloatType())("EU_Sales","Global_Sales"))
df_EUpercent.show()
#df_EUpercent.select('Rank', 'Name', 'Platform', 'Year', 'Genre', 'Publisher', 'EU_Sales', 'Global_Sales','pourcent_EU').show()

In [None]:
#Via un RDD


In [None]:
#Via du SQL
requete_pourcentEU = spark.sql("SELECT Rank, Name, Platform, Year, Genre, Publisher, EU_Sales, Global_Sales, EU_Sales/Global_Sales*100 AS pourcent_EU FROM jv_vente")
requete_pourcentEU.show()

<h1> Afficher les différentes plateformes du dataframe </h1>

In [None]:
#df.select('Platform').drop_duplicates().show()

<h1> Afficher les ventes des jeux publiés en 2006 </h1>

In [None]:
df.where(df.Year == 2006).show()

<h1>Combien de jeux ont été vendus en plus 15 millions d'exemplaire </h1>

In [None]:
df.where(df.Global_Sales >= 15).count()

<h1>Rajouter une colonne pour indiquer si le jeux est un jeux pokémon (2 façons différentes)</h1>

In [None]:
df_isPokemon = df.withColumn('isPokemon', F.lower(df.Name).contains('pokemon'))
df_isPokemon.where(df_isPokemon.isPokemon == 'true').count()

### Afficher la répartition des mots

In [None]:
#methode par RDD

In [None]:
#methode par dataframe
df_1 = df.withColumn("words",F.udf(lambda x :x.lower().split(),\
                                   T.ArrayType(T.StringType()))("Name"))
print(df_1.first())
df_2 =df_1.withColumn("word",F.explode("words"))
print(df_2.first())
df_3 = df_2.groupBy("word").count()
df_3.show()

<h1> Quels sont les 20 mots les plus présent </h1>

In [None]:
df_3.sort('count', ascending=False).show()

<h1> Quel mot fait le plus vendre ? </h1>

In [None]:
df_1 = df.withColumn("words",F.udf(lambda x :x.lower().split(),\
                                   T.ArrayType(T.StringType()))("Name"))
print(df_1.first())
df_2 =df_1.withColumn("word",F.explode("words"))
print(df_2.first())
df_3 = df_2.groupBy("word").count()
#df_3 --> word  count
df_4 = df_2.groupBy("word").agg(F.sum("Global_Sales"))
#df_4 --> word  sum(Global_Sales)
df_4.where("word != 'the' and length(word) > 2").sort('sum(Global_Sales)', ascending=False).show()
#df_3.join(df_2)
#df_3.show()

<h1>Afficher les ventes des différents "Publisher" </h1>

<h1> Partie Machine-learning </h1>

<h1> A partir des différentes colonnes (sans les ventes globales) essayez de prédire les ventes européennes </h1>

In [None]:
import pyspark.ml as SparkML
vector = SparkML.feature.VectorAssembler(inputCols=["Year","NA_Sales","EU_Sales","JP_Sales","Other_Sales"], outputCol="features")

df_transform= vector.transform(df.dropna())
(trainingData, testData) = df_transform.randomSplit([0.8, 0.2])

classifier = SparkML.regression.RandomForestRegressor(labelCol="EU_Sales", featuresCol="features")
model = classifier.fit(trainingData)

pred = model.transform(testData)

#pred.show()
evaluator = SparkML.evaluation.RegressionEvaluator(
    labelCol="EU_Sales", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(pred)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
#PF2T2TMC 15ACH6H

<h1> Evaluez votre algorithme et essayez en d'autres </h1>

<h1> Rajouter des colonnes pour améliorer vos algorithmes </h1>