In [2]:
# STEP 1 - Setup and Extract (pandas + PySpark) in Colab

# Import libraries
from pathlib import Path
import logging
import pandas as pd
from pyspark.sql import SparkSession

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)

# Define config
DATASET_URL = "https://raw.githubusercontent.com/mwaskom/seaborn-data/master/tips.csv"
DATA_DIR = Path("data")
RAW_DIR = DATA_DIR / "raw"
PROCESSED_DIR = DATA_DIR / "processed"
CSV_PATH = RAW_DIR / "tips.csv"

# Create folders
RAW_DIR.mkdir(parents=True, exist_ok=True)
PROCESSED_DIR.mkdir(parents=True, exist_ok=True)

# Define pandas extraction function
def extract_with_pandas(url: str, output_path: Path) -> pd.DataFrame:
    logging.info(f"Download CSV with pandas from: {url}")
    df = pd.read_csv(url)
    df.to_csv(output_path, index=False)
    logging.info(f"Save local copy to: {output_path} | rows={len(df)}, cols={len(df.columns)}")
    return df

# Define Spark session creation function
def create_spark_session(app_name: str = "ColabETL_Demo") -> SparkSession:
    logging.info("Start Spark session")
    spark = SparkSession.builder.appName(app_name).getOrCreate()
    return spark

# Define Spark extraction function (from local file)
def extract_with_spark(local_path: Path, spark: SparkSession):
    logging.info(f"Read local CSV with Spark from: {local_path}")
    df = spark.read.option("header", True).option("inferSchema", True).csv(str(local_path))
    logging.info(f"Spark DataFrame rows={df.count()}, cols={len(df.columns)}")
    return df

# Run extraction with pandas
df_pandas = extract_with_pandas(DATASET_URL, CSV_PATH)
print("First 5 rows (pandas):")
display(df_pandas.head())

# Run extraction with Spark
spark = create_spark_session()
df_spark = extract_with_spark(CSV_PATH, spark)
print("Schema (PySpark):")
df_spark.printSchema()
print("First 5 rows (PySpark):")
df_spark.show(5)


First 5 rows (pandas):


Unnamed: 0,total_bill,tip,sex,smoker,day,time,size
0,16.99,1.01,Female,No,Sun,Dinner,2
1,10.34,1.66,Male,No,Sun,Dinner,3
2,21.01,3.5,Male,No,Sun,Dinner,3
3,23.68,3.31,Male,No,Sun,Dinner,2
4,24.59,3.61,Female,No,Sun,Dinner,4


Schema (PySpark):
root
 |-- total_bill: double (nullable = true)
 |-- tip: double (nullable = true)
 |-- sex: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- size: integer (nullable = true)

First 5 rows (PySpark):
+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



In [3]:
# Clean and enrich data (pandas) and reload cleaned data (PySpark)

# Define path for cleaned CSV
CLEAN_CSV_PATH = PROCESSED_DIR / "tips_clean.csv"

# Define pandas cleaning function
def clean_with_pandas(df: pd.DataFrame) -> pd.DataFrame:
    logging.info("Start basic cleaning with pandas")

    # Drop rows with missing core numeric columns
    df = df.dropna(subset=["total_bill", "tip"])

    # Filter out non-positive bills
    df = df[df["total_bill"] > 0]

    # Compute tip percentage
    df["tip_pct"] = df["tip"] / df["total_bill"] * 100.0

    # Round tip percentage
    df["tip_pct"] = df["tip_pct"].round(2)

    # Create high tip flag
    df["high_tip_flag"] = (df["tip_pct"] > 20.0).astype(int)

    # Normalize text columns (strip spaces)
    for col in ["sex", "smoker", "day", "time"]:
        df[col] = df[col].astype(str).str.strip()

    logging.info(f"Finish cleaning: rows={len(df)}, cols={len(df.columns)}")
    return df

# Run cleaning with pandas
df_clean_pandas = clean_with_pandas(df_pandas)

# Save cleaned data to CSV
logging.info(f"Save cleaned data to: {CLEAN_CSV_PATH}")
df_clean_pandas.to_csv(CLEAN_CSV_PATH, index=False)

print("First 5 cleaned rows (pandas):")
display(df_clean_pandas.head())

# Define function to load cleaned data with Spark
def load_clean_with_spark(clean_path: Path, spark: SparkSession):
    logging.info(f"Load cleaned CSV with Spark from: {clean_path}")
    df = (
        spark.read
             .option("header", True)
             .option("inferSchema", True)
             .csv(str(clean_path))
    )
    logging.info(f"Spark cleaned DataFrame rows={df.count()}, cols={len(df.columns)}")
    return df

# Load cleaned data with Spark
df_clean_spark = load_clean_with_spark(CLEAN_CSV_PATH, spark)

print("Schema of cleaned data (PySpark):")
df_clean_spark.printSchema()
print("First 5 cleaned rows (PySpark):")
df_clean_spark.show(5)


First 5 cleaned rows (pandas):


Unnamed: 0,total_bill,tip,sex,smoker,day,time,size,tip_pct,high_tip_flag
0,16.99,1.01,Female,No,Sun,Dinner,2,5.94,0
1,10.34,1.66,Male,No,Sun,Dinner,3,16.05,0
2,21.01,3.5,Male,No,Sun,Dinner,3,16.66,0
3,23.68,3.31,Male,No,Sun,Dinner,2,13.98,0
4,24.59,3.61,Female,No,Sun,Dinner,4,14.68,0


Schema of cleaned data (PySpark):
root
 |-- total_bill: double (nullable = true)
 |-- tip: double (nullable = true)
 |-- sex: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- size: integer (nullable = true)
 |-- tip_pct: double (nullable = true)
 |-- high_tip_flag: integer (nullable = true)

First 5 cleaned rows (PySpark):
+----------+----+------+------+---+------+----+-------+-------------+
|total_bill| tip|   sex|smoker|day|  time|size|tip_pct|high_tip_flag|
+----------+----+------+------+---+------+----+-------+-------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|   5.94|            0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|  16.05|            0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|  16.66|            0|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|  13.98|            0|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|  14.68|            0|
+----------+----+------+-

In [5]:
# Create analytics table (PySpark) and save to Parquet and CSV

from pyspark.sql import functions as F

# Define output paths
PARQUET_DIR = PROCESSED_DIR / "parquet"
PARQUET_DIR.mkdir(parents=True, exist_ok=True)

ANALYTICS_PARQUET_PATH = str(PARQUET_DIR / "tips_analytics.parquet")
ANALYTICS_CSV_PATH = PROCESSED_DIR / "tips_analytics.csv"

# Define function to create analytics table with Spark
def create_analytics_with_spark(df):
    logging.info("Create analytics table with Spark")

    analytics_df = (
        df.groupBy("day", "time")
          .agg(
              F.count("*").alias("n_rows"),
              F.avg("total_bill").alias("avg_total_bill"),
              F.avg("tip_pct").alias("avg_tip_pct"),
              F.avg("high_tip_flag").alias("high_tip_rate")
          )
          .orderBy("day", "time")
    )

    logging.info(f"Analytics table rows={analytics_df.count()}, cols={len(analytics_df.columns)}")
    return analytics_df

# Create analytics Spark DataFrame
df_analytics_spark = create_analytics_with_spark(df_clean_spark)

# Save analytics table as Parquet (useful for cloud tools like AWS/EMR/Glue)
logging.info(f"Save analytics table in Parquet format to: {ANALYTICS_PARQUET_PATH}")
df_analytics_spark.write.mode("overwrite").parquet(ANALYTICS_PARQUET_PATH)

# Convert analytics table to pandas and save as CSV for easy sharing
df_analytics_pandas = df_analytics_spark.toPandas()
logging.info(f"Save analytics table as CSV to: {ANALYTICS_CSV_PATH}")
df_analytics_pandas.to_csv(ANALYTICS_CSV_PATH, index=False)

print("Analytics table (first rows):")
display(df_analytics_pandas.head())


Analytics table (first rows):


Unnamed: 0,day,time,n_rows,avg_total_bill,avg_tip_pct,high_tip_rate
0,Fri,Dinner,12,19.663333,15.8925,0.166667
1,Fri,Lunch,7,12.845714,18.875714,0.285714
2,Sat,Dinner,87,20.441379,15.314598,0.114943
3,Sun,Dinner,76,21.41,16.689605,0.197368
4,Thur,Dinner,1,18.78,15.97,0.0
