# Parquet File Viewer

This notebook allows you to retrieve parquet files from S3 or local disk and display their contents.
Supports both AWS profile and IAM role authentication for S3 access.


## Setup and Imports


In [15]:
import boto3
import pandas as pd
import pyarrow.parquet as pq
import pyarrow as pa
from botocore.exceptions import ClientError, NoCredentialsError
from urllib.parse import urlparse
import logging
import io
import sys
import os
from pathlib import Path

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

print("Libraries imported successfully!")


Libraries imported successfully!


## Configuration

Choose your data source and set your preferences here:


In [16]:
# Data source configuration
DATA_SOURCE = "local"  # "s3" or "local"

# For S3 data source
S3_URL = "s3://ais-research-data/parquet/year=2016/month=10/day=26/26_exactEarth_historical_data_2016-10-26.parquet"
AWS_MODE = "profile"  # 'profile' or 'role'
AWS_PROFILE = "default"  # AWS profile name (for profile mode)
AWS_REGION = "il-central-1"  # AWS region

# For local data source
LOCAL_FILE_PATH = "/Users/shlomy/Downloads/26_exactEarth_historical_data_2016-10-26.parquet"  # Update this path

# Display configuration
MAX_ROWS = None  # Number of rows to display (set to None for all rows)

print(f"Configuration:")
print(f"  Data source: {DATA_SOURCE}")
if DATA_SOURCE == "s3":
    print(f"  S3 URL: {S3_URL}")
    print(f"  AWS mode: {AWS_MODE}")
    print(f"  AWS profile: {AWS_PROFILE}")
    print(f"  AWS region: {AWS_REGION}")
else:
    print(f"  Local file: {LOCAL_FILE_PATH}")
print(f"  Max rows: {MAX_ROWS}")

Configuration:
  Data source: local
  Local file: /Users/shlomy/Downloads/26_exactEarth_historical_data_2016-10-26.parquet
  Max rows: None


## Helper Functions


In [17]:
def get_s3_client(aws_mode='profile', aws_profile='default', region='il-central-1'):
    """
    Create and return an S3 client with appropriate authentication.
    
    Args:
        aws_mode: Authentication mode - 'profile' or 'role'
        aws_profile: AWS profile name (for profile mode)
        region: AWS region
    
    Returns:
        boto3 S3 client
    """
    try:
        if aws_mode == 'profile':
            session = boto3.Session(profile_name=aws_profile, region_name=region)
            s3_client = session.client('s3')
        elif aws_mode == 'role':
            s3_client = boto3.client('s3', region_name=region)
        else:
            raise ValueError(f"Invalid aws_mode: {aws_mode}. Must be 'profile' or 'role'")
        
        # Test the connection
        s3_client.list_buckets()
        print(f"✅ Successfully connected to S3 using {aws_mode} mode")
        return s3_client
        
    except NoCredentialsError:
        print("❌ AWS credentials not found. Please configure your credentials.")
        return None
    except ClientError as e:
        print(f"❌ Failed to connect to S3: {e}")
        return None
    except Exception as e:
        print(f"❌ Unexpected error creating S3 client: {e}")
        return None

def parse_s3_url(s3_url):
    """
    Parse S3 URL to extract bucket and key.
    
    Args:
        s3_url: S3 URL in format s3://bucket/key
        
    Returns:
        tuple: (bucket, key)
    """
    if not s3_url.startswith('s3://'):
        raise ValueError("URL must start with 's3://'")
    
    parsed = urlparse(s3_url)
    bucket = parsed.netloc
    key = parsed.path.lstrip('/')
    
    if not bucket or not key:
        raise ValueError("Invalid S3 URL format. Expected: s3://bucket/key")
    
    return bucket, key

def read_parquet_from_local(file_path, max_rows=None):
    """
    Read parquet file from local disk and return as pandas DataFrame.
    
    Args:
        file_path: Path to local parquet file
        max_rows: Maximum number of rows to read (None for all)
        
    Returns:
        pandas.DataFrame
    """
    try:
        print(f"📖 Reading parquet file from local disk: {file_path}")
        
        # Check if file exists
        if not os.path.exists(file_path):
            print(f"❌ File not found: {file_path}")
            return None
        
        # Get file size
        file_size = os.path.getsize(file_path)
        print(f"📊 File size: {file_size / 1024 / 1024:.2f} MB")
        
        # Read parquet file using pyarrow
        parquet_file = pq.ParquetFile(file_path)
        
        # Read the data
        if max_rows:
            table = parquet_file.read(columns=None)  # Read all columns
            # Convert to pandas and limit rows
            df = table.to_pandas()
            df = df.head(max_rows)
        else:
            table = parquet_file.read(columns=None)
            df = table.to_pandas()
        
        print(f"✅ Successfully read parquet file with {len(df)} rows and {len(df.columns)} columns")
        return df
        
    except Exception as e:
        print(f"❌ Error reading parquet file: {e}")
        return None

print("Helper functions defined successfully!")


Helper functions defined successfully!


## Read Parquet File

Choose your data source and read the parquet file:


In [18]:
# Read parquet file based on data source
if DATA_SOURCE == "s3":
    # Parse S3 URL
    try:
        bucket, key = parse_s3_url(S3_URL)
        print(f"📁 Parsed S3 URL: bucket={bucket}, key={key}")
    except Exception as e:
        print(f"❌ Error parsing S3 URL: {e}")
        sys.exit(1)
    
    # Create S3 client
    s3_client = get_s3_client(AWS_MODE, AWS_PROFILE, AWS_REGION)
    if s3_client is None:
        print("❌ Failed to create S3 client")
        sys.exit(1)
    
    # Read parquet from S3
    def read_parquet_from_s3(s3_client, bucket, key, max_rows=None):
        try:
            print(f"📖 Reading parquet file from s3://{bucket}/{key}")
            
            # Read the parquet file from S3
            obj = s3_client.get_object(Bucket=bucket, Key=key)
            
            # Read the entire file content into memory
            file_content = obj['Body'].read()
            print(f"📊 File size: {len(file_content) / 1024 / 1024:.2f} MB")
            
            # Create a BytesIO object for pyarrow
            file_buffer = io.BytesIO(file_content)
            
            # Read parquet data using pyarrow
            parquet_file = pq.ParquetFile(file_buffer)
            
            # Read the data
            if max_rows:
                table = parquet_file.read(columns=None)  # Read all columns
                # Convert to pandas and limit rows
                df = table.to_pandas()
                df = df.head(max_rows)
            else:
                table = parquet_file.read(columns=None)
                df = table.to_pandas()
            
            print(f"✅ Successfully read parquet file with {len(df)} rows and {len(df.columns)} columns")
            return df
            
        except ClientError as e:
            error_code = e.response['Error']['Code']
            if error_code == 'NoSuchKey':
                print(f"❌ Parquet file not found: s3://{bucket}/{key}")
            elif error_code == 'NoSuchBucket':
                print(f"❌ S3 bucket not found: {bucket}")
            elif error_code == 'AccessDenied':
                print(f"❌ Access denied to s3://{bucket}/{key}. Check your permissions.")
            else:
                print(f"❌ S3 error: {e}")
            return None
        except Exception as e:
            print(f"❌ Error reading parquet file: {e}")
            return None
    
    df = read_parquet_from_s3(s3_client, bucket, key, MAX_ROWS)
    
else:
    # Read from local disk
    df = read_parquet_from_local(LOCAL_FILE_PATH, MAX_ROWS)

if df is None:
    print("❌ Failed to read parquet file")
else:
    print(f"✅ Parquet file loaded successfully!")


📖 Reading parquet file from local disk: /Users/shlomy/Downloads/26_exactEarth_historical_data_2016-10-26.parquet
📊 File size: 306.13 MB
✅ Successfully read parquet file with 10296941 rows and 127 columns
✅ Parquet file loaded successfully!


## Display DataFrame Information


In [19]:
if df is not None:
    print(f"\n{'='*80}")
    print(f"DATAFRAME INFORMATION")
    print(f"{'='*80}")
    print(f"Shape: {df.shape[0]:,} rows × {df.shape[1]} columns")
    print(f"Memory usage: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
    
    print(f"\n{'='*80}")
    print(f"COLUMN INFORMATION")
    print(f"{'='*80}")
    print(f"{'Column':<30} {'Type':<15} {'Non-Null Count':<15} {'Null Count':<15}")
    print(f"{'-'*75}")
    for col in df.columns:
        non_null = df[col].count()
        null_count = len(df) - non_null
        print(f"{col:<30} {str(df[col].dtype):<15} {non_null:<15,} {null_count:<15,}")
else:
    print("❌ No data to display")



DATAFRAME INFORMATION
Shape: 10,296,941 rows × 127 columns
Memory usage: 17927.17 MB

COLUMN INFORMATION
Column                         Type            Non-Null Count  Null Count     
---------------------------------------------------------------------------
base_date_time                 object          10,296,941      0              
year                           float64         139,011         10,157,930     
month                          float64         1,376,923       8,920,018      
day                            float64         1,376,923       8,920,018      
hour                           float64         1,376,923       8,920,018      
minute                         float64         1,376,923       8,920,018      
second                         float64         8,212,917       2,084,024      
lon                            float64         8,370,423       1,926,518      
lat                            float64         8,370,423       1,926,518      
mmsi                        

## Display First N Rows


In [20]:
if df is not None:
    print(f"\n{'='*80}")
    print(f"FIRST {min(MAX_ROWS, len(df))} ROWS")
    print(f"{'='*80}")
    
    # Display first N rows with proper formatting
    pd.set_option('display.max_columns', None)
    pd.set_option('display.width', None)
    pd.set_option('display.max_colwidth', 50)
    
    display(df.head(MAX_ROWS))
    
    if len(df) > MAX_ROWS:
        print(f"\n... and {len(df) - MAX_ROWS:,} more rows")
else:
    print("❌ No data to display")





TypeError: '<' not supported between instances of 'int' and 'NoneType'

## Data Exploration (Optional)

Use the cells below to explore your data further:


In [22]:
# Show data types
if df is not None:
    print("Data Types:")
    print(df.dtypes)


Data Types:
base_date_time     object
year              float64
month             float64
day               float64
hour              float64
                   ...   
course_q           object
heading_q          object
hazard            float64
shiptype          float64
loaded            float64
Length: 127, dtype: object


In [None]:
# Show basic statistics for numeric columns
if df is not None:
    print("Basic Statistics:")
    display(df.describe())


In [None]:
# Check for missing values
if df is not None:
    print("Missing Values:")
    missing_data = df.isnull().sum()
    missing_percent = (missing_data / len(df)) * 100
    missing_df = pd.DataFrame({
        'Missing Count': missing_data,
        'Missing Percentage': missing_percent
    })
    missing_df = missing_df[missing_df['Missing Count'] > 0].sort_values('Missing Count', ascending=False)
    
    if len(missing_df) > 0:
        display(missing_df)
    else:
        print("No missing values found!")


## SQL Queries with DuckDB

You can now run SQL queries directly on your parquet data using DuckDB!


In [None]:
import duckdb

# Create a DuckDB connection
conn = duckdb.connect()

# Register the DataFrame as a view for SQL queries
if df is not None:
    conn.register('parquet_data', df)
    print("✅ DataFrame registered as 'parquet_data' table for SQL queries")
    
    # Show available columns for SQL queries
    print(f"\nAvailable columns: {list(df.columns)}")
    print(f"Total rows: {len(df):,}")
else:
    print("❌ No data available for SQL queries")


In [42]:
# Example SQL queries - modify these as needed

if df is not None:
    print("🔍 Example SQL Queries:")
    print("=" * 50)
    
    # 1. Basic SELECT query
    print("\n1. Basic SELECT (first 5 rows):")
    result1 = conn.execute("SELECT * FROM parquet_data LIMIT 5").fetchdf()
    display(result1)
    
    # 2. Count total rows
    print("\n2. Total row count:")
    result2 = conn.execute("SELECT COUNT(*) as total_rows FROM parquet_data").fetchdf()
    display(result2)
    
    # 3. Show column info
    print("\n3. Column information:")
    result3 = conn.execute("DESCRIBE parquet_data").fetchdf()
    display(result3)
    
    # 4. Show unique values in first few columns (if they exist)
    if len(df.columns) > 0:
        first_col = df.columns[0]
        print(f"\n4. Unique values in '{first_col}' column:")
        result4 = conn.execute(f"SELECT DISTINCT {first_col} FROM parquet_data LIMIT 10").fetchdf()
        display(result4)
    
    print("\n✅ SQL queries completed successfully!")
else:
    print("❌ No data available for SQL queries")


🔍 Example SQL Queries:

1. Basic SELECT (first 5 rows):


Unnamed: 0,base_date_time,year,month,day,hour,minute,second,lon,lat,mmsi,imo,callsign,accuracy,epfd,msg_type,repeat,status,turn,speed,course,heading,ais_version,maneuver,raim,radio,shipname,ship_type,to_bow,to_stern,to_port,to_starboard,draught,destination,dte,seqno,dest_mmsi,retransmit,dac,fid,data,mmsi1,mmsiseq1,mmsi2,mmsiseq2,mmsi3,mmsiseq3,mmsi4,mmsiseq4,alt,assigned,text,cs,display,dsc,band,type1_1,offset1_1,type1_2,offset1_2,type2_1,offset2_1,offset1,increment1,offset2,increment2,msg22,number1,timeout1,number2,timeout2,offset3,number3,timeout3,increment3,offset4,number4,timeout4,increment4,d_type,name,off_position,virtual_aid,name_ext,channel_a,channel_b,txrx,power,dest1,dest2,addressed,band_a,band_b,zonesize,ne_lon,ne_lat,sw_lon,sw_lat,station_type,interval,quiet,partno,vendorid,model,serial,structured,gnss,aid_type,app_id,spare_1,spare_2,spare_3,spare_4,reserved_1,reserved_2,empty_1,empty_2,full_name,vin,length,beam,spare,speed_q,course_q,heading_q,hazard,shiptype,loaded
0,2016/10/26 00:00:00,,,,,,32.0,174.783413,-41.280177,512002959,,,False,,1,0,0.0,0.0,0.0,36.4,170.0,,0.0,False,99487.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,[0],,,,,,,,,,,,,,,,,,
1,2016/10/26 00:00:00,,,,,,28.0,173.69835,-38.729367,512409000,,,False,,1,0,7.0,-127.0,5.3,178.1,180.0,,0.0,False,2255.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,[0],,,,,,,,,,,,,,,,,,
2,2016/10/26 00:00:00,,,,,,59.0,151.261183,-33.869533,503376700,,,False,,3,0,0.0,-127.0,3.9,179.3,163.0,,0.0,False,131072.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,[0],,,,,,,,,,,,,,,,,,
3,2016/10/26 00:00:00,,,,,,0.0,-82.900983,-7.945333,372296000,,,False,,1,0,0.0,0.0,11.7,285.0,288.0,,0.0,False,32843.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,[0],,,,,,,,,,,,,,,,,,
4,2016/10/26 00:00:00,,12.0,18.0,4.0,0.0,,,,538006072,958431000.0,V7JM7,,1.0,5,0,,,,,,0.0,,,,CARAVOS TRIUMPH,70.0,197.0,32.0,8.0,24.0,13.8,DALIAN CN,False,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,[0],,,,,,,,,,,,,,,,,,



2. Total row count:


Unnamed: 0,total_rows
0,10296941



3. Column information:


Unnamed: 0,column_name,column_type,null,key,default,extra
0,base_date_time,VARCHAR,YES,,,
1,year,DOUBLE,YES,,,
2,month,DOUBLE,YES,,,
3,day,DOUBLE,YES,,,
4,hour,DOUBLE,YES,,,
...,...,...,...,...,...,...
122,course_q,BOOLEAN,YES,,,
123,heading_q,BOOLEAN,YES,,,
124,hazard,DOUBLE,YES,,,
125,shiptype,DOUBLE,YES,,,



4. Unique values in 'base_date_time' column:


Unnamed: 0,base_date_time
0,2016/10/26 00:00:00



✅ SQL queries completed successfully!


## Custom SQL Queries

Write your own SQL queries here. The data is available as the `parquet_data` table.


In [None]:
# Write your custom SQL queries here
# The data is available as 'parquet_data' table

if df is not None:
    # Example: Replace this with your own query
    custom_query = """
    SELECT 
        COUNT(*) as total_records,
        COUNT(DISTINCT mmsi) as unique_vessels,
        COUNT(DISTINCT msg_type) as unique_message_types
    FROM parquet_data
    """
    
    print("Custom SQL Query:")
    print(custom_query)
    print("\nResult:")
    
    try:
        result = conn.execute(custom_query).fetchdf()
        display(result)
    except Exception as e:
        print(f"❌ SQL Error: {e}")
        print("💡 Tip: Check your column names and SQL syntax")
        
    print("\n" + "="*50)
    print("💡 SQL Tips:")
    print("- Use 'parquet_data' as the table name")
    print("- Available columns:", list(df.columns))
    print("- Use LIMIT to control result size")
    print("- DuckDB supports most standard SQL functions")
    print("- For unique records, use COUNT(DISTINCT column_name) instead of COUNT(DISTINCT *)")
    
    # Additional useful queries for AIS data
    print("\n🔍 Useful AIS Data Queries:")
    print("=" * 40)
    
    # Query 1: Message type distribution
    try:
        query1 = """
        SELECT 
            msg_type,
            COUNT(*) as count,
            ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM parquet_data), 2) as percentage
        FROM parquet_data 
        GROUP BY msg_type 
        ORDER BY count DESC
        """
        result1 = conn.execute(query1).fetchdf()
        print("\n1. Message Type Distribution:")
        display(result1)
    except Exception as e:
        print(f"   Error in message type query: {e}")
    
    # Query 2: Vessel count by MMSI
    try:
        query2 = """
        SELECT 
            COUNT(DISTINCT mmsi) as unique_vessels,
            COUNT(*) as total_messages
        FROM parquet_data
        WHERE mmsi IS NOT NULL
        """
        result2 = conn.execute(query2).fetchdf()
        print("\n2. Vessel Statistics:")
        display(result2)
    except Exception as e:
        print(f"   Error in vessel query: {e}")
    
    # Query 3: Geographic coverage
    try:
        query3 = """
        SELECT 
            COUNT(DISTINCT ROUND(lat, 1)) as unique_lat_areas,
            COUNT(DISTINCT ROUND(lon, 1)) as unique_lon_areas,
            MIN(lat) as min_latitude,
            MAX(lat) as max_latitude,
            MIN(lon) as min_longitude,
            MAX(lon) as max_longitude
        FROM parquet_data
        WHERE lat IS NOT NULL AND lon IS NOT NULL
        """
        result3 = conn.execute(query3).fetchdf()
        print("\n3. Geographic Coverage:")
        display(result3)
    except Exception as e:
        print(f"   Error in geographic query: {e}")
    
else:
    print("❌ No data available for SQL queries")


## Advanced SQL Examples

Here are some more advanced SQL query examples you can try:


In [22]:
# Advanced SQL Examples
# Uncomment and modify these queries as needed

if df is not None:
    print("🔍 Advanced SQL Examples:")
    print("=" * 50)
    
    # Example 1: Group by and aggregation
    print("\n1. Group by example (if you have categorical columns):")
    try:
        # This will work if you have columns with categorical data
        categorical_cols = df.select_dtypes(include=['object', 'category']).columns
        if len(categorical_cols) > 0:
            col = categorical_cols[0]
            query1 = f"""
            SELECT 
                {col},
                COUNT(*) as count,
                COUNT(*) * 100.0 / (SELECT COUNT(*) FROM parquet_data) as percentage
            FROM parquet_data 
            GROUP BY {col}
            ORDER BY count DESC
            LIMIT 10
            """
            result1 = conn.execute(query1).fetchdf()
            display(result1)
        else:
            print("   No categorical columns found for grouping")
    except Exception as e:
        print(f"   Error: {e}")
    
    # Example 2: Filtering
    print("\n2. Filtering example:")
    try:
        # Get first few rows with a simple filter
        query2 = """
        SELECT * 
        FROM parquet_data 
        WHERE 1=1  -- This will show all rows, modify as needed
        LIMIT 5
        """
        result2 = conn.execute(query2).fetchdf()
        display(result2)
    except Exception as e:
        print(f"   Error: {e}")
    
    # Example 3: Statistical analysis
    print("\n3. Statistical analysis:")
    try:
        numeric_cols = df.select_dtypes(include=['number']).columns
        if len(numeric_cols) > 0:
            col = numeric_cols[0]
            query3 = f"""
            SELECT 
                MIN({col}) as min_value,
                MAX({col}) as max_value,
                AVG({col}) as avg_value,
                COUNT({col}) as non_null_count
            FROM parquet_data
            """
            result3 = conn.execute(query3).fetchdf()
            display(result3)
        else:
            print("   No numeric columns found for statistical analysis")
    except Exception as e:
        print(f"   Error: {e}")
    
    print("\n✅ Advanced SQL examples completed!")
    
else:
    print("❌ No data available for SQL queries")


🔍 Advanced SQL Examples:

1. Group by example (if you have categorical columns):


Unnamed: 0,base_date_time,count,percentage
0,2016/10/26 00:00:00,10,100.0



2. Filtering example:


Unnamed: 0,base_date_time,year,month,day,hour,minute,second,lon,lat,mmsi,imo,callsign,accuracy,epfd,msg_type,repeat,status,turn,speed,course,heading,ais_version,maneuver,raim,radio,shipname,ship_type,to_bow,to_stern,to_port,to_starboard,draught,destination,dte,seqno,dest_mmsi,retransmit,dac,fid,data,mmsi1,mmsiseq1,mmsi2,mmsiseq2,mmsi3,mmsiseq3,mmsi4,mmsiseq4,alt,assigned,text,cs,display,dsc,band,type1_1,offset1_1,type1_2,offset1_2,type2_1,offset2_1,offset1,increment1,offset2,increment2,msg22,number1,timeout1,number2,timeout2,offset3,number3,timeout3,increment3,offset4,number4,timeout4,increment4,d_type,name,off_position,virtual_aid,name_ext,channel_a,channel_b,txrx,power,dest1,dest2,addressed,band_a,band_b,zonesize,ne_lon,ne_lat,sw_lon,sw_lat,station_type,interval,quiet,partno,vendorid,model,serial,structured,gnss,aid_type,app_id,spare_1,spare_2,spare_3,spare_4,reserved_1,reserved_2,empty_1,empty_2,full_name,vin,length,beam,spare,speed_q,course_q,heading_q,hazard,shiptype,loaded
0,2016/10/26 00:00:00,,,,,,32.0,174.783413,-41.280177,512002959,,,False,,1,0,0.0,0.0,0.0,36.4,170.0,,0.0,False,99487.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,[0],,,,,,,,,,,,,,,,,,
1,2016/10/26 00:00:00,,,,,,28.0,173.69835,-38.729367,512409000,,,False,,1,0,7.0,-127.0,5.3,178.1,180.0,,0.0,False,2255.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,[0],,,,,,,,,,,,,,,,,,
2,2016/10/26 00:00:00,,,,,,59.0,151.261183,-33.869533,503376700,,,False,,3,0,0.0,-127.0,3.9,179.3,163.0,,0.0,False,131072.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,[0],,,,,,,,,,,,,,,,,,
3,2016/10/26 00:00:00,,,,,,0.0,-82.900983,-7.945333,372296000,,,False,,1,0,0.0,0.0,11.7,285.0,288.0,,0.0,False,32843.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,[0],,,,,,,,,,,,,,,,,,
4,2016/10/26 00:00:00,,12.0,18.0,4.0,0.0,,,,538006072,958431000.0,V7JM7,,1.0,5,0,,,,,,0.0,,,,CARAVOS TRIUMPH,70.0,197.0,32.0,8.0,24.0,13.8,DALIAN CN,False,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,[0],,,,,,,,,,,,,,,,,,



3. Statistical analysis:


Unnamed: 0,min_value,max_value,avg_value,non_null_count
0,,,,0



✅ Advanced SQL examples completed!


## SQL with Direct Parquet File Access

You can also query parquet files directly without loading them into memory first:


In [30]:
# Direct parquet file querying (more memory efficient for large files)

if DATA_SOURCE == "local" and os.path.exists(LOCAL_FILE_PATH):
    print("🔍 Direct Parquet File Queries:")
    print("=" * 50)
    
    # Query parquet file directly without loading into memory
    try:
        # Basic query on parquet file
        query = f"SELECT * FROM '{LOCAL_FILE_PATH}' LIMIT 5"
        result = conn.execute(query).fetchdf()
        print("Direct parquet query result:")
        display(result)
        
        # Get schema information
        schema_query = f"DESCRIBE SELECT * FROM '{LOCAL_FILE_PATH}' LIMIT 0"
        schema_result = conn.execute(schema_query).fetchdf()
        print("\nParquet file schema:")
        display(schema_result)
        
    except Exception as e:
        print(f"❌ Error querying parquet file directly: {e}")

elif DATA_SOURCE == "s3":
    print("💡 For S3 files, the data is already loaded into memory.")
    print("   For very large S3 files, consider downloading first or using AWS Athena.")
    
else:
    print("❌ No local file available for direct querying")


🔍 Direct Parquet File Queries:
Direct parquet query result:


Unnamed: 0,base_date_time,year,month,day,hour,minute,second,lon,lat,mmsi,imo,callsign,accuracy,epfd,msg_type,repeat,status,turn,speed,course,heading,ais_version,maneuver,raim,radio,shipname,ship_type,to_bow,to_stern,to_port,to_starboard,draught,destination,dte,seqno,dest_mmsi,retransmit,dac,fid,data,mmsi1,mmsiseq1,mmsi2,mmsiseq2,mmsi3,mmsiseq3,mmsi4,mmsiseq4,alt,assigned,text,cs,display,dsc,band,type1_1,offset1_1,type1_2,offset1_2,type2_1,offset2_1,offset1,increment1,offset2,increment2,msg22,number1,timeout1,number2,timeout2,offset3,number3,timeout3,increment3,offset4,number4,timeout4,increment4,d_type,name,off_position,virtual_aid,name_ext,channel_a,channel_b,txrx,power,dest1,dest2,addressed,band_a,band_b,zonesize,ne_lon,ne_lat,sw_lon,sw_lat,station_type,interval,quiet,partno,vendorid,model,serial,structured,gnss,aid_type,app_id,spare_1,spare_2,spare_3,spare_4,reserved_1,reserved_2,empty_1,empty_2,full_name,vin,length,beam,spare,speed_q,course_q,heading_q,hazard,shiptype,loaded
0,2016/10/26 00:00:00,,,,,,32.0,174.783413,-41.280177,512002959,,,False,,1,0,0.0,0.0,0.0,36.4,170.0,,0.0,False,99487.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,[0],,,,,,,,,,,,,,,,,,
1,2016/10/26 00:00:00,,,,,,28.0,173.69835,-38.729367,512409000,,,False,,1,0,7.0,-127.0,5.3,178.1,180.0,,0.0,False,2255.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,[0],,,,,,,,,,,,,,,,,,
2,2016/10/26 00:00:00,,,,,,59.0,151.261183,-33.869533,503376700,,,False,,3,0,0.0,-127.0,3.9,179.3,163.0,,0.0,False,131072.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,[0],,,,,,,,,,,,,,,,,,
3,2016/10/26 00:00:00,,,,,,0.0,-82.900983,-7.945333,372296000,,,False,,1,0,0.0,0.0,11.7,285.0,288.0,,0.0,False,32843.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,[0],,,,,,,,,,,,,,,,,,
4,2016/10/26 00:00:00,,12.0,18.0,4.0,0.0,,,,538006072,958431000.0,V7JM7,,1.0,5,0,,,,,,0.0,,,,CARAVOS TRIUMPH,70.0,197.0,32.0,8.0,24.0,13.8,DALIAN CN,False,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,[0],,,,,,,,,,,,,,,,,,



Parquet file schema:


Unnamed: 0,column_name,column_type,null,key,default,extra
0,base_date_time,VARCHAR,YES,,,
1,year,DOUBLE,YES,,,
2,month,DOUBLE,YES,,,
3,day,DOUBLE,YES,,,
4,hour,DOUBLE,YES,,,
...,...,...,...,...,...,...
122,course_q,BOOLEAN,YES,,,
123,heading_q,BOOLEAN,YES,,,
124,hazard,DOUBLE,YES,,,
125,shiptype,DOUBLE,YES,,,


## Querying Multiple Parquet Files

Here are different ways to query 2 or more parquet files:


In [33]:
# Method 1: Configuration for Multiple Files
# Update these paths to point to your parquet files

# For local files
LOCAL_FILES = [
    "/path/to/file1.parquet",
    "/path/to/file2.parquet",
    "/path/to/file3.parquet"
]

# For S3 files
S3_FILES = [
    "s3://ais-research-data/parquet/year=2016/month=10/day=26/file1.parquet",
    "s3://ais-research-data/parquet/year=2016/month=10/day=27/file2.parquet"
]

# Example file paths (update these to your actual files)
EXAMPLE_LOCAL_FILES = [
    "output/thread_1/parquet/file1.parquet",
    "output/thread_2/parquet/file2.parquet"
]

print("📁 Multiple File Configuration:")
print(f"Local files: {len(LOCAL_FILES)} files")
print(f"S3 files: {len(S3_FILES)} files")
print(f"Example local files: {EXAMPLE_LOCAL_FILES}")


📁 Multiple File Configuration:
Local files: 3 files
S3 files: 2 files
Example local files: ['output/thread_1/parquet/file1.parquet', 'output/thread_2/parquet/file2.parquet']


### Method 1: Direct File Querying (Recommended for Large Files)


In [34]:
# Method 1: Query multiple parquet files directly (memory efficient)

def query_multiple_parquet_files(file_paths, query, conn):
    """
    Query multiple parquet files using DuckDB's UNION ALL approach.
    
    Args:
        file_paths: List of parquet file paths
        query: SQL query template (use {file_path} placeholder)
        conn: DuckDB connection
    """
    try:
        # Create UNION ALL query for multiple files
        union_queries = []
        for file_path in file_paths:
            if os.path.exists(file_path):
                # Escape single quotes in file path
                escaped_path = file_path.replace("'", "''")
                union_queries.append(f"SELECT * FROM '{escaped_path}'")
            else:
                print(f"⚠️ File not found: {file_path}")
        
        if not union_queries:
            print("❌ No valid files found")
            return None
        
        # Create the full query
        full_query = f"""
        WITH combined_data AS (
            {' UNION ALL '.join(union_queries)}
        )
        {query.replace('FROM parquet_data', 'FROM combined_data')}
        """
        
        print(f"🔍 Executing query on {len(union_queries)} files:")
        print(f"Files: {[os.path.basename(f) for f in file_paths if os.path.exists(f)]}")
        
        result = conn.execute(full_query).fetchdf()
        return result
        
    except Exception as e:
        print(f"❌ Error querying multiple files: {e}")
        return None

# Example usage with local files
if len(EXAMPLE_LOCAL_FILES) > 0:
    print("🔍 Method 1: Direct File Querying")
    print("=" * 50)
    
    # Check which files exist
    existing_files = [f for f in EXAMPLE_LOCAL_FILES if os.path.exists(f)]
    
    if existing_files:
        print(f"Found {len(existing_files)} existing files:")
        for f in existing_files:
            print(f"  ✅ {f}")
        
        # Example query: Count records across all files
        count_query = """
        SELECT 
            COUNT(*) as total_records,
            COUNT(DISTINCT mmsi) as unique_vessels
        FROM parquet_data
        """
        
        result = query_multiple_parquet_files(existing_files, count_query, conn)
        if result is not None:
            print("\n📊 Combined Results:")
            display(result)
    else:
        print("❌ No example files found. Update EXAMPLE_LOCAL_FILES with actual file paths.")
else:
    print("💡 Update EXAMPLE_LOCAL_FILES with your actual parquet file paths to test this method.")


🔍 Method 1: Direct File Querying
❌ No example files found. Update EXAMPLE_LOCAL_FILES with actual file paths.


### Method 2: Load and Combine DataFrames


In [None]:
# Method 2: Load multiple files into memory and combine

def load_multiple_parquet_files(file_paths, max_rows_per_file=None):
    """
    Load multiple parquet files and combine them into a single DataFrame.
    
    Args:
        file_paths: List of parquet file paths
        max_rows_per_file: Maximum rows to load per file (None for all)
    
    Returns:
        Combined pandas DataFrame
    """
    dataframes = []
    
    for i, file_path in enumerate(file_paths):
        try:
            if os.path.exists(file_path):
                print(f"📖 Loading file {i+1}/{len(file_paths)}: {os.path.basename(file_path)}")
                
                # Read parquet file
                df = pd.read_parquet(file_path)
                
                # Limit rows if specified
                if max_rows_per_file and len(df) > max_rows_per_file:
                    df = df.head(max_rows_per_file)
                    print(f"   Limited to {max_rows_per_file} rows")
                
                # Add source file information
                df['source_file'] = os.path.basename(file_path)
                
                dataframes.append(df)
                print(f"   ✅ Loaded {len(df):,} rows")
            else:
                print(f"   ❌ File not found: {file_path}")
                
        except Exception as e:
            print(f"   ❌ Error loading {file_path}: {e}")
    
    if not dataframes:
        print("❌ No files could be loaded")
        return None
    
    # Combine all DataFrames
    print(f"\n🔄 Combining {len(dataframes)} DataFrames...")
    combined_df = pd.concat(dataframes, ignore_index=True)
    print(f"✅ Combined DataFrame: {len(combined_df):,} rows × {len(combined_df.columns)} columns")
    
    return combined_df

# Example usage
print("🔍 Method 2: Load and Combine DataFrames")
print("=" * 50)

# Check for existing files
existing_files = [f for f in EXAMPLE_LOCAL_FILES if os.path.exists(f)]

if existing_files:
    print(f"Loading {len(existing_files)} files...")
    
    # Load files (limit to 1000 rows per file for demo)
    combined_df = load_multiple_parquet_files(existing_files, max_rows_per_file=1000)
    
    if combined_df is not None:
        # Register combined DataFrame for SQL queries
        conn.register('combined_data', combined_df)
        
        # Query the combined data
        print("\n📊 Querying combined data:")
        
        # Count by source file
        source_query = """
        SELECT 
            source_file,
            COUNT(*) as records,
            COUNT(DISTINCT mmsi) as unique_vessels
        FROM combined_data
        GROUP BY source_file
        ORDER BY records DESC
        """
        
        result = conn.execute(source_query).fetchdf()
        display(result)
        
        # Overall statistics
        stats_query = """
        SELECT 
            COUNT(*) as total_records,
            COUNT(DISTINCT mmsi) as total_unique_vessels,
            COUNT(DISTINCT source_file) as number_of_files
        FROM combined_data
        """
        
        stats_result = conn.execute(stats_query).fetchdf()
        print("\n📈 Overall Statistics:")
        display(stats_result)
        
else:
    print("❌ No example files found. Update EXAMPLE_LOCAL_FILES with actual file paths.")
    print("💡 This method loads all data into memory, so use with caution for large files.")


### Method 3: Wildcard Pattern Matching


In [None]:
# Method 3: Use wildcard patterns to query multiple files

def query_parquet_with_wildcard(pattern, query, conn):
    """
    Query multiple parquet files using wildcard patterns.
    
    Args:
        pattern: Wildcard pattern (e.g., "data/*.parquet" or "output/thread_*/parquet/*.parquet")
        query: SQL query template
        conn: DuckDB connection
    
    Returns:
        Query result as DataFrame
    """
    try:
        # Use DuckDB's glob functionality
        full_query = f"""
        {query.replace('FROM parquet_data', f'FROM glob(\'{pattern}\')')}
        """
        
        print(f"🔍 Querying files matching pattern: {pattern}")
        result = conn.execute(full_query).fetchdf()
        return result
        
    except Exception as e:
        print(f"❌ Error querying with wildcard pattern: {e}")
        return None

# Example patterns
patterns = [
    "output/thread_*/parquet/*.parquet",  # All parquet files in thread subdirectories
    "output/*/parquet/*.parquet",         # All parquet files in any subdirectory
    "*.parquet",                          # All parquet files in current directory
    "data/*.parquet"                      # All parquet files in data directory
]

print("🔍 Method 3: Wildcard Pattern Matching")
print("=" * 50)

# Test each pattern
for pattern in patterns:
    print(f"\n📁 Testing pattern: {pattern}")
    
    # Check if any files match the pattern
    try:
        # List files matching the pattern
        list_query = f"SELECT file FROM glob('{pattern}') LIMIT 5"
        files = conn.execute(list_query).fetchdf()
        
        if len(files) > 0:
            print(f"   ✅ Found {len(files)} files (showing first 5):")
            for file in files['file']:
                print(f"      - {file}")
            
            # Example query on matching files
            count_query = """
            SELECT 
                COUNT(*) as total_records,
                COUNT(DISTINCT mmsi) as unique_vessels
            FROM parquet_data
            """
            
            result = query_parquet_with_wildcard(pattern, count_query, conn)
            if result is not None:
                print(f"   📊 Query result:")
                display(result)
        else:
            print(f"   ❌ No files found matching pattern")
            
    except Exception as e:
        print(f"   ❌ Error with pattern {pattern}: {e}")

print("\n💡 Wildcard patterns are very useful for querying files with similar naming patterns!")


### Method 4: S3 Multiple Files


In [None]:
# Method 4: Query multiple S3 parquet files

def query_s3_multiple_files(s3_files, query, s3_client, conn):
    """
    Query multiple S3 parquet files by downloading and combining them.
    
    Args:
        s3_files: List of S3 URLs
        query: SQL query template
        s3_client: boto3 S3 client
        conn: DuckDB connection
    
    Returns:
        Query result as DataFrame
    """
    try:
        # Download and combine S3 files
        dataframes = []
        
        for i, s3_url in enumerate(s3_files):
            try:
                print(f"📥 Downloading S3 file {i+1}/{len(s3_files)}: {s3_url}")
                
                # Parse S3 URL
                bucket, key = parse_s3_url(s3_url)
                
                # Download file content
                obj = s3_client.get_object(Bucket=bucket, Key=key)
                file_content = obj['Body'].read()
                
                # Create DataFrame from parquet content
                file_buffer = io.BytesIO(file_content)
                df = pd.read_parquet(file_buffer)
                
                # Add source information
                df['source_s3_url'] = s3_url
                df['source_file'] = os.path.basename(key)
                
                dataframes.append(df)
                print(f"   ✅ Loaded {len(df):,} rows")
                
            except Exception as e:
                print(f"   ❌ Error downloading {s3_url}: {e}")
        
        if not dataframes:
            print("❌ No S3 files could be downloaded")
            return None
        
        # Combine DataFrames
        print(f"\n🔄 Combining {len(dataframes)} S3 DataFrames...")
        combined_df = pd.concat(dataframes, ignore_index=True)
        
        # Register for SQL queries
        conn.register('s3_combined_data', combined_df)
        
        # Execute query
        final_query = query.replace('FROM parquet_data', 'FROM s3_combined_data')
        result = conn.execute(final_query).fetchdf()
        
        return result
        
    except Exception as e:
        print(f"❌ Error querying S3 multiple files: {e}")
        return None

# Example S3 files (update these with your actual S3 URLs)
example_s3_files = [
    "s3://ais-research-data/parquet/year=2016/month=10/day=26/file1.parquet",
    "s3://ais-research-data/parquet/year=2016/month=10/day=27/file2.parquet"
]

print("🔍 Method 4: S3 Multiple Files")
print("=" * 50)

if DATA_SOURCE == "s3" and 's3_client' in locals():
    print("S3 client available - testing multiple S3 file querying")
    
    # Example query
    s3_query = """
    SELECT 
        source_file,
        COUNT(*) as records,
        COUNT(DISTINCT mmsi) as unique_vessels
    FROM parquet_data
    GROUP BY source_file
    ORDER BY records DESC
    """
    
    # Note: This would download files, so we'll just show the concept
    print("💡 To query multiple S3 files:")
    print("1. Update example_s3_files with your actual S3 URLs")
    print("2. Uncomment the code below to test")
    print("3. Be aware this downloads files to memory")
    
    # Uncomment to test with actual S3 files:
    # result = query_s3_multiple_files(example_s3_files, s3_query, s3_client, conn)
    # if result is not None:
    #     display(result)
    
else:
    print("💡 S3 client not available. This method requires S3 access.")
    print("   Update your configuration to use S3 data source to test this method.")


## Summary: Multiple Parquet File Querying

Here's a comparison of the different methods:

| Method | Best For | Memory Usage | Speed | Use Case |
|--------|----------|--------------|-------|----------|
| **Method 1: Direct File Querying** | Large files | Low | Fast | Production, large datasets |
| **Method 2: Load & Combine** | Small-medium files | High | Medium | Interactive analysis |
| **Method 3: Wildcard Patterns** | Patterned files | Low | Fast | Batch processing |
| **Method 4: S3 Multiple Files** | S3 files | High | Slow | S3-specific workflows |

### **Quick Examples:**

```sql
-- Method 1: Direct querying
WITH combined_data AS (
    SELECT * FROM 'file1.parquet'
    UNION ALL
    SELECT * FROM 'file2.parquet'
)
SELECT COUNT(*) FROM combined_data;

-- Method 3: Wildcard patterns
SELECT COUNT(*) FROM glob('data/*.parquet');

-- Method 2 & 4: After loading into memory
SELECT COUNT(*) FROM combined_data;
```

### **Recommendations:**
- **For large files**: Use Method 1 (Direct) or Method 3 (Wildcard)
- **For interactive analysis**: Use Method 2 (Load & Combine)
- **For S3 workflows**: Use Method 4 (S3 Multiple Files)
- **For production**: Use Method 1 with proper error handling


## Summary

This notebook now provides a complete solution for:
- ✅ Reading parquet files from both S3 and local disk
- ✅ Connecting to S3 with AWS profile or IAM role authentication
- ✅ Displaying comprehensive data information
- ✅ Showing the first N rows of data
- ✅ **SQL querying with DuckDB** - NEW!
- ✅ Direct parquet file querying (memory efficient)
- ✅ Data exploration tools

**SQL Capabilities:**
- 🐤 **DuckDB integration** - Fast SQL engine for analytics
- 📊 **Standard SQL support** - SELECT, WHERE, GROUP BY, ORDER BY, etc.
- 🔍 **Direct parquet querying** - Query files without loading into memory
- 📈 **Advanced analytics** - Aggregations, statistical functions, window functions

**Default region**: `il-central-1` (Israel Central)

**To use with different files:**
1. **For S3 files**: Update `S3_URL` in the Configuration cell
2. **For local files**: Set `DATA_SOURCE = "local"` and update `LOCAL_FILE_PATH` in the Configuration cell
