In [1]:
import pyspark
from pyspark import SQLContext
from pyspark import SparkContext
import time
from pyspark.sql import Row
from pyspark.sql.functions import lag, col, toRadians, sqrt, sin, cos, asin, lit, unix_timestamp, from_unixtime, datediff,sum
from pyspark.sql.functions import isnan, when, count, col, expr
from pyspark.sql.window import Window
from pyspark import SparkConf
from pyspark.sql.functions import countDistinct
import numpy as np

In [2]:
#sc = pyspark.SparkContext.getOrCreate()
#https://datascience.stackexchange.com/questions/8549/how-do-i-set-get-heap-size-for-spark-via-python-notebook

conf = SparkConf().setAppName("App")
conf = (conf.setMaster('local[*]')
        .set('spark.executor.memory', '4G')
        .set('spark.driver.memory', '45G')
        .set('spark.driver.maxResultSize', '10G'))
sc = SparkContext(conf=conf)
sc

In [3]:
sqlContext = SQLContext(sc)

In [4]:
starttime = time.time()

df_Broadcast = sqlContext.read.parquet("ais/2011/01/Zone18_2011_01/Broadcast.parquet").cache()
df_Broadcast_mon2 = sqlContext.read.parquet("ais/2011/02/Zone18_2011_02/Broadcast.parquet").cache()
df_Broadcast_mon3 = sqlContext.read.parquet("ais/2011/03/Zone18_2011_03/Broadcast.parquet").cache()


result_frame = df_Broadcast.unionAll(df_Broadcast_mon2)
result_frame = result_frame.unionAll(df_Broadcast_mon3)

result_frame = result_frame.orderBy(result_frame.MMSI, result_frame.BaseDateTime)
windowSpec = Window.partitionBy(result_frame['MMSI']).orderBy(result_frame['MMSI'])
MMSIShift = lag(result_frame["MMSI"]).over(windowSpec)

result_frame = result_frame.withColumn('CheckMMSI', result_frame['MMSI'] != MMSIShift)

#Calculating the Distance from the Latitude and longitude 

result_frame = result_frame.withColumn('latShift', when(result_frame.CheckMMSI == False, lag(result_frame["lat"]).over(windowSpec)).otherwise(np.nan))
result_frame = result_frame.withColumn('lonShift', when(result_frame.CheckMMSI == False, lag(result_frame["lon"]).over(windowSpec)).otherwise(np.nan))

lat1= toRadians("lat").alias("lat1")
lon1= toRadians("lon").alias("lon1")
lat2= toRadians("latShift").alias("lat2")
lon2= toRadians("lonShift").alias("lon2")

dlat = lat2 - lat1
dlon = lon2 - lon1

a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = lit(2) * asin(sqrt(a))
r = lit(6367)
distance = (c * r).alias('dist')

#adding distance values to the main dataset
result_frame = result_frame.select('*', distance)


#-------------------------------time Calculation ----------------

#Shifting datetime column
result_frame = result_frame.withColumn('DateShift', when(result_frame['CheckMMSI'] == False, lag(result_frame["BaseDateTime"]).over(windowSpec)).otherwise(np.nan))
#casting string date field to the timestamp
timeA = result_frame['BaseDateTime'].cast('timestamp')
result_frame = result_frame.withColumn("BaseDateTime", timeA)
#df_Broadcast.show()
#casting string date field to the timestamp
timeB = result_frame['DateShift'].cast('timestamp')
result_frame = result_frame.withColumn("DateShift", timeB)
#taking the difference between two datetime fields
timeDiff = (unix_timestamp('BaseDateTime', "yyyy-MM-dd HH:mm:ss") - unix_timestamp('DateShift', "yyyy-MM-dd HH:mm:ss"))
#taking out the total hours
timeDiff= timeDiff/3600


#adding it to the main dataframe.
result_frame = result_frame.withColumn("Duration", timeDiff)
result_frame = result_frame.withColumn('speed', result_frame['dist'] / (result_frame['Duration']))
result_frame = result_frame.withColumn('stopped', result_frame['speed'] > 5 )

df =  1 +  sum(col("stopped").cast("long")).over(windowSpec)
result_frame = result_frame.withColumn("SubVoyageID", df)

result_frame.write.format("com.databricks.spark.csv").option("header", "true").save("output_Spark.csv")

print('total time taken:', time.time() - starttime)

total time taken: 174.84401297569275


In [5]:
result_frame.show()

+-------------------+---+-------+---------+---+----------+------------+---+------+--------+------------------+----------+-----------------+---------+------------------+----------+------------------+-------------------+--------------------+------------------+-------+-----------+
|       BaseDateTime|COG|Heading|     MMSI|ROT|ReceiverID|ReceiverType|SOG|Status|VoyageID|               lat|       lon|__index_level_0__|CheckMMSI|          latShift|  lonShift|              dist|          DateShift|            Duration|             speed|stopped|SubVoyageID|
+-------------------+---+-------+---------+---+----------+------------+---+------+--------+------------------+----------+-----------------+---------+------------------+----------+------------------+-------------------+--------------------+------------------+-------+-----------+
|2011-03-23 18:19:00|108|    107|211078025|  8|   05SOAK1|           r| 18|     0|    8213|31.931160000000006|-76.495425|         13889188|     null|              