### Ingest circuits from Database

#### Step 1 - Read data from MySql database

In [1]:
from os import environ as env
from pyspark.sql import SparkSession

In [2]:
# Configuración de SparkSession con soporte S3
spark = SparkSession.builder \
    .appName("MySQL to MinIO") \
    .config("spark.jars", "/home/dev/working_dir/driver_jdbc/mysql-connector-j-8.0.32.jar,"
                          "/home/dev/working_dir/driver_jdbc/minio-7.1.0.jar,"
                          "/home/dev/working_dir/driver_jdbc/hadoop-aws-3.3.1.jar,"
                          "/home/dev/working_dir/driver_jdbc/aws-java-sdk-bundle-1.12.196.jar") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minio_access_key") \
    .config("spark.hadoop.fs.s3a.secret.key", "minio_secret_key") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .getOrCreate()

In [3]:
# Conexión a MySQL
mysql_url = f"jdbc:mysql://mysql_db:3306/{env['MYSQL_DATABASE']}"
mysql_properties = {
    "user": env["MYSQL_USER"],
    "password": env["MYSQL_PASSWORD"],
    "driver": "com.mysql.cj.jdbc.Driver"
}

try:
    # Leer una tabla de la base de datos f1db
    df = spark.read.jdbc(
        url=mysql_url, 
        table="circuits",
        properties=mysql_properties
    )
    print("✅ Connection successful")
except Exception as e:
    print(f"❌ Connection failed: {str(e)}")

✅ Connection successful


#### Step 2 - Write data to datalake as csv

In [4]:
df.show(5)

+---------+-----------+--------------------+------------+---------+--------+-------+---+--------------------+
|circuitId| circuitRef|                name|    location|  country|     lat|    lng|alt|                 url|
+---------+-----------+--------------------+------------+---------+--------+-------+---+--------------------+
|        1|albert_park|Albert Park Grand...|   Melbourne|Australia|-37.8497|144.968| 10|http://en.wikiped...|
|        2|     sepang|Sepang Internatio...|Kuala Lumpur| Malaysia| 2.76083|101.738| 18|http://en.wikiped...|
|        3|    bahrain|Bahrain Internati...|      Sakhir|  Bahrain| 26.0325|50.5106|  7|http://en.wikiped...|
|        4|  catalunya|Circuit de Barcel...|    Montmeló|    Spain|   41.57|2.26111|109|http://en.wikiped...|
|        5|   istanbul|       Istanbul Park|    Istanbul|   Turkey| 40.9517| 29.405|130|http://en.wikiped...|
+---------+-----------+--------------------+------------+---------+--------+-------+---+--------------------+
only showi

In [5]:
file_path =  env["BRONZE_LAYER_PATH"]

# Guardar en MinIO en formato CSV
df.write.csv(f"{file_path}/circuits", header=True, mode="overwrite")

print("Datos guardados en MinIO con éxito.")

Datos guardados en MinIO con éxito.


In [6]:
# Leer el archivo CSV desde el directorio
df = spark.read.csv(f"{file_path}/circuits", header=True, inferSchema=True)
df.show(5)

+---------+-----------+--------------------+------------+---------+--------+-------+---+--------------------+
|circuitId| circuitRef|                name|    location|  country|     lat|    lng|alt|                 url|
+---------+-----------+--------------------+------------+---------+--------+-------+---+--------------------+
|        1|albert_park|Albert Park Grand...|   Melbourne|Australia|-37.8497|144.968| 10|http://en.wikiped...|
|        2|     sepang|Sepang Internatio...|Kuala Lumpur| Malaysia| 2.76083|101.738| 18|http://en.wikiped...|
|        3|    bahrain|Bahrain Internati...|      Sakhir|  Bahrain| 26.0325|50.5106|  7|http://en.wikiped...|
|        4|  catalunya|Circuit de Barcel...|    Montmeló|    Spain|   41.57|2.26111|109|http://en.wikiped...|
|        5|   istanbul|       Istanbul Park|    Istanbul|   Turkey| 40.9517| 29.405|130|http://en.wikiped...|
+---------+-----------+--------------------+------------+---------+--------+-------+---+--------------------+
only showi

In [7]:
spark.sparkContext.stop()