# Part1: hbase insertion

## Python Script/Notebook Setup

In [11]:
import pandas as pd
import happybase # For HBase connection
from datetime import datetime
import hashlib # For potential salting, though we'll use modulo here

# HBase connection details
HBASE_HOST = 'localhost' # Since Docker maps ports to localhost
HBASE_PORT = 9090        # Default Thrift server port for HBase within the harisekhon/hbase image
                         # Note: The harisekhon image might run Thrift on 9090 by default.
                         # If you have issues connecting, we might need to confirm this port for Thrift.
                         # The docker-compose exposes 9090 (RegionServer UI), but Thrift itself might be on a different internal port
                         # if not explicitly configured or exposed. HappyBase connects to the Thrift server.
                         # Let's assume for now it's accessible via the RegionServer host/port or a default Thrift port.
                         # A common default Thrift port is 9090.

NAMESPACE = 'practice'
TABLE_NAME = 'crimes'
FULL_TABLE_NAME = f'{NAMESPACE}:{TABLE_NAME}'

# Column families (must match what you created in HBase shell)
CF_LOC = 'cf_loc'
CF_CRIME = 'cf_crime'
CF_VICTIM = 'cf_victim'

print("HBase connection parameters set.")
print(f"Target table: {FULL_TABLE_NAME}")

HBase connection parameters set.
Target table: practice:crimes


## Connection to HBase from Python

In [12]:
def connect_to_hbase():
    """Establishes a connection to HBase."""
    try:
        connection = happybase.Connection(
            host=HBASE_HOST,
            port=HBASE_PORT, # Default HappyBase port if not specified
            autoconnect=True,
            # transport='buffered', # common transport
            # protocol='compact'    # common protocol
        )
        # The autoconnect=True should handle opening.
        # connection.open() # Explicit open if autoconnect is False
        print(f"Successfully connected to HBase at {HBASE_HOST}:{HBASE_PORT}!")
        
        # Optional: List tables to verify connection and that the namespace/table exists
        print("Available tables:", connection.tables())
        return connection
    except Exception as e:
        print(f"Error connecting to HBase: {e}")
        print("Please ensure HBase is running and Thrift server is accessible.")
        print("Common issues: ")
        print("1. HBase Docker container not running or Thrift server not started.")
        print(f"2. Incorrect host ('{HBASE_HOST}') or port ('{HBASE_PORT}') for Thrift server.")
        print("3. Firewall blocking the connection to the Thrift port.")
        return None

connection = connect_to_hbase()

Successfully connected to HBase at localhost:9090!
Available tables: [b'practice:crimes']


## Create Table in Python (Already Done via Shell)

In [13]:
# This step is optional as the table should already be created via HBase shell.
# If you need to create it from Python:
# def create_hbase_table_if_not_exists(conn, table_name_str, families_dict):
#     if conn:
#         try:
#             tables = conn.tables()
#             if table_name_str.encode('utf-8') not in tables:
#                 print(f"Table '{table_name_str}' not found. Creating it...")
#                 conn.create_table(table_name_str, families_dict)
#                 print(f"Table '{table_name_str}' created successfully with families: {families_dict}")
#             else:
#                 print(f"Table '{table_name_str}' already exists.")
#         except Exception as e:
#             print(f"Error interacting with table '{table_name_str}': {e}")

# families = {
#     CF_LOC: dict(),      # Empty dict for default settings
#     CF_CRIME: dict(),
#     CF_VICTIM: dict()
# }
# if connection:
#    create_hbase_table_if_not_exists(connection, FULL_TABLE_NAME, families)

## Load and Clean Data

In [14]:
def load_and_clean_data(csv_filepath):
    """Loads data from CSV and performs initial cleaning."""
    try:
        df = pd.read_csv(csv_filepath)
        print(f"Dataset '{csv_filepath}' loaded with {df.shape[0]} rows and {df.shape[1]} columns.")
    except FileNotFoundError:
        print(f"Error: File '{csv_filepath}' not found.")
        return None
    
    # --- Column Renaming ---
    original_columns = df.columns.tolist()
    df.columns = [col.lower().replace(' ', '_').replace('-', '_') for col in df.columns]
    # Special handling for 'part_1_2' if it became 'part_1_2' from 'Part 1-2'
    df.columns = [col.replace('part_1_2', 'part_1_2_val') if col == 'part_1_2' else col for col in df.columns]
    renamed_columns = df.columns.tolist()
    print("\nColumns renamed:")
    for orig, new in zip(original_columns, renamed_columns):
        if orig != new:
            print(f"'{orig}' -> '{new}'")

    # --- Date/Time Processing for Rowkey and Storage ---
    # DATE OCC and TIME OCC are crucial for the rowkey.
    # Ensure 'date_occ' is parsed correctly
    try:
        # Assuming 'date_occ' format is like 'MM/DD/YYYY hh:mm:ss AM/PM' or just 'MM/DD/YYYY'
        # If it's just 'MM/DD/YYYY', pd.to_datetime will handle it.
        # If it has time, we need to be careful or split it.
        # Let's assume the original CSV 'DATE OCC' is the primary date.
        df['parsed_date_occ'] = pd.to_datetime(df['date_occ'], errors='coerce')

        # Extract components for rowkey
        df['year_occ'] = df['parsed_date_occ'].dt.year
        df['month_occ'] = df['parsed_date_occ'].dt.month
        # df['day_occ'] = df['parsed_date_occ'].dt.day # Not used in current rowkey design, but good to have

        # Handle 'time_occ' (integer like 1830)
        # Convert to string, pad with leading zeros to ensure 4 digits for HHMM
        df['time_occ_str'] = df['time_occ'].astype(str).str.zfill(4)
        df['hour_occ'] = df['time_occ_str'].str.slice(0, 2)
        # df['minute_occ'] = df['time_occ_str'].str.slice(2, 4) # Not used in current rowkey

    except Exception as e:
        print(f"Error parsing date/time columns: {e}")
        print("Please check 'date_occ' and 'time_occ' formats.")
        # Fill with placeholder if parsing fails to avoid issues later, though this should be addressed
        df['year_occ'] = 1900
        df['month_occ'] = 1
        # df['day_occ'] = 1
        df['hour_occ'] = '00'

    # --- Prepare Catalog for Column Families ---
    # This maps your DataFrame columns to HBase column qualifiers with their families
    column_family_mapping = {
        # cf_loc
        'area_name': CF_LOC,
        'rpt_dist_no': CF_LOC,
        'premis_cd': CF_LOC,
        'premis_desc': CF_LOC,
        'location': CF_LOC,
        'cross_street': CF_LOC,
        'lat': CF_LOC,
        'lon': CF_LOC,
        # cf_crime
        'crm_cd': CF_CRIME,
        'crm_cd_desc': CF_CRIME,
        'part_1_2_val': CF_CRIME, # Renamed from 'part 1-2'
        'mocodes': CF_CRIME,
        'weapon_used_cd': CF_CRIME,
        'weapon_desc': CF_CRIME,
        'status': CF_CRIME,
        'status_desc': CF_CRIME,
        'crm_cd_1': CF_CRIME,
        'crm_cd_2': CF_CRIME,
        'crm_cd_3': CF_CRIME,
        'crm_cd_4': CF_CRIME,
        'date_rptd': CF_CRIME, # Original date reported string
        'date_occ': CF_CRIME,  # Original date occurred string
        'time_occ': CF_CRIME,  # Original time occurred integer
        # cf_victim
        'vict_age': CF_VICTIM,
        'vict_sex': CF_VICTIM,
        'vict_descent': CF_VICTIM,
    }
    # DR_NO, year_occ, month_occ, area are used for rowkey, not directly as columns here
    # (unless you also want to store them as distinct columns, which is redundant if in rowkey)

    print("\nData cleaning and preparation complete (excluding rowkey generation for now).")
    return df, column_family_mapping

# Load the data
csv_file = 'Crime_Data_from_2020_to_Present.csv' # Make sure this path is correct
df_crimes, cf_mapping = load_and_clean_data(csv_file)

if df_crimes is not None:
    print("\nSample of processed DataFrame headers:")
    print(df_crimes.head(2))
    print("\nYear OCC unique values:", df_crimes['year_occ'].unique())
    print("Month OCC unique values:", df_crimes['month_occ'].unique())
    print("Area unique values:", df_crimes['area'].unique()) # 'area' is the numeric area code column

Dataset 'Crime_Data_from_2020_to_Present.csv' loaded with 1005091 rows and 28 columns.

Columns renamed:
'DR_NO' -> 'dr_no'
'Date Rptd' -> 'date_rptd'
'DATE OCC' -> 'date_occ'
'TIME OCC' -> 'time_occ'
'AREA' -> 'area'
'AREA NAME' -> 'area_name'
'Rpt Dist No' -> 'rpt_dist_no'
'Part 1-2' -> 'part_1_2_val'
'Crm Cd' -> 'crm_cd'
'Crm Cd Desc' -> 'crm_cd_desc'
'Mocodes' -> 'mocodes'
'Vict Age' -> 'vict_age'
'Vict Sex' -> 'vict_sex'
'Vict Descent' -> 'vict_descent'
'Premis Cd' -> 'premis_cd'
'Premis Desc' -> 'premis_desc'
'Weapon Used Cd' -> 'weapon_used_cd'
'Weapon Desc' -> 'weapon_desc'
'Status' -> 'status'
'Status Desc' -> 'status_desc'
'Crm Cd 1' -> 'crm_cd_1'
'Crm Cd 2' -> 'crm_cd_2'
'Crm Cd 3' -> 'crm_cd_3'
'Crm Cd 4' -> 'crm_cd_4'
'LOCATION' -> 'location'
'Cross Street' -> 'cross_street'
'LAT' -> 'lat'
'LON' -> 'lon'


  df['parsed_date_occ'] = pd.to_datetime(df['date_occ'], errors='coerce')



Data cleaning and preparation complete (excluding rowkey generation for now).

Sample of processed DataFrame headers:
       dr_no               date_rptd                date_occ  time_occ  area  \
0  190326475  03/01/2020 12:00:00 AM  03/01/2020 12:00:00 AM      2130     7   
1  200106753  02/09/2020 12:00:00 AM  02/08/2020 12:00:00 AM      1800     1   

  area_name  rpt_dist_no  part_1_2_val  crm_cd            crm_cd_desc  ...  \
0  Wilshire          784             1     510       VEHICLE - STOLEN  ...   
1   Central          182             1     330  BURGLARY FROM VEHICLE  ...   

  crm_cd_4                                 location cross_street      lat  \
0      NaN  1900 S  LONGWOOD                     AV          NaN  34.0375   
1      NaN  1000 S  FLOWER                       ST          NaN  34.0444   

        lon parsed_date_occ  year_occ month_occ time_occ_str hour_occ  
0 -118.3506      2020-03-01      2020         3         2130       21  
1 -118.2628      2020-02-08  

## Implement Rowkey Generation and Data Push Function

In [15]:
def generate_row_key(row):
    """
    Generates the HBase rowkey: SALT_YEAR_MONTH_AREACODE_DRNO
    SALT: DR_NO % 100 (2 digits, zero-padded)
    YEAR: 4-digit year
    MONTH: 2-digit month (zero-padded)
    AREACODE: 'area' column value (2 digits, zero-padded)
    DRNO: dr_no
    """
    try:
        # Ensure necessary fields are not NaN/None before processing
        if pd.isna(row['dr_no']) or pd.isna(row['year_occ']) or \
        pd.isna(row['month_occ']) or pd.isna(row['area']):
            return None # Skip rows with missing key components

        dr_no = int(row['dr_no'])
        salt = str(dr_no % 100).zfill(2) # 2-digit salt
        
        year = str(int(row['year_occ']))
        month = str(int(row['month_occ'])).zfill(2) # 2-digit month
        
        areacode = str(int(row['area'])).zfill(2) # 2-digit area code

        # Rowkey: SALT_YEAR_MONTH_AREACODE_DRNO
        row_key_str = f"{salt}_{year}_{month}_{areacode}_{dr_no}"
        return row_key_str.encode('utf-8') # HBase rowkeys are bytes
    except Exception as e:
        # print(f"Error generating rowkey for DR_NO {row.get('dr_no', 'Unknown')}: {e}")
        return None


def push_data_to_hbase(conn, df, table_name_str, col_family_map, batch_size=1000, row_limit=500000):
    """
    Pushes data from DataFrame to HBase table, skipping NA values for cells.
    """
    if not conn or df is None:
        print("No HBase connection or DataFrame is None. Aborting push.")
        return

    try:
        table = conn.table(table_name_str)
        print(f"\nStarting data insertion into '{table_name_str}'...")
        
        success_count = 0
        error_count = 0
        skipped_count = 0
        
        # Use table.batch() for more efficient writes
        with table.batch(batch_size=batch_size) as b:
            for index, row in df.head(row_limit).iterrows(): # Process only the first row_limit rows
                row_key = generate_row_key(row)

                if row_key is None:
                    skipped_count +=1
                    if skipped_count % (batch_size * 10) == 0 : # Print progress occasionally for skipped
                        print(f"Skipped {skipped_count} rows due to missing key components...")
                    continue

                data_to_insert = {}
                for col_name_df, cf_name_hbase in col_family_map.items():
                    if col_name_df in row and pd.notna(row[col_name_df]):
                        # Ensure value is string for HBase, convert if necessary
                        value = str(row[col_name_df])
                        # Construct fully qualified column name: cf:qualifier
                        hbase_col_qualifier = f"{cf_name_hbase}:{col_name_df}"
                        data_to_insert[hbase_col_qualifier.encode('utf-8')] = value.encode('utf-8')
                
                if data_to_insert: # Only insert if there's some data besides key
                    try:
                        b.put(row_key, data_to_insert)
                        success_count += 1
                    except Exception as e_put:
                        print(f"Error inserting row with key {row_key.decode('utf-8', errors='ignore')}: {e_put}")
                        error_count += 1
                else:
                    skipped_count +=1 # Also counts as skipped if no actual data columns to insert

                if (index + 1) % batch_size == 0:
                    print(f"Processed {index + 1} rows. Inserted: {success_count}, Errors: {error_count}, Skipped: {skipped_count}")
            
            # The batch sends automatically when it fills or at the end of the 'with' block.
            # If any rows are left in the batch and not sent, ensure they are.
            # (HappyBase batch context manager handles this)

        print(f"\nData insertion summary for the first {row_limit} rows:")
        print(f"Successfully inserted rows: {success_count}")
        print(f"Rows with errors during put: {error_count}")
        print(f"Rows skipped (missing key components or no data): {skipped_count}")

    except Exception as e:
        print(f"An error occurred during the batch insertion process: {e}")

# --- Trigger the insertion ---
if connection and df_crimes is not None:
    push_data_to_hbase(connection, df_crimes, FULL_TABLE_NAME, cf_mapping, row_limit=500000) # Push first 500k
else:
    if not connection:
        print("Cannot push data: No HBase connection.")
    if df_crimes is None:
        print("Cannot push data: DataFrame not loaded.")


Starting data insertion into 'practice:crimes'...
Processed 1000 rows. Inserted: 1000, Errors: 0, Skipped: 0
Processed 2000 rows. Inserted: 2000, Errors: 0, Skipped: 0
Processed 3000 rows. Inserted: 3000, Errors: 0, Skipped: 0
Processed 4000 rows. Inserted: 4000, Errors: 0, Skipped: 0
Processed 5000 rows. Inserted: 5000, Errors: 0, Skipped: 0
Processed 6000 rows. Inserted: 6000, Errors: 0, Skipped: 0
Processed 7000 rows. Inserted: 7000, Errors: 0, Skipped: 0
Processed 8000 rows. Inserted: 8000, Errors: 0, Skipped: 0
Processed 9000 rows. Inserted: 9000, Errors: 0, Skipped: 0
Processed 10000 rows. Inserted: 10000, Errors: 0, Skipped: 0
Processed 11000 rows. Inserted: 11000, Errors: 0, Skipped: 0
Processed 12000 rows. Inserted: 12000, Errors: 0, Skipped: 0
Processed 13000 rows. Inserted: 13000, Errors: 0, Skipped: 0
Processed 14000 rows. Inserted: 14000, Errors: 0, Skipped: 0
Processed 15000 rows. Inserted: 15000, Errors: 0, Skipped: 0
Processed 16000 rows. Inserted: 16000, Errors: 0, Sk

# Part 2: Retrieve data from HBase and Analyze in Python

## Connection to HBase:

In [18]:
# HBase connection details (same as before)
HBASE_HOST = 'localhost'
HBASE_PORT = 9090 # Default Thrift server port
NAMESPACE = 'practice'
TABLE_NAME = 'crimes'
FULL_TABLE_NAME = f'{NAMESPACE}:{TABLE_NAME}'

# Column families
CF_LOC = 'cf_loc'
CF_CRIME = 'cf_crime'
CF_VICTIM = 'cf_victim'

def connect_to_hbase():
    try:
        connection = happybase.Connection(
            host=HBASE_HOST,
            port=HBASE_PORT,
            timeout=60000, # Increased timeout
            autoconnect=True
        )
        print(f"Successfully connected to HBase at {HBASE_HOST}:{HBASE_PORT}!")
        return connection
    except Exception as e:
        print(f"Error connecting to HBase: {e}")
        return None

connection = connect_to_hbase()

Successfully connected to HBase at localhost:9090!


## Helper Function to Decode HBase Data:

In [19]:
def decode_hbase_row(row_data):
    """Decodes a row from HBase (keys and values) from bytes to strings."""
    decoded_row = {}
    for key, value in row_data.items():
        decoded_row[key.decode('utf-8')] = value.decode('utf-8')
    return decoded_row

## Retrieve Data for Specific Queries:

### A. All SHOPLIFTING and VANDALISM crimes (if the label of the crime contains it)

In [20]:
def get_shoplifting_vandalism_crimes(conn, table_name_str, limit=100):
    if not conn:
        print("No HBase connection.")
        return []

    results = []
    try:
        table = conn.table(table_name_str)
        
        # Filter: cf_crime:crm_cd_desc CONTAINS 'SHOPLIFTING' OR cf_crime:crm_cd_desc CONTAINS 'VANDALISM'
        # Using SingleColumnValueFilter with SubstringComparator
        # FilterList with MUST_PASS_ONE for OR condition
        shoplifting_filter = "SingleColumnValueFilter ('cf_crime', 'crm_cd_desc', =, 'substring:SHOPLIFTING')"
        vandalism_filter = "SingleColumnValueFilter ('cf_crime', 'crm_cd_desc', =, 'substring:VANDALISM')"
        
        # Constructing a FilterList for OR is more complex with HappyBase's string filters.
        # Often, for complex ORs, it's easier to do two separate scans or fetch more data and filter in Python.
        # However, let's try to make a more complex filter string if possible, or do two scans.

        # Option 1: Two separate scans and merge (simpler to implement correctly)
        scan_filter_shoplifting = shoplifting_filter
        print(f"Scanning for SHOPLIFTING with filter: {scan_filter_shoplifting}")
        for key, data in table.scan(filter=scan_filter_shoplifting, limit=limit//2 if limit else None): # limit per scan
            decoded_data = decode_hbase_row(data)
            decoded_data['row_key'] = key.decode('utf-8')
            results.append(decoded_data)
        
        scan_filter_vandalism = vandalism_filter
        print(f"Scanning for VANDALISM with filter: {scan_filter_vandalism}")
        for key, data in table.scan(filter=scan_filter_vandalism, limit=limit//2 if limit else None): # limit per scan
            decoded_data = decode_hbase_row(data)
            decoded_data['row_key'] = key.decode('utf-8')
            # Avoid duplicates if a crime could be both (unlikely for crm_cd_desc)
            # This basic append might have duplicates if a row matches both, but for different crime types it's fine.
            results.append(decoded_data)

        # If you need a strict limit on the total unique results, you'd manage a set of row keys.
        print(f"Found {len(results)} SHOPLIFTING/VANDALISM crimes (up to combined limit).")

    except Exception as e:
        print(f"Error retrieving SHOPLIFTING/VANDALISM crimes: {e}")
    return results

if connection:
    shoplifting_vandalism_results = get_shoplifting_vandalism_crimes(connection, FULL_TABLE_NAME, limit=20) # Get up to 20
    print("\n--- Shoplifting/Vandalism Results (Sample) ---")
    for res in shoplifting_vandalism_results[:5]: # Print first 5
        print(res)
    # Convert to DataFrame for easier viewing/comparison
    df_shop_vand = pd.DataFrame(shoplifting_vandalism_results)
    print("\nDataFrame from Shoplifting/Vandalism HBase results:")
    display(df_shop_vand.head())

Scanning for SHOPLIFTING with filter: SingleColumnValueFilter ('cf_crime', 'crm_cd_desc', =, 'substring:SHOPLIFTING')
Scanning for VANDALISM with filter: SingleColumnValueFilter ('cf_crime', 'crm_cd_desc', =, 'substring:VANDALISM')
Found 20 SHOPLIFTING/VANDALISM crimes (up to combined limit).

--- Shoplifting/Vandalism Results (Sample) ---
{'cf_crime:crm_cd': '442', 'cf_crime:crm_cd_1': '442.0', 'cf_crime:crm_cd_desc': 'SHOPLIFTING - PETTY THEFT ($950 & UNDER)', 'cf_crime:date_occ': '01/09/2020 12:00:00 AM', 'cf_crime:date_rptd': '01/09/2020 12:00:00 AM', 'cf_crime:mocodes': '0104 0325 0202 0216 0346', 'cf_crime:part_1_2_val': '1', 'cf_crime:status': 'IC', 'cf_crime:status_desc': 'Invest Cont', 'cf_crime:time_occ': '1825', 'cf_loc:area_name': 'Southwest', 'cf_loc:lat': '34.0052', 'cf_loc:location': '4300 S  WESTERN                      AV', 'cf_loc:lon': '-118.3089', 'cf_loc:premis_cd': '248.0', 'cf_loc:premis_desc': 'CELL PHONE STORE', 'cf_loc:rpt_dist_no': '395', 'cf_victim:vict_age'

Unnamed: 0,cf_crime:crm_cd,cf_crime:crm_cd_1,cf_crime:crm_cd_desc,cf_crime:date_occ,cf_crime:date_rptd,cf_crime:mocodes,cf_crime:part_1_2_val,cf_crime:status,cf_crime:status_desc,cf_crime:time_occ,...,cf_loc:premis_desc,cf_loc:rpt_dist_no,cf_victim:vict_age,cf_victim:vict_descent,cf_victim:vict_sex,row_key,cf_crime:weapon_desc,cf_crime:weapon_used_cd,cf_loc:cross_street,cf_crime:crm_cd_2
0,442,442.0,SHOPLIFTING - PETTY THEFT ($950 & UNDER),01/09/2020 12:00:00 AM,01/09/2020 12:00:00 AM,0104 0325 0202 0216 0346,1,IC,Invest Cont,1825,...,CELL PHONE STORE,395,19,X,X,00_2020_01_03_200304500,,,,
1,442,442.0,SHOPLIFTING - PETTY THEFT ($950 & UNDER),01/15/2020 12:00:00 AM,01/15/2020 12:00:00 AM,0104 0325,1,AO,Adult Other,2140,...,GAS STATION,312,0,X,X,00_2020_01_03_200304900,,,,
2,442,442.0,SHOPLIFTING - PETTY THEFT ($950 & UNDER),01/24/2020 12:00:00 AM,01/24/2020 12:00:00 AM,0325 1414,1,IC,Invest Cont,1840,...,THE BEVERLY CONNECTION,722,0,W,M,00_2020_01_07_200705100,,,,
3,343,343.0,SHOPLIFTING-GRAND THEFT ($950.01 & OVER),01/04/2020 12:00:00 AM,01/14/2020 12:00:00 AM,1414 0325 1822 2028 1420,1,IC,Invest Cont,1330,...,OTHER BUSINESS,839,0,X,X,00_2020_01_08_200804600,,,,
4,442,442.0,SHOPLIFTING - PETTY THEFT ($950 & UNDER),01/22/2020 12:00:00 AM,01/23/2020 12:00:00 AM,0325,1,IC,Invest Cont,2100,...,MINI-MART,1555,0,O,M,00_2020_01_15_201505100,,,,


### B. Crimes occurring in Hollywood

In [22]:
def get_hollywood_crimes(conn, table_name_str, hollywood_areacode='06', hollywood_areaname='Hollywood', limit=100):
    if not conn:
        print("No HBase connection.")
        return []

    results = []
    try:
        table = conn.table(table_name_str)
        
        # Filter: Rowkey contains '_<hollywood_areacode>_' AND cf_loc:area_name IS 'Hollywood'
        # Regex for areacode in rowkey: ".*_AREACODE_.*" (e.g., ".*_06_.*")
        # This will scan ALL salts and ALL year/months for that area.
        # This can be a wide scan. For better performance, add year/month to rowkey filter if known.
        
        # Let's assume we want Hollywood crimes from 2020 as in Part 4
        year_to_filter = "2020"
        row_key_regex_filter = f".*_{year_to_filter}_.._{hollywood_areacode}_.*" # Any salt, specific year, any month, specific area

        filter_string = f"RowFilter (=, 'regexstring:{row_key_regex_filter}') AND SingleColumnValueFilter ('{CF_LOC}', 'area_name', =, 'binary:{hollywood_areaname}')"
        print(f"Scanning for Hollywood crimes with filter: {filter_string}")

        for key, data in table.scan(filter=filter_string, limit=limit):
            decoded_data = decode_hbase_row(data)
            decoded_data['row_key'] = key.decode('utf-8')
            results.append(decoded_data)
        print(f"Found {len(results)} crimes in {hollywood_areaname} (up to limit {limit}).")

    except Exception as e:
        print(f"Error retrieving {hollywood_areaname} crimes: {e}")
    return results

if connection:
    # VERIFY your Hollywood AREACODE and AREA NAME string
    hollywood_area_code_verified = '06' # Example, replace with your actual
    hollywood_area_name_verified = 'Hollywood' # Example, replace with your actual (check casing)
    
    hollywood_results = get_hollywood_crimes(connection, FULL_TABLE_NAME, 
                                             hollywood_areacode=hollywood_area_code_verified,
                                             hollywood_areaname=hollywood_area_name_verified, 
                                             limit=10)
    print(f"\n--- {hollywood_area_name_verified} Crime Results (Sample) ---")
    for res in hollywood_results[:5]:
        print(res)
    df_hollywood = pd.DataFrame(hollywood_results)
    print(f"\nDataFrame from {hollywood_area_name_verified} HBase results:")
    display(df_hollywood.head())

Scanning for Hollywood crimes with filter: RowFilter (=, 'regexstring:.*_2020_.._06_.*') AND SingleColumnValueFilter ('cf_loc', 'area_name', =, 'binary:Hollywood')
Found 10 crimes in Hollywood (up to limit 10).

--- Hollywood Crime Results (Sample) ---
{'cf_crime:crm_cd': '510', 'cf_crime:crm_cd_1': '510.0', 'cf_crime:crm_cd_desc': 'VEHICLE - STOLEN', 'cf_crime:date_occ': '01/09/2020 12:00:00 AM', 'cf_crime:date_rptd': '01/10/2020 12:00:00 AM', 'cf_crime:part_1_2_val': '1', 'cf_crime:status': 'IC', 'cf_crime:status_desc': 'Invest Cont', 'cf_crime:time_occ': '1950', 'cf_loc:area_name': 'Hollywood', 'cf_loc:cross_street': 'ARGYLE                       BL', 'cf_loc:lat': '34.098', 'cf_loc:location': 'SUNSET', 'cf_loc:lon': '-118.3252', 'cf_loc:premis_cd': '101.0', 'cf_loc:premis_desc': 'STREET', 'cf_loc:rpt_dist_no': '647', 'cf_victim:vict_age': '0', 'row_key': '00_2020_01_06_200604400'}
{'cf_crime:crm_cd': '510', 'cf_crime:crm_cd_1': '510.0', 'cf_crime:crm_cd_desc': 'VEHICLE - STOLEN', '

Unnamed: 0,cf_crime:crm_cd,cf_crime:crm_cd_1,cf_crime:crm_cd_desc,cf_crime:date_occ,cf_crime:date_rptd,cf_crime:part_1_2_val,cf_crime:status,cf_crime:status_desc,cf_crime:time_occ,cf_loc:area_name,...,cf_loc:premis_desc,cf_loc:rpt_dist_no,cf_victim:vict_age,row_key,cf_crime:mocodes,cf_victim:vict_descent,cf_victim:vict_sex,cf_crime:crm_cd_2,cf_crime:weapon_desc,cf_crime:weapon_used_cd
0,510,510.0,VEHICLE - STOLEN,01/09/2020 12:00:00 AM,01/10/2020 12:00:00 AM,1,IC,Invest Cont,1950,Hollywood,...,STREET,647,0,00_2020_01_06_200604400,,,,,,
1,510,510.0,VEHICLE - STOLEN,01/13/2020 12:00:00 AM,01/14/2020 12:00:00 AM,1,IC,Invest Cont,1830,Hollywood,...,STREET,656,0,00_2020_01_06_200604600,,,,,,
2,330,330.0,BURGLARY FROM VEHICLE,01/15/2020 12:00:00 AM,01/16/2020 12:00:00 AM,1,IC,Invest Cont,1,Hollywood,...,STREET,645,35,00_2020_01_06_200604700,0344 0358 1300 1609,O,M,,,
3,745,745.0,VANDALISM - MISDEAMEANOR ($399 OR UNDER),01/18/2020 12:00:00 AM,01/18/2020 12:00:00 AM,2,IC,Invest Cont,150,Hollywood,...,OTHER BUSINESS,648,0,00_2020_01_06_200604800,1402 0329 0601,X,X,998.0,,
4,626,626.0,INTIMATE PARTNER - SIMPLE ASSAULT,01/22/2020 12:00:00 AM,01/23/2020 12:00:00 AM,2,IC,Invest Cont,1600,Hollywood,...,"MULTI-UNIT DWELLING (APARTMENT, DUPLEX, ETC)",676,33,00_2020_01_06_200605100,2000 0400 0444 0416 1414,H,F,,"STRONG-ARM (HANDS, FIST, FEET OR BODILY FORCE)",400.0


### C. Victim age and sex for crimes of INTIMATE PARTNER - SIMPLE ASSAULT (exact match)

In [23]:
def get_intimate_partner_assault_victims(conn, table_name_str, crime_desc="INTIMATE PARTNER - SIMPLE ASSAULT", limit=100):
    if not conn:
        print("No HBase connection.")
        return []

    results = []
    try:
        table = conn.table(table_name_str)
        
        # Columns to retrieve
        columns_to_fetch = [f'{CF_VICTIM}:vict_age', f'{CF_VICTIM}:vict_sex', f'{CF_CRIME}:crm_cd_desc']
        
        # Filter: cf_crime:crm_cd_desc IS 'INTIMATE PARTNER - SIMPLE ASSAULT'
        filter_string = f"SingleColumnValueFilter ('{CF_CRIME}', 'crm_cd_desc', =, 'binary:{crime_desc}')"
        print(f"Scanning for victim info for '{crime_desc}' with filter: {filter_string}")

        for key, data in table.scan(columns=[col.encode('utf-8') for col in columns_to_fetch], 
                                    filter=filter_string, 
                                    limit=limit):
            decoded_data = decode_hbase_row(data)
            decoded_data['row_key'] = key.decode('utf-8')
            results.append(decoded_data)
        print(f"Found {len(results)} victim details for '{crime_desc}' (up to limit {limit}).")

    except Exception as e:
        print(f"Error retrieving victim details for '{crime_desc}': {e}")
    return results

if connection:
    # VERIFY your exact crime description string
    crime_description_exact = "INTIMATE PARTNER - SIMPLE ASSAULT" 
    
    ipa_victim_results = get_intimate_partner_assault_victims(connection, FULL_TABLE_NAME, 
                                                              crime_desc=crime_description_exact, 
                                                              limit=10)
    print("\n--- Intimate Partner Assault Victim Info (Sample) ---")
    for res in ipa_victim_results[:5]:
        print(res)
    df_ipa_victims = pd.DataFrame(ipa_victim_results)
    print("\nDataFrame from Intimate Partner Assault HBase results:")
    display(df_ipa_victims.head())

Scanning for victim info for 'INTIMATE PARTNER - SIMPLE ASSAULT' with filter: SingleColumnValueFilter ('cf_crime', 'crm_cd_desc', =, 'binary:INTIMATE PARTNER - SIMPLE ASSAULT')
Found 10 victim details for 'INTIMATE PARTNER - SIMPLE ASSAULT' (up to limit 10).

--- Intimate Partner Assault Victim Info (Sample) ---
{'cf_crime:crm_cd_desc': 'INTIMATE PARTNER - SIMPLE ASSAULT', 'cf_victim:vict_age': '25', 'cf_victim:vict_sex': 'M', 'row_key': '00_2020_01_01_200105000'}
{'cf_crime:crm_cd_desc': 'INTIMATE PARTNER - SIMPLE ASSAULT', 'cf_victim:vict_age': '20', 'cf_victim:vict_sex': 'F', 'row_key': '00_2020_01_02_200204800'}
{'cf_crime:crm_cd_desc': 'INTIMATE PARTNER - SIMPLE ASSAULT', 'cf_victim:vict_age': '24', 'cf_victim:vict_sex': 'M', 'row_key': '00_2020_01_05_200504100'}
{'cf_crime:crm_cd_desc': 'INTIMATE PARTNER - SIMPLE ASSAULT', 'cf_victim:vict_age': '33', 'cf_victim:vict_sex': 'F', 'row_key': '00_2020_01_06_200605100'}
{'cf_crime:crm_cd_desc': 'INTIMATE PARTNER - SIMPLE ASSAULT', 'cf_

Unnamed: 0,cf_crime:crm_cd_desc,cf_victim:vict_age,cf_victim:vict_sex,row_key
0,INTIMATE PARTNER - SIMPLE ASSAULT,25,M,00_2020_01_01_200105000
1,INTIMATE PARTNER - SIMPLE ASSAULT,20,F,00_2020_01_02_200204800
2,INTIMATE PARTNER - SIMPLE ASSAULT,24,M,00_2020_01_05_200504100
3,INTIMATE PARTNER - SIMPLE ASSAULT,33,F,00_2020_01_06_200605100
4,INTIMATE PARTNER - SIMPLE ASSAULT,23,F,00_2020_01_09_200904500
