# CSV â†’ Parquet (GCS)
Minimal pipeline from bronze to processed partitioned Parquet.

## Spark session

In [1]:
import os
from pyspark.sql import SparkSession

# Hadoop user for local FS access
os.environ["HADOOP_USER_NAME"] = "spark"

# Stop existing SparkSession
try:
  spark.stop()
except:
  pass

spark = (
  SparkSession.builder
  .appName("csv_parquet")
  .master("spark://spark-master:7077")

  # Ivy cache directory
  .config("spark.jars.ivy", "/home/spark/.ivy2")

  # GCS connector settings
  .config(
      "spark.hadoop.fs.gs.impl",
      "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem"
  )
  .config(
      "spark.hadoop.fs.AbstractFileSystem.gs.impl",
      "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS"
  )

  # GCS service account auth
  .config(
      "spark.hadoop.google.cloud.auth.service.account.enable", "true"
  )
  .config(
      "spark.hadoop.google.cloud.auth.service.account.json.keyfile",
      "/cred/clickstream-sa.json"
  )

  .getOrCreate()
)

# Log level
spark.sparkContext.setLogLevel("WARN")


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/02 06:52:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Paths and imports

In [None]:
from pyspark.sql.functions import col, to_timestamp, regexp_replace, monotonically_increasing_id
from pyspark.sql.functions import when, lit, concat_ws, array, to_date, date_format

# GCS paths
GCS_BUCKET = "clickstream-pipeline-484705-clickstream-data"
INPUT_PATHS = [
  f"gs://{GCS_BUCKET}/raw/2019-Oct.csv",
  f"gs://{GCS_BUCKET}/raw/2019-Nov.csv",
]

OUTPUT_PATHS = {
  "bronze_invalid": f"gs://{GCS_BUCKET}/raw/invalid",
  "processed": f"gs://{GCS_BUCKET}/processed/clickstream",
}

print(f"Input:  {', '.join(INPUT_PATHS)}")
print(f"Output (invalid): {OUTPUT_PATHS['bronze_invalid']}")
print(f"Output (processed):  {OUTPUT_PATHS['processed']}")


Input:  gs://clickstream-pipeline-484705-clickstream-data/raw/2019-Oct.csv, gs://clickstream-pipeline-484705-clickstream-data/raw/2019-Nov.csv
Output (invalid): gs://clickstream-pipeline-484705-clickstream-data/raw/invalid
Output (processed):  gs://clickstream-pipeline-484705-clickstream-data/processed/clickstreams


## Bronze schema (all strings)

In [3]:
from pyspark.sql import types
bronze_schema = types.StructType([                                                                                        
  types.StructField("event_time", types.StringType(), True),                                                            
  types.StructField("event_type", types.StringType(), True),                                                            
  types.StructField("product_id", types.StringType(), True),                                                            
  types.StructField("category_id", types.StringType(), True),                                                           
  types.StructField("category_code", types.StringType(), True),                                                         
  types.StructField("brand", types.StringType(), True),                                                                 
  types.StructField("price", types.StringType(), True),                                                                 
  types.StructField("user_id", types.StringType(), True),                                                               
  types.StructField("user_session", types.StringType(), True),                                                          
])


## Load bronze data from GCS

In [4]:
print("Loading data from GCS...")
df_bronze = (
  spark.read
  .option("header", "true")
  .schema(bronze_schema)
  .csv(INPUT_PATHS)
)

total_records = df_bronze.count()
print(f"Read complete: {total_records:,} records\n")


Loading data from GCS...




Read complete: 109,950,743 records



                                                                                

## Type casting

In [5]:
print("Casting types...")
                                                                                                                        
df_bronze_typed = (
    df_bronze
    .withColumn("_row_id", monotonically_increasing_id())

    .withColumn("event_time_raw", col("event_time"))
    .withColumn(
        "event_time_typed",
        to_timestamp(
            regexp_replace(col("event_time"), " UTC$", ""),
            "yyyy-MM-dd HH:mm:ss"
        )
    )

    .withColumn("product_id_raw", col("product_id"))
    .withColumn("product_id_typed", col("product_id").cast("long"))

    .withColumn("category_id_raw", col("category_id"))
    .withColumn("category_id_typed", col("category_id").cast("long"))

    .withColumn("price_raw", col("price"))
    .withColumn("price_typed", col("price").cast("double"))

    .withColumn("user_id_raw", col("user_id"))
    .withColumn("user_id_typed", col("user_id").cast("long"))
) 
df_bronze_typed.cache()


Casting types...


DataFrame[event_time: string, event_type: string, product_id: string, category_id: string, category_code: string, brand: string, price: string, user_id: string, user_session: string, _row_id: bigint, event_time_raw: string, event_time_typed: timestamp, product_id_raw: string, product_id_typed: bigint, category_id_raw: string, category_id_typed: bigint, price_raw: string, price_typed: double, user_id_raw: string, user_id_typed: bigint]

## Casting validation

In [6]:
import time
print("Validating cast failures...")
start_time = time.time()

df_cast_fail = df_bronze_typed.filter(
  (col("event_time_raw").isNotNull() & col("event_time_typed").isNull()) |
  (col("product_id_raw").isNotNull() & col("product_id_typed").isNull()) |
  (col("category_id_raw").isNotNull() & col("category_id_typed").isNull()) |
  (col("price_raw").isNotNull() & col("price_typed").isNull()) |
  (col("user_id_raw").isNotNull() & col("user_id_typed").isNull())
)

cast_fail_count = df_cast_fail.count()

end_time = time.time()
elapsed = end_time - start_time

cast_fail_rate = cast_fail_count / total_records if total_records > 0 else 0

print("\n" + "="*50)
print("Casting validation summary")
print("="*50)
print(f"Total records:      {total_records:,}")
print(f"Cast failures:   {cast_fail_count:,} ({cast_fail_rate:.4%})")
print("="*50 + "\n")

print(f"Elapsed time:     {elapsed:.2f}s")


Validating cast failures...





Casting validation summary
Total records:      109,950,743
Cast failures:   0 (0.0000%)

Elapsed time:     1256.93s


                                                                                

## Save invalid rows (if any)

In [7]:
if cast_fail_count > 0:
  print("Saving invalid rows to GCS...")

  df_cast_fail_labeled = df_cast_fail.withColumn(
      "failure_reason",
      concat_ws(", ",
          array(
              when(col("event_time_raw").isNotNull() & col("event_time_typed").isNull(),
                   lit("event_time")).otherwise(lit("")),
              when(col("product_id_raw").isNotNull() & col("product_id_typed").isNull(),
                   lit("product_id")).otherwise(lit("")),
              when(col("category_id_raw").isNotNull() & col("category_id_typed").isNull(),
                   lit("category_id")).otherwise(lit("")),
              when(col("price_raw").isNotNull() & col("price_typed").isNull(),
                   lit("price")).otherwise(lit("")),
              when(col("user_id_raw").isNotNull() & col("user_id_typed").isNull(),
                   lit("user_id")).otherwise(lit(""))
          )
      )
  )

  df_cast_fail_labeled.select(
      "event_time_raw", "event_type", "product_id_raw", "category_id_raw",
      "category_code", "brand", "price_raw", "user_id_raw", "user_session",
      "failure_reason"
  ).write.mode("overwrite").parquet(OUTPUT_PATHS["bronze_invalid"])

  print("Invalid rows saved\n")
else:
  print("No cast failures\n")


No cast failures



## Build processed data

In [8]:
print("Building processed data...")
                                                                                                                        
df_processed = (                                                                                                             
  df_bronze_typed                                                                                                       
  .filter(                                                                                                              
      col("event_time_typed").isNotNull() &                                                                                   
      col("user_id_typed").isNotNull() &                                                                                      
      col("product_id_typed").isNotNull()                                                                                     
  )                                                                                                                     
    .select(                                                                                           
      col("event_time_typed").alias("event_time"),                                                   
      col("event_type"),                                                                             
      col("product_id_typed").alias("product_id"),                                                   
      col("category_id_typed").alias("category_id"),                                                 
      col("category_code"),                                                                          
      col("brand"),                                                                                  
      col("price_typed").alias("price"),                                                             
      col("user_id_typed").alias("user_id"),                                                         
      col("user_session")                                                                            
  )   
)    
df_processed.cache()                                                                 
                                                                                                                        
processed_count = df_processed.count()                                                                                          
processed_rate = processed_count / total_records if total_records > 0 else 0                                                    
                                                                                                                        
print(f"Processed records: {processed_count:,} ({processed_rate:.2%})")

Building processed data...




Processed records: 109,950,743 (100.00%)


                                                                                

## Write processed Parquet (partitioned)

In [9]:
df_processed_partitioned = (
  df_processed
  .withColumn("event_date", to_date(col("event_time")))
  .withColumn("event_month", date_format(col("event_time"), "yyyy-MM"))
)

(
  df_processed_partitioned.write
  .mode("overwrite")
  .partitionBy("event_month", "event_date")
  .option("compression", "snappy")
  .parquet(OUTPUT_PATHS["processed"])
)

print("Processed data saved")


                                                                                

Processed data saved
