# ðŸ‘½ Project: Alien Insights ETL Pipeline

**Goal**: Clean and optimize raw UFO sightings data for analysis.

### Pipeline Steps:
1.  **Extract**: Read `ufo_sightings.csv` from S3 Raw.
2.  **Clean (Silver Layer)**:
    *   Standardize column names (remove spaces).
    *   Handle missing values in `State` and `Shape`.
    *   **Challenge**: Convert messy `Time` string to actual `Timestamp`.
3.  **Feature Engineering (Gold Layer)**:
    *   Extract `Year` and `Month` for trend analysis.
    *   Aggregate total sightings per `State`.
4.  **Load**: Write optimized Parquet data back to S3.

In [None]:
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import col, to_timestamp, year, month, count, trim, upper, when

# 1. Setup
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
print("Ready to find aliens! ðŸ›¸")

### 1. Extract (Read from S3)

In [None]:
BUCKET_NAME = "egirgis-datalake-v1"
INPUT_PATH = f"s3://{BUCKET_NAME}/raw/ufo/"
OUTPUT_PATH_GOLD = f"s3://{BUCKET_NAME}/gold/ufo_analytics/"

# Read Raw CSV
dyf_raw = glueContext.create_dynamic_frame.from_options(
    format_options={"quoteChar": "\"", "withHeader": True, "separator": ","},
    connection_type="s3",
    format="csv",
    connection_options={"paths": [INPUT_PATH], "recurse": True},
    transformation_ctx="raw_input"
)

print("Raw Data Schema:")
dyf_raw.printSchema()
dyf_raw.show(5)

### 2. Clean & Normalize (Silver Layer)
We convert to Spark DataFrame for easier column manipulation.

In [None]:
df_raw = dyf_raw.toDF()

# 1. Rename columns (Remove spaces)
df_cleaned = df_raw.withColumnRenamed("Colors Reported", "Colors_Reported") \
                   .withColumnRenamed("Shape Reported", "Shape")

# 2. Clean 'Shape': Trim whitespace, uppercase, replace NULL with 'UNKNOWN'
df_cleaned = df_cleaned.withColumn("Shape", upper(trim(col("Shape")))) \
                       .fillna({"Shape": "UNKNOWN"})

# 3. Parse Time (Format is like '6/1/1930 22:00')
# Spark's to_timestamp handles standard formats well, but let's be explicit
df_cleaned = df_cleaned.withColumn("Event_Time", to_timestamp(col("Time"), "M/d/yyyy H:mm"))

df_cleaned.printSchema()
df_cleaned.select("City", "State", "Shape", "Event_Time").show(5)

### 3. Feature Engineering (Gold Layer)
Extract insights: Year, Month, and aggregations.

In [None]:
# Extract Time Features
df_gold = df_cleaned.withColumn("Year", year(col("Event_Time"))) \
                    .withColumn("Month", month(col("Event_Time")))

# Filter out bad dates (if parsing failed)
df_gold = df_gold.filter(col("Year").isNotNull())

print(f"Total Cleaned Sightings: {df_gold.count()}")
df_gold.show(5)

### 4. Load (Write to S3)
Write the final dataset partitioned by **Year** (efficient for time-series queries).

In [None]:
# Convert back to DynamicFrame for Glue Writing (optional, but good practice)
dyf_gold = DynamicFrame.fromDF(df_gold, glueContext, "gold_output")

print(f"Writing Gold Data to: {OUTPUT_PATH_GOLD}")

# Write Parquet, Partitioned by Year
glueContext.write_dynamic_frame.from_options(
    frame=dyf_gold,
    connection_type="s3",
    format="parquet",
    connection_options={"path": OUTPUT_PATH_GOLD, "partitionKeys": ["Year"]},
    transformation_ctx="write_gold"
)

print("mission_complete = True ðŸ›¸")