<a href="https://colab.research.google.com/github/irawan09/PySpark/blob/main/PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!git clone 'https://github.com/irawan09/PySpark.git'

#PySpark in Google Colab

To run spark in Colab, we need to first install all the dependencies in Colab environment i.e. Apache Spark 3.0.1 with hadoop 2.7, Java 8 and Findspark to locate the spark in the system. The tools installation can be carried out inside the Jupyter Notebook of the Colab. One important note is that if you are new in Spark, it is better to avoid Spark 2.4.0 version since some people have already complained about its compatibility issue with python. Follow the steps to install the dependencies:

## Setting the Environment
Spark is written in the Scala programming language and requires the Java Virtual Machine (JVM) to run. Therefore, our first task is to download Java.

### Installing Java on the virtual environment

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

### Download and Setting JAVA and Spark Home directory
Next, we will install Apache Spark 3.0.1 with Hadoop 2.7

In [None]:
!wget -q https://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz

Now, we just need to unzip that folder.

In [None]:
!tar xf spark-3.0.1-bin-hadoop2.7.tgz

*Note – At the time of writing this code, I am choosing version 3.0.1. You can change the version depends on you preferences. You can refer on this website : https://archive.apache.org/dist/spark/spark-3.0.1/

Now that we have installed all the necessary dependencies in Colab, it is time to set the environment path. This will enable us to run Pyspark in the Colab environment.

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"

### Install PySpark and findspark

There is one last thing that we need to install and that is the findspark library and PySpark library. It will locate Spark on the system and import it as a regular library.

In [None]:
 !pip install pyspark
 !pip install -q findspark

We need to locate Spark in the system. For that, we import findspark and use the **findspark.init()** method.

In [None]:
import findspark
findspark.init()

If you want to know the location where Spark is installed, use **findspark.find()**

In [None]:
findspark.find()

###Starting Spark Session

SparkSession is the entry point to Spark SQL. It is one of the very first objects you create while developing a Spark SQL application.

As a Spark developer, you create a SparkSession using the **SparkSession.builder** method (that gives you access to Builder API that you use to configure the session).

Once created, SparkSession allows for creating a DataFrame (based on an RDD or a Scala Seq), creating a Dataset, accessing the Spark SQL services (e.g. ExperimentalMethods, ExecutionListenerManager, UDFRegistration), executing a SQL query, loading a table and the last but not least accessing DataFrameReader interface to load a dataset of the format of your choice (to some extent).

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("Colab").config('spark.ui.port', '4050').getOrCreate()

In [None]:
spark

If you want to view the Spark UI, you would have to include a few more lines of code to create a public URL for the UI page.

In [None]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels

##Roll the Party !!!

In [None]:
import findspark
from pyspark import SparkContext
from pyspark.sql import Window, Row
from pyspark.sql.functions import *
from pyspark.sql.types import *

import matplotlib.pyplot as plt

In [None]:
df_matches = spark.read.format('csv').options(header='true').load('/content/PySpark/Matches.csv')
df_matches.limit(5).show()

In [None]:
old_cols = df_matches.columns[-3:]
new_cols = ["HomeTeamGoals", "AwayTeamGoals", "FinalResult"]
old_new_cols = [*zip(old_cols, new_cols)]
for old_col, new_col in old_new_cols:
    df_matches = df_matches.withColumnRenamed(old_col, new_col)

df_matches.limit(5).toPandas()

In [None]:
print("Number of row : ", df_matches.count())

Sometimes you might want to view some specific columns from the dataframe. For those purposes, you can leverage the capabilities of Spark’s SQL.

In [None]:
df_matches.select("Season","HomeTeam","AwayTeam").show(15)

In [None]:
df_matches = df_matches \
    .withColumn('HomeTeamWin', when(col('FinalResult') == 'H', 1).otherwise(0)) \
    .withColumn('AwayTeamWin', when(col('FinalResult') == 'A', 1).otherwise(0)) \
    .withColumn('GameTie', when(col('FinalResult') == 'D', 1).otherwise(0))

Bundesliga is a D1 division and we are interested in season <= 2010 and >= 2000

In [None]:
bundesliga = df_matches \
                    .filter((col('Season') >= 2000) & 
                            (col('Season') <= 2010) & 
                            (col('Div') == 'D1'))

Home team features

In [None]:
home = bundesliga.groupby('Season', 'HomeTeam') \
       .agg(sum('HomeTeamWin').alias('TotalHomeWin'),
            sum('AwayTeamWin').alias('TotalHomeLoss'),
            sum('GameTie').alias('TotalHomeTie'),
            sum('HomeTeamGoals').alias('HomeScoredGoals'),
            sum('AwayTeamGoals').alias('HomeAgainstGoals')) \
       .withColumnRenamed('HomeTeam', 'Team')

Away game features 

In [None]:
away =  bundesliga.groupby('Season', 'AwayTeam') \
       .agg(sum('AwayTeamWin').alias('TotalAwayWin'),
            sum('HomeTeamWin').alias('TotalAwayLoss'),
            sum('GameTie').alias('TotalAwayTie'),
            sum('AwayTeamGoals').alias('AwayScoredGoals'),
            sum('HomeTeamGoals').alias('AwayAgainstGoals'))  \
       .withColumnRenamed('AwayTeam', 'Team')

Season features

In [None]:
window = ['Season']
window = Window.partitionBy(window).orderBy(col('WinPct').desc(), col('GoalDifferentials').desc())
table = home.join(away, ['Team', 'Season'],  'inner') \
    .withColumn('GoalsScored', col('HomeScoredGoals') + col('AwayScoredGoals')) \
    .withColumn('GoalsAgainst', col('HomeAgainstGoals') + col('AwayAgainstGoals')) \
    .withColumn('GoalDifferentials', col('GoalsScored') - col('GoalsAgainst')) \
    .withColumn('Win', col('TotalHomeWin') + col('TotalAwayWin')) \
    .withColumn('Loss', col('TotalHomeLoss') + col('TotalAwayLoss')) \
    .withColumn('Tie', col('TotalHomeTie') + col('TotalAwayTie')) \
    .withColumn('WinPct', round((100* col('Win')/(col('Win') + col('Loss') + col('Tie'))), 2)) \
    .drop('HomeScoredGoals', 'AwayScoredGoals', 'HomeAgainstGoals', 'AwayAgainstGoals') \
    .drop('TotalHomeWin', 'TotalAwayWin', 'TotalHomeLoss', 'TotalAwayLoss', 'TotalHomeTie', 'TotalAwayTie') \
    .withColumn('TeamPosition', rank().over(window)) 

In [None]:
table_df = table.filter(col('TeamPosition') == 1).orderBy(asc('Season')).toPandas()
table_df