# Data Ingestion Notebook

This notebook demonstrates data ingestion from various sources into S3 (LocalStack).

## Parameters
- `execution_date`: Date of pipeline execution
- `s3_bucket`: Target S3 bucket for data storage
- `aws_endpoint_url`: AWS endpoint URL (LocalStack)

In [None]:
# Parameters - these will be injected by Papermill
execution_date = '2024-01-01'
s3_bucket = 'data-lake'
aws_endpoint_url = 'http://localhost:4566'

In [None]:
import pandas as pd
import boto3
import numpy as np
from datetime import datetime
import os

# Configure AWS credentials for LocalStack
os.environ['AWS_ACCESS_KEY_ID'] = 'test'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'test'
os.environ['AWS_DEFAULT_REGION'] = 'us-east-1'

print(f"Execution Date: {execution_date}")
print(f"S3 Bucket: {s3_bucket}")
print(f"AWS Endpoint: {aws_endpoint_url}")

In [None]:
# Initialize S3 client
s3_client = boto3.client(
    's3',
    endpoint_url=aws_endpoint_url,
    aws_access_key_id='test',
    aws_secret_access_key='test',
    region_name='us-east-1'
)

# Check if bucket exists, create if not
try:
    s3_client.head_bucket(Bucket=s3_bucket)
    print(f"Bucket '{s3_bucket}' exists")
except:
    s3_client.create_bucket(Bucket=s3_bucket)
    print(f"Created bucket '{s3_bucket}'")

In [None]:
# Generate sample data
np.random.seed(42)
n_records = 10000

sample_data = pd.DataFrame({
    'customer_id': range(1, n_records + 1),
    'transaction_date': pd.date_range(
        start=execution_date, 
        periods=n_records, 
        freq='min'
    ),
    'amount': np.random.normal(100, 25, n_records),
    'category': np.random.choice(
        ['Electronics', 'Clothing', 'Food', 'Books', 'Other'], 
        n_records
    ),
    'payment_method': np.random.choice(
        ['Credit Card', 'Debit Card', 'Cash', 'PayPal'], 
        n_records
    )
})

# Add some missing values for data quality testing
sample_data.loc[np.random.choice(sample_data.index, 100), 'amount'] = np.nan
sample_data.loc[np.random.choice(sample_data.index, 50), 'category'] = None

print(f"Generated {len(sample_data)} records")
print(sample_data.head())
print("\nData Info:")
print(sample_data.info())

In [None]:
# Save data to CSV
csv_filename = f"raw_data_{execution_date.replace('-', '_')}.csv"
csv_path = f"/tmp/{csv_filename}"

sample_data.to_csv(csv_path, index=False)
print(f"Saved data to {csv_path}")

# Upload to S3
s3_key = f"raw-data/{execution_date}/{csv_filename}"
s3_client.upload_file(csv_path, s3_bucket, s3_key)

print(f"Uploaded to S3: s3://{s3_bucket}/{s3_key}")

In [None]:
# Verify upload
try:
    response = s3_client.head_object(Bucket=s3_bucket, Key=s3_key)
    print(f"✅ File successfully uploaded to S3")
    print(f"File size: {response['ContentLength']} bytes")
    print(f"Last modified: {response['LastModified']}")
except Exception as e:
    print(f"❌ Error verifying upload: {e}")

In [None]:
# Generate metadata
metadata = {
    'execution_date': execution_date,
    'total_records': len(sample_data),
    'file_path': f"s3://{s3_bucket}/{s3_key}",
    'missing_values': {
        'amount': sample_data['amount'].isna().sum(),
        'category': sample_data['category'].isna().sum()
    },
    'categories': sample_data['category'].value_counts().to_dict(),
    'timestamp': datetime.now().isoformat()
}

print("Data Ingestion Metadata:")
for key, value in metadata.items():
    print(f"  {key}: {value}")

## Summary

Data ingestion completed successfully!

The following steps were performed:
1. Generated sample transaction data
2. Added realistic missing values for testing
3. Uploaded data to S3 (LocalStack)
4. Generated metadata for downstream processing

The data is now ready for processing by the Spark job.