In [1]:
import pyspark as spark
from pyspark import SparkContext
# initialize a new Spark Context to use for the execution of the script
sc = SparkContext(appName="MY-APP-NAME", master="local[*]")
# prevent useless logging messages
sc.setLogLevel("ERROR")

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, FloatType, IntegerType
from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor, LinearRegression
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator, MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.classification import FMClassifier
import numpy as np
from pyspark.mllib.clustering import KMeans
import matplotlib.pyplot as plt
from pyspark.sql.functions import col
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
import matplotlib.pyplot as plt

spark = SparkSession.builder \
    .master("local") \
    .appName("appName") \
    .getOrCreate()

In [3]:
def filter_out_timestep(data):
    condition = "u_out == 0"
    filtered = data.filter(condition)
    return filtered

In [4]:
#takes a spark dataset containing the variable to regress and time_step
#returns slope and intercept 
def get_linear_parameters(data, name): 
    # take only the correct time_step until u_out = 0
    data = filter_out_timestep(data)
    
    #adjust name of the feature and label
    feature = ['time_step']
    lr_data = data.select(col(name).alias("label"), *feature)

    #prepare the data and the pipeline
    vectorAssembler = VectorAssembler(inputCols=feature, outputCol="unscaled_features")
    standardScaler = StandardScaler(inputCol="unscaled_features", outputCol="features")
    lr = LinearRegression(maxIter=10, regParam=.01)
    stages = [vectorAssembler, standardScaler, lr]
    pipeline = Pipeline(stages=stages)

    #fit the pipeline
    model = pipeline.fit(lr_data)

    prediction = model.transform(lr_data)
    
    #get coefficients
    return[model.stages[2].coefficients[0], model.stages[2].intercept]

In [5]:
#takes a spark dataset containing the variable to regress and time_step
#returns slope and intercept 
def get_logistic_parameters(data, name):
    #adjust name of the feature and label
    feature = ['time_step']
    lr_data = data.select(col(name).alias("label"), *feature)

    #prepare the data and the pipeline
    vectorAssembler = VectorAssembler(inputCols=feature, outputCol="unscaled_features")
    standardScaler = StandardScaler(inputCol="unscaled_features", outputCol="features")
    lr = LogisticRegression(maxIter=10, regParam=.01)
    stages = [vectorAssembler, standardScaler, lr]
    pipeline = Pipeline(stages=stages)

    #fit the pipeline
    model = pipeline.fit(lr_data)

    #get coefficients
    return[model.stages[2].coefficients[0], model.stages[2].intercept]

### Read the dataset

In [6]:
data = spark.read.options(header='true', inferschema='true', delimiter=',').csv("data/train.csv")
data.show(5)

+---+---------+---+---+--------------------+-------------------+-----+------------------+
| id|breath_id|  R|  C|           time_step|               u_in|u_out|          pressure|
+---+---------+---+---+--------------------+-------------------+-----+------------------+
|  1|        1| 20| 50|                 0.0|0.08333400563464438|    0| 5.837491705069121|
|  2|        1| 20| 50|0.033652305603027344| 18.383041472634716|    0|5.9077938505203464|
|  3|        1| 20| 50| 0.06751441955566406| 22.509277769756217|    0| 7.876253923154396|
|  4|        1| 20| 50| 0.10154223442077637| 22.808822256996738|    0|11.742871922971284|
|  5|        1| 20| 50|  0.1357555389404297| 25.355850299494183|    0|12.234986941129785|
+---+---------+---+---+--------------------+-------------------+-----+------------------+
only showing top 5 rows



# Pre-process the dataset

In [7]:
#cast types
df = data.withColumn("pressure",col("pressure").cast("float"))\
    .withColumn("u_out",col("u_out").cast("float"))\
    .withColumn("u_in",col("u_in").cast("float"))\
    .withColumn("time_step",col("time_step").cast("float"))\
    .withColumn("breath_id",col("breath_id").cast("int"))\
    .withColumn("id",col("id").cast("int"))\
    .withColumn("R",col("R").cast("string"))\
    .withColumn("C",col("C").cast("string"))

df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- breath_id: integer (nullable = true)
 |-- R: string (nullable = true)
 |-- C: string (nullable = true)
 |-- time_step: float (nullable = true)
 |-- u_in: float (nullable = true)
 |-- u_out: float (nullable = true)
 |-- pressure: float (nullable = true)



# Deleting negative pressure

In [8]:
print ("n_negative pressure :",df.filter(df['pressure']< 0).count())
print ("n_positive pressure:", df.filter(df['pressure']> 0).count())

n_negative pressure : 3713
n_positive pressure: 6032287


In [9]:
df_pr=df.filter(df['pressure']>0)
df = df_pr
df.show()

+---+---------+---+---+-----------+----------+-----+---------+
| id|breath_id|  R|  C|  time_step|      u_in|u_out| pressure|
+---+---------+---+---+-----------+----------+-----+---------+
|  1|        1| 20| 50|        0.0|0.08333401|  0.0|5.8374915|
|  2|        1| 20| 50|0.033652306| 18.383041|  0.0| 5.907794|
|  3|        1| 20| 50| 0.06751442| 22.509277|  0.0| 7.876254|
|  4|        1| 20| 50|0.101542234| 22.808823|  0.0|11.742872|
|  5|        1| 20| 50| 0.13575554|  25.35585|  0.0|12.234987|
|  6|        1| 20| 50| 0.16969776| 27.259867|  0.0|12.867706|
|  7|        1| 20| 50| 0.20370793| 27.127485|  0.0|14.695562|
|  8|        1| 20| 50| 0.23772264| 26.807732|  0.0|15.890698|
|  9|        1| 20| 50| 0.27177644| 27.864716|  0.0|15.539187|
| 10|        1| 20| 50| 0.30573177| 28.313036|  0.0|15.750094|
| 11|        1| 20| 50| 0.33967495| 26.866758|  0.0|17.296741|
| 12|        1| 20| 50| 0.37368035| 26.762802|  0.0|17.226439|
| 13|        1| 20| 50| 0.40765023| 27.993273|  0.0|16.

In [10]:
print ("n_negative pressure :",df.filter(df['pressure']< 0).count())
print ("n_positive pressure:", df.filter(df['pressure']> 0).count())

n_negative pressure : 0
n_positive pressure: 6032287


In [11]:
#cast types
df = data.withColumn("pressure",col("pressure").cast("float"))\
    .withColumn("u_out",col("u_out").cast("float"))\
    .withColumn("u_in",col("u_in").cast("float"))\
    .withColumn("time_step",col("time_step").cast("float"))\
    .withColumn("breath_id",col("breath_id").cast("int"))\
    .withColumn("id",col("id").cast("int"))\
    .withColumn("R",col("R").cast("string"))\
    .withColumn("C",col("C").cast("string"))

df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- breath_id: integer (nullable = true)
 |-- R: string (nullable = true)
 |-- C: string (nullable = true)
 |-- time_step: float (nullable = true)
 |-- u_in: float (nullable = true)
 |-- u_out: float (nullable = true)
 |-- pressure: float (nullable = true)



# Regressing on u_in, u_out and pressure 

In [12]:
breath_ids = [int(row.breath_id) for row in df.select('breath_id').distinct().collect()]
print(len(breath_ids))

75450


In [13]:
%%time
r, c, m_u_in, q_u_in, m_u_out, q_u_out, m_pressure, q_pressure = ([] for i in range(8))
i = 0
for b_id in breath_ids: 
    condition = "breath_id == " + str(b_id)
    print(f"Breath_id: {b_id}")
    breath = df.filter(condition) #rows regarding a breath 
    #train linear regressor on u_in and pressure 
    u_in_par = get_linear_parameters(breath, 'u_in')
    print(f"u_in regression: coefficient {u_in_par[0]}, intercept {u_in_par[1]}")
    pressure_par = get_linear_parameters(breath, 'pressure')
    print(f"pressure regression: coefficient {pressure_par[0]}, intercept {pressure_par[1]}")
    u_out_par = get_logistic_parameters(breath, 'u_out')
    print(f"u_out regression: coefficient {u_out_par[0]}, intercept {u_out_par[1]}")
    r.append(breath.head()['R'])
    c.append(breath.head()['C'])
    m_u_in.append(float(u_in_par[0]))
    q_u_in.append(float(u_in_par[1]))
    m_u_out.append(float(u_out_par[0]))
    q_u_out.append(float(u_out_par[1]))
    m_pressure.append(float(pressure_par[0]))
    q_pressure.append(float(pressure_par[1]))
    print('---------------------------------------------------------------------------')

Breath_id: 148
u_in regression: coefficient -0.627586555937512, intercept 3.279187855644647
pressure regression: coefficient 1.712044550044049, intercept 4.787830588906654
u_out regression: coefficient 3.5839924595399673, intercept -4.387057482814503
---------------------------------------------------------------------------
Breath_id: 463
u_in regression: coefficient -2.730341695467989, intercept 11.496348651324867
pressure regression: coefficient 5.452311677836793, intercept 14.032851614737082
u_out regression: coefficient 3.5980943797891185, intercept -4.875283862177274
---------------------------------------------------------------------------
Breath_id: 471


KeyboardInterrupt: 

## Rebuilding the dataset 

In [14]:
#create new spark dataframe
features = ['breath_ids', 'R', 'C', 'm_u_in', 'q_u_in', 'm_u_out', 'q_u_out', 'm_pressure', 'q_pressure']
data = zip(breath_ids, r, c, m_u_in, q_u_in, m_u_out, q_u_out, m_pressure, q_pressure)
df_coeff = spark.createDataFrame(data, schema = features)

In [15]:
df_coeff.show()

+----------+---+---+------------------+------------------+------------------+------------------+-----------------+------------------+
|breath_ids|  R|  C|            m_u_in|            q_u_in|           m_u_out|           q_u_out|       m_pressure|        q_pressure|
+----------+---+---+------------------+------------------+------------------+------------------+-----------------+------------------+
|       148| 50| 10|-0.627586555937512| 3.279187855644647|3.5839924595399673|-4.387057482814503|1.712044550044049| 4.787830588906654|
|       463| 50| 10|-2.730341695467989|11.496348651324867|3.5980943797891185|-4.875283862177274|5.452311677836793|14.032851614737082|
+----------+---+---+------------------+------------------+------------------+------------------+-----------------+------------------+

