In [None]:
import $ivy.`org.vegas-viz::vegas:0.3.11`
import $ivy.`org.apache.spark::spark-core:2.4.0`
import $ivy.`org.apache.spark::spark-sql:2.4.0`
import $ivy.`org.vegas-viz::vegas-spark:0.3.11`


In [None]:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import vegas.sparkExt._
import vegas._
import org.apache.spark.sql.functions._

# Parameters

In [None]:
val CACHED_DATA_DIR = "/goalimpacct/spark_data_cache/result_parquet"


val PLAYER_ID_LIST = List(
    ("35207", "Reus"),
    ("28003", "Messi"),
    ("68290", "Neymar"),
    ("126716", "Alcacer"),
    ("8198", "C.Ronaldo"),
    ("38253", "Lewandowski"),
    ("125781", "Griezmann"),
    ("342229", "Mbappe")
)

//val PLAYER_ID_LIST = List(
// ("68290", "Neymar"))

val TARGET_SEASONS = List(
    ("2018")
)

val TAKE_LAST_X_DATA = 1

# Helper Vars

In [None]:
val playtimeColumn = ":playtimeLast" + TAKE_LAST_X_DATA + "Matches"
val totalOffPointsColumn = ":totalOffPointsLast" + TAKE_LAST_X_DATA + "Matches"
val totalDefPointsColumn = ":totalDefPointsLast" + TAKE_LAST_X_DATA + "Matches"
val avgOffPointsColumn = ":avgOffPointsLast" + TAKE_LAST_X_DATA + "Matches"
val avgDefPointsColumn = ":avgDefPointsLast" + TAKE_LAST_X_DATA + "Matches"
val totalRankedOffPointsColumn = ":totalRankedOffPointsLast" + TAKE_LAST_X_DATA + "Matches"
val totalRankedDefPointsColumn = ":totalRankedDefPointsLast" + TAKE_LAST_X_DATA + "Matches"
val avgRankedOffPointsColumn = ":avgRankedOffPointsLast" + TAKE_LAST_X_DATA + "Matches"
val avgRankedDefPointsColumn = ":avgRankedDefPointsLast" + TAKE_LAST_X_DATA + "Matches"

# Create Spark Context

In [None]:
val conf = new SparkConf()
conf.setMaster(s"local[*]")
conf.setAppName("ShowHistory")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")

val spark = SparkSession.builder.appName("ShowHistory").getOrCreate()


# Helper Functions

In [None]:
def getYearDF(lastMatchDF : DataFrame, spark: SparkSession) : DataFrame = {

    import spark.implicits._

    val years  = lastMatchDF.withColumn("year", year($":target-match-timestamp")).select("year").distinct()

    val earliestYear : Int  = years.sort($"year".asc).head().get(0).toString.toInt
    val latestYear : Int = years.sort($"year".desc).head().get(0).toString.toInt

    val monthList = (earliestYear to latestYear).toList.flatMap(x => (1 to 12).toList.map(i => x + "-" + i + "-01"))


    val yearDF = monthList.toDF("timeString").withColumn("time", to_date($"timeString", "yyyy-MM-dd")).select("time")

    yearDF
}

#  Create/Load spark dataframes

In [None]:
import spark.implicits._    
val rawDataDF = spark.sqlContext.read.parquet(CACHED_DATA_DIR)
val playerNameMapper = PLAYER_ID_LIST.toDF(":player", ":name")
val saisonMapper =  TARGET_SEASONS.toDF(":saison")

In [None]:
val lastMatchDF =  rawDataDF.select(":player",":saison",":match",":team",":tournament",
    ":target-match-timestamp",
    playtimeColumn,
    totalOffPointsColumn,
    totalDefPointsColumn,
    avgOffPointsColumn,
    avgDefPointsColumn,
    totalRankedOffPointsColumn,
    totalRankedDefPointsColumn,
    avgRankedOffPointsColumn,
    avgRankedDefPointsColumn)
    .join(playerNameMapper, ":player")
    .join(saisonMapper, ":saison")

lastMatchDF.count


In [None]:
rawDataDF.filter($":player" === "125781" && $":saison" === "2018").count

In [None]:
val yearDF = getYearDF(lastMatchDF, spark)

yearDF.show

# Building avg values of all players

In [None]:
val avgRelevanteTournaments = rawDataDF
    .select(":player",":tournament", ":saison")
    .join(saisonMapper, ":saison")
    .join(playerNameMapper.select(":player"), ":player")
    .select(":tournament")
    .distinct()

avgRelevanteTournaments.show

val allPlayersByTimeFrame = rawDataDF.select(":player",":saison",":match",":team",":tournament",
    ":target-match-timestamp",
    playtimeColumn,
    totalOffPointsColumn,
    totalDefPointsColumn,
    avgOffPointsColumn,
    avgDefPointsColumn,
    totalRankedOffPointsColumn,
    totalRankedDefPointsColumn,
    avgRankedOffPointsColumn,
    avgRankedDefPointsColumn)
    .join(saisonMapper, ":saison")
    .join(avgRelevanteTournaments, ":tournament")
    .withColumn("time",  to_date( concat_ws("-", year($":target-match-timestamp"), month($":target-match-timestamp"), lit("01") ), "yyyy-MM"))
    .groupBy(":player", "time")
    .agg(sum(totalOffPointsColumn).as("totalOffNeutralPoints"),
        avg(avgOffPointsColumn).as("avgOffNeutralPoints"),
        sum(totalRankedOffPointsColumn).as("totalOffRankedPoints"),
        avg(avgRankedOffPointsColumn).as("avgOffRankedPoints"),
        sum(totalDefPointsColumn).as("totalDefNeutralPoints"),
        avg(avgDefPointsColumn).as("avgDefNeutralPoints"),
        sum(totalRankedDefPointsColumn).as("totalDefRankedPoints"),
        avg(avgRankedDefPointsColumn).as("avgDefRankedPoints"),
        sum(playtimeColumn).as("playtime"))


val avgPlayer = allPlayersByTimeFrame
    .groupBy("time")
    .agg(avg("totalOffNeutralPoints").as("totalOffNeutralPoints"),
        avg("avgOffNeutralPoints").as("avgOffNeutralPoints"),
        avg("totalOffRankedPoints").as("totalOffRankedPoints"),
        avg("avgOffRankedPoints").as("avgOffRankedPoints"),
        avg("totalDefNeutralPoints").as("totalDefNeutralPoints"),
        avg("avgDefNeutralPoints").as("avgDefNeutralPoints"),
        avg("totalDefRankedPoints").as("totalDefRankedPoints"),
        avg("avgDefRankedPoints").as("avgDefRankedPoints"))
    .join(yearDF, Seq("time"), "right_outer")
    .na.fill(0)
    .withColumn(":name", lit("avg"))

avgPlayer.count



# Aggregate players off values to time frames

In [None]:
val aggregatedOffPlayerPointsDF = lastMatchDF.withColumn("time",  to_date( concat_ws("-", year($":target-match-timestamp"), month($":target-match-timestamp"), lit("01") ), "yyyy-MM"))
      .select(":name", "time", totalOffPointsColumn, avgOffPointsColumn, totalRankedOffPointsColumn, avgRankedOffPointsColumn)
      .groupBy(":name", "time")
      .agg(sum(totalOffPointsColumn).as("totalOffNeutralPoints"),
        avg(avgOffPointsColumn).as("avgOffNeutralPoints"),
        sum(totalRankedOffPointsColumn).as("totalOffRankedPoints"),
        avg(avgRankedOffPointsColumn).as("avgOffRankedPoints"))
      .join(yearDF.crossJoin(playerNameMapper.select(":name")), Seq("time", ":name"), "right_outer")
        .na.fill(0)
        .union(avgPlayer.select("time", ":name", "totalOffNeutralPoints", "avgOffNeutralPoints", "totalOffRankedPoints", "avgOffRankedPoints"))


aggregatedOffPlayerPointsDF.count

# Total Off Points

In [None]:
val totalRankedPlotableDF = aggregatedOffPlayerPointsDF
    .select(":name", "time", "totalOffRankedPoints")
      .withColumnRenamed("totalOffRankedPoints", "points")

Vegas("total ranked player points")
  .withDataFrame(totalRankedPlotableDF)
  .mark(Line)
  .encodeX("time", Ordinal)
  .encodeY("points", Quant)
  .encodeColor(
    field=":name",
    dataType=Nominal,
    legend=Legend(orient="left", title="total ranked player points"))
.show

# AVG Off Points

In [None]:
val avgRankedPlotableDF = aggregatedOffPlayerPointsDF
    .select(":name", "time",  "avgOffRankedPoints")
      .withColumnRenamed("avgOffRankedPoints", "points")

Vegas("Ranked player points per time")
  .withDataFrame(avgRankedPlotableDF)
  .mark(Line)
  .encodeX("time", Ordinal)
  .encodeY("points", Quant)
  .encodeColor(
    field=":name",
    dataType=Nominal,
    legend=Legend(orient="left", title="Ranked player points per time"))
.show

In [None]:
val playerHistory = rawDataDF.select(":player",":saison",":match",":team",":tournament",
    ":target-match-timestamp",
    playtimeColumn,
    totalOffPointsColumn,
    totalDefPointsColumn,
    avgOffPointsColumn,
    avgDefPointsColumn,
    totalRankedOffPointsColumn,
    totalRankedDefPointsColumn,
    avgRankedOffPointsColumn,
    avgRankedDefPointsColumn)
    .join(playerNameMapper, ":player")
    .withColumn("time",  to_date( concat_ws("-", year($":target-match-timestamp"), month($":target-match-timestamp") , lit("01") ), "yyyy-MM"))

val a = playerHistory
    .select($":player", $":target-match-timestamp")
    .join(playerHistory
          .withColumnRenamed(":player", ":player2")
          .withColumnRenamed(":target-match-timestamp", ":target-match-timestamp2"), 
            $":player" === $":player2" && $":target-match-timestamp" >= $":target-match-timestamp2")
    .groupBy($":player", $":target-match-timestamp")
    .agg(sum(totalRankedOffPointsColumn).as(totalRankedOffPointsColumn), 
         sum(totalRankedDefPointsColumn).as(totalRankedDefPointsColumn))


val b = a.withColumn("time",  to_date( concat_ws("-", year($":target-match-timestamp"), lit("01") , lit("01") ), "yyyy-MM"))
.groupBy(":player", "time")
.agg(max(totalRankedOffPointsColumn).as(totalRankedOffPointsColumn), 
         min(totalRankedDefPointsColumn).as(totalRankedDefPointsColumn))
    .join(playerNameMapper, ":player")


Vegas("total ranked player points")
  .withDataFrame(b)
  .mark(Line)
  .encodeX("time", Ordinal)
  .encodeY(totalRankedOffPointsColumn, Quant)
  .encodeColor(
    field=":name",
    dataType=Nominal,
    legend=Legend(orient="left", title="total ranked player points"))
.show

Vegas("total ranked player points")
  .withDataFrame(b)
  .mark(Line)
  .encodeX("time", Ordinal)
  .encodeY(totalRankedDefPointsColumn, Quant)
  .encodeColor(
    field=":name",
    dataType=Nominal,
    legend=Legend(orient="left", title="total ranked player points"))
.show
