In [1]:
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType,FloatType
import pyspark.sql.functions as f
import pandas as pd
import os
import numpy as np
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import make_column_transformer
from sklearn.feature_extraction.text import CountVectorizer

# Links

* https://www.hackdeploy.com/pyspark-one-hot-encoding-with-countvectorizer/
* https://techinplanet.com/split-lists-in-a-column-into-one-hot-encoded-features-in-pyspark/
* https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.ml.feature.OneHotEncoder.html
* https://towardsdatascience.com/countvectorizer-hashingtf-e66f169e2d4e


In [2]:
class PSTool:
    def __init__(self):
        print('Creating output folder')
        if os.path.exists('output'):
            pass
        else:
            os.makedirs('output')

    def pyspark_session(self, host_location):
        """
        Creates and returns spark session object
        """
        print('Starting session')
        sc = SparkContext(host_location)  # Create spark context
        spark = SparkSession(sc)  # Create session
        return spark, sc

    def file_loader(self, path, delim, spark_obj, schema):
        print('Loading in file')
        data = spark_obj.read.options(delimiter=delim).option("header","False").csv(path, schema=schema)
        
        print('File loaded')
        return data

    def get_questions(self, df):
        pass

if __name__ == "__main__":
    pstool = PSTool()  # Instanciate object
    spk, sc = pstool.pyspark_session('local[16]')  # start session
    # load data
#     path = '/data/dataprocessing/interproscan/all_bacilli.tsv'
    path = 'all_bacilli_subset.tsv'
    schema = StructType([
        StructField("Protein_accession", StringType(), True),
        StructField("Sequence_MD5_digest", StringType(), True),
        StructField("Sequence_length", IntegerType(), True),
        StructField("Analysis", StringType(), True),
        StructField("Signature_accession", StringType(), True),
        StructField("Signature_description", StringType(), True),
        StructField("Start_location", IntegerType(), True),
        StructField("Stop_location", IntegerType(), True),
        StructField("Score", FloatType(), True),
        StructField("Status", StringType(), True),
        StructField("Date", StringType(), True),
        StructField("InterPro_annotations_accession", StringType(), True),
        StructField("InterPro_annotations_description", StringType(), True),
        StructField("GO_annotations", StringType(), True),
        StructField("Pathways_annotations", StringType(), True)])
    
    df = pstool.file_loader(path, '\t', spk, schema)
#     pstool.get_questions(df)
#     print('Closing spark session')
#     spk.sparkContext.stop()
#     df.printSchema()  # Shows column names and some info


Creating output folder
Starting session
Loading in file
File loaded


In [3]:
print('len:', len(df.columns))
print('count:' , df.count())

len: 15
count: 200


In [4]:
df.columns

['Protein_accession',
 'Sequence_MD5_digest',
 'Sequence_length',
 'Analysis',
 'Signature_accession',
 'Signature_description',
 'Start_location',
 'Stop_location',
 'Score',
 'Status',
 'Date',
 'InterPro_annotations_accession',
 'InterPro_annotations_description',
 'GO_annotations',
 'Pathways_annotations']

# Data munging

First, the data has to be munged. 
* Remove entries without InterPRO number
* Note which proteins features are >90% size of the protein and remove them. "ProtID_select"
* Select the smaller features for the above found proteins. Proteins without large feature do not matter.

In [5]:
IPRO_filt = df.filter(df["InterPro_annotations_accession"] != '-').filter(df['Signature_description'] != '-')

In [6]:
print('len:', len(IPRO_filt.columns))
print('count:' , IPRO_filt.count())

len: 15
count: 97


In [7]:
df_sizes = IPRO_filt.withColumn('perc', abs(df.Start_location - df.Stop_location) / df.Sequence_length).sort('perc')
df_sizes

DataFrame[Protein_accession: string, Sequence_MD5_digest: string, Sequence_length: int, Analysis: string, Signature_accession: string, Signature_description: string, Start_location: int, Stop_location: int, Score: float, Status: string, Date: string, InterPro_annotations_accession: string, InterPro_annotations_description: string, GO_annotations: string, Pathways_annotations: string, perc: double]

## Get descriptions into arrays

This will be done to be able to one-hot encode.

In [8]:
df_array = df_sizes.select(split(df_sizes.Signature_description," ").alias("nameAsArray"))
df_array.show(5)

+--------------------+
|         nameAsArray|
+--------------------+
|[Pyruvate, kinase...|
|[Pyruvate, kinase...|
|[Bacterial, senso...|
|[Pyruvate, kinase...|
|[Pyruvate, kinase...|
+--------------------+
only showing top 5 rows



In [9]:
import copy
df_copy = df_array.select('*')

In [10]:
# df_concat = df_copy.join(IPRO_filt.select('Sequence_length', 'Start_location', 'Stop_location', 'Score'))
# df_concat.show()

In [11]:
# Get term frequency vector through HashingTF
from pyspark.ml.feature import HashingTF
ht = HashingTF(inputCol="nameAsArray", outputCol="features")
result = ht.transform(df_copy)
# result.show(truncate=False)

In [12]:
result.show()

+--------------------+--------------------+
|         nameAsArray|            features|
+--------------------+--------------------+
|[Pyruvate, kinase...|(262144,[25702,61...|
|[Pyruvate, kinase...|(262144,[115560,2...|
|[Bacterial, senso...|(262144,[41004,41...|
|[Pyruvate, kinase...|(262144,[115560,2...|
|[Pyruvate, kinase...|(262144,[115560,2...|
|[Pyruvate, kinase...|(262144,[115560,2...|
|[Bacterial, senso...|(262144,[41004,41...|
|[Bacterial, senso...|(262144,[41004,41...|
|[Pyruvate, kinase...|(262144,[115560,2...|
|[Pyruvate, kinase...|(262144,[115560,2...|
|[Bacterial, senso...|(262144,[41004,41...|
|[Pyruvate, kinase...|(262144,[115560,2...|
|[PpiC-type, pepti...|(262144,[7386,618...|
|[Aminotransferase...|(262144,[19690,14...|
|[Transaminase_4ab...|(262144,[99950],[...|
|[PEP-utilising, e...|(262144,[72259,10...|
|[Nudix, box, sign...|(262144,[22370,61...|
|[PK, beta-barrel,...|(262144,[33826,16...|
|       [GAF, domain]|(262144,[78676,10...|
|[Ribosomal, prote...|(262144,[4

In [13]:
test = np.array(IPRO_filt.select(['Sequence_length', 'Start_location', 'Stop_location', 'Score']).collect())

In [14]:
x_3d = np.array(result.select('features').collect())
x_3d.shape

(97, 1, 262144)

In [15]:
len(x_3d[0][0])

262144

In [16]:
reshaped = x_3d.reshape(97, -1)
reshaped.shape

(97, 262144)

In [17]:
reshaped[0]

array([0., 0., 0., ..., 0., 0., 0.])

In [18]:
numeric_data = np.concatenate([test, reshaped], axis=1)

In [32]:
numeric_data.shape

(97, 262148)

In [20]:
# def get_array_from_df(data, column_list):
#     return np.array(data.select(column_list).collect())
    
# labels = get_array_from_df(IPRO_filt, 'InterPro_annotations_accession')

In [29]:
np.savetxt("data_test.txt", numeric_data, fmt='%s')

In [31]:
with open("data_test.txt") as f:
    for line in f:
        print(len(line))

1048608
1048592
1048606
1048611
1048592
1048590
1048606
1048611
1048610
1048610
1048608
1048608
1048608
1048604
1048603
1048607
1048610
1048610
1048608
1048609
1048609
1048591
1048591
1048591
1048591
1048610
1048608
1048609
1048593
1048591
1048605
1048610
1048605
1048590
1048609
1048590
1048591
1048590
1048590
1048590
1048591
1048608
1048610
1048609
1048608
1048606
1048591
1048606
1048609
1048609
1048606
1048590
1048610
1048593
1048610
1048605
1048607
1048590
1048590
1048592
1048608
1048610
1048611
1048607
1048610
1048610
1048610
1048610
1048610
1048611
1048590
1048590
1048604
1048608
1048590
1048590
1048611
1048590
1048607
1048610
1048590
1048590
1048593
1048590
1048590
1048592
1048592
1048592
1048592
1048592
1048592
1048611
1048590
1048610
1048611
1048610
1048610
