<a href="https://colab.research.google.com/github/sivanandhini751/GitHub-1/blob/main/ASSESSMENT_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:

from google.colab import files
uploaded = files.upload()


!pip install pyspark --quiet

from pyspark import SparkContext, SparkConf
import csv, re, math

conf = SparkConf().setAppName("EPL_RDD_Assessment").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf)

file_path = "/content/EPL.csv"
raw = sc.textFile(file_path)


header_line = raw.first()

header_fields = next(csv.reader([header_line]))
print("Detected header fields:", header_fields)


def norm_name(s):
    return re.sub(r'[^a-z0-9]', '', s.lower())

hdr_norm = [norm_name(h) for h in header_fields]

def find_index(candidates, default=None):
    for cand in candidates:
        cn = norm_name(cand)

        for i, h in enumerate(hdr_norm):
            if cn == h or cn in h or h in cn:
                return i
    return default


season_idx = find_index(['season', 'Season'], default=None)
home_idx   = find_index(['home_team','hometeam','home'], default=None)
away_idx   = find_index(['away_team','awayteam','away'], default=None)
fthg_idx   = find_index(['home_goals','fthg','homegoals','hg'], default=None)
ftag_idx   = find_index(['away_goals','ftag','awaygoals','ag'], default=None)
ftr_idx    = find_index(['result','ftr','res'], default=None)

print(f"Using indices -> season:{season_idx}, home:{home_idx}, away:{away_idx}, FTHG:{fthg_idx}, FTAG:{ftag_idx}, FTR:{ftr_idx}")


def parse_partition(lines):
    import csv
    for line in lines:
        if line is None or line.strip()=="":
            continue
        try:
            for row in csv.reader([line]):

                yield [c for c in row]
        except Exception:

            continue

parsed = raw.filter(lambda l: l != header_line).mapPartitions(parse_partition)

total_lines = raw.count()
parsed_rows = parsed.count()
print("Total lines (including header):", total_lines)
print("Parsed rows (after CSV parsing & header removal):", parsed_rows)


int_re = re.compile(r'(-?\d+)')
def extract_int_from_str(s):
    if s is None:
        return None
    s = str(s).strip()
    if s == "":
        return None

    m = int_re.search(s)
    if m:
        try:
            return int(m.group(1))
        except:
            pass

    try:
        f = float(s)

        if abs(f - round(f)) < 1e-9:
            return int(round(f))
        else:
            return int(math.floor(f))
    except:
        return None


required = {'season':season_idx, 'home':home_idx, 'away':away_idx, 'fthg':fthg_idx, 'ftag':ftag_idx, 'ftr':ftr_idx}
missing = [k for k,v in required.items() if v is None]
if missing:
    print("ERROR: Could not auto-detect these columns:", missing)
    print("Header fields again:", header_fields)
    raise SystemExit("Fix column detection before proceeding.")

max_idx = max(season_idx, home_idx, away_idx, fthg_idx, ftag_idx, ftr_idx)


parsed_enough = parsed.filter(lambda row: len(row) > max_idx)


clean = parsed_enough.filter(lambda row: extract_int_from_str(row[fthg_idx]) is not None and extract_int_from_str(row[ftag_idx]) is not None and str(row[ftr_idx]).strip() != "")

clean_count = clean.count()
print("Rows after filtering malformed / non-numeric-goals rows:", clean_count)


if clean_count == 0:
    sample = parsed_enough.take(10)
    print("\n--- Diagnostic: first 10 parsed rows (showing repr of fields) ---")
    for r in sample:
        print([repr(c) for c in r])
    raise SystemExit("No valid rows after parsing - inspect diagnostics above.")


sample_clean = clean.take(6)
print("\nSample clean rows (first 6):")
for r in sample_clean:
    print([repr(c) for c in r])

# Q1 & Q2: season total goals (RDD)
season_goals = clean.map(lambda row: (row[season_idx].strip(), extract_int_from_str(row[fthg_idx]) + extract_int_from_str(row[ftag_idx]))) \
                   .reduceByKey(lambda a,b: a+b)

highest_goals_season = season_goals.takeOrdered(1, key=lambda x: -x[1])
lowest_goals_season  = season_goals.takeOrdered(1, key=lambda x: x[1])

# Q3: team with highest avg goals per season
home_goals = clean.map(lambda row: (row[home_idx].strip(), extract_int_from_str(row[fthg_idx])))
away_goals = clean.map(lambda row: (row[away_idx].strip(), extract_int_from_str(row[ftag_idx])))

team_goals = home_goals.union(away_goals).reduceByKey(lambda a,b: a+b)

team_seasons = clean.flatMap(lambda row: [(row[home_idx].strip(), row[season_idx].strip()), (row[away_idx].strip(), row[season_idx].strip())]) \
                    .distinct() \
                    .map(lambda x: (x[0], 1)) \
                    .reduceByKey(lambda a,b: a+b)

team_avg_goals = team_goals.join(team_seasons).mapValues(lambda x: x[0] / x[1])
highest_avg_team = team_avg_goals.takeOrdered(1, key=lambda x: -x[1])


team_names_sample = team_goals.map(lambda x: x[0]).take(200)
print("\nSample team names (first 200 distinct in data):")
for t in team_names_sample[:40]:
    print("-", t)

# Q4 & Q5: Manchester United probabilities (auto-detect common variants)
possible_variants = ['manchester united','man united','man united fc','man utd','manutd','man u','manchester_utd','manchesterutd']
team_found = None
team_keys = set([n.strip().lower() for n in team_names_sample])

for pv in possible_variants:
    for actual in team_keys:
        if pv.replace(' ', '') in actual.replace(' ', '') or pv in actual:
            team_found = next((n for n in team_names_sample if n.strip().lower() == actual), None)
            break
    if team_found:
        break

if not team_found:
    print("\nCould not auto-detect a 'Manchester United' variant.")
    print("Please pick the exact team name from the sample printed above and set team_name to that exact string.")
    raise SystemExit("No Manchester United found automatically.")

team_name = team_found
print("\nUsing team name for probabilities:", repr(team_name))

team_lower = team_name.strip().lower()

mu_matches = clean.filter(lambda row: row[home_idx].strip().lower() == team_lower or row[away_idx].strip().lower() == team_lower)

mu_total = mu_matches.count()

def mu_label(row):
    home = row[home_idx].strip()
    away = row[away_idx].strip()
    ftr = row[ftr_idx].strip().upper()
    if ftr == 'D':
        return 'Draw'
    if ftr == 'H':
        return 'Win' if home.strip().lower() == team_lower else 'Loss'
    if ftr == 'A':
        return 'Win' if away.strip().lower() == team_lower else 'Loss'
    return None

mu_counts = mu_matches.map(mu_label).filter(lambda x: x is not None).countByValue()
mu_wins = mu_counts.get('Win', 0)
mu_losses = mu_counts.get('Loss', 0)
mu_draws = mu_counts.get('Draw', 0)

p_win = mu_wins / mu_total if mu_total > 0 else 0.0
p_loss = mu_losses / mu_total if mu_total > 0 else 0.0
p_draw = mu_draws / mu_total if mu_total > 0 else 0.0


print("\n================ RESULTS ================\n")

if highest_goals_season:
    s,g = highest_goals_season[0]
    print(f"Q1. Season with Highest Goals: {s} ({g} goals)")
else:
    print("Q1. Season with Highest Goals: No valid data")

if lowest_goals_season:
    s,g = lowest_goals_season[0]
    print(f"Q2. Season with Lowest Goals:  {s} ({g} goals)")
else:
    print("Q2. Season with Lowest Goals: No valid data")

if highest_avg_team:
    t,avg = highest_avg_team[0]
    print(f"Q3. Team with Highest Average Goals/Season: {t} ({avg:.2f} avg goals)")
else:
    print("Q3. Team with Highest Average Goals/Season: No valid data")

print(f"\nQ4 & Q5. Manchester United Probabilities (using name {repr(team_name)}):")
print(f"P(Win)  = {p_win:.3f}  (wins {mu_wins} / {mu_total})")
print(f"P(Loss) = {p_loss:.3f}  (losses {mu_losses} / {mu_total})")
print(f"P(Draw) = {p_draw:.3f}  (draws {mu_draws} / {mu_total})")

print("\n=========================================\n")


Saving EPL.csv to EPL (3).csv
Detected header fields: ['home_team', 'away_team', 'home_goals', 'away_goals', 'result', 'season']
Using indices -> season:5, home:0, away:1, FTHG:2, FTAG:3, FTR:4
Total lines (including header): 4561
Parsed rows (after CSV parsing & header removal): 4560
Rows after filtering malformed / non-numeric-goals rows: 4560

Sample clean rows (first 6):
["'Sheffield United'", "'Liverpool'", "'1.0'", "'1.0'", "'D'", "'2006-2007'"]
["'Arsenal'", "'Aston Villa'", "'1.0'", "'1.0'", "'D'", "'2006-2007'"]
["'Everton'", "'Watford'", "'2.0'", "'1.0'", "'H'", "'2006-2007'"]
["'Newcastle United'", "'Wigan Athletic'", "'2.0'", "'1.0'", "'H'", "'2006-2007'"]
["'Portsmouth'", "'Blackburn Rovers'", "'3.0'", "'0.0'", "'H'", "'2006-2007'"]
["'Reading'", "'Middlesbrough'", "'3.0'", "'2.0'", "'H'", "'2006-2007'"]

Sample team names (first 200 distinct in data):
- Sheffield United
- West Ham United
- Charlton Athletic
- Liverpool
- Southampton
- Crystal Palace
- AFC Bournemouth
- Ev