In [1]:
Dataset Review
The dataset we are going to use is publicly released by kaggle about LOL professional games. This data derives from census data, and consists of information about 3801 matches and their other information in each match. We will use this information to predict if an team win a game or not. The dataset is rather clean, and consists of both numeric and categorical variables.

In [2]:
dataset = spark.table("lol_red_and_blue_csv")


cols = dataset.columns


In [3]:
display(dataset)

In [4]:
type(dataset)

In [5]:
import pandas as pd
pandas_df = pd.read_csv("/dbfs//FileStore/tables/lol_red_and_blue.csv" )


In [6]:
pandas_df

In [7]:
pandas_df['TeamColor'] = pandas_df['TeamColor'].replace({'red':0, 'blue':1})


In [8]:
pandas_df

In [9]:
type(pandas_df)

In [10]:
dataset.printSchema()

In [11]:
###One-Hot Encoding
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

categoricalColumns = ["TeamColor"]
stages = [] # stages in our Pipeline
for categoricalCol in categoricalColumns:
  # Category Indexing with StringIndexer
  stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"Index")
  # Use OneHotEncoder to convert categorical variables into binary SparseVectors
  encoder = OneHotEncoder(inputCol=categoricalCol+"Index", outputCol=categoricalCol+"classVec")
  # Add stages.  These are not run here, but will run all at once later on.
  stages += [stringIndexer, encoder]

In [12]:
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol = "Result", outputCol = "label")
stages += [label_stringIdx]

In [13]:
dataset.columns

In [14]:
# Transform all features into a vector using VectorAssembler
numericCols = ["gamelength", "DragonValue", "BaronValue","Herald","InhibitsValue", "TowerValue","GoldIn10","GoldIn20",
 "kill"]
assemblerInputs = map(lambda c: c + "classVec", categoricalColumns) + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [15]:
# Create a Pipeline.
pipeline = Pipeline(stages=stages)
# Run the feature transformations.
#  - fit() computes feature statistics as needed.
#  - transform() actually transforms the features.
pipelineModel = pipeline.fit(dataset)
dataset = pipelineModel.transform(dataset)

# Keep relevant columns
selectedcols = ["label", "features"] + cols
dataset = dataset.select(selectedcols)
display(dataset)

In [16]:
dataset.printSchema

In [17]:
display(dataset)

In [18]:
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print trainingData.count()
print testData.count()

In [19]:
display(trainingData)

In [20]:
#Logistic regression
from pyspark.ml.classification import LogisticRegression

# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

# Train model with Training Data
lrModel = lr.fit(trainingData)

In [21]:
# Make predictions on test data using the transform() method.
# LogisticRegression.transform() will only use the 'features' column.
predictions = lrModel.transform(testData)

In [22]:
predictions.printSchema()

In [23]:
selected = predictions.select("label", "prediction", "probability")
display(selected)

In [24]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

In [25]:
evaluator.getMetricName()

In [26]:
print lr.explainParams()

In [27]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

In [28]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing

In [29]:
# Use test set here so we can measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

In [30]:
evaluator.evaluate(predictions)

In [31]:
print 'Model Intercept: ', cvModel.bestModel.intercept

In [32]:
#weights = cvModel.bestModel.weights
# on Spark 2.X weights are available as ceofficients
weights = cvModel.bestModel.coefficients
weights = map(lambda w: (float(w),), weights)  # convert numpy type to float, and to tuple
weightsDF = sqlContext.createDataFrame(weights, ["Feature Weight"])
display(weightsDF)

In [33]:
selected = predictions.select("label", "prediction", "probability")
display(selected)

In [34]:
from pyspark.ml.classification import DecisionTreeClassifier

# Create initial Decision Tree Model
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=3)

# Train model with Training Data
dtModel = dt.fit(trainingData)

In [35]:
print "numNodes = ", dtModel.numNodes
print "depth = ", dtModel.depth

In [36]:
# Make predictions on test data using the Transformer.transform() method.
predictions = dtModel.transform(testData)

In [37]:
predictions.printSchema()

In [38]:
# View model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "TeamColor", "TeamTag","Result")
display(selected)

In [39]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

In [40]:
dt.getImpurity()

In [41]:
# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [1,2,6,10])
             .addGrid(dt.maxBins, [20,40,80])
             .build())

In [42]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)
# Takes ~5 minutes

In [43]:
print "numNodes = ", cvModel.bestModel.numNodes
print "depth = ", cvModel.bestModel.depth

In [44]:
# Use test set here so we can measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

In [45]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

In [46]:
# View Best model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "TeamColor", "TeamTag","Result")
display(selected)


In [47]:
from pyspark.ml.classification import RandomForestClassifier

# Create an initial RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features")

# Train model with Training Data
rfModel = rf.fit(trainingData)

In [48]:
# Make predictions on test data using the Transformer.transform() method.
predictions = rfModel.transform(testData)

In [49]:
predictions.printSchema()

In [50]:
# View model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "TeamColor", "TeamTag","Result")
display(selected)

In [51]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

In [52]:
# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [2, 4, 6])
             .addGrid(rf.maxBins, [20, 60])
             .addGrid(rf.numTrees, [5, 20])
             .build())

In [53]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations.  This can take about 6 minutes since it is training over 20 trees!
cvModel = cv.fit(trainingData)

In [54]:
# Use test set here so we can measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

In [55]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

In [56]:
# View Best model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability")
display(selected)

In [57]:
bestModel = cvModel.bestModel

In [58]:
# Generate predictions for entire dataset
finalPredictions = bestModel.transform(dataset)

In [59]:
# Evaluate best model
evaluator.evaluate(finalPredictions)

In [60]:
finalPredictions.createOrReplaceTempView("finalPredictions")

In [61]:
%sql
SELECT *
FROM finalPredictions


In [62]:
dataset_2 = spark.table("deathvalue_including_victim_csv")
display(dataset_2)

In [63]:
#vertices_1 = dataset_2.select("Name")
vertices_1=dataset_2.select('Victim').distinct()
#display(vertices_1)
#vertices_1.count()
#dataset_2.count()
#vertice_1 = dataset_2.select("Victim")
display(vertices_1)

In [64]:
vertices_2=dataset_2.select('Name').distinct()

In [65]:
td1_2 = vertices_1.unionAll(vertices_2) 
allPerson = td1_2.distinct()



In [66]:
display(dataset_2)

In [68]:
from pyspark.sql.functions import *

allPerson = allPerson.select(col("Victim").alias("id"))



In [69]:
allRelation = dataset_2.select("Name","Victim","Character")
allRelation = dataset_2.select(col("Name").alias("src"), col("Victim").alias("dst"),col("Character").alias("relationship"))
allRelation

In [70]:
from graphframes import *
g = GraphFrame(allPerson, allRelation)


In [71]:
display(g.vertices)


In [72]:
display(g.edges)

In [73]:
display(g.inDegrees)

In [74]:
display(g.outDegrees)

In [75]:
display(g.degrees)

In [76]:
#Count for how many times, the player of 'Cop' kill some one 
numFollows = g.edges.filter("src = 'Cop' and relationship = 'Killer'").count()
print "The number of follow edges is", numFollows

In [77]:
help(g.find)

In [78]:
filteredPaths = g.bfs(
  fromExpr = "id = 'Bjergsen'",
  toExpr = "id = 'Hai'",
  edgeFilter = "relationship = 'Killer'",
   maxPathLength = 3)
display(filteredPaths)

In [79]:
result = g.stronglyConnectedComponents(maxIter=10)
display(result.select("id"))

In [80]:
results = g.pageRank(resetProbability=0.15, tol=0.01)
display(results.vertices)

In [81]:
display(results.edges)

In [82]:
# Run PageRank for a fixed number of iterations.
g.pageRank(resetProbability=0.15, maxIter=10)

In [83]:
# Run PageRank personalized for vertex "a"
g.pageRank(resetProbability=0.15, maxIter=10, sourceId="a")