In [None]:

!wget -nc https://repo1.maven.org/maven2/com/google/cloud/spark/spark-bigquery-with-dependencies_2.12/0.42.4/spark-bigquery-with-dependencies_2.12-0.42.4.jar \
     -O /content/jars/spark-bigquery-0.42.4.jar

!wget -nc https://repo1.maven.org/maven2/com/google/cloud/bigdataoss/gcs-connector/hadoop3-2.2.14/gcs-connector-hadoop3-2.2.14-shaded.jar \
     -O /content/jars/gcs-connector-hadoop3-2.2.14.jar


File ‘/content/jars/spark-bigquery-0.42.4.jar’ already there; not retrieving.
--2025-07-13 11:44:38--  https://repo1.maven.org/maven2/com/google/cloud/bigdataoss/gcs-connector/hadoop3-2.2.14/gcs-connector-hadoop3-2.2.14-shaded.jar
Resolving repo1.maven.org (repo1.maven.org)... 199.232.192.209, 199.232.196.209, 2a04:4e42:4c::209, ...
Connecting to repo1.maven.org (repo1.maven.org)|199.232.192.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 38298738 (37M) [application/java-archive]
Saving to: ‘/content/jars/gcs-connector-hadoop3-2.2.14.jar’


2025-07-13 11:44:41 (14.0 MB/s) - ‘/content/jars/gcs-connector-hadoop3-2.2.14.jar’ saved [38298738/38298738]



In [None]:
from pyspark.sql import SparkSession
from pathlib import Path
import os, re

KEY_PATH     = "<Your key path>"
DATA_BUCKET  = "<Your data bucket>"
TEMP_BUCKET  = "<Your temp bucket>"
PROJECT_ID   = "<Your project id>"
DATASET      = "<Your dataset>"

JAR1 = "/content/jars/spark-bigquery-0.42.4.jar"
JAR2 = "/content/jars/gcs-connector-hadoop3-2.2.14.jar"
ALL_JARS = ",".join([JAR1, JAR2])

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = KEY_PATH

spark = (
    SparkSession.builder
      .appName("Spark-GCS-BQ Full Working")
      .master("local[*]")
      .config("spark.jars", ALL_JARS)
      .config("spark.driver.extraClassPath", ALL_JARS)
      .config("spark.executor.extraClassPath", ALL_JARS)

      # ---- GCS connector ----
      .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
      .config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
      .config("spark.hadoop.fs.gs.project.id", PROJECT_ID)
      .config("spark.hadoop.google.cloud.auth.service.account.enable", "true")
      .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", KEY_PATH)

      # ---- BigQuery connector ----
      .config("temporaryGcsBucket", TEMP_BUCKET)

      .getOrCreate()
)

cp = spark.sparkContext._jvm.java.lang.System.getProperty("java.class.path")
print("✅ GCS connector:", "gcs-connector" in cp)
print("✅ BigQuery jar :", "spark-bigquery" in cp)


✅ GCS connector: True
✅ BigQuery jar : True


In [3]:
df_appearances = spark.read.csv('gs://long09011/appearances.csv', header=True, inferSchema=True)
df_club_games = spark.read.csv('gs://long09011/club_games.csv', header=True, inferSchema=True)
df_clubs = spark.read.csv('gs://long09011/clubs.csv', header=True, inferSchema=True)
df_games = spark.read.csv('gs://long09011/games.csv', header=True, inferSchema=True)
df_competitions = spark.read.csv('gs://long09011/competitions.csv', header=True, inferSchema=True)
df_game_lineups = spark.read.csv('gs://long09011/game_lineups.csv', header=True, inferSchema=True)
df_player_valuations = spark.read.csv('gs://long09011/player_valuations.csv', header=True, inferSchema=True)
df_players = spark.read.csv('gs://long09011/players.csv', header=True, inferSchema=True)
df_transfers = spark.read.csv('gs://long09011/transfers.csv', header=True, inferSchema=True)

In [4]:
df_competitions_filtered = df_competitions \
    .filter(df_competitions['country_name'].isNotNull()) \
    .dropDuplicates(['name']) \
    .select('competition_id', 'name', 'type', 'country_name', 'confederation', 'is_major_national_league') \
    .withColumnRenamed('name', 'competition_name') \
    .withColumnRenamed('type', 'competition_type')



df_clubs_filtered = df_clubs.join(df_competitions_filtered, df_clubs["domestic_competition_id"] == df_competitions_filtered["competition_id"], how="inner") \
    .select(df_clubs['club_id'], \
            df_clubs['name'].alias('club_name'), \
            df_clubs['domestic_competition_id'], \
            df_clubs['stadium_name'], \
            df_clubs['stadium_seats'])



df_clubs_home = df_clubs_filtered.alias("home_club")
df_clubs_away = df_clubs_filtered.alias("away_club")

In [5]:
from pyspark.sql import functions as F


In [6]:
df_games_filtered = df_games.join(df_competitions_filtered, df_games["competition_id"] == df_competitions_filtered["competition_id"], how="inner") \
    .join(df_clubs_home, df_games["home_club_id"] == df_clubs_home["club_id"], how="inner") \
    .join(df_clubs_away, df_games["away_club_id"] == df_clubs_away["club_id"], how="inner") \
    .select(df_games['game_id'], \
            df_games['competition_id'], \
            df_games['date'], \
            df_games['season'], \
            df_games['round'], \
            df_games['home_club_id'], \
            df_games['away_club_id'], \
            df_games['home_club_name'], \
            df_games['away_club_name'], \
            df_games['home_club_goals'], \
            df_games['away_club_goals'], \
            df_games['home_club_manager_name'], \
            df_games['away_club_manager_name'], \
            df_games['stadium'], \
            df_games['attendance'], \
            df_games['referee'], \
            df_games['competition_type'])



df_club_games_filtered = df_club_games.join(df_clubs_filtered, df_club_games["club_id"] == df_clubs_filtered["club_id"], how="inner") \
    .join(df_games_filtered, df_club_games["game_id"] == df_games_filtered["game_id"], how="inner") \
    .select(df_club_games['game_id'], \
            df_club_games['club_id'], \
            df_club_games['own_goals'], \
            df_club_games['opponent_id'], \
            df_club_games['opponent_goals'], \
            df_club_games['hosting'], \
            df_club_games['is_win'])



df_players_filtered = df_players \
    .join(df_clubs_filtered, df_players['current_club_id'] == df_clubs_filtered['club_id'], 'inner') \
    .dropDuplicates(['name']) \
    .select(
        df_players['player_id'],
        df_players['name'].alias('player_name'),
        df_players['last_season'],
        df_players['current_club_id'],
        df_players['country_of_birth'],
        df_players['country_of_citizenship'].alias('nationality'),
        df_players['date_of_birth'].cast('date').alias('date_of_birth'),
        (F.datediff(F.current_date(), df_players['date_of_birth'].cast('date')) / 365).cast('int').alias('age'),
        df_players['position'],
        df_players['foot'],
        df_players['height_in_cm'],
        df_players['market_value_in_eur'],
        df_players['highest_market_value_in_eur'],
        df_players['image_url']
    )



df_player_valuations_filtered = df_player_valuations.join(df_players_filtered, df_player_valuations["player_id"] == df_players_filtered["player_id"], how="inner") \
    .join(df_clubs_filtered, df_player_valuations["current_club_id"] == df_clubs_filtered["club_id"], how="inner") \
    .select(df_player_valuations['player_id'], \
            df_player_valuations['date'], \
            F.year(df_player_valuations['date']).alias('season'), \
            df_player_valuations['market_value_in_eur'], \
            df_player_valuations['current_club_id'])


df_game_lineups_filtered = df_game_lineups \
    .filter((df_game_lineups['date'].isNotNull()) &
            (df_game_lineups['date'] != '0') &
            (df_game_lineups['date'] != '1')) \
    .join(df_players_filtered, df_game_lineups['player_id'] == df_players_filtered['player_id'], 'inner') \
    .join(df_clubs_filtered, df_game_lineups['club_id'] == df_clubs_filtered['club_id'], 'inner') \
    .join(df_games_filtered, df_game_lineups['game_id'] == df_games_filtered['game_id'], 'inner') \
    .select(
        df_game_lineups['game_lineups_id'],
        df_game_lineups['date'].cast('date').alias('date'),
        df_game_lineups['game_id'],
        df_game_lineups['player_id'],
        df_game_lineups['club_id'],
        df_game_lineups['player_name'],
        df_game_lineups['type'],
        df_game_lineups['position'],
        df_game_lineups['number'],
        df_game_lineups['team_captain']
    )



df_clubs_from = df_clubs_filtered.alias("from_club")
df_clubs_to = df_clubs_filtered.alias("to_club")

df_transfers_filtered = df_transfers \
    .filter(df_transfers['transfer_fee'].isNotNull()) \
    .join(df_clubs_from, df_transfers['from_club_id'] == df_clubs_from['club_id'], 'inner') \
    .join(df_clubs_to, df_transfers['to_club_id'] == df_clubs_to['club_id'], 'inner') \
    .join(df_players_filtered, df_transfers['player_id'] == df_players_filtered['player_id'], 'inner') \
    .select(
        df_transfers['player_id'],
        df_transfers['player_name'],
        df_transfers['transfer_date'],
        (df_transfers['transfer_season'].substr(1, 2).cast('int') + 2000).alias('season'),
        df_transfers['from_club_id'],
        df_transfers['to_club_id'],
        df_transfers['transfer_fee'],
        df_transfers['market_value_in_eur']
    )

df_appearances_filtered = df_appearances \
    .filter(df_appearances['player_name'].isNotNull()) \
    .join(df_competitions_filtered, df_appearances['competition_id'] == df_competitions_filtered['competition_id'], 'inner') \
    .join(df_clubs_filtered, df_appearances['player_club_id'] == df_clubs_filtered['club_id'], 'inner') \
    .join(df_games_filtered, df_appearances['game_id'] == df_games_filtered['game_id'], 'inner') \
    .join(df_players_filtered, df_appearances['player_id'] == df_players_filtered['player_id'], 'inner') \
    .select(df_appearances['*'])



In [7]:
print('clubs_filtered count:', df_clubs_filtered.count())
print('appearances_filtered count:', df_appearances_filtered.count())

clubs_filtered count: 405
appearances_filtered count: 1403183


In [8]:
df_clubs_filtered.write.format('bigquery').option('table', 'football-465805.test.clubs').mode('overwrite').save()
df_appearances_filtered.write.format('bigquery').option('table', 'football-465805.test.appearances').mode('overwrite').save()
df_club_games_filtered.write.format('bigquery').option('table', 'football-465805.test.club_games').mode('overwrite').save()
df_games_filtered.write.format('bigquery').option('table', 'football-465805.test.games').mode('overwrite').save()
df_competitions_filtered.write.format('bigquery').option('table', 'football-465805.test.competitions').mode('overwrite').save()
df_game_lineups_filtered.write.format('bigquery').option('table', 'football-465805.test.game_lineups').mode('overwrite').save()
df_player_valuations_filtered.write.format('bigquery').option('table', 'football-465805.test.player_valuations').mode('overwrite').save()
df_players_filtered.write.format('bigquery').option('table', 'football-465805.test.players').mode('overwrite').save()
df_transfers_filtered.write.format('bigquery').option('table', 'football-465805.test.transfers').mode('overwrite').save()
