# py-iku: Advanced Features

This notebook covers advanced py-iku features:

1. **Plugin Architecture**: Extend with custom recipe/processor mappings
2. **Command Line Interface**: Use py-iku from the terminal
3. **DSS Export**: Export directly to Dataiku DSS project format

## 1. Plugin Architecture

py-iku's plugin system allows you to:
- Register custom pandas method handlers
- Map custom functions to Dataiku recipes
- Create reusable extension packages

In [None]:
from py2dataiku.plugins.registry import (
    PluginRegistry,
    register_recipe_handler,
    register_processor_handler,
    Plugin
)
from py2dataiku.models.dataiku_recipe import RecipeType
from py2dataiku.models.prepare_step import ProcessorType
from py2dataiku.models.transformation import Transformation, TransformationType

### 1.1 Registering Custom Recipe Mappings

Map custom pandas methods to Dataiku recipe types.

In [None]:
# Register a custom method to map to a recipe type
PluginRegistry.register_recipe_mapping(
    pandas_method='my_custom_merge',
    recipe_type=RecipeType.JOIN
)

# Now when py-iku sees df.my_custom_merge(), it will create a JOIN recipe
print(f"Registered recipe mappings: {len(PluginRegistry._recipe_mappings)}")

### 1.2 Using Decorators

Use decorators for cleaner registration.

In [None]:
# Register a custom method handler using decorator
@register_recipe_handler('special_aggregate')
def handle_special_aggregate(context):
    """Custom handler for a special aggregation method."""
    return Transformation(
        transformation_type=TransformationType.AGGREGATION,
        source_dataframe=context.dataframes.get('df'),
        target_dataframe='result',
        parameters={'custom': True, 'method': 'special_aggregate'},
        source_line=context.current_line,
        notes=['Custom aggregation via plugin']
    )

print("Custom handler registered!")

### 1.3 Creating a Plugin Class

For larger extensions, create a Plugin class.

In [None]:
class MyAnalyticsPlugin(Plugin):
    """Custom plugin for analytics operations."""
    
    name = "my_analytics"
    version = "1.0.0"
    
    def register(self):
        """Register all handlers when plugin loads."""
        # Register recipe mappings
        PluginRegistry.register_recipe_mapping(
            'time_series_analysis',
            RecipeType.PYTHON
        )
        PluginRegistry.register_recipe_mapping(
            'anomaly_detection',
            RecipeType.PYTHON
        )
        
        # Register processor mappings
        PluginRegistry.register_processor_mapping(
            'normalize_ts',
            ProcessorType.NORMALIZER
        )
        
        print(f"Plugin '{self.name}' v{self.version} registered!")

# Load the plugin
plugin = MyAnalyticsPlugin()
plugin.register()

### 1.4 Checking Registered Plugins

In [None]:
# View all registered mappings
print("Recipe Mappings:")
for method, recipe_type in PluginRegistry._recipe_mappings.items():
    print(f"  {method} -> {recipe_type.value}")

print("\nProcessor Mappings:")
for method, processor_type in PluginRegistry._processor_mappings.items():
    print(f"  {method} -> {processor_type.value}")

print("\nMethod Handlers:")
for method in PluginRegistry._method_handlers.keys():
    print(f"  {method}")

## 2. Command Line Interface (CLI)

py-iku provides a CLI for terminal usage.

### 2.1 CLI Commands

```bash
# Convert Python code to Dataiku flow
py2dataiku convert script.py -o flow.json
py2dataiku convert script.py -f yaml -o flow.yaml

# Visualize a flow
py2dataiku viz script.py -f ascii
py2dataiku viz script.py -f svg -o diagram.svg
py2dataiku viz script.py -f html -o interactive.html

# Analyze code and show transformations
py2dataiku analyze script.py
py2dataiku analyze script.py -f json

# Export to Dataiku DSS project
py2dataiku export script.py -o ./my_project
py2dataiku export script.py -o project.zip --zip
```

In [None]:
# Create a sample script to use with CLI
sample_script = '''
import pandas as pd

df = pd.read_csv('data.csv')
df = df.dropna()
summary = df.groupby('category')['amount'].sum().reset_index()
summary.to_csv('summary.csv', index=False)
'''

with open('sample_script.py', 'w') as f:
    f.write(sample_script)
print("Created sample_script.py")

In [None]:
# Run CLI commands from notebook
!python -m py2dataiku.cli convert sample_script.py 2>/dev/null | head -20

In [None]:
# Visualize with CLI
!python -m py2dataiku.cli viz sample_script.py -f ascii 2>/dev/null

In [None]:
# Analyze with CLI
!python -m py2dataiku.cli analyze sample_script.py 2>/dev/null

### 2.2 Using CLI Programmatically

In [None]:
from py2dataiku.cli import create_parser, cmd_convert, cmd_analyze
import sys
from io import StringIO

# Create a parser
parser = create_parser()

# Parse arguments programmatically
args = parser.parse_args(['convert', 'sample_script.py', '-f', 'yaml'])
print(f"Command: {args.command}")
print(f"Input: {args.input}")
print(f"Format: {args.format}")

## 3. DSS Project Export

Export flows directly to Dataiku DSS project format for import.

In [None]:
from py2dataiku.exporters.dss_exporter import DSSExporter
from py2dataiku import convert

In [None]:
# Create a flow
etl_code = '''
import pandas as pd

# Load raw data
customers = pd.read_csv('customers.csv')
orders = pd.read_csv('orders.csv')

# Clean customer data
customers['name'] = customers['name'].str.strip().str.title()
customers['email'] = customers['email'].str.lower()
customers = customers.dropna(subset=['customer_id'])

# Join with orders
customer_orders = pd.merge(customers, orders, on='customer_id', how='left')

# Calculate metrics
customer_summary = customer_orders.groupby('customer_id').agg({
    'order_id': 'count',
    'amount': 'sum',
    'name': 'first'
}).reset_index()
customer_summary.columns = ['customer_id', 'order_count', 'total_amount', 'name']

# Save output
customer_summary.to_csv('customer_metrics.csv', index=False)
'''

flow = convert(etl_code)
print(flow.get_summary())

### 3.1 Export to Directory

In [None]:
# Create exporter
exporter = DSSExporter(flow, project_key='CUSTOMER_ANALYTICS')

# Export to directory
output_path = exporter.export('./dss_project')
print(f"Exported to: {output_path}")

In [None]:
# View the exported structure
!find ./dss_project -type f | head -20

In [None]:
# View project.json
!cat ./dss_project/project.json 2>/dev/null | head -30

### 3.2 Export as ZIP

In [None]:
# Export as ZIP file for easy import to Dataiku
zip_path = exporter.export('./dss_export', create_zip=True)
print(f"Created ZIP: {zip_path}")

# Check file size
import os
size = os.path.getsize(zip_path)
print(f"ZIP size: {size} bytes")

### 3.3 Get API Bundle

Get the export as a Python dictionary for API-based import.

In [None]:
# Get API bundle (dict format)
bundle = exporter.get_api_bundle()

print(f"Bundle keys: {list(bundle.keys())}")
print(f"\nProject info:")
print(f"  Key: {bundle['project']['projectKey']}")
print(f"  Name: {bundle['project']['name']}")

print(f"\nDatasets: {len(bundle['datasets'])}")
for ds in bundle['datasets'][:3]:
    print(f"  - {ds['name']} ({ds['type']})")

print(f"\nRecipes: {len(bundle['recipes'])}")
for r in bundle['recipes'][:3]:
    print(f"  - {r['name']} ({r['type']})")

### 3.4 Examining Exported Recipes

In [None]:
import json

# View a recipe definition
if bundle['recipes']:
    recipe = bundle['recipes'][0]
    print(f"Recipe: {recipe['name']}")
    print(json.dumps(recipe, indent=2)[:1000])

## 4. Complete Example: End-to-End Workflow

In [None]:
# Complete workflow: Code -> Analysis -> Visualization -> Export

ml_pipeline = '''
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer

# Load data
df = pd.read_csv('ml_dataset.csv')

# Feature engineering with NumPy
df['log_amount'] = np.log1p(df['amount'])
df['amount_clipped'] = np.clip(df['amount'], 0, 10000)
df['is_high'] = np.where(df['amount'] > 1000, 1, 0)

# Handle missing values
imputer = SimpleImputer(strategy='median')
df[['feature1', 'feature2']] = imputer.fit_transform(df[['feature1', 'feature2']])

# Scale features
scaler = StandardScaler()
df[['feature1', 'feature2']] = scaler.fit_transform(df[['feature1', 'feature2']])

# Split data
X = df.drop('target', axis=1)
y = df['target']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Save processed data
X_train.to_csv('train_features.csv', index=False)
X_test.to_csv('test_features.csv', index=False)
'''

# 1. Convert
flow = convert(ml_pipeline)
print("=" * 60)
print("FLOW SUMMARY")
print("=" * 60)
print(flow.get_summary())

In [None]:
# 2. Visualize
print("\n" + "=" * 60)
print("FLOW DIAGRAM")
print("=" * 60)
print(flow.visualize(format='ascii'))

In [None]:
# 3. Export
print("\n" + "=" * 60)
print("DSS EXPORT")
print("=" * 60)

exporter = DSSExporter(flow, project_key='ML_PIPELINE')
zip_path = exporter.export('./ml_pipeline_export', create_zip=True)
print(f"\nExported to: {zip_path}")
print("\nThis ZIP can be imported directly into Dataiku DSS!")

## 5. Tips and Best Practices

### Code Organization
- Keep data transformations in separate, focused scripts
- Use clear variable names that indicate data state
- Add comments that describe the business logic

### Plugin Development
- Create plugins for domain-specific transformations
- Register handlers early in application startup
- Use type hints in handler functions

### DSS Integration
- Test exports in a development DSS instance first
- Use meaningful project keys
- Review recipe configurations before import

In [None]:
# Cleanup
import shutil
import os

for path in ['sample_script.py', 'dss_project', 'dss_export', 'ml_pipeline_export', 
             'flow_diagram.svg', 'flow_interactive.html']:
    if os.path.isfile(path):
        os.remove(path)
    elif os.path.isdir(path):
        shutil.rmtree(path)

print("Cleanup complete!")

## Summary

This concludes the py-iku tutorial series. You've learned:

1. **Basic Usage**: Converting pandas code to Dataiku flows
2. **NumPy Support**: Using NumPy functions in transformations
3. **Scikit-learn**: Converting ML pipelines
4. **Visualizations**: Multiple output formats
5. **Advanced Features**: Plugins, CLI, and DSS export

For more information, see the documentation and examples in the repository.