In [None]:
!apt-get update -qq
!apt-get install -y openjdk-11-jdk-headless -qq
!pip install -q pyspark


W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)


In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"

from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("EPL_RDD_Analysis") \
    .getOrCreate()

sc = spark.sparkContext
print("SparkContext created:", sc)


SparkContext created: <SparkContext master=local[*] appName=EPL_RDD_Analysis>


In [None]:
from google.colab import files
uploaded = files.upload()


Saving EPL.csv to EPL.csv


In [None]:
!ls -lh /content


total 212K
-rw-r--r-- 1 root root 208K Oct  8 03:44 EPL.csv
drwxr-xr-x 1 root root 4.0K Oct  6 13:38 sample_data


In [None]:
import csv
csv_path = "/content/EPL.csv"

import os
assert os.path.exists(csv_path), f"File not found: {csv_path}"

lines = sc.textFile(csv_path)

header = lines.first()
print("Header:", header)


def parse_line(line):

    return next(csv.reader([line]))

matches = lines.filter(lambda l: l != header).map(lambda l: [c.strip() for c in parse_line(l)])

print("Sample rows:", matches.take(5))


Header: home_team,away_team,home_goals,away_goals,result,season
Sample rows: [['Sheffield United', 'Liverpool', '1.0', '1.0', 'D', '2006-2007'], ['Arsenal', 'Aston Villa', '1.0', '1.0', 'D', '2006-2007'], ['Everton', 'Watford', '2.0', '1.0', 'H', '2006-2007'], ['Newcastle United', 'Wigan Athletic', '2.0', '1.0', 'H', '2006-2007'], ['Portsmouth', 'Blackburn Rovers', '3.0', '0.0', 'H', '2006-2007']]


In [None]:
total_matches = matches.count()
print("Total matches:", total_matches)


Total matches: 4560


In [None]:
result_counts = matches.map(lambda x: (x[5], 1)).reduceByKey(lambda a, b: a + b)
print("Result counts (H/D/A):", result_counts.collect())


Result counts (H/D/A): [('2006-2007', 380), ('2007-2008', 380), ('2008-2009', 380), ('2009-2010', 380), ('2010-2011', 380), ('2012-2013', 380), ('2014-2015', 380), ('2015-2016', 380), ('2011-2012', 380), ('2013-2014', 380), ('2016-2017', 380), ('2017-2018', 380)]


In [None]:
print("Sample parsed rows (first 10):")
print(matches.take(10))


Sample parsed rows (first 10):
[['Sheffield United', 'Liverpool', '1.0', '1.0', 'D', '2006-2007'], ['Arsenal', 'Aston Villa', '1.0', '1.0', 'D', '2006-2007'], ['Everton', 'Watford', '2.0', '1.0', 'H', '2006-2007'], ['Newcastle United', 'Wigan Athletic', '2.0', '1.0', 'H', '2006-2007'], ['Portsmouth', 'Blackburn Rovers', '3.0', '0.0', 'H', '2006-2007'], ['Reading', 'Middlesbrough', '3.0', '2.0', 'H', '2006-2007'], ['West Ham United', 'Charlton Athletic', '3.0', '1.0', 'H', '2006-2007'], ['Bolton Wanderers', 'Tottenham Hotspur', '2.0', '0.0', 'H', '2006-2007'], ['Manchester United', 'Fulham', '5.0', '1.0', 'H', '2006-2007'], ['Chelsea', 'Manchester City', '3.0', '0.0', 'H', '2006-2007']]


In [None]:
import re

matches = matches.filter(lambda row: len(row) >= 6)

def safe_int(s):
    try:
        if s is None:
            return 0
        s = str(s).strip()
        s = s.replace('"', '').replace("'", "")
        if s == "" or s.lower() in ("na", "n/a", "none"):
            return 0
        s = s.replace(",", "")
        return int(float(s))
    except Exception:
        return 0


def is_bad_goal_field(s):
    try:
        _ = int(float(str(s).strip().replace('"','').replace("'", "").replace(",", "")))
        return False
    except Exception:
        return True

bad_rows = matches.filter(lambda r: is_bad_goal_field(r[3]) or is_bad_goal_field(r[4]))
print("Example problematic rows (up to 10):", bad_rows.take(10))


Example problematic rows (up to 10): [['Sheffield United', 'Liverpool', '1.0', '1.0', 'D', '2006-2007'], ['Arsenal', 'Aston Villa', '1.0', '1.0', 'D', '2006-2007'], ['Everton', 'Watford', '2.0', '1.0', 'H', '2006-2007'], ['Newcastle United', 'Wigan Athletic', '2.0', '1.0', 'H', '2006-2007'], ['Portsmouth', 'Blackburn Rovers', '3.0', '0.0', 'H', '2006-2007'], ['Reading', 'Middlesbrough', '3.0', '2.0', 'H', '2006-2007'], ['West Ham United', 'Charlton Athletic', '3.0', '1.0', 'H', '2006-2007'], ['Bolton Wanderers', 'Tottenham Hotspur', '2.0', '0.0', 'H', '2006-2007'], ['Manchester United', 'Fulham', '5.0', '1.0', 'H', '2006-2007'], ['Chelsea', 'Manchester City', '3.0', '0.0', 'H', '2006-2007']]


In [None]:
home_goals = matches.map(lambda x: (x[1].strip(), safe_int(x[3])))
away_goals = matches.map(lambda x: (x[2].strip(), safe_int(x[4])))


total_goals = home_goals.union(away_goals).reduceByKey(lambda a, b: a + b)


goals_by_team = total_goals.collect()
print("Total goals per team:", goals_by_team)


top3 = total_goals.takeOrdered(3, key=lambda kv: -kv[1])
print("Top 3 scorers:", top3)


Total goals per team: [('Liverpool', 351), ('Charlton Athletic', 15), ('West Ham United', 212), ('Sheffield United', 8), ('Southampton', 123), ('Crystal Palace', 103), ('AFC Bournemouth', 61), ('3.0', 0), ('Watford', 62), ('Blackburn Rovers', 122), ('Tottenham Hotspur', 342), ('Fulham', 128), ('Manchester City', 365), ('Portsmouth', 63), ('Everton', 257), ('West Bromwich Albion', 154), ('Burnley', 64), ('Norwich City', 64), ('Cardiff City', 12), ('Leicester City', 99), ('Huddersfield Town', 12), ('1.0', 0), ('5.0', 0), ('0.0', 0), ('4.0', 0), ('7.0', 0), ('Aston Villa', 216), ('Middlesbrough', 50), ('Chelsea', 368), ('Newcastle United', 178), ('Hull City', 74), ('Wolverhampton Wanderers', 56), ('Blackpool', 25), ('Swansea City', 127), ('6.0', 0), ('8.0', 0), ('Wigan Athletic', 126), ('Reading', 65), ('Manchester United', 372), ('Bolton Wanderers', 111), ('Arsenal', 382), ('Birmingham City', 53), ('Sunderland', 175), ('Derby County', 8), ('Stoke City', 150), ('Queens Park Rangers', 55),

In [None]:
home_goals_sum = matches.map(lambda x: safe_int(x[3])).sum()
total_matches = matches.count()
avg_home_goals = home_goals_sum / total_matches if total_matches else 0
print("Average home goals per match:", avg_home_goals)


Average home goals per match: 1.1442982456140351


In [None]:
print(matches.take(10))
print("Bad rows example:", bad_rows.take(10))


[['Sheffield United', 'Liverpool', '1.0', '1.0', 'D', '2006-2007'], ['Arsenal', 'Aston Villa', '1.0', '1.0', 'D', '2006-2007'], ['Everton', 'Watford', '2.0', '1.0', 'H', '2006-2007'], ['Newcastle United', 'Wigan Athletic', '2.0', '1.0', 'H', '2006-2007'], ['Portsmouth', 'Blackburn Rovers', '3.0', '0.0', 'H', '2006-2007'], ['Reading', 'Middlesbrough', '3.0', '2.0', 'H', '2006-2007'], ['West Ham United', 'Charlton Athletic', '3.0', '1.0', 'H', '2006-2007'], ['Bolton Wanderers', 'Tottenham Hotspur', '2.0', '0.0', 'H', '2006-2007'], ['Manchester United', 'Fulham', '5.0', '1.0', 'H', '2006-2007'], ['Chelsea', 'Manchester City', '3.0', '0.0', 'H', '2006-2007']]
Bad rows example: [['Sheffield United', 'Liverpool', '1.0', '1.0', 'D', '2006-2007'], ['Arsenal', 'Aston Villa', '1.0', '1.0', 'D', '2006-2007'], ['Everton', 'Watford', '2.0', '1.0', 'H', '2006-2007'], ['Newcastle United', 'Wigan Athletic', '2.0', '1.0', 'H', '2006-2007'], ['Portsmouth', 'Blackburn Rovers', '3.0', '0.0', 'H', '2006-20

In [1]:
!git config --global user.email "vaishnavibalakrishnan2006@gmail.com"
!git config --global user.name "vaishnavibala25"

!git clone https://github.com/vaishnavibala25/epl-pyspark-rdd.git


Cloning into 'epl-pyspark-rdd'...


In [3]:
# 3. Copy notebook into repo folder (update filename)
!cp "/content/EPL_RDD.ipynb" epl-pyspark-rdd

cp: cannot stat '/content/EPL_RDD.ipynb': No such file or directory


In [4]:
import os

In [11]:
!git branch -M main



In [15]:
%cd /content/epl-pyspark-rdd


/content/epl-pyspark-rdd


In [18]:
!ls


In [19]:
%cd /content/epl-pyspark-rdd
!ls


/content/epl-pyspark-rdd


In [None]:
from google.colab import files

uploaded = files.upload()
