In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import Row, SparkSession, SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer, VectorAssembler

from pyspark.sql.functions import isnan, when, count, col

from datetime import datetime
import dateutil

In [None]:
ss = SparkSession.builder.getOrCreate()

## Read in Argo data as Spark DataFrame

In [None]:
argo_df = ss.read.csv("s3://msds-argo-clustering/argo_data.csv", header=True, inferSchema=True)

# Do it this way, because all nulls if define schema ahead of item

In [None]:
# Cast temp as DoubleType()
argo_df = argo_df.withColumn("tempTmp", argo_df['temp'].cast(DoubleType()))\
                 .drop("temp")\
                 .withColumnRenamed("tempTmp", "temp")\
                 .select("profile_id", "pres", "temp", "lat", "lon", "psal", "date")

## Filter DataFrame by conditions to keep records

In [None]:
argo_filterby = argo_df.groupBy("profile_id") \
                       .agg(min("pres").alias("min_pres"), 
                            max("pres").alias("max_pres"), 
                            count("profile_id").alias("count_profile_id"))

In [None]:
# Now, here are the profile_ids we want to keep, to be inner joined with original argo_df
argo_keep_ids = argo_filterby.filter("count_profile_id >= 50 and min_pres <= 25 and max_pres >= 999") \
                             .select("profile_id")

In [None]:
# Inner join the profile_ids to keep with original argo_df to filter and keep only desired IDs
argo_df_keep = argo_keep_ids.join(argo_df, "profile_id", "inner").persist()