# Landing zone

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Delta Lake Configuration

In [2]:
# Install PySpark and Delta Lake for data processing and storage

!pip install pyspark
!pip install delta-spark

Collecting delta-spark
  Downloading delta_spark-3.3.0-py3-none-any.whl.metadata (2.0 kB)
Downloading delta_spark-3.3.0-py3-none-any.whl (21 kB)
Installing collected packages: delta-spark
Successfully installed delta-spark-3.3.0


In [18]:
# Import necessary libraries
import pyspark  # PySpark for distributed data processing
from pyspark.sql import SparkSession  # SparkSession is the entry point for PySpark
from delta import *  # Delta Lake integration for PySpark

# Initialize SparkSession with Delta Lake configuration
builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

# Create SparkSession
spark = configure_spark_with_delta_pip(builder).getOrCreate()


## Temporary Landing

In [19]:
import os

# Define the directory containing CSV files
csv_directory = "/content/drive/MyDrive/Colab Notebooks/BDM25/csv_data"

# Define the path for the Temporal Landing Zone
delta_directory = "/content/drive/MyDrive/Colab Notebooks/BDM25/csv_data/delta_data"

# Process all CSV files in the folder
for csv_file in os.listdir(csv_directory):
    if csv_file.endswith(".csv"):
        file_path = os.path.join(csv_directory, csv_file)

        # Read the CSV file as a PySpark DataFrame
        df = spark.read.option("delimiter", ",").option("header", True).csv(file_path)

        # Define the Delta storage path for each file
        delta_path = os.path.join(delta_directory, csv_file.replace(".csv", ""))

        # Write data to Delta Lake (Temporal Landing Zone)
        df.write.format("delta").mode("overwrite").save(delta_path)

        print(f"✅ Successfully saved file {csv_file} to Delta Lake at: {delta_path}")


✅ Successfully saved file appearances.csv to Delta Lake at: /content/drive/MyDrive/Colab Notebooks/BDM25/csv_data/delta_data/appearances
✅ Successfully saved file club_games.csv to Delta Lake at: /content/drive/MyDrive/Colab Notebooks/BDM25/csv_data/delta_data/club_games
✅ Successfully saved file clubs.csv to Delta Lake at: /content/drive/MyDrive/Colab Notebooks/BDM25/csv_data/delta_data/clubs
✅ Successfully saved file competitions.csv to Delta Lake at: /content/drive/MyDrive/Colab Notebooks/BDM25/csv_data/delta_data/competitions
✅ Successfully saved file game_events.csv to Delta Lake at: /content/drive/MyDrive/Colab Notebooks/BDM25/csv_data/delta_data/game_events
✅ Successfully saved file game_lineups.csv to Delta Lake at: /content/drive/MyDrive/Colab Notebooks/BDM25/csv_data/delta_data/game_lineups
✅ Successfully saved file games.csv to Delta Lake at: /content/drive/MyDrive/Colab Notebooks/BDM25/csv_data/delta_data/games
✅ Successfully saved file player_valuations.csv to Delta Lake a

## Attach metadata

In [20]:
import json
from datetime import datetime

# Define the metadata file path
metadata_path = os.path.join(delta_directory, "metadata.json")

# Extract metadata
def extract_metadata(dataframe, csv_file, file_path, delta_path):
    metadata = {
        "file_name": csv_file,
        "columns": dataframe.columns,
        "num_rows": dataframe.count(),
        "creation_time": str(datetime.now()),
        "last_modified": str(datetime.fromtimestamp(os.path.getmtime(file_path))),
        "delta_path": delta_path
    }
    return metadata



In [21]:
# Create an empty list to store metadata
all_metadata = []

# Process all CSV files and extract metadata
for csv_file in os.listdir(csv_directory):
    if csv_file.endswith(".csv"):
        file_path = os.path.join(csv_directory, csv_file)

        # Read CSV file as PySpark DataFrame
        df = spark.read.option("delimiter", ",").option("header", True).csv(file_path)

        # Define the Delta storage path
        delta_path = os.path.join(delta_directory, csv_file.replace(".csv", ""))

        # Extract metadata and add it to the list
        metadata = extract_metadata(df, csv_file, file_path, delta_path)
        all_metadata.append(metadata)

# Save all metadata to a JSON file
with open(metadata_path, "w") as f:
    json.dump(all_metadata, f, indent=4)

print(f"✅ All metadata saved to: {metadata_path}")


✅ All metadata saved to: /content/drive/MyDrive/Colab Notebooks/BDM25/csv_data/delta_data/metadata.json


## Persistent Landing

In [24]:
# Define the path for the Persistent Landing Zone
persistent_directory = "/content/drive/MyDrive/Colab Notebooks/BDM25/csv_data/persistent_data"

# Move data from Temporal to Persistent Landing Zone
for metadata in all_metadata:
    csv_file = metadata["file_name"]
    delta_path = metadata["delta_path"]

    # Define the persistent storage path
    persistent_path = os.path.join(persistent_directory, csv_file.replace(".csv", ""))

    # Read data from the Temporal Landing Zone
    df = spark.read.format("delta").load(delta_path)

    # Save data to the Persistent Landing Zone
    df.write.format("delta").mode("overwrite").save(persistent_path)

    # Update metadata with the persistent path information
    metadata["persistent_path"] = persistent_path

    print(f"✅ Data successfully moved from Temporal Landing Zone to Persistent Landing Zone: {persistent_path}")

# Save the updated metadata to the JSON file
with open(metadata_path, "w") as f:
    json.dump(all_metadata, f, indent=4)

print(f"✅ All metadata has been updated and saved to: {persistent_path}")


✅ Data successfully moved from Temporal Landing Zone to Persistent Landing Zone: /content/drive/MyDrive/Colab Notebooks/BDM25/csv_data/persistent_data/appearances
✅ Data successfully moved from Temporal Landing Zone to Persistent Landing Zone: /content/drive/MyDrive/Colab Notebooks/BDM25/csv_data/persistent_data/club_games
✅ Data successfully moved from Temporal Landing Zone to Persistent Landing Zone: /content/drive/MyDrive/Colab Notebooks/BDM25/csv_data/persistent_data/clubs
✅ Data successfully moved from Temporal Landing Zone to Persistent Landing Zone: /content/drive/MyDrive/Colab Notebooks/BDM25/csv_data/persistent_data/competitions
✅ Data successfully moved from Temporal Landing Zone to Persistent Landing Zone: /content/drive/MyDrive/Colab Notebooks/BDM25/csv_data/persistent_data/game_events
✅ Data successfully moved from Temporal Landing Zone to Persistent Landing Zone: /content/drive/MyDrive/Colab Notebooks/BDM25/csv_data/persistent_data/game_lineups
✅ Data successfully moved f