This demo notebook shows how to transform data from different file types(.ms, .catalog and StationXML) into parquets

# Initialisation

Installing dependencies

!pip install -r requirements.txt

Importing Libraries

In [79]:
import os
from obspy import read
from obspy import read_inventory
import boto3
from botocore import UNSIGNED
from botocore.config import Config
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split
from pyspark.sql.types import StructType, StructField, StringType
from obspy import read
import concurrent.futures
from pyspark.sql.functions import col

Configuring access to s3 bucket

In [2]:
s3=boto3.resource('s3',config=Config(signature_version=UNSIGNED))
BUCKET_NAME = 'scedc-pds'

Following helper function will list all files in specified folder in s3 bucket

In [78]:
def list_s3_directories(bucket_name, folder_name, max_directories):
    # max_directories is the maximum number of files we want to list
    bucket = s3.Bucket(bucket_name)

    directories = []
    prefix = folder_name if folder_name.endswith('/') else folder_name + '/'
    for obj in bucket.objects.filter(Prefix=prefix, Delimiter='/'):
        directory = obj.key
        # print(obj.key)
        directories.append(directory)

        if len(directories) >= max_directories:
            return directories[:max_directories]
        
    return directories

Following helper function will download all files from specified folder of s3 bucket

In [4]:
def download_files(bucket_name, directory_list):
    for obj in directory_list:
        folder_path = '/'.join(obj.split('/')[:-1])
        if not os.path.exists(folder_path):
            os.makedirs(folder_path)
        s3.Bucket(bucket_name).download_file(obj,obj)

# Processing Continous Waveforms (.ms files)

We list 10 files from an arbitrary folder

In [5]:
continous_waveform = list_s3_directories(bucket_name = 'scedc-pds', folder_name='continuous_waveforms/2020/2020_300', max_directories=10)

# Printing the directory list
print('first five directories: ')
for directory_name in continous_waveform[0:5]:
    print(directory_name)

first five directories: 
continuous_waveforms/2020/2020_300/AZBZN__BHE___2020300.ms
continuous_waveforms/2020/2020_300/AZBZN__BHN___2020300.ms
continuous_waveforms/2020/2020_300/AZBZN__BHZ___2020300.ms
continuous_waveforms/2020/2020_300/AZBZN__HHE___2020300.ms
continuous_waveforms/2020/2020_300/AZBZN__HHN___2020300.ms


We download these 10 files

In [6]:
download_files(BUCKET_NAME, continous_waveform)

In the following code, we convert the .ms files and save them as parquet file. Schemas are yet to be properly configured due to which some data will not appear in final parquet

In [15]:
# Define the schema for the Spark DataFrame
schema = StructType([
    StructField("station", StringType(), nullable=False),
    StructField("starttime", StringType(), nullable=False),
    StructField("sampling_rate", StringType(), nullable=False),
])

# Function to process a single MS file
def process_ms_file(ms_file, folder_path):
    if not os.path.exists(folder_path):
            os.makedirs(folder_path)
    # Initialize SparkSession
    spark = SparkSession.builder \
        .appName("MS to Parquet Conversion") \
        .getOrCreate()

    # Read the MS file using ObsPy
    stream = read(ms_file)

    # Extract data from ObsPy Stream
    data = [(tr.stats.station, str(tr.stats.starttime), str(tr.stats.sampling_rate)) for tr in stream]

    # Create Spark DataFrame with the defined schema
    spark_df = spark.createDataFrame(data, schema)

    # Write the Spark DataFrame to Parquet format
    output_path = ms_file.replace(".ms", ".parquet")
    output_path = folder_path + output_path
    spark_df.write.parquet(output_path)

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("MS to Parquet Conversion") \
    .getOrCreate()

# List of MS file paths
ms_files = continous_waveform

# Create a thread pool executor
executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)

# Process each MS file in parallel
results = []
for ms_file in ms_files:
    future = executor.submit(process_ms_file, ms_file, 'pyspark/')
    results.append(future)

# Wait for all tasks to complete
concurrent.futures.wait(results)

# Shutdown the thread pool executor
executor.shutdown()

We load all the converted parquet files and combine them to a single parquet file

In [16]:
# Directory containing the Parquet files
directory = "pyspark/continuous_waveforms/2020/2020_300"

# Read all Parquet files in the directory into separate DataFrames
parquet_files = [file for file in os.listdir(directory) if file.endswith(".parquet")]
dataframes = [spark.read.parquet(os.path.join(directory, file)) for file in parquet_files]

# Perform a union of the DataFrames
combined_df = dataframes[0]  # Take the first DataFrame as the starting point
for df in dataframes[1:]:
    combined_df = combined_df.union(df)

# Write the combined DataFrame to a single Parquet file
combined_df.write.parquet("output/combined.parquet")

                                                                                

Visualising the final parquet.

In [18]:
# Read the Parquet file into a Spark DataFrame
parquet_df = spark.read.parquet("output/combined.parquet")

# Display the contents of the Spark DataFrame
parquet_df.show(truncate=False)

+-------+---------------------------+-------------+
|station|starttime                  |sampling_rate|
+-------+---------------------------+-------------+
|BZN    |2020-10-26T02:49:58.788400Z|100.0        |
|BZN    |2020-10-26T02:49:58.008400Z|100.0        |
|BZN    |2020-10-26T00:00:00.008300Z|100.0        |
|BZN    |2020-10-26T00:00:00.008300Z|100.0        |
|BZN    |2020-10-26T00:00:00.008300Z|100.0        |
|BZN    |2020-10-26T02:49:58.008400Z|100.0        |
|BZN    |2020-10-26T00:00:00.008300Z|100.0        |
|BZN    |2020-10-26T00:00:00.008300Z|100.0        |
|BZN    |2020-10-26T00:00:00.008300Z|100.0        |
|BZN    |2020-10-26T00:00:00.019500Z|40.0         |
|BZN    |2020-10-26T00:00:00.019500Z|40.0         |
|BZN    |2020-10-26T00:00:00.019500Z|40.0         |
|BZN    |2020-10-26T00:00:00.069500Z|1.0          |
+-------+---------------------------+-------------+



# Processing Earthquake .Catalog files

We list 10 files from an arbitrary folder

In [34]:
earthquake_catalog = list_s3_directories(bucket_name = 'scedc-pds', folder_name='earthquake_catalogs/SCEC_DC', max_directories=10)

# Printing the directory list
print('first five directories: ')
for directory_name in earthquake_catalog[0:5]:
    print(directory_name)

first five directories: 
earthquake_catalogs/SCEC_DC/1932.catalog
earthquake_catalogs/SCEC_DC/1933.catalog
earthquake_catalogs/SCEC_DC/1934.catalog
earthquake_catalogs/SCEC_DC/1935.catalog
earthquake_catalogs/SCEC_DC/1936.catalog


We download these files

In [35]:
download_files(BUCKET_NAME, earthquake_catalog)

In the following code, we convert the .catalog files and save them as parquet file.

In [62]:
# List of file paths
file_paths = earthquake_catalog


output_dir = 'earthquake_catalogs_parquet'

# Function to process a catalog file
def process_catalog_file(file_path):
    catalog_file = spark.read.text(file_path)
    
    # Filter out the header and the rows until the line with "###"
    filtered_catalog_file = catalog_file.filter(~catalog_file['value'].startswith("#")) \
        .filter(~catalog_file['value'].startswith("###"))

    # Split the lines of the filtered catalog file into columns using regular expressions
    catalog_df = filtered_catalog_file.select(split(filtered_catalog_file['value'], '\\s+').alias('columns'))

    # Extract columns from the columns array
    catalog_df = catalog_df.select(
        col("columns").getItem(0).alias("Date"),
        col("columns").getItem(1).alias("Time"),
        col("columns").getItem(2).alias("ET"),
        col("columns").getItem(3).alias("GT"),
        col("columns").getItem(4).alias("MAG"),
        col("columns").getItem(5).alias("M"),
        col("columns").getItem(6).alias("LAT"),
        col("columns").getItem(7).alias("LON"),
        col("columns").getItem(8).alias("DEPTH"),
        col("columns").getItem(9).alias("Q"),
        col("columns").getItem(10).alias("EVID"),
        col("columns").getItem(11).alias("NPH"),
        col("columns").getItem(12).alias("NGRM")
    )
    # Save the DataFrame as Parquet
    catalog_df.write.parquet(output_dir + '/' + file_path.split('/')[-1].split('.')[0] + '.parquet')

    return catalog_df

# Create a ThreadPoolExecutor
executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)

# Process catalog files in parallel
catalog_dfs = list(executor.map(process_catalog_file, file_paths))

# Wait for all tasks to complete
concurrent.futures.wait(results)

# Shutdown the thread pool executor
executor.shutdown()

We combine all these converted parquets into single parquet

In [64]:
# Directory containing the Parquet files
directory = "earthquake_catalogs_parquet"

# Read all Parquet files in the directory into separate DataFrames
parquet_files = [file for file in os.listdir(directory) if file.endswith(".parquet")]
dataframes = [spark.read.parquet(os.path.join(directory, file)) for file in parquet_files]

# Perform a union of the DataFrames
combined_df = dataframes[0]  # Take the first DataFrame as the starting point
for df in dataframes[1:]:
    combined_df = combined_df.union(df)

# Write the combined DataFrame to a single Parquet file
combined_df.write.parquet("output/combined_earthquake_catalogs.parquet")

Visualising that parquet

In [66]:
# Read the Parquet file into a Spark DataFrame
parquet_df = spark.read.parquet("output/combined_earthquake_catalogs.parquet")

# Display the contents of the Spark DataFrame
parquet_df.show(truncate=False)

+----------+-----------+---+---+----+---+------+--------+-----+---+-------+---+----+
|Date      |Time       |ET |GT |MAG |M  |LAT   |LON     |DEPTH|Q  |EVID   |NPH|NGRM|
+----------+-----------+---+---+----+---+------+--------+-----+---+-------+---+----+
|1933/01/01|13:31:30.27|eq |l  |2.26|l  |33.820|-117.985|6.0  |D  |3359497|6  |0   |
|1933/01/01|22:45:24.12|eq |l  |2.09|l  |33.831|-118.145|6.0  |C  |3359499|5  |0   |
|1933/01/03|04:45:39.15|eq |l  |2.34|h  |33.604|-116.768|6.0  |D  |3359503|4  |0   |
|1933/01/03|12:39:12.21|eq |l  |2.12|h  |33.912|-116.960|6.0  |D  |3359504|4  |0   |
|1933/01/03|22:40:26.95|eq |l  |1.38|h  |34.404|-118.371|6.0  |D  |3361946|5  |0   |
|1933/01/04|06:36:14.60|eq |l  |2.39|l  |33.352|-116.832|6.0  |D  |3359508|6  |0   |
|1933/01/05|09:31:17.44|eq |l  |2.58|l  |33.895|-118.994|10.0 |D  |3359512|6  |0   |
|1933/01/05|23:29:19.48|qb |l  |1.55|h  |34.260|-117.830|0.0  |D  |3359516|5  |0   |
|1933/01/07|05:56:44.00|eq |l  |2.32|h  |33.805|-119.714|6.0  |C 

# Processing FDSN StationXML files

We download an arbitarty StationXML file

In [68]:
KEY="FDSNstationXML/CI/CI_GSC.xml"
s3.Bucket(BUCKET_NAME).download_file(KEY,'CI_GSC.xml')

We visualise content of the file

In [69]:
stationxml_file_path = 'CI_GSC.xml'
inventory = read_inventory(stationxml_file_path)

In [70]:
inventory

Inventory created at 2022-08-11T16:46:02.326506Z
	Sending institution: ANSS Station Information System (ANSS Station Information System)
	Contains:
		Networks (1):
			CI
		Stations (1):
			CI.GSC (Goldstone)
		Channels (485):
			CI.GSC..ABT, CI.GSC..ACE (7x), CI.GSC..ACA, CI.GSC..ACF, 
			CI.GSC..ACK, CI.GSC..ACQ, CI.GSC..ACS, CI.GSC..ADG, CI.GSC..ADL, 
			CI.GSC..ADT, CI.GSC..ALL, CI.GSC..AMD, CI.GSC..ANI, CI.GSC..APK, 
			CI.GSC..APO, CI.GSC..ARD, CI.GSC..ASL, CI.GSC..ASQ, CI.GSC..ATH, 
			CI.GSC..AWR, CI.GSC..BCI (3x), CI.GSC..BHZ (15x), CI.GSC..BHN (15x)
			CI.GSC..BHE (16x), CI.GSC..EHZ (2x), CI.GSC..ELZ, 
			CI.GSC..HCI (3x), CI.GSC..HDI, CI.GSC..HDO, CI.GSC..HHZ (14x), 
			CI.GSC..HHN (14x), CI.GSC..HHE (15x), CI.GSC..HLZ (8x), 
			CI.GSC..HLN (8x), CI.GSC..HLE (8x), CI.GSC..HNZ (10x), 
			CI.GSC..HNN (10x), CI.GSC..HNE (10x), CI.GSC..LAT, CI.GSC..LCE (7x)
			CI.GSC..LCL (7x), CI.GSC..LCQ (7x), CI.GSC..LDZ (3x), 
			CI.GSC..LDN (3x), CI.GSC..LDE (3x), CI.GSC..LDI, CI.GSC..LDO (6

This file mostly contains metadata. We take tabular data from it and make a parquet

In [76]:
stations = inventory.networks[0].stations
# Access channel information for the station
channels = stations[0].channels

# Define the schema for the DataFrame
schema = StructType([
    StructField("Channel", StringType(), nullable=False),
    StructField("StartTime", StringType(), nullable=False),
    StructField("EndTime", StringType(), nullable=True),
    StructField("ChannelTypes", StringType(), nullable=False),
    StructField("SensorDescription", StringType(), nullable=True)
])

# Define a function to extract the relevant information from the channel data
def extract_channel_info(channel):
    channel_data = {
        "Channel": channel.code,
        "StartTime": channel.start_date,
        "EndTime": channel.end_date,
        "ChannelTypes": ", ".join(channel.types),
        "SensorDescription": channel.sensor.description if channel.sensor else None
    }
    return channel_data

# Create a list of dictionaries representing the channel data
channels_data = [extract_channel_info(channel) for channel in channels]

# Create a DataFrame from the list of dictionaries
df = spark.createDataFrame(channels_data, schema)

# Show the DataFrame
df.write.parquet("output/FDSNXML.parquet")

In [77]:
# Read the Parquet file into a Spark DataFrame
parquet_df = spark.read.parquet("output/FDSNXML.parquet")

# Display the contents of the Spark DataFrame
parquet_df.show(truncate=False)

+-------+----------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------+------------------+------------------+
|Channel|StartTime                                                                                                                         |EndTime                                                                                                                           |ChannelTypes      |SensorDescription |
+-------+----------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------+------------------+------------------+
|ABT    |{_UTCDateTime__precision=6, _UTCDateTime__ns=1660154400000000