In [10]:
import json

minio_client = get_minio_client()
bucket = "cdm-lake"
prefix = "msd_source/"
objects = minio_client.list_objects(bucket, prefix, recursive=False)

msd_db="modelseed_biochemistry"
spark = get_spark_session()
create_namespace_if_not_exists(spark, msd_db)

delete_keys = ['thermodynamics','notes','linked_compound','is_obsolete', # compounds
               'is_core','is_cofactor','comprised_of','class', # compounds
               'abstract_compound','aliases', # compounds
               'abstract_reaction','code','ec_numbers','pathways', # reactions
               'linked_reaction','stoichiometry','definition','equation', # reactions
               'compound_ids' # reactions
              ]

stoichiometries = dict()

spark_dataframes = dict()
for file_obj in objects:
    
    if('/MSD_' not in file_obj.object_name):
        continue
        
    print(file_obj.object_name)
    
    all_entities = list()

    #compounds or reactions
    msd_type = file_obj.object_name.lower().split('_')[2].split('.')[0]

    spark.sql(f"DROP TABLE IF EXISTS {msd_type}")
    file_resp = minio_client.get_object(bucket,file_obj.object_name)
    biochem_entities = file_resp.json()
    for entity in biochem_entities:

        # UPDATE IDENTIFIERS
        if(msd_type == 'reactions'):
            entity['id'] = 'seed.reaction:'+entity['id']
        if(msd_type == 'compounds'):
            entity['id'] = 'seed.compound:'+entity['id']
            
        for field in delete_keys:

            if(msd_type == 'reactions' and field == 'stoichiometry'):
                stoichiometries[entity['id']]=entity['stoichiometry']
            
            if(field in entity):
                del(entity[field])

        # has to be a list of json strings
        all_entities.append(json.dumps(entity))

    rdd = spark.sparkContext.parallelize(all_entities)
    df = spark.read.json(rdd)
    spark_dataframes[msd_type]=df

rgt_entities=list()
for rxn in stoichiometries:
    for rgt in stoichiometries[rxn]:
        entity = {'reaction_id':rxn,
                  'compound_id':'seed.compound:'+rgt['compound'],
                  'compartment_index':rgt['compartment'],
                  'stoichiometry':rgt['coefficient']}
        rgt_entities.append(json.dumps(entity))
rdd=spark.sparkContext.parallelize(rgt_entities)
df=spark.read.json(rdd)
spark_dataframes['reagents']=df

25/01/09 21:49:34 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Namespace modelseed_biochemistry is ready to use.
msd_source/MSD_Compounds.json


25/01/09 21:49:39 WARN TaskSetManager: Stage 95 contains a task of very large size (9133 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

msd_source/MSD_Reactions.json


25/01/09 21:49:50 WARN TaskSetManager: Stage 96 contains a task of very large size (6647 KiB). The maximum recommended task size is 1000 KiB.
25/01/09 21:49:53 WARN TaskSetManager: Stage 97 contains a task of very large size (16592 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [13]:
spark = get_spark_session()
spark.sql("USE modelseed_biochemistry")
spark.sql("DROP TABLE IF EXISTS compounds")
spark.sql("DROP TABLE IF EXISTS reactions")
spark.sql("DROP TABLE IF EXISTS reagents")

25/01/09 21:52:54 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


DataFrame[]

In [14]:
from pyspark.sql.types import (
	StringType, LongType, DoubleType, BooleanType
)
from pyspark.sql.functions import col

for msd_type in spark_dataframes:
    df=spark_dataframes[msd_type]
    for field, dtype in df.dtypes:
        
        if(dtype == 'string'):
            df = df.withColumn(field, col(field).cast(StringType()))
        elif(dtype == 'double'):
            df = df.withColumn(field, col(field).cast(DoubleType()))
        elif(dtype == 'bigint' and field == 'is_transport'):
            df = df.withColumn(field, col(field).cast(BooleanType()))
        elif(dtype == 'bigint'):
            df = df.withColumn(field, col(field).cast(LongType()))
        else:
            print("Unsupported field?",field,dtype)


In [15]:
for msd_type in spark_dataframes:
    spark_table = f"{msd_db}.{msd_type}"
    delta_file = f"msd_delta/{msd_type}.delta"
    df = spark_dataframes[msd_type]
    df.write.mode("overwrite") \
        .option("overwriteSchema", "true") \
    	.option("compression", "snappy") \
    	.option("path", f"s3a://{bucket}/{delta_file}") \
    	.format("delta") \
    	.saveAsTable(spark_table)
    print(f"Spark table {spark_table} created.")

25/01/09 21:53:09 WARN TaskSetManager: Stage 98 contains a task of very large size (9133 KiB). The maximum recommended task size is 1000 KiB.
25/01/09 21:53:16 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `spark_catalog`.`modelseed_biochemistry`.`compounds` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
chgrp: changing ownership of 'file:///cdm_shared_workspace/hive_metastore/modelseed_biochemistry.db/compounds-__PLACEHOLDER__': chown: changing group of '/cdm_shared_workspace/hive_metastore/modelseed_biochemistry.db/compounds-__PLACEHOLDER__': Operation not permitted


Spark table modelseed_biochemistry.compounds created.


25/01/09 21:53:17 WARN TaskSetManager: Stage 102 contains a task of very large size (6647 KiB). The maximum recommended task size is 1000 KiB.
25/01/09 21:53:20 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `spark_catalog`.`modelseed_biochemistry`.`reactions` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
chgrp: changing ownership of 'file:///cdm_shared_workspace/hive_metastore/modelseed_biochemistry.db/reactions-__PLACEHOLDER__': chown: changing group of '/cdm_shared_workspace/hive_metastore/modelseed_biochemistry.db/reactions-__PLACEHOLDER__': Operation not permitted


Spark table modelseed_biochemistry.reactions created.


25/01/09 21:53:21 WARN TaskSetManager: Stage 106 contains a task of very large size (16592 KiB). The maximum recommended task size is 1000 KiB.
25/01/09 21:53:24 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `spark_catalog`.`modelseed_biochemistry`.`reagents` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
chgrp: changing ownership of 'file:///cdm_shared_workspace/hive_metastore/modelseed_biochemistry.db/reagents-__PLACEHOLDER__': chown: changing group of '/cdm_shared_workspace/hive_metastore/modelseed_biochemistry.db/reagents-__PLACEHOLDER__': Operation not permitted


Spark table modelseed_biochemistry.reagents created.


In [16]:
spark.sql("SELECT COUNT(*) AS Compound_Count from compounds").show()
spark.sql("SELECT COUNT(*) AS Reaction_Count from reactions").show()
spark.sql("SELECT COUNT(*) AS Reagent_Count from reagents").show()

                                                                                

+--------------+
|Compound_Count|
+--------------+
|         45706|
+--------------+



                                                                                

+--------------+
|Reaction_Count|
+--------------+
|         56009|
+--------------+





+-------------+
|Reagent_Count|
+-------------+
|       262506|
+-------------+



                                                                                

In [17]:
spark.sql("SELECT * from compounds ORDER BY id LIMIT 10").show()
spark.sql("SELECT * from reactions ORDER BY id LIMIT 10").show()
spark.sql("SELECT * from reagents ORDER BY reaction_id,compound_id LIMIT 10").show()

                                                                                

+------------+------+-------+---------+--------------+--------------------+--------------------+-----+---------+--------------------+--------------------+--------------------+----------------+
|abbreviation|charge| deltag|deltagerr|       formula|                  id|            inchikey| mass|     name|                 pka|                 pkb|              smiles|          source|
+------------+------+-------+---------+--------------+--------------------+--------------------+-----+---------+--------------------+--------------------+--------------------+----------------+
|         h2o|     0| -37.54|     0.18|           H2O|seed.compound:cpd...|XLYOFNOQVPJJNP-UH...| 18.0|      H2O|           1:1:15.70|           1:1:-1.80|                   O|Primary Database|
|         atp|    -3|-548.85|     0.36| C10H13N5O13P3|seed.compound:cpd...|ZKHQWZAMYRWXGA-KQ...|504.0|      ATP|1:14:12.60;1:22:3...|1:9:-3.03;1:14:-3...|Nc1ncnc2c1ncn2[C@...|Primary Database|
|         nad|    -1|-286.41|     1

                                                                                

+------------+------+---------+---------+--------------------+------------+--------------------+-------------+----------------+------+
|abbreviation|deltag|deltagerr|direction|                  id|is_transport|                name|reversibility|          source|status|
+------------+------+---------+---------+--------------------+------------+--------------------+-------------+----------------+------+
|      R00004| -3.46|     0.05|     NULL|seed.reaction:rxn...|           0|diphosphate phosp...|            >|Primary Database|    OK|
|      R00005|-20.14|     1.86|     NULL|seed.reaction:rxn...|           0|urea-1-carboxylat...|            >|Primary Database|    OK|
|      R00006|  8.27|      0.9|     NULL|seed.reaction:rxn...|           0|pyruvate:pyruvate...|            <|Primary Database|    OK|
|      R00008|  4.49|     0.57|     NULL|seed.reaction:rxn...|           0|4-hydroxy-4-methy...|            =|Primary Database|    OK|
|      R00009|-46.06|     1.64|     NULL|seed.reaction:

                                                                                

+-----------------+--------------------+--------------------+-------------+
|compartment_index|         compound_id|         reaction_id|stoichiometry|
+-----------------+--------------------+--------------------+-------------+
|                0|seed.compound:cpd...|seed.reaction:rxn...|         -1.0|
|                0|seed.compound:cpd...|seed.reaction:rxn...|          2.0|
|                0|seed.compound:cpd...|seed.reaction:rxn...|         -1.0|
|                0|seed.compound:cpd...|seed.reaction:rxn...|          1.0|
|                0|seed.compound:cpd...|seed.reaction:rxn...|         -1.0|
|                0|seed.compound:cpd...|seed.reaction:rxn...|          2.0|
|                0|seed.compound:cpd...|seed.reaction:rxn...|          2.0|
|                0|seed.compound:cpd...|seed.reaction:rxn...|         -3.0|
|                0|seed.compound:cpd...|seed.reaction:rxn...|         -1.0|
|                0|seed.compound:cpd...|seed.reaction:rxn...|         -1.0|
+-----------

In [18]:
spark.sql('SELECT rxn.id,rxn.name,cpd.id,cpd.name,rgt.compartment_index,rgt.stoichiometry \
        FROM compounds AS cpd, reagents AS rgt, reactions AS rxn \
        WHERE LOWER(cpd.name) LIKE "%quinon%" \
        AND rgt.compound_id = cpd.id \
        AND rgt.reaction_id = rxn.id').show()

                                                                                

+--------------------+--------------------+--------------------+--------------------+-----------------+-------------+
|                  id|                name|                  id|                name|compartment_index|stoichiometry|
+--------------------+--------------------+--------------------+--------------------+-----------------+-------------+
|seed.reaction:rxn...|            RXN-2543|seed.compound:cpd...|2,3-dimethyl-6-ph...|                0|         -1.0|
|seed.reaction:rxn...|                   -|seed.compound:cpd...|2-Octaprenyl-3-me...|                0|         -1.0|
|seed.reaction:rxn...|                   -|seed.compound:cpd...|2-Octaprenyl-3-me...|                1|          1.0|
|seed.reaction:rxn...|            RXN-2542|seed.compound:cpd...|2-methyl-6-phytyl...|                0|         -1.0|
|seed.reaction:rxn...|            RXN-2542|seed.compound:cpd...|2,3-dimethyl-6-ph...|                0|          1.0|
|seed.reaction:rxn...|alcohol dehydroge...|seed.compound

In [19]:
spark.sql('SELECT COUNT(DISTINCT(rxn.id)) AS Reaction_Count \
        FROM compounds AS cpd, reagents AS rgt, reactions AS rxn \
        WHERE LOWER(cpd.name) LIKE "%quinon%" \
        AND rgt.compound_id = cpd.id \
        AND rgt.reaction_id = rxn.id').show()

[Stage 172:>                                                        (0 + 2) / 2]

+--------------+
|Reaction_Count|
+--------------+
|          1317|
+--------------+



                                                                                