# Los Angeles Crime Analytics: Data Profiling

## Step 1: Environment Setup & Configuration

In [0]:
# Part 1: Environment Setup & Configuration

# Import required libraries
from pyspark.sql import functions as F
from pyspark.sql.types import *
import pandas as pd
from datetime import datetime
import requests

## Step 2: Download Data from LA Open Data API

### Download data and save to volume

In [0]:
# Configuration
catalog = "workspace"
schema = "la_crime_schema"
volume = "datastore"
download_url = "https://data.lacity.org/resource/2nrs-mtv8.csv?$limit=10000000"
file_name = "crime_data_raw.csv"

# Paths
path_volume = f"/Volumes/{catalog}/{schema}/{volume}"
file_path = f"{path_volume}/{file_name}"

# Download data using pandas
try:
    df_pandas = pd.read_csv(download_url)    
except Exception as e:
    print(f"Error downloading data: {str(e)}")
    raise

# Save to Unity Catalog Volume
try:
    df_pandas.to_csv(file_path, index=False)    
except Exception as e:
    print(f"Error saving data: {str(e)}")
    raise

### Load into Spark DataFrame for profiling

In [0]:
# Load into Spark DataFrame for profiling
try:
    df_raw = spark.read.csv(
        file_path,
        header=True,
        inferSchema=True,
        mode="PERMISSIVE"
    )
    # Get basic stats
    row_count = df_raw.count()
    col_count = len(df_raw.columns)
    print(f"Data loaded successfully!")
    print(f"Total Records: {row_count:,}")
    print(f"Total Columns: {col_count}")
    
except Exception as e:
    print(f"Error loading data: {str(e)}")
    raise

# Preview of the data
display(df_raw.limit(10))

## Step 3: Understanding the Source Schema & Data Structure

In [0]:
# Create a detailed schema summary table

schema_info = []
for field in df_raw.schema.fields:
    schema_info.append({
        'Column_Name': field.name,
        'Data_Type': str(field.dataType),
        'Nullable': field.nullable,
        'Category': 'TBD'  # We'll categorize later
    })

schema_df = spark.createDataFrame(schema_info)

# Get basic statistics
print("DATASET STATISTICS")
print(f"Total Records: {df_raw.count():,}")
print(f"Total Columns: {len(df_raw.columns)}")
print(f"Storage Location: {file_path}")

# Column names list
print("Columns")
for i, col in enumerate(df_raw.columns, 1):
    print(f"{i:2d}. {col}")

## Step 4: Null Value Analysis

In [0]:
# Calculate null counts and percentages for all columns
null_analysis = []
total_records = df_raw.count()

for col_name in df_raw.columns:
    null_count = df_raw.filter(F.col(col_name).isNull()).count()
    null_percentage = (null_count / total_records) * 100
    non_null_count = total_records - null_count
    
    null_analysis.append({
        'Column': col_name,
        'Null_Count': null_count,
        'Null_Percentage': round(null_percentage, 2),
        'Non_Null_Count': non_null_count,
        'Completeness': f"{100 - null_percentage:.2f}%"
    })

# Create DataFrame and sort by null percentage (highest first)
null_df = spark.createDataFrame(null_analysis).orderBy(F.desc('Null_Percentage'))

# Display full null analysis results
print("NULL Analysis Results")
display(null_df)

# Summary statistics
print("NULL Analysis Summary")

total_cells = total_records * len(df_raw.columns)
total_null_cells = sum([row['Null_Count'] for row in null_analysis])
completeness_percentage = ((total_cells - total_null_cells) / total_cells) * 100

print(f"Total Records: {total_records:,}")
print(f"Total Columns: {len(df_raw.columns)}")
print(f"Total Cells: {total_cells:,}")
print(f"Null Cells: {total_null_cells:,}")
print(f"Non-Null Cells: {total_cells - total_null_cells:,}")
print(f"\nOverall Data Completeness: {completeness_percentage:.2f}%")

## Step 5: Cardinality Analysis (Distinct Value Counts)

In [0]:
# Cardinality Analysis (Distinct Value Counts)

# Calculate distinct counts for all columns
cardinality_analysis = []
total_records = df_raw.count()

for col_name in df_raw.columns:
    distinct_count = df_raw.select(col_name).distinct().count()
    cardinality_ratio = (distinct_count / total_records) * 100
    # Categorize cardinality
    if cardinality_ratio > 90:
        category = "Very High (Identifier)"
    elif cardinality_ratio > 50:
        category = "High"
    elif cardinality_ratio > 10:
        category = "Medium-High"
    elif cardinality_ratio > 1:
        category = "Medium"
    else:
        category = "Low (Good Dimension)"
    cardinality_analysis.append({
        'Column': col_name,
        'Distinct_Count': distinct_count,
        'Total_Records': total_records,
        'Cardinality_Ratio_%': round(cardinality_ratio, 2),
        'Category': category
    })

# Create DataFrame and sort by distinct count
cardinality_df = spark.createDataFrame(cardinality_analysis).orderBy(F.desc('Distinct_Count'))

# Display full cardinality results
print("Cardinality Analysis")
display(cardinality_df)

## Step 6: Temporal Data Validation

In [0]:
# Step 6: Temporal Data Validation

# Analyze date_occ (occurrence date)
date_occ_stats = df_raw.select(
    F.min('date_occ').alias('Min_Date_Occ'),
    F.max('date_occ').alias('Max_Date_Occ'),
    F.count('date_occ').alias('Total_Records')
).collect()[0]

print("Date Occurred (date_occ):")
print(f"  Earliest: {date_occ_stats['Min_Date_Occ']}")
print(f"  Latest: {date_occ_stats['Max_Date_Occ']}")
print(f"  Non-Null Records: {date_occ_stats['Total_Records']:,}")

# Analyze date_rptd (reported date)
date_rptd_stats = df_raw.select(
    F.min('date_rptd').alias('Min_Date_Rptd'),
    F.max('date_rptd').alias('Max_Date_Rptd'),
    F.count('date_rptd').alias('Total_Records')
).collect()[0]

print("\nDate Reported (date_rptd):")
print(f"  Earliest: {date_rptd_stats['Min_Date_Rptd']}")
print(f"  Latest: {date_rptd_stats['Max_Date_Rptd']}")
print(f"  Non-Null Records: {date_rptd_stats['Total_Records']:,}")

# Check for date logic issues: date_rptd should be >= date_occ
print("Date Validation: Report date should be >= Occurrence date")

# Count records where report date is BEFORE occurrence date (illogical)
illogical_dates = df_raw.filter(F.col('date_rptd') < F.col('date_occ')).count()

print(f"Records where date_rptd < date_occ: {illogical_dates:,}")

if illogical_dates > 0:
    illogical_pct = (illogical_dates / total_records) * 100
    print(f"Percentage: {illogical_pct:.2f}%")
    print("\nSample illogical records:")
    display(df_raw.filter(F.col('date_rptd') < F.col('date_occ'))
            .select('dr_no', 'date_rptd', 'date_occ', 'crm_cd_desc')
            .limit(10))
else:
    print("All dates follow logical order!")

print("Reporting LAG Anaysis")

df_with_lag = df_raw.withColumn(
    'days_to_report',
    F.datediff(F.col('date_rptd'), F.col('date_occ'))
)

lag_stats = df_with_lag.select(
    F.min('days_to_report').alias('Min_Days'),
    F.max('days_to_report').alias('Max_Days'),
    F.avg('days_to_report').alias('Avg_Days'),
    F.expr('percentile(days_to_report, 0.5)').alias('Median_Days'),
    F.expr('percentile(days_to_report, 0.90)').alias('P90_Days')
).collect()[0]

print("Days between occurrence and report:")
print(f"  Minimum: {lag_stats['Min_Days']} days")
print(f"  Maximum: {lag_stats['Max_Days']} days")
print(f"  Average: {lag_stats['Avg_Days']:.2f} days")
print(f"  Median: {lag_stats['Median_Days']} days")
print(f"  90th Percentile: {lag_stats['P90_Days']} days")

# Distribution of reporting lag
print("\nReporting Lag Distribution:")
lag_distribution = df_with_lag.groupBy('days_to_report').count().orderBy('days_to_report').limit(20)
display(lag_distribution)

# TIME_OCC validation (should be 0000-2359 in 24-hour format)
print("Time Field Validation")

# Check for invalid time values
invalid_times = df_raw.filter(
    (F.col('time_occ') < 0) | (F.col('time_occ') > 2359)
).count()

print(f"Invalid time_occ values (outside 0-2359): {invalid_times:,}")

# Check for times with invalid minutes (>59)
# Extract hour and minute
df_time_check = df_raw.withColumn('time_str', F.lpad(F.col('time_occ').cast('string'), 4, '0'))
df_time_check = df_time_check.withColumn('hour', F.substring('time_str', 1, 2).cast('int'))
df_time_check = df_time_check.withColumn('minute', F.substring('time_str', 3, 2).cast('int'))

invalid_hours = df_time_check.filter((F.col('hour') > 23) | (F.col('hour') < 0)).count()
invalid_minutes = df_time_check.filter((F.col('minute') > 59) | (F.col('minute') < 0)).count()

print(f"Invalid hours (>23 or <0): {invalid_hours:,}")
print(f"Invalid minutes (>59 or <0): {invalid_minutes:,}")

# Show time distribution by hour
print("\nCrime Distribution by Hour of Day:")
hour_distribution = df_time_check.groupBy('hour').count().orderBy('hour')
display(hour_distribution)

# Check for future dates (data quality issue)
print("Date Checks")

current_date = datetime.now()

future_occ = df_raw.filter(F.col('date_occ') > F.lit(current_date)).count()
future_rptd = df_raw.filter(F.col('date_rptd') > F.lit(current_date)).count()

print(f"Occurrences in the future: {future_occ:,}")
print(f"Reports in the future: {future_rptd:,}")

if future_occ > 0 or future_rptd > 0:
    print("WARNING: Found future dates!")
    if future_occ > 0:
        print("\nSample future occurrences:")
        display(df_raw.filter(F.col('date_occ') > F.lit(current_date))
                .select('dr_no', 'date_occ', 'date_rptd', 'crm_cd_desc')
                .limit(10))
else:
    print("No future dates found!")

## Step 7: Geographic Data Validation

### Coordinate Statistics

In [0]:
# Coordinate statistics
coord_stats = df_raw.select(
    F.min('lat').alias('Min_LAT'),
    F.max('lat').alias('Max_LAT'),
    F.avg('lat').alias('Avg_LAT'),
    F.min('lon').alias('Min_LON'),
    F.max('lon').alias('Max_LON'),
    F.avg('lon').alias('Avg_LON'),
    F.count('lat').alias('Non_Null_LAT'),
    F.count('lon').alias('Non_Null_LON')
).collect()[0]

print(f"\nLatitude Statistics:")
print(f"  Min: {coord_stats['Min_LAT']:.6f}")
print(f"  Max: {coord_stats['Max_LAT']:.6f}")
print(f"  Avg: {coord_stats['Avg_LAT']:.6f}")
print(f"  Non-Null: {coord_stats['Non_Null_LAT']:,}")

print(f"\nLongitude Statistics:")
print(f"  Min: {coord_stats['Min_LON']:.6f}")
print(f"  Max: {coord_stats['Max_LON']:.6f}")
print(f"  Avg: {coord_stats['Avg_LON']:.6f}")
print(f"  Non-Null: {coord_stats['Non_Null_LON']:,}")

# Check for coordinates outside LA County bounds
print("OUT-OF-BOUNDS Coordinate Analysis")

# Zero coordinates (0, 0) - data quality issue
zero_coords = df_raw.filter((F.col('lat') == 0) & (F.col('lon') == 0)).count()
print(f"Zero coordinates (0, 0): {zero_coords:,}")

# Out of LA County bounds
out_of_bounds_lat = df_raw.filter(
    (F.col('lat') < 33.7) | (F.col('lat') > 34.3)
).count()

out_of_bounds_lon = df_raw.filter(
    (F.col('lon') < -118.7) | (F.col('lon') > -118.1)
).count()

out_of_bounds_both = df_raw.filter(
    ((F.col('lat') < 33.7) | (F.col('lat') > 34.3)) |
    ((F.col('lon') < -118.7) | (F.col('lon') > -118.1))
).count()

print(f"Out-of-bounds Latitude: {out_of_bounds_lat:,}")
print(f"Out-of-bounds Longitude: {out_of_bounds_lon:,}")
print(f"Out-of-bounds (either): {out_of_bounds_both:,}")

if out_of_bounds_both > 0:
    out_of_bounds_pct = (out_of_bounds_both / total_records) * 100
    print(f"Percentage: {out_of_bounds_pct:.2f}%")
    print("WARNING: Found coordinates outside LA County!")
    
    print("\nSample out-of-bounds records:")
    display(df_raw.filter(
        ((F.col('lat') < 33.7) | (F.col('lat') > 34.3)) |
        ((F.col('lon') < -118.7) | (F.col('lon') > -118.1))
    ).select('dr_no', 'lat', 'lon', 'location', 'area_name').limit(10))
else:
    print("All coordinates within LA County bounds!")


### LA Area Analysis

In [0]:
# Area distribution (LAPD has 21 geographic areas)
print("LAPD Area Distribution")
area_distribution = df_raw.groupBy('area', 'area_name').count().orderBy('area')

print("\nCrime count by Area:")
display(area_distribution)

# Check for invalid area codes
valid_areas = [str(i) for i in range(1, 22)]
invalid_area_count = df_raw.filter(~F.col('area').cast('string').isin(valid_areas)).count()

print(f"\nInvalid area codes (not 1-21): {invalid_area_count:,}")

if invalid_area_count > 0:
    print("WARNING: Found invalid area codes!")
    display(df_raw.filter(~F.col('area').cast('string').isin(valid_areas))
            .select('area', 'area_name').distinct())
else:
    print("All area codes valid (1-21)")


### LA Reporting District analysis

In [0]:
# Reporting District analysis
print("Reporting District Analysis")

rpt_dist_stats = df_raw.select(
    F.countDistinct('rpt_dist_no').alias('Distinct_Districts'),
    F.min('rpt_dist_no').alias('Min_District'),
    F.max('rpt_dist_no').alias('Max_District')
).collect()[0]

print(f"Total Reporting Districts: {rpt_dist_stats['Distinct_Districts']:,}")
print(f"District Range: {rpt_dist_stats['Min_District']} to {rpt_dist_stats['Max_District']}")

# Top 10 districts by crime count
print("\nTop 10 Reporting Districts by Crime Count:")
top_districts = df_raw.groupBy('rpt_dist_no').count().orderBy(F.desc('count')).limit(10)
display(top_districts)


### Location field analysis

In [0]:
# Location field analysis
print("# Location Field Analysis")

location_stats = df_raw.select(
    F.count('location').alias('Non_Null_Locations'),
    F.countDistinct('location').alias('Unique_Locations')
).collect()[0]

print(f"Non-Null Locations: {location_stats['Non_Null_Locations']:,}")
print(f"Unique Locations: {location_stats['Unique_Locations']:,}")

# Sample locations
print("\nSample Location Values:")
display(df_raw.select('location', 'cross_street', 'area_name')
        .filter(F.col('location').isNotNull())
        .limit(10))

# Location string length distribution
location_length_stats = df_raw.filter(F.col('location').isNotNull()).select(
    F.min(F.length('location')).alias('Min_Length'),
    F.max(F.length('location')).alias('Max_Length'),
    F.avg(F.length('location')).alias('Avg_Length')
).collect()[0]

print(f"\nLocation String Length:")
print(f"  Min: {location_length_stats['Min_Length']}")
print(f"  Max: {location_length_stats['Max_Length']}")
print(f"  Avg: {location_length_stats['Avg_Length']:.2f}")

## Step 8: Demographic Data Validation

### Victim Age Analysis

In [0]:
# Step 8: Demographic Data Validation
print("Age Analysis")

# Age statistics
age_stats = df_raw.select(
    F.min('vict_age').alias('Min_Age'),
    F.max('vict_age').alias('Max_Age'),
    F.avg('vict_age').alias('Avg_Age'),
    F.expr('percentile(vict_age, 0.5)').alias('Median_Age'),
    F.count('vict_age').alias('Non_Null_Count')
).collect()[0]

print(f"Age Statistics:")
print(f"  Min: {age_stats['Min_Age']}")
print(f"  Max: {age_stats['Max_Age']}")
print(f"  Avg: {age_stats['Avg_Age']:.2f}")
print(f"  Median: {age_stats['Median_Age']}")
print(f"  Non-Null: {age_stats['Non_Null_Count']:,}")

# Check for invalid ages
negative_ages = df_raw.filter(F.col('vict_age') < 0).count()
zero_ages = df_raw.filter(F.col('vict_age') == 0).count()
over_120 = df_raw.filter(F.col('vict_age') > 120).count()
invalid_ages = df_raw.filter((F.col('vict_age') < 0) | (F.col('vict_age') > 120)).count()

print(f"\nData Quality Issues:")
print(f"  Negative ages: {negative_ages:,}")
print(f"  Zero ages: {zero_ages:,}")
print(f"  Ages > 120: {over_120:,}")
print(f"  Total invalid: {invalid_ages:,} ({(invalid_ages/total_records)*100:.2f}%)")

if invalid_ages > 0:
    print("\nWARNING: Found invalid ages!")
    print("Sample invalid ages:")
    display(df_raw.filter((F.col('vict_age') < 0) | (F.col('vict_age') > 120))
            .select('dr_no', 'vict_age', 'vict_sex', 'vict_descent', 'crm_cd_desc')
            .limit(10))
else:
    print("\nAll ages are valid (0-120)")

# Age distribution
print("Age Distribution")

# Create age groups for analysis
df_age_groups = df_raw.withColumn(
    'age_group',
    F.when(F.col('vict_age') == 0, 'Unknown')
    .when(F.col('vict_age') < 18, '0-17 (Juvenile)')
    .when(F.col('vict_age') < 25, '18-24')
    .when(F.col('vict_age') < 35, '25-34')
    .when(F.col('vict_age') < 45, '35-44')
    .when(F.col('vict_age') < 55, '45-54')
    .when(F.col('vict_age') < 65, '55-64')
    .when(F.col('vict_age') < 120, '65+ (Senior)')
    .otherwise('Invalid')
)

age_group_dist = df_age_groups.groupBy('age_group').count().orderBy(F.desc('count'))

print("Crime Victims by Age Group:")
display(age_group_dist)



### Victim Sex Analysis

In [0]:
# Victim Sex analysis
print("VICTIM SEX ANALYSIS")
print("=" * 80)

sex_distribution = df_raw.groupBy('vict_sex').count().orderBy(F.desc('count'))

print("Victim Sex Distribution:")
display(sex_distribution)

# Expected values: F, M, X, H, - (dash), null
expected_sex_values = ['F', 'M', 'X', 'H', '-', None]
sex_values = df_raw.select('vict_sex').distinct().collect()
actual_sex_values = [row['vict_sex'] for row in sex_values]

print(f"\nUnique sex values found: {actual_sex_values}")
print(f"Expected values: F (Female), M (Male), X (Unknown), H (Non-binary), - (Unknown)")

# Count null vs non-null
null_sex = df_raw.filter(F.col('vict_sex').isNull()).count()
non_null_sex = df_raw.filter(F.col('vict_sex').isNotNull()).count()

print(f"\nNull vict_sex: {null_sex:,} ({(null_sex/total_records)*100:.2f}%)")
print(f"Non-Null vict_sex: {non_null_sex:,} ({(non_null_sex/total_records)*100:.2f}%)")

print("=" * 80)

# COMMAND ----------

# Victim Descent analysis
print("=" * 80)
print("VICTIM DESCENT (ETHNICITY) ANALYSIS")
print("=" * 80)

descent_distribution = df_raw.groupBy('vict_descent').count().orderBy(F.desc('count'))

print("Victim Descent Distribution:")
display(descent_distribution)

# Descent code mapping (from documentation)
descent_mapping = {
    'A': 'Other Asian',
    'B': 'Black',
    'C': 'Chinese',
    'D': 'Cambodian',
    'F': 'Filipino',
    'G': 'Guamanian',
    'H': 'Hispanic/Latin/Mexican',
    'I': 'American Indian/Alaskan Native',
    'J': 'Japanese',
    'K': 'Korean',
    'L': 'Laotian',
    'O': 'Other',
    'P': 'Pacific Islander',
    'S': 'Samoan',
    'U': 'Hawaiian',
    'V': 'Vietnamese',
    'W': 'White',
    'X': 'Unknown',
    'Z': 'Asian Indian',
    '-': 'Unknown'
}

print("\nDescent Code Reference:")
for code, description in sorted(descent_mapping.items()):
    print(f"  {code}: {description}")

# Check for unexpected codes
descent_values = df_raw.select('vict_descent').distinct().collect()
actual_descent_codes = [row['vict_descent'] for row in descent_values if row['vict_descent'] is not None]
unexpected_codes = [code for code in actual_descent_codes if code not in descent_mapping.keys()]

if unexpected_codes:
    print(f"\n⚠️ WARNING: Found unexpected descent codes: {unexpected_codes}")
else:
    print("\n✅ All descent codes are valid")

# Null analysis
null_descent = df_raw.filter(F.col('vict_descent').isNull()).count()
print(f"\nNull vict_descent: {null_descent:,} ({(null_descent/total_records)*100:.2f}%)")

print("=" * 80)

# COMMAND ----------

# Cross-analysis: Age vs Sex vs Descent
print("=" * 80)
print("DEMOGRAPHIC CROSS-ANALYSIS")
print("=" * 80)

# Check for records missing all demographic info
missing_all_demographics = df_raw.filter(
    F.col('vict_age').isNull() | 
    F.col('vict_sex').isNull() | 
    F.col('vict_descent').isNull()
).count()

print(f"Records missing any demographic info: {missing_all_demographics:,} ({(missing_all_demographics/total_records)*100:.2f}%)")

# Most common demographic profile
print("\nTop 10 Victim Demographic Profiles (Age Group + Sex + Descent):")
top_profiles = df_age_groups.groupBy('age_group', 'vict_sex', 'vict_descent') \
    .count() \
    .orderBy(F.desc('count')) \
    .limit(10)
display(top_profiles)

print("=" * 80)