# Initializing Spark Session, Context and SQL Context

In [None]:
from pyspark import SparkContext
sc = SparkContext("local")

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession(sc)

In [9]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

# Quick test on Spark Functionality

In [189]:
sc.parallelize([("A", 1), ("B", 1)]).toDF(['alpha', 'numa']).collect()

[Row(alpha='A', numa=1), Row(alpha='B', numa=1)]

# Define commonly used variables 

In [191]:
bzip_c = 'org.apache.hadoop.io.compress.BZip2Codec'
gzip_c = 'org.apache.hadoop.io.compress.GzipCodec'
db_csv = 'com.databricks.spark.csv'
datapath = "Datasets/originalDataset.csv"

# Load Data

In [None]:
data = sqlContext.read.format(db_csv).options(header='true', inferschema='false', delimiter=',').load(datapath)

## Have a look at the data

In [194]:
data.take(1)[0]

Row(Scorecard='ODI # 1', Team 1='Australia', Team 2='England', Winner='Australia', Margin='5 wickets', Ground='Melbourne', Match Date='Jan 5, 1971')

In [19]:
from pyspark.sql.functions import col

In [21]:
data.where(col("Team 1") == "India").take(1)

[Row(Scorecard='ODI # 28', Team 1='India', Team 2='New Zealand', Winner='New Zealand', Margin='4 wickets', Ground='Manchester', Match Date='Jun 14, 1975')]

In [24]:
data.where((col("Team 1") == "India") | 
           (col("Team 2") == "India")).count()

930

In [210]:
data.count()

3932

In [69]:
from pyspark.sql.functions import when, lit
import pyspark.sql.functions as F

# Questions

## 1. What is India’s total Win/Loss/Tie percentage?

In [198]:
intermediate = data.where((col("Winner") != "no result"))\
    .where((col("Team 1") == "India") | 
           (col("Team 2") == "India"))\
    .select([col("Team 1"), col("Team 2"), col("Ground"), col("Margin"), col("Match Date"),
             when(col("Winner") == "India", 1).otherwise(0).alias("Win"),
             when(col("Winner") == "tied", 1).otherwise(0).alias("Tie"),
             when((col("Winner") != "tied") & 
                  (col("Winner") != "India"), 1).otherwise(0).alias("Lose")])


In [199]:
inter_1 = intermediate.agg(*[
    F.sum("Win").alias("Win"), F.sum("Tie").alias("Tie"), F.sum("Lose").alias("Lose"), F.sum(lit(1)).alias("All")])
inter_1.withColumn("All", col('All') / lit(100)).withColumn("Win", col("Win") / col("All")).withColumn("Tie", col("Tie") / col("All"))\
    .withColumn("Lose", col("Lose") / col("All")).drop("All").collect()

[Row(Win=53.48314606741573, Tie=0.7865168539325842, Lose=45.73033707865169)]

## 2. What is India’s Win/Loss/Tie percentage in away and home matches?

In [201]:
inter_2 = intermediate.withColumn("Home/Away", when(col("Team 1") == "India", "Home").otherwise("Away"))\
    .select(["Home/Away", "Win", "Tie", "Lose"])\
    .groupBy(["Home/Away"])\
    .agg(*[F.sum("Win").alias("Win"), F.sum("Tie").alias("Tie"), F.sum("Lose").alias("Lose"), F.sum(lit(1)).alias("All")])

In [202]:
inter_2.withColumn("All", col('All') / lit(100)).withColumn("Win", col("Win") / col("All")).withColumn("Tie", col("Tie") / col("All"))\
    .withColumn("Lose", col("Lose") / col("All")).drop("All").collect()

[Row(Home/Away='Home', Win=57.798165137614674, Tie=0.9174311926605504, Lose=41.28440366972477),
 Row(Home/Away='Away', Win=46.666666666666664, Tie=0.5797101449275363, Lose=52.7536231884058)]

## 3. How many matches has India played against different ICC teams?
## 4. How many matches India has won or lost against different teams?

In [204]:
inter_3 = intermediate.withColumn("Opponent", when(col("Team 1") == "India", col("Team 2")).otherwise(col("Team 1")))\
    .groupBy(["Opponent"])\
    .agg(*[F.sum("Win").alias("Win"), F.sum("Tie").alias("Tie"), F.sum("Lose").alias("Lose"), F.sum(lit(1)).alias("All")])

In [206]:
inter_3.collect()

[Row(Opponent='Afghanistan', Win=1, Tie=0, Lose=0, All=1),
 Row(Opponent='Sri Lanka', Win=88, Tie=1, Lose=55, All=144),
 Row(Opponent='West Indies', Win=56, Tie=1, Lose=61, All=118),
 Row(Opponent='Bangladesh', Win=27, Tie=0, Lose=5, All=32),
 Row(Opponent='Ireland', Win=3, Tie=0, Lose=0, All=3),
 Row(Opponent='Hong Kong', Win=1, Tie=0, Lose=0, All=1),
 Row(Opponent='U.A.E.', Win=3, Tie=0, Lose=0, All=3),
 Row(Opponent='Zimbabwe', Win=51, Tie=2, Lose=10, All=63),
 Row(Opponent='Namibia', Win=1, Tie=0, Lose=0, All=1),
 Row(Opponent='Kenya', Win=11, Tie=0, Lose=2, All=13),
 Row(Opponent='New Zealand', Win=51, Tie=1, Lose=44, All=96),
 Row(Opponent='England', Win=52, Tie=2, Lose=39, All=93),
 Row(Opponent='East Africa', Win=1, Tie=0, Lose=0, All=1),
 Row(Opponent='Australia', Win=45, Tie=0, Lose=73, All=118),
 Row(Opponent='South Africa', Win=29, Tie=0, Lose=45, All=74),
 Row(Opponent='Bermuda', Win=1, Tie=0, Lose=0, All=1),
 Row(Opponent='Scotland', Win=1, Tie=0, Lose=0, All=1),
 Row(Opp

## 5. Which are the home and away grounds where India has played most number of matches?

In [130]:
inter_4 = intermediate.withColumn("Home/Away", when(col("Team 1") == "India", "Home").otherwise("Away"))\
.groupBy(["Home/Away", "Ground"])\
    .agg(*[F.sum(lit(1)).alias("All")])

In [137]:
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

w = Window().partitionBy("Home/Away").orderBy(col("All").desc())
inter_4.withColumn("rn", row_number().over(w)).where(col("rn") == 1).select("Home/Away", "Ground").collect()

[Row(Home/Away='Home', Ground='Sharjah'),
 Row(Home/Away='Away', Ground='Colombo (RPS)')]

## 6. What has been the average Indian win or loss by Runs per year?

In [240]:
from pyspark.sql.functions import unix_timestamp, to_timestamp
from pyspark.sql.functions import year
inter_5 = intermediate.withColumn("Match Date", to_timestamp(col("Match Date"), 'MMM dd, yyyy'))\
    .withColumn("Year", year(col("Match Date")))

In [259]:
inter_6 = inter_5.withColumn("Win/Tie/Lose", 
                        when(col("Win") == 1, "Win").when(col("Tie") == 1, "Tie").when(col("Lose") == 1, "Lose"))\
    .withColumn("Margin Count Type", F.split(col("Margin"), " "))\
    .withColumn("Margin Count", col("Margin Count Type").getItem(0).cast("Integer"))\
    .withColumn("Margin Type", col("Margin Count Type").getItem(1)).where((col("Margin Type").isin(['run', 'runs'])) & 
                                                                          (col("Win/Tie/Lose").isin(['Win', 'Lose'])))\
    .groupBy(["Year", "Win/Tie/Lose"])\
    .agg(*[F.avg(col("Margin Count")).alias("Avg Margin Count")])\
    
    
yearly_win_stats = inter_6.where(col("Win/Tie/Lose") == "Win").withColumn("Avg Win Margin", 
                                                                          F.bround(col("Avg Margin Count"), 2))\
    .drop(*["Win/Tie/Lose", "Avg Margin Count"])
yearly_lose_stats = inter_6.where(col("Win/Tie/Lose") == "Lose").withColumn("Avg Lose Margin", 
                                                                            F.bround(col("Avg Margin Count"), 2))\
    .drop(*["Win/Tie/Lose", "Avg Margin Count"])

yearly_win_stats.join(yearly_lose_stats, "Year", "outer").sort(col("Year").desc()).where(col("Year").isNotNull()).collect()

[Row(Year=2017, Avg Win Margin=73.38, Avg Lose Margin=54.25),
 Row(Year=2016, Avg Win Margin=190.0, Avg Lose Margin=16.67),
 Row(Year=2015, Avg Win Margin=66.44, Avg Lose Margin=82.2),
 Row(Year=2014, Avg Win Margin=101.5, Avg Lose Margin=58.2),
 Row(Year=2013, Avg Win Margin=58.25, Avg Lose Margin=100.33),
 Row(Year=2012, Avg Win Margin=30.33, Avg Lose Margin=78.25),
 Row(Year=2011, Avg Win Margin=75.62, Avg Lose Margin=67.0),
 Row(Year=2010, Avg Win Margin=76.0, Avg Lose Margin=121.33),
 Row(Year=2009, Avg Win Margin=59.2, Avg Lose Margin=48.67),
 Row(Year=2008, Avg Win Margin=81.22, Avg Lose Margin=61.0),
 Row(Year=2007, Avg Win Margin=66.44, Avg Lose Margin=50.0),
 Row(Year=2006, Avg Win Margin=38.75, Avg Lose Margin=52.12),
 Row(Year=2005, Avg Win Margin=93.0, Avg Lose Margin=83.5),
 Row(Year=2004, Avg Win Margin=39.45, Avg Lose Margin=53.89),
 Row(Year=2003, Avg Win Margin=122.3, Avg Lose Margin=75.0),
 Row(Year=2002, Avg Win Margin=51.38, Avg Lose Margin=42.83),
 Row(Year=2001, 