# Databricks Training: Oil & Gas Data Analysis with WellView Data

**Welcome to Databricks!** This 1-hour beginner training will teach you Databricks notebook fundamentals, data engineering basics, and visualization capabilities using realistic oil & gas well data.

---

## Section 1: Databricks Notebook Basics (10 minutes)

### What is Databricks?

Databricks is a unified analytics platform built on Apache Spark that allows you to process large-scale data, build data pipelines, and create machine learning models. At its core, Databricks notebooks provide an interactive environment for data analysis combining code, visualizations, and documentation.

**Key Concepts:**
- **Notebooks** contain cells that can execute code, SQL, or display markdown
- **Clusters** provide the compute resources to execute your code
- **Unity Catalog** provides centralized data governance with a three-level namespace: `catalog.schema.table`
- **Delta Lake** is the default storage format, providing ACID transactions and time travel

### Magic Commands

Magic commands are special instructions that change how Databricks interprets a cell. They always start with `%` and must be the first thing in a cell.

**Essential Magic Commands:**

In [0]:
%python
print("Python is the default language in Python notebooks")

**Available Magic Commands:**
- `%python` - Execute Python code (default in Python notebooks)
- `%sql` - Execute SQL queries directly
- `%scala` - Execute Scala code
- `%r` - Execute R code
- `%sh` - Run shell commands on the driver node
- `%fs` - Use filesystem commands (e.g., `%fs ls /path/`)
- `%md` - Render Markdown for documentation
- `%run` - Execute another notebook

**Try It Yourself:**

In [0]:
%sql
-- #TODO: Change this query to show all schemas in the catalog
SHOW CATALOGS

In [0]:
%sql
SHOW SCHEMAS IN training

In [0]:
%sh
#TODO: Run a shell command to check Python version
echo "Running on Databricks!"


In [0]:
%sh
python --version

### Markdown Best Practices

Use markdown cells (`%md`) to document your notebooks with headers, lists, code formatting, and more.

**Markdown Syntax Examples:**
- `# Header 1`, `## Header 2`, `### Header 3`
- `**bold text**` and `*italic text*`
- `` `code formatting` `` for inline code
- Bullet lists with `-` or `*`
- Numbered lists with `1.`, `2.`, etc.
- Tables using `|` separators
- Links: `[Link Text](URL)`

**Best Practices for Comments:**
1. Use markdown cells for section descriptions and context
2. Use `#` comments in code cells for line-by-line explanation
3. Use `#TODO` to mark exercises for trainees
4. Use `#NOTE` for important callouts
5. Use docstrings for function definitions

---

### Markdown TODO
Create a markdown cell below with header 4, a code snippet and a bulleted list of details explaining the code snippet


## Section 2: Unity Catalog Setup (10 minutes)

Unity Catalog provides centralized governance and a three-level namespace for organizing data: **catalog.schema.table**

### Understanding the Three-Level Namespace

**Catalogs** are the top-level containers (often organized by environment or business unit)
↓
**Schemas** provide additional organization within catalogs (often by team or project)
↓
**Tables** store your actual data

**Example:** `training_catalog.wellview_data.well_headers`


### Setting Up Your Training Environment

In [0]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable
import random
from datetime import datetime, timedelta

# Get current user for personalized schema naming
current_user = spark.sql("SELECT current_user()").collect()[0][0]
# Clean username for use in schema name (remove special characters)
clean_username = current_user.split('@')[0].replace('.', '_').replace('-', '_')

print(f"Welcome, {current_user}!")
print(f"Your schema will be: {clean_username}_wellview")

In [0]:
%sql
-- View available catalogs
SHOW CATALOGS;

In [0]:
%sql
-- #TODO: Replace 'training' with your assigned catalog name
USE CATALOG training;

-- View schemas in this catalog
SHOW SCHEMAS;

In [0]:
# Create your personal schema for this training
# Each trainee gets their own schema to work in

# #TODO: Update the catalog name if instructed by your trainer
training_catalog = "training"
your_schema = f"{clean_username}_wellview"

# Create schema
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {training_catalog}.{your_schema}")
spark.sql(f"USE SCHEMA {your_schema}")

print(f"✓ Created and using schema: {training_catalog}.{your_schema}")

In [0]:
# Verify your current context
current_catalog = spark.sql("SELECT current_catalog()").collect()[0][0]
current_schema = spark.sql("SELECT current_schema()").collect()[0][0]

print(f"Current Catalog: {current_catalog}")
print(f"Current Schema: {current_schema}")
print(f"Full namespace: {current_catalog}.{current_schema}.table_name")

## Section 3: Generate WellView Training Data (10 minutes)

WellView is commercial well information management software used by over 70% of the oil & gas industry. We'll create realistic fake data mimicking a WellView system with three key tables: well headers, production data, and completion data.

### Understanding Oil & Gas Data

**API Well Number:** 14-digit unique identifier (XX-XXX-XXXXX-XX-XX)
- State code, county code, well number, sidetrack code, event sequence

**Production Volumes:**
- Oil: Measured in barrels (BBL), typically 10-2,000+ bbl/day
- Gas: Measured in thousand cubic feet (MCF), typically 100-10,000+ MCF/day
- Water: Measured in barrels, increases over well lifetime

**Common Formations:** Bakken, Eagle Ford, Marcellus, Permian Basin, Haynesville



### Generate Well Headers Table

In [0]:
# Generate realistic well header data
import random
from datetime import datetime, timedelta

# Set random seed for reproducibility
random.seed(42)

# Define state/county codes for Texas
state_code = 42  # Texas
county_codes = [453, 415, 329, 441, 165]  # Travis, Tom Green, Midland, Taylor, Gaines

# Generate formations commonly found in Texas
formations = ["Wolfcamp", "Spraberry", "Bone Spring", "Wolfberry", "Delaware", 
              "Eagle Ford", "Austin Chalk", "Barnett Shale"]

# Generate operators (fake company names)
operators = ["Permian Resources", "Eagle Oil Co", "Texas Energy Partners", 
             "Lone Star Exploration", "Wolfcamp Operating LLC", "Shale Development Inc"]

# Generate well data
well_data = []
num_wells = 100

for i in range(num_wells):
    county = random.choice(county_codes)
    well_num = random.randint(20001, 89999)  # Current well numbers
    sidetrack = random.choice([0, 0, 0, 1, 2])  # Most wells are original bore
    
    api_number = f"{state_code:02d}-{county:03d}-{well_num:05d}-{sidetrack:02d}-00"
    
    # Generate spud date (well start date) between 2015-2024
    spud_date = datetime(2015, 1, 1) + timedelta(days=random.randint(0, 3650))
    
    # Completion date 30-90 days after spud
    completion_date = spud_date + timedelta(days=random.randint(30, 90))
    
    # Generate well name
    well_name = f"Well #{random.randint(1, 999):03d}"
    lease_name = f"{random.choice(['Smith', 'Johnson', 'Williams', 'Brown', 'Jones'])} Ranch"
    full_well_name = f"{random.choice(operators)} {well_name} {lease_name}"
    
    # Well type - more horizontal wells in recent years
    well_type = random.choice(["Vertical", "Horizontal"])
    
    # Total depth
    if well_type == "Horizontal":
        total_depth = random.randint(8000, 15000)
        lateral_length = random.randint(4000, 10000)
    else:
        total_depth = random.randint(5000, 12000)
        lateral_length = 0
    
    # Status
    status = random.choice(["Producing", "Producing", "Producing", "Shut-In", "Plugged & Abandoned"])
    
    # Location (fake coordinates in Texas)
    lat = random.uniform(31.0, 33.0)
    long = random.uniform(-103.0, -100.0)
    
    well_data.append({
        "api_number": api_number,
        "well_name": full_well_name,
        "operator": random.choice(operators),
        "well_type": well_type,
        "status": status,
        "spud_date": spud_date.strftime("%Y-%m-%d"),
        "completion_date": completion_date.strftime("%Y-%m-%d"),
        "total_depth_ft": total_depth,
        "lateral_length_ft": lateral_length,
        "target_formation": random.choice(formations),
        "latitude": lat,
        "longitude": long,
        "county_code": county,
        "state": "Texas"
    })

# Create DataFrame
well_headers_df = spark.createDataFrame(well_data)

# Display the data
print(f"Generated {num_wells} well records")
display(well_headers_df.limit(10))

In [0]:
# Write to Delta table
table_name = f"{training_catalog}.{your_schema}.well_headers"

well_headers_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable(table_name)

print(f"✓ Created table: {table_name}")

### Generate Production Data Table

In [0]:
# Generate production data for each well
# Create multiple months of production for each well
# NOTE __builtins__ allows the use of the python built-in function vs pyspark equivalent

production_data = []

# Read wells back
wells = spark.table(f"{training_catalog}.{your_schema}.well_headers").collect()

for well in wells:
    api_number = well.api_number
    completion_date = datetime.strptime(well.completion_date, "%Y-%m-%d")
    well_status = well.status
    well_type = well.well_type
    
    # Generate 1-3 years of production data
    if well_status == "Plugged & Abandoned":
        months_producing = random.randint(12, 24)
    elif well_status == "Shut-In":
        months_producing = random.randint(6, 18)
    else:
        months_producing = random.randint(12, 36)
    
    # Initial production rates (higher for horizontal wells)
    if well_type == "Horizontal":
        initial_oil = random.randint(300, 1500)
        initial_gas = random.randint(1000, 8000)
        initial_water = random.randint(10, 100)
    else:
        initial_oil = random.randint(50, 300)
        initial_gas = random.randint(200, 2000)
        initial_water = random.randint(5, 50)
    
    # Generate monthly production with decline
    for month in range(months_producing):
        production_date = completion_date + timedelta(days=30 * month)
        
        # Decline curve (exponential decline)
        decline_factor = 0.85 ** (month / 12)  # 15% annual decline
        
        # Add some random variation
        variation = random.uniform(0.85, 1.15)
        
        oil_volume = __builtins__.max(5, int(initial_oil * decline_factor * variation))
        gas_volume = __builtins__.max(50, int(initial_gas * decline_factor * variation))
        
        # Water production increases over time (water cut increases)
        water_increase = 1 + (month / 12) * 0.5
        water_volume = int(initial_water * water_increase * variation)
        
        # Days on production (assume full month unless shut-in)
        days_on = 30 if random.random() > 0.05 else random.randint(15, 29)
        
        # Calculate GOR (Gas-Oil Ratio)
        gor = __builtins__.round(gas_volume / oil_volume * 1000, 1) if oil_volume > 0 else 0
        
        # Calculate water cut
        total_liquid = oil_volume + water_volume
        water_cut = __builtins__.round((water_volume / total_liquid * 100), 1) if total_liquid > 0 else 0
        
        production_data.append({
            "api_number": api_number,
            "production_date": production_date.strftime("%Y-%m-%d"),
            "oil_volume_bbl": oil_volume,
            "gas_volume_mcf": gas_volume,
            "water_volume_bbl": water_volume,
            "days_on_production": days_on,
            "gas_oil_ratio": gor,
            "water_cut_pct": water_cut
        })

# Create DataFrame
production_df = spark.createDataFrame(production_data)

print(f"Generated {len(production_data):,} production records")
display(production_df.limit(20))

In [0]:
# Write production data to Delta table
production_table = f"{training_catalog}.{your_schema}.production_data"

production_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable(production_table)

print(f"✓ Created table: {production_table}")

### Generate Completion Data Table

In [0]:
# Generate completion data for each well
completion_data = []

completion_methods = [
    "Hydraulic Fracture - Multi-Stage",
    "Hydraulic Fracture - Single Stage", 
    "Acid Stimulation",
    "Natural Completion",
    "Gravel Pack"
]

proppant_types = ["White Sand", "Brown Sand", "Ceramic", "Resin Coated Sand"]

for well in wells:
    api_number = well.api_number
    completion_date = well.completion_date
    well_type = well.well_type
    formation = well.target_formation
    total_depth = well.total_depth_ft
    lateral_length = well.lateral_length_ft
    
    # Horizontal wells typically use multi-stage hydraulic fracturing
    if well_type == "Horizontal":
        method = "Hydraulic Fracture - Multi-Stage"
        num_stages = random.randint(15, 40)
        fluid_volume = random.randint(150000, 500000)  # gallons per stage
        proppant_volume = random.randint(200000, 800000)  # lbs total
    else:
        method = random.choice(completion_methods)
        num_stages = 1 if "Single" in method else random.randint(1, 5)
        fluid_volume = random.randint(10000, 100000)
        proppant_volume = random.randint(20000, 200000)
    
    # Perforated interval
    perf_top = total_depth - random.randint(100, 500)
    perf_bottom = total_depth
    
    # Maximum treatment pressure
    max_pressure = random.randint(5000, 10000)
    
    completion_data.append({
        "api_number": api_number,
        "completion_date": completion_date,
        "completion_method": method,
        "formation": formation,
        "num_stages": num_stages,
        "total_fluid_volume_gal": fluid_volume * num_stages,
        "total_proppant_lbs": proppant_volume,
        "proppant_type": random.choice(proppant_types),
        "perf_top_depth_ft": perf_top,
        "perf_bottom_depth_ft": perf_bottom,
        "max_treatment_pressure_psi": max_pressure,
        "lateral_length_ft": lateral_length
    })

# Create DataFrame  
completion_df = spark.createDataFrame(completion_data)

print(f"Generated {len(completion_data)} completion records")
display(completion_df.limit(10))

In [0]:
# Write completion data to Delta table
completion_table = f"{training_catalog}.{your_schema}.completion_data"

completion_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable(completion_table)

print(f"✓ Created table: {completion_table}")

In [0]:
%sql
-- Verify all tables were created
SHOW TABLES;

## Section 4: Data Cleansing and Quality (15 minutes)

Real-world data often has quality issues. Let's explore common data cleansing operations including de-duplication and standardization.



### Introduce Data Quality Issues

Let's intentionally create some data quality problems to practice fixing them.

In [0]:
# Create duplicate records in production data
production_with_issues = spark.table(production_table)

# Get a sample of records to duplicate
sample_records = production_with_issues.sample(0.05, seed=42)  # 5% duplicates

# Add duplicates
production_with_duplicates = production_with_issues.union(sample_records)

# Introduce some data inconsistencies
from pyspark.sql.functions import when, rand, col

production_with_issues = production_with_duplicates.withColumn(
    "api_number",
    # Randomly add leading/trailing spaces to 10% of records
    when(rand() < 0.1, concat(lit(" "), col("api_number"), lit(" ")))
    .otherwise(col("api_number"))
).withColumn(
    "oil_volume_bbl",
    # Introduce some negative values (data errors)
    when(rand() < 0.02, lit(-999))
    .otherwise(col("oil_volume_bbl"))
)

# Write to a new table
dirty_table = f"{training_catalog}.{your_schema}.production_data_dirty"
production_with_issues.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable(dirty_table)

print(f"Created table with data quality issues: {dirty_table}")
print(f"Original records: {production_with_issues.count():,}")

### Identify Data Quality Issues

In [0]:
# Count duplicates
from pyspark.sql.functions import count, col

dirty_df = spark.table(dirty_table)

# Check for exact duplicates
duplicate_count = dirty_df.count() - dirty_df.distinct().count()
print(f"Exact duplicate records: {duplicate_count:,}")

# Check for duplicates based on key columns
key_cols = ["api_number", "production_date"]
duplicate_keys = dirty_df.groupBy(key_cols).count().filter(col("count") > 1)
print(f"Duplicate keys (api_number + production_date): {duplicate_keys.count():,}")

display(duplicate_keys.orderBy(col("count").desc()).limit(10))

In [0]:
# Identify data standardization issues
api_with_spaces = dirty_df.filter(
    col("api_number").startswith(" ") | col("api_number").endswith(" ")
).count()

print(f"API numbers with leading/trailing spaces: {api_with_spaces:,}")

# Check for negative values (invalid data)
negative_oil = dirty_df.filter(col("oil_volume_bbl") < 0).count()
print(f"Records with negative oil volume: {negative_oil:,}")

# Show examples
display(dirty_df.filter(col("oil_volume_bbl") < 0).limit(5))

### Exercise: De-duplication

In [0]:
# #TODO: Remove duplicate records keeping only the first occurrence
# HINT: Use databricks assistant if you aren't sure what function to run

# Your code here:
cleaned_df = dirty_df  # Replace this line with your de-duplication logic

print(f"Original records: {dirty_df.count():,}")
print(f"After de-duplication: {cleaned_df.count():,}")
print(f"Removed: {dirty_df.count() - cleaned_df.count():,} duplicates")

In [0]:
# Solution to de-duplication exercise
cleaned_df = dirty_df.dropDuplicates(["api_number", "production_date"])

print(f"✓ De-duplication complete")
print(f"Records after de-duplication: {cleaned_df.count():,}")

### Exercise: Data Standardization

In [0]:
# #TODO: Standardize API numbers by trimming whitespace and converting to uppercase
# #TODO: Replace negative oil volumes with NULL values
# HINT: Use databricks assistant if you aren't sure what function to run

from pyspark.sql.functions import trim, when, col, upper

# Your code here:
standardized_df = cleaned_df  # Replace with your standardization logic


# Verify the fixes
api_with_spaces = standardized_df.filter(
    col("api_number").startswith(" ") | col("api_number").endswith(" ")
).count()

negative_oil = standardized_df.filter(col("oil_volume_bbl") < 0).count()

print(f"API numbers with spaces after cleaning: {api_with_spaces}")
print(f"Negative oil volumes after cleaning: {negative_oil}")

In [0]:
# Solution (uncomment to see):
standardized_df = cleaned_df \
    .withColumn("api_number", trim(col("api_number"))) \
    .withColumn("oil_volume_bbl", 
                when(col("oil_volume_bbl") < 0, lit(None))
                .otherwise(col("oil_volume_bbl")))
    
display(standardized_df.limit(20))


### Write Clean Data

In [0]:
# Write cleaned data to new table
clean_table = f"{training_catalog}.{your_schema}.production_data_clean"

standardized_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable(clean_table)

print(f"✓ Created clean table: {clean_table}")

## Section 5: Build a Data Product (15 minutes)

Let's create a useful analytics table that combines data from multiple sources and performs aggregations.



### Create Well Production Summary

In [0]:
# Goal: Create a summary table showing total production by well with well details

# Read the tables
wells = spark.table(f"{training_catalog}.{your_schema}.well_headers")
production = spark.table(clean_table)
completions = spark.table(completion_table)

# Aggregate production by well
production_summary = production.groupBy("api_number").agg(
    sum("oil_volume_bbl").alias("total_oil_bbl"),
    sum("gas_volume_mcf").alias("total_gas_mcf"),
    sum("water_volume_bbl").alias("total_water_bbl"),
    avg("gas_oil_ratio").alias("avg_gor"),
    avg("water_cut_pct").alias("avg_water_cut"),
    count("*").alias("months_produced"),
    min("production_date").alias("first_production"),
    max("production_date").alias("last_production")
)

display(production_summary.limit(10))

In [0]:
from pyspark.sql.functions import col, round

# Alias tables after join to avoid ambiguous columns
well_production_summary = production_summary \
    .join(wells.alias("w"), "api_number", "inner") \
    .join(completions.alias("c"), "api_number", "inner") \
    .select(
        col("api_number"),
        col("w.well_name"),
        col("w.operator"),
        col("w.well_type"),
        col("w.status"),
        col("w.target_formation"),
        col("c.completion_method"),
        col("c.num_stages"),
        col("w.total_depth_ft"),
        col("c.lateral_length_ft"),
        col("total_oil_bbl"),
        col("total_gas_mcf"),
        col("total_water_bbl"),
        col("avg_gor"),
        col("avg_water_cut"),
        col("months_produced"),
        col("first_production"),
        col("last_production")
    )

well_production_summary = well_production_summary \
    .withColumn("total_boe", 
        round(col("total_oil_bbl") + col("total_gas_mcf") / 6, 0)) \
    .withColumn("avg_monthly_oil", 
        round(col("total_oil_bbl") / col("months_produced"), 1)) \
    .withColumn("avg_monthly_gas", 
        round(col("total_gas_mcf") / col("months_produced"), 1))

display(
    well_production_summary
    .orderBy(col("total_boe").desc())
    .limit(10)
)

In [0]:
# #TODO: Calculate cumulative production per operator and avg oil/gas per well
# HINT: Group by operator and sum/avg the production volumes

# Your code here:
operator_summary = None  # Replace with your aggregation logic

In [0]:
# Solution to operator summary exercise
operator_summary = well_production_summary.groupBy("operator").agg(
    count("api_number").alias("well_count"),
    sum("total_oil_bbl").alias("operator_total_oil"),
    sum("total_gas_mcf").alias("operator_total_gas"),
    round(avg("total_oil_bbl"), 0).alias("avg_oil_per_well"),
    round(avg("total_gas_mcf"), 0).alias("avg_gas_per_well"),
    round(sum("total_boe"), 0).alias("operator_total_boe")
).orderBy(col("operator_total_boe").desc())

print("✓ Operator summary created")
display(operator_summary)

In [0]:
# Save the data product as a managed table
summary_table = f"{training_catalog}.{your_schema}.well_production_summary"

well_production_summary.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable(summary_table)

print(f"✓ Created data product: {summary_table}")

## Section 6: Visualizations and Charts (10 minutes)

Databricks provides built-in visualization capabilities using the `display()` function. Let's create several charts to analyze our well data.


### Production by Well Type

In [0]:
%sql
-- Production comparison by well type
SELECT 
    well_type,
    COUNT(*) as well_count,
    ROUND(SUM(total_oil_bbl), 0) as total_oil,
    ROUND(SUM(total_gas_mcf), 0) as total_gas,
    ROUND(AVG(total_oil_bbl), 0) as avg_oil_per_well,
    ROUND(AVG(total_gas_mcf), 0) as avg_gas_per_well
FROM well_production_summary
GROUP BY well_type
ORDER BY total_oil DESC

**Instructions to create visualization:**
1. Run the cell above
2. Click the **+** button above the results table
3. Select **Visualization**
4. Choose **Bar chart**
5. Set X-axis: `well_type`
6. Set Y-axis: `avg_oil_per_well` and `avg_gas_per_well`
7. Click **Save**

### Production Over Time

In [0]:
# Monthly production trends
monthly_production = spark.sql(f"""
    SELECT 
        DATE_TRUNC('MONTH', production_date) as month,
        SUM(oil_volume_bbl) as monthly_oil,
        SUM(gas_volume_mcf) as monthly_gas,
        SUM(water_volume_bbl) as monthly_water
    FROM {clean_table}
    GROUP BY DATE_TRUNC('MONTH', production_date)
    ORDER BY month
""")

display(monthly_production)

**Instructions for line chart:**
1. Click **+** above results → **Visualization**
2. Choose **Line chart**
3. X-axis: `month`
4. Y-axis: `monthly_oil`, `monthly_gas` (you can select multiple)
5. This shows production trends over time

### Top Producing Wells

In [0]:
%sql
-- #TODO: Write a query to find the top 10 wells by total BOE production
-- HINT: Use ORDER BY and LIMIT

SELECT 
    well_name,
    operator,
    target_formation,
    total_boe,
    total_oil_bbl,
    total_gas_mcf
FROM well_production_summary
-- Add your ORDER BY and LIMIT here

In [0]:
%sql
-- Solution: Top 10 producing wells
SELECT 
    well_name,
    operator,
    target_formation,
    ROUND(total_boe, 0) as total_boe,
    ROUND(total_oil_bbl, 0) as total_oil,
    ROUND(total_gas_mcf, 0) as total_gas
FROM well_production_summary
ORDER BY total_boe DESC
LIMIT 10

**Create a bar chart showing top producers:**
1. Visualization type: **Bar chart**
2. X-axis: `well_name`
3. Y-axis: `total_boe`
4. Color by: `operator`

### Production by Formation

In [0]:
# Analyze production by target formation
formation_analysis = spark.sql(f"""
    SELECT 
        target_formation,
        COUNT(*) as well_count,
        ROUND(SUM(total_boe), 0) as total_boe,
        ROUND(AVG(total_boe), 0) as avg_boe_per_well,
        ROUND(AVG(avg_gor), 1) as avg_gas_oil_ratio
    FROM {summary_table}
    GROUP BY target_formation
    ORDER BY total_boe DESC
""")

display(formation_analysis)

**Create a pie chart:**
1. Visualization type: **Pie chart**
2. Keys: `target_formation`
3. Values: `total_boe`
4. Shows percentage of total production by formation

### Well Type Performance Analysis

In [0]:
# Compare well performance metrics
performance_by_type = well_production_summary.groupBy("well_type", "completion_method").agg(
    count("api_number").alias("well_count"),
    round(avg("total_oil_bbl"), 0).alias("avg_oil"),
    round(avg("total_gas_mcf"), 0).alias("avg_gas"),
    round(avg("avg_water_cut"), 1).alias("avg_water_cut_pct")
).orderBy("well_type", col("well_count").desc())

display(performance_by_type)

### Scatter Plot: Lateral Length vs Production

In [0]:
# Analyze relationship between lateral length and production for horizontal wells
horizontal_wells = well_production_summary.filter(col("well_type") == "Horizontal")

scatter_data = horizontal_wells.select(
    "lateral_length_ft",
    "total_oil_bbl",
    "total_gas_mcf",
    "num_stages",
    "target_formation"
).filter(col("lateral_length_ft") > 0)

display(scatter_data)

**Create a scatter plot:**
1. Visualization type: **Scatter plot**
2. X-axis: `lateral_length_ft`
3. Y-axis: `total_oil_bbl`
4. Color by: `target_formation`
5. Shows correlation between lateral length and production

---

## Section 7: Best Practices and Key Takeaways (5 minutes)

### Databricks Best Practices Summary

**1. Data Organization:**
- Always use Unity Catalog three-level namespace: `catalog.schema.table`
- Use managed tables for new data (better performance and governance)
- Create schemas for logical groupings (by team, project, or data domain)

**2. Code Best Practices:**
- Import PySpark functions explicitly: `from pyspark.sql.functions import col, sum, avg`
- Use `display()` for interactive data exploration
- Chain DataFrame transformations before calling actions
- Avoid `collect()` on large datasets
- Use Delta Lake format for ACID transactions and time travel

**3. Performance Optimization:**
- Run `OPTIMIZE` on tables regularly to compact small files
- Use Z-ORDER for frequently queried columns: `OPTIMIZE table ZORDER BY (date, category)`
- Use `cache()` for DataFrames accessed multiple times
- Filter data early in your transformations

**4. Data Quality:**
- Always check for duplicates and handle them appropriately
- Standardize data formats (trim whitespace, consistent casing)
- Handle NULL values explicitly
- Validate data ranges and constraints

**5. Documentation:**
- Use markdown cells to document your analysis
- Add comments to complex code logic
- Use meaningful variable and table names
- Mark exercises and TODOs clearly

### Key Concepts Review

**Magic Commands:**
- `%sql` - Execute SQL queries
- `%md` - Render markdown
- `%fs` - Filesystem commands
- `%sh` - Shell commands

**DataFrame Operations:**
- **Transformations** (lazy): `select()`, `filter()`, `groupBy()`, `join()`, `withColumn()`
- **Actions** (trigger execution): `display()`, `count()`, `collect()`, `write()`

**Unity Catalog:**
- Three levels: Catalog → Schema → Table
- Use fully qualified names: `catalog.schema.table`
- Provides centralized governance and access control

**Delta Lake:**
- ACID transactions on data lakes
- Time travel and versioning
- Automatic schema evolution
- Performance optimizations (OPTIMIZE, Z-ORDER, VACUUM)

### Additional Exercises to Try

In [0]:
# #TODO: Calculate the total production for each completion method
# Compare multi-stage hydraulic fracturing vs other methods

# #TODO: Identify wells with declining production (compare first 6 months vs last 6 months)

# #TODO: Calculate water cut trends over time - does it increase as expected?

# #TODO: Find the most productive formation by well type (horizontal vs vertical)

# #TODO: Create a pivot table showing production by operator and formation

### Next Steps

**Continue Learning:**
1. Explore Delta Live Tables (DLT) for production pipelines
2. Learn about Databricks SQL for ad-hoc analysis
3. Study performance tuning and optimization
4. Practice with real datasets from your organization
5. Learn about ML integration with MLflow

**Documentation Resources:**
- Databricks Documentation: [docs.databricks.com](https://docs.databricks.com)
- PySpark API Reference: [spark.apache.org/docs/latest/api/python](https://spark.apache.org/docs/latest/api/python)
- Delta Lake Documentation: [docs.delta.io](https://docs.delta.io)

---

## Cleanup (Optional)

When you're finished with this training, you can optionally clean up your schema:

In [0]:
# WARNING: This will delete all tables in your schema
# Uncomment only if you want to clean up

spark.sql(f"DROP SCHEMA IF EXISTS training.{your_schema} CASCADE");

---

## Summary

**Congratulations!** You've completed the Databricks beginner training. You now know how to:

✓ Use Databricks notebooks with magic commands and markdown
✓ Work with Unity Catalog's three-level namespace
✓ Create schemas and Delta tables
✓ Generate and manipulate data with PySpark
✓ Perform data cleansing (de-duplication and standardization)
✓ Build data products with joins and aggregations
✓ Create visualizations and charts using `display()`
✓ Follow Databricks best practices

Keep practicing with your own datasets and explore more advanced features as you grow your skills!

---

**Training Complete** | Version 1.0 | October 2025