### Ingest circuits.csv

#### Step 1 - Read the CSV file the spark df reader

In [0]:
%run /Workspace/Users/nicolas97alonso@gmail.com/databricks-course/utils/vairables

In [0]:
circuits_path = f"{raw_root}/circuits.csv"

#### Why `inferSchema` should be avoided in production

Relying on `inferSchema` in production pipelines is risky because Spark must scan the entire dataset to guess column types. This adds unnecessary cost and makes the pipeline less predictable. Schema inference can also produce different results when new data arrives, which leads to inconsistent schemas across runs.

- **Performance impact** ‚Äî Spark performs an extra full scan of all input files to infer types, which becomes expensive at scale.
- **Unstable behavior** ‚Äî Type inference depends on the data itself. If new files contain different patterns, Spark may infer a different type (for example, `Integer` becoming `Long` or even `String`).
- **Silent schema drift** ‚Äî Pipelines may continue running with a changed schema, causing downstream failures or corrupted tables.
- **Weak data contracts** ‚Äî Production systems should treat the schema as a stable contract. Inference breaks that assumption.

In [0]:
circuits_df = spark.read \
    .option('header', True) \
    .option('inferSchema', True) \
    .csv(circuits_path)
display(circuits_df)


#### Step 1: Change Schema üìù

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

In [0]:
circuits_schema = StructType([
    StructField("circuitId", IntegerType(), False),
    StructField("circuitRef", StringType(), True),
    StructField("name", StringType(), True),
    StructField("location", StringType(), True),
    StructField("country", StringType(), True),
    StructField("lat", DoubleType(), True),
    StructField("lng", DoubleType(), True),
    StructField("alt", IntegerType(), True),
    StructField("url", StringType(), True)
])

In [0]:
circuits_df = spark.read \
    .option('header', True) \
    .schema(circuits_schema) \
    .csv(circuits_path)
    
display(circuits_df)

In [0]:
circuits_df.printSchema()

#### Step 2: Select only the required columns üìù
4 methods to select

1. Only allow you to select the columns
    

In [0]:
circuits_selected_df = circuits_df.select(
    "circuitId",
    "circuitRef",
    "name",
    "location",
    "country",
    "lat",
    "lng",
    "alt"
)

2. The following 3 allow you to use column methods

In [0]:
circuits_selected_df = circuits_df.select(
    circuits_df.circuitId,
    circuits_df.circuitRef,
    circuits_df.name,
    circuits_df.location,
    circuits_df.country,
    circuits_df.lat,
    circuits_df.lng,
    circuits_df.alt
)

In [0]:
circuits_selected_df = circuits_df.select(
    circuits_df["circuitId"],
    circuits_df["circuitRef"],
    circuits_df["name"],
    circuits_df["location"],
    circuits_df["country"],
    circuits_df["lat"],
    circuits_df["lng"],
    circuits_df["alt"]
)

In [0]:
from pyspark.sql.functions import col

In [0]:
circuits_selected_df = circuits_df.select(
    col("circuitId"),
    col("circuitRef"),
    col("name"),
    col("location"),
    col("country"),
    col("lat"),
    col("lng"),
    col("alt")
)

In [0]:
circuits_selected_df.display()

#### Step 3: Rename columns üìù

In [0]:
circuits_renamed_df = (
    circuits_selected_df
        .withColumnRenamed("circuitId", "circuit_id")
        .withColumnRenamed("circuitRef", "circuit_ref")
        .withColumnRenamed("lat", "latitude")
        .withColumnRenamed("lng", "longitud")
        .withColumnRenamed("alt", "altitude")
)

In [0]:
circuits_renamed_df.display()