In [1]:
from pyspark.sql.functions import *
import matplotlib.pyplot as plt
from pyspark.sql import *
import numpy as np


# working with continuous features
from pyspark.ml.feature import Bucketizer
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier

In [2]:
def get_data_file(ss,file_path):
    reader=ss.read
    reader.option("header",True)
    reader.option("inferSchema",True)
    reader.option("sep","\t")
    reader.option("comment","#")
    df = reader.csv(file_path)
    return df

In [3]:
def pos_plx(df):
    # get rid of second row (string values)
    df1 = df.filter(df.Plx!="mas").select("Plx", "e_Plx","Gmag","GLON","GLAT","Teff")
    # cast Plx, e_Plx, Gmag, Teff columns to floats
    df2=df1.select(df1.Plx.cast("Float"), df1.e_Plx.cast("Float"),df1.Gmag.cast("Float"),df1.GLON,df1.GLAT,df1.Teff.cast("Float"))
    # keep only positive parallax values
    positive_plxs = df2.filter(df2.Plx>0)
    return positive_plxs


In [4]:
def dist_plx_filt(df, percnt):
    # keep top 10% parallax measurements
    plx_10perc=df.filter(col("e_Plx")/col("Plx") <percnt)
    # now find distance in parsecs by doing 1/(Plx *10^-3)
    dist_top10perc = plx_10perc.withColumn("Dist", 1/(plx_10perc.Plx*10**-3))
    return dist_top10perc

In [5]:
def filter_Teffs(df,Teff_cntCutoff):
    # all Teff values
    Teff_all = df.select("Teff").rdd.flatMap(lambda x: x).collect()
    
    # first group df by Teff and get count for each Teff
    Teff_grps=df.groupBy("Teff").agg(count("Teff").alias("Teff_count"))
    
    Teff_common=Teff_grps.filter(Teff_grps.Teff_count >Teff_cntCutoff)
    # make list of above common Teff values 
    Teff_comList= Teff_common.select("Teff").rdd.flatMap(lambda x: x).collect()
    
    # make dataframe of only this common Teff values
    Teff_com_df=df.where(df.Teff.isin(Teff_comList))
    # drop duplicates and only keep one
    Teff_no_dups=Teff_com_df.drop_duplicates(["Teff"])
    
    # making dataframe of not common Teff values 
    Teff_not_comList= list(set(Teff_all)-set(Teff_comList))
    Teff_not_comList_df = df.where(df.Teff.isin(Teff_not_comList))
    
    
    # putting them together
    Teff_final_df = Teff_not_comList_df.union(Teff_no_dups)
    return Teff_final_df

In [6]:
def absmag_dist_in(df):
    # 8000 minus Distance column to give distance from center of galaxy
    # because the Dist column is for distance from us to galaxy center
    
    dist_cent=df.withColumn("Dist_from_center",8000-df.Dist)
    
    # use the Dist column to find absilute magnitude of star as seen from earth
    # M = m - 5*logBase10(d) -5
    df_AbsGmag=dist_cent.withColumn("Gmag_absolute", dist_cent.Gmag-5*log(10.0, dist_cent.Dist) -5) 
    
    return df_AbsGmag

In [7]:
def dist_buckets_labels(df,first_bin,last_bin,bin_width):
    # bucket bins
    bucketBoders=list(np.arange(first_bin,last_bin,bin_width))
    # how to set splits for buckets
    bucketer = Bucketizer().setSplits(bucketBoders).setInputCol("Dist_from_center").setOutputCol("bucketed_distances")
    # distance buckets made using the df column "Dist_from_center"
    dist_buckets=bucketer.transform(df.select("Dist_from_center"))
    
    # add this colum of bucketized dataframe to original frame
    df_finally=df.join(dist_buckets, df.Dist_from_center == dist_buckets.Dist_from_center,how='left') 

    return df_finally

In [8]:
def feature_columns(df):
    # cast to type "Double"
    new_df=df.select(df.Gmag_absolute.cast("Double"), "Dist",df.Teff.cast("Double"))
    cols = new_df.select("Gmag_absolute","Teff").columns

    return cols

In [9]:
def pipeline(df,cols):
    stages = [] # stages in our Pipeline
    # Convert label into label indices using the StringIndexer
    label_stringIdx = StringIndexer(inputCol="bucketed_distances", outputCol="label")
    stages += [label_stringIdx]
    numericCols = ["Gmag_absolute","Teff"]
    assemblerInputs =  numericCols
    assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
    stages += [assembler]
    
    partialPipeline = Pipeline().setStages(stages)
    pipelineModel=partialPipeline.fit(df.select(df.Gmag_absolute,df.Teff, df.bucketed_distances))
    preppedDataDF = pipelineModel.transform(df.select(df.Gmag_absolute,df.Teff, df.bucketed_distances))
    
    # Keep relevant columns
    selectedcols = ["label", "features"] + cols
    dataset = preppedDataDF.select(selectedcols)
    
    dataset = preppedDataDF.select(selectedcols)
    return dataset

In [10]:
def decision_tree_model(trainData):
    # Create initial Decision Tree Model
    dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=3)

    # Train model with Training Data
    dtModel = dt.fit(trainData)
    return dtModel


In [11]:
def prediction(testData,dtModel):
    # Make predictions on test data using the Transformer.transform() method.
    predictions = dtModel.transform(testData)
    
    # View model's predictions and probabilities of each prediction class
    selected = predictions.select("label", "prediction", "probability", "Gmag_absolute", "Teff")

    return selected

In [12]:
def main():
    tsv_inGalx = "asu_withGmag.tsv" 
    
    spark = SparkSession.builder.appName("cs696projgaia").getOrCreate()
    
    df_in=get_data_file(spark,tsv_inGalx)
    
    pos_plxin = pos_plx(df_in)
    
    topdists_in=dist_plx_filt(pos_plxin, 0.1)
    
    Teffs_filt_in= filter_Teffs(topdists_in,5)
    
    
    df_final_in=absmag_dist_in(Teffs_filt_in)
    
    df_final_in.repartition(1).write.format('csv').save('df_i')
    
    
    buckets_dists=dist_buckets_labels(df_final_in,0,8200,200)
    
    
    columns =feature_columns(buckets_dists)
    
    dataset_prep=pipeline(buckets_dists, columns)
    
    ### Randomly split data into training and test sets. set seed for reproducibility
    (trainingData, testData) = dataset_prep.randomSplit([0.7, 0.3], seed=100)
    
    dtmodel=decision_tree_model(trainingData)
    
    df_predic= prediction(testData,dtmodel)
    ## due to format of data frame we could output it to csv file
    ## we tried multiple things
    ## however we are able to it with smaller data set in jupyter notebook


    spark.stop()
#----------------------------------------------------------------
main()
