In [0]:
from pyspark.sql.functions import col, explode, struct, array, lit
from pyspark.sql.types import IntegerType, StringType, StructType, StructField, BinaryType, ArrayType, DoubleType, MapType,LongType, BooleanType
from pyspark.sql.functions import element_at, col,split, explode, posexplode, when, lit, concat, substring


xml_tag_to_extract="SpectrumIdentificationResult"
container="data" 
storage_account= "senjkdtbxloader"

checkpoint_location = f"abfss://{container}@{storage_account}.dfs.core.windows.net/checkpoint/silver/{xml_tag_to_extract.lower()}"

target_table= f"jk_libraries.silver.{xml_tag_to_extract.lower()}"

source_table = f"jk_libraries.bronze.{xml_tag_to_extract.lower()}"


schema = StructType(
                    [StructField('_id', StringType(), True), 
                     StructField('_spectraData_ref', StringType(), True), 
                     StructField('_spectrumID', StringType(), True), 
                     StructField('source_file', StringType(), False), 
                     StructField('file_size', LongType(), False), 
                     StructField('peptide_reference', StringType(), True), 
                     StructField('calculated_mass_to_charge', DoubleType(), True), 
                     StructField('experimental_mass_to_charge', DoubleType(), True), 
                     StructField('charge_state', LongType(), True), 
                     StructField('pass_threshold', BooleanType(), True), 
                     StructField('fragment_mz', DoubleType(), True), 
                     StructField('fragment_name', StringType(), True),
                     StructField('charge', LongType(), True), 
                     StructField('y_ion', BooleanType(), False), 
                     StructField('b_ion', BooleanType(), False), 
                     StructField('immonium_ion', BooleanType(), False), 
                     StructField('neutral_loss_chemical_formula', StringType(), True)
                     ]
                    )




# Create table from dataframe
from pyspark.sql import DataFrame

def create_table(df: DataFrame, table_name:str):
    ddl = ", ".join([f"{field.name} {field.dataType.simpleString()}" for field in df.schema.fields])
    sql = f"CREATE TABLE IF NOT EXISTS {table_name} ({ddl})"
    spark.sql(sql)


def create_table_from_schema(schema: StructType, table_name:str):
    ddl = ", ".join([f"{field.name} {field.dataType.simpleString()}" for field in schema.fields])
    sql = f"CREATE TABLE IF NOT EXISTS {table_name} ({ddl})"
    print(sql)
    spark.sql(sql)


create_table_from_schema(schema, target_table)

In [0]:
from pyspark.sql.functions import explode, transform, explode
from pyspark.sql.functions import expr, element_at, map_from_entries, filter, transform, lit, struct, udf, expr, col, lit, when, size


df= spark\
    .readStream\
    .format("delta")\
    .table(source_table)


## Repeatedly call explode as we know the depth of the nested structure
df_intermeidate=df.withColumn("spectrum_identification_item_exploded", explode("SpectrumIdentificationItem"))\
                   .withColumn("ion_type_exploded", explode("spectrum_identification_item_exploded.Fragmentation.IonType"))\
                   .withColumn("fragment_vector",map_from_entries(transform("ion_type_exploded.fragmentArray", lambda x: struct(x["_measure_ref"], x["_values"]))))\
                    .withColumn("series_index", col("ion_type_exploded._index")).withColumn("charge",col("ion_type_exploded._charge"))

# Extract relevant information yout fragmentation

fragmentarray_expanded=df_intermeidate.withColumn("y_ion", when(size(filter("ion_type_exploded.cvParam", lambda x: x["_accession"] == lit("MS:1001220"))) > 0, True).otherwise(False)).withColumn("b_ion", when(size(filter("ion_type_exploded.cvParam", lambda x: x["_accession"] == lit("MS:1001224"))) > 0, True).otherwise(False)).withColumn("immonium_ion", when(size(filter("ion_type_exploded.cvParam", lambda x: x["_accession"] == lit("MS:1001239"))) > 0, True).otherwise(False)).withColumn("neutral_loss", filter("ion_type_exploded.cvParam", lambda x: x["_accession"] == lit("MS:1000336")))

# extract peptide information

peptideinfo_expanded=fragmentarray_expanded.withColumn("peptide_reference", col("spectrum_identification_item_exploded._peptide_ref")).withColumn("calculated_mass_to_charge", col("spectrum_identification_item_exploded._calculatedMassToCharge")).withColumn("experimental_mass_to_charge", col("spectrum_identification_item_exploded._calculatedMassToCharge")).withColumn("charge_state", col("spectrum_identification_item_exploded._chargeState")).withColumn("pass_threshold", col("spectrum_identification_item_exploded._passThreshold")).withColumn("neutral_loss_chemical_formula",when(size(col("neutral_loss")) > 0, col("neutral_loss").getItem(0)["_value"]).otherwise(lit(None)))


## Convert the fragment_vector (which is string) into an Array to obtain one row per identified fragment ion

df_exploded=peptideinfo_expanded.withColumn("fragment_array",split(element_at(col("fragment_vector"),"Measure_MZ"), " ")).withColumn("index_array",split(col("series_index"), " ")).select("*",posexplode("fragment_array").alias("position","fragment_mz"))


def add_fragment_name(df):

  return df.withColumn("fragment_name",
                when(
                      col("b_ion"),
                      concat(lit("b"),col("index_array")[col("position")],lit("+"),col("charge")))
                .otherwise(
                            when(
                                  col("y_ion"),concat(lit("y"),col("index_array")[col("position")],lit("+"),col("charge")))
                            .otherwise(
                                      concat(lit("["),substring(col("peptide_reference"),col("index_array")[col("position")],1),lit("]+"))
                                      )
                            )
                
                )

df_exploded=add_fragment_name(df_exploded)


final_df=df_exploded.select("_id","_spectraData_ref", "_spectrumID","source_file","file_size","peptide_reference","calculated_mass_to_charge","experimental_mass_to_charge","charge_state","pass_threshold","fragment_mz","fragment_name","charge","y_ion","b_ion","immonium_ion","neutral_loss_chemical_formula")


final_df.writeStream\
    .format("delta")\
    .option("checkpointLocation", checkpoint_location)\
    .option("mode","append")\
    .trigger(availableNow=True)\
    .table(target_table)

In [0]:
final_df