In [None]:
import sys
from datetime import datetime
import math

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

from pyspark.sql.types import *

In [None]:
spark = SparkSession\
    .builder\
    .appName("SparkETL")\
    .getOrCreate()

In [None]:
df = spark.read.csv("s3://BUCKET/FOLDER/out2.csv", header=True, inferSchema=True)

In [None]:
df.printSchema()

In [None]:
def getAngDist1(x):
    ra1 = x.SourceRA
    ra2 = x.DestRA
    dec1 = x.SourceDec
    dec2 = x.DestDec
    sin_dec1 = math.sin(math.radians(dec1))
    sin_dec2 = math.sin(math.radians(dec2))
    cos_dec1 = math.cos(math.radians(dec1))
    cos_dec2 = math.cos(math.radians(dec2))
    raDiff = math.radians((ra1 - ra2)*15)
    cos_ra = math.cos(raDiff)
    cosAng = sin_dec1*sin_dec2 + cos_dec1*cos_dec2*cos_ra
    #Note: divide by 0.01745329252 below to convert radians to degrees
    AngDist = math.acos(cosAng)/0.01745329252
    return(x.Source, x.Destination, AngDist, x.AngularDist)

In [None]:
rdd2 = df.rdd.map(lambda x: getAngDist1(x))

In [None]:
rdd2.collect()

In [None]:
schema = StructType([
    StructField('Source',StringType(), True),
    StructField('Destination',StringType(), True),
    StructField('AngDistNew',FloatType(), True),
    StructField('AngDistOld',FloatType(), True)
])

In [None]:
dfFromRDD = rdd2.toDF(schema)

In [None]:
dfFromRDD.printSchema()

In [None]:
print(dfFromRDD.head())

In [None]:
dfFromRDD.coalesce(1).write.option("header", "true").csv("s3://BUCKET/FOLDER/rdd2.csv")