# Indikator perdagangan untuk pelabuhan Indonesia
Chunk pertama memuat beberapa fungsi yang dibutuhkan dari library modul ```pyspark.sql```
dan beberapa library pendukung

In [1]:
# library
import time                              #time  
from datetime import datetime, timedelta #datetime
import geomesa_pyspark                   #penghubung database, integrasi dengan Spark Py API utk akses data di GeoMesa
# library dari pyspark.sql 
import pyspark.sql.functions as F        #sql
from pyspark.sql import SparkSession     #sesi spark untuk akses database dan agar sql berfungsi
from pyspark.sql.types import *          #list tipe data yg tersedia
from pyspark.sql.window import Window    #fungsi window
from pyspark.sql.functions import col,sum,lpad, expr,lag,lead,when,lit #fungsi untuk dataframe

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
65,application_1621874771809_0016,pyspark3,idle,Link,Link,✔


SparkSession available as 'spark'.


## Class aisTradeIndicators
Class aisTradeIndicators berisi fungsi yg digunakan untuk koneksi ke data, load data, cleaning data dengan memfilter moving ships, dan mendapatkan beberapa indikator. Bounding box port Indonesia didefinisikan dengan menggunakan koordinat pusat pelabuhan/port tersebut kemudian diekspansi sehingga membentuk bounding box disekitarnya. Class ini dapat mengestimasi indikator terkait perdagangan internasional seperti:
- Time in port               : menghitung lama waktu kapal di pelabuhan
- Port traffic               : jumlah unik kapal yang masuk ke suatu pelabuhan
- Number of visit            : jumlah kunjungan kapal di pelabuhan
- Number of draught changes  : jumlah kapal yang mengalami perubahan draft di pelabuhan (terbagi menjadi 2, yaitu positif dan negatif)
- Draught change sizes       : besar perubahan draft kapal yang terjadi di pelabuhan (terbagi menjadi 2 juga)

In [2]:
class aisTradeIndicators(object):
    
    def __init__(self, appName, feature, userName):
        '''Inisialisasi program'''
        self.appName = appName
        self.feature = feature
        self.userName = userName
        return None 
    
    def geomesaConfig(self):
        '''Pengaturan untuk terhubung ke db'''
        conf = geomesa_pyspark.configure(
            jars=['/usr/lib/spark/jars/geomesa-hbase-spark-runtime_2.11-2.3.0.jar'],
            packages=['geomesa_pyspark','pytz'],
            spark_home='/usr/lib/spark/').\
            setAppName(self.appName)
        self.geomesaConfig = conf

    def sparkSession(self):
        '''Sesi spark'''
        spark = (SparkSession.builder.config(conf = self.geomesaConfig).getOrCreate())
        self.spark = spark
    
    def getFilteredData(self, startDate, endDate, areaLatitude, areaLongitude):
        '''Filter data AIS berdasarkan area bounding box, seperti wilayah, dan rentang waktu'''
        params = {
            "hbase.zookeepers": "hbase.optix-ons-local:2181",
            "hbase.catalog": "ons-historical"
        }
        # terhubung ke geomesa
        df =( self.spark.read.format("geomesa").options(**params).option("geomesa.feature", self.feature).load()  )
        # rentang waktu
        df = df.filter((F.col("dtg") > F.unix_timestamp(F.lit(startDate)).cast('timestamp')) &
                      (F.col("dtg") < F.unix_timestamp(F.lit(endDate)).cast('timestamp')) )
        # filter bounding box
        filterExpr = "st_contains(st_makeBBox({0},{1},{2},{3}),position)".format(areaLongitude['lLim'],
                                                                                 areaLatitude['lLim'],
                                                                                 areaLongitude['uLim'],
                                                                                 areaLatitude['uLim'])
        # filter mmsi yang valid
        df = df.filter(F.expr(filterExpr))\
               .filter((df["mmsi"]>=100000000) & (df["mmsi"]<=999999999) )
        
        return df
    
    def pilih_data(self, pilihan):
        '''Fungsi memilih rentang data (selama enam bulan) -> untuk memudahkan saja'''
        if (pilihan == 1): start_date = "2018-12-01 00:00:00"; end_date = "2019-07-01 00:00:00"
        elif (pilihan == 2): start_date = "2019-07-01 00:00:00"; end_date = "2020-01-01 00:00:00"
        elif (pilihan == 3): start_date = "2020-01-01 00:00:00"; end_date = "2020-07-01 00:00:00"
        elif (pilihan == 4): start_date = "2020-07-01 00:00:00"; end_date = "2021-01-01 00:00:00"
        else: start_date = "2020-09-01 00:00:00"; end_date = "2021-02-01 00:00:00"
            
        print('Pilihan: ',pilihan)
        print("Memilih data range {} hingga {}".format(start_date,end_date))
        # split nama rentang waktu
        a = start_date.split('-')
        nama_start_date = a[0]+a[1]
        b = end_date.split('-')
        nama_end_date = b[0]+b[1]
        
        return start_date, end_date, nama_start_date, nama_end_date
    
    def movingShips(self, df, latDiff, longDiff, desc=False, addCount = False):
        '''Filter moving ships'''
        # df group berdasarkan mmsi
        dfMS = df.groupBy("mmsi")\
              .agg(F.max("latitude").alias("latMax"), F.min("latitude").alias("latMin"),
                   F.max("longitude").alias("longMax"), F.min("longitude").alias("longMin") )      
        
        # menghitung jarak tempuh dengan menggunakan selisih koordinat
        dfMS = dfMS.withColumn('latDelta',  F.abs(dfMS["latMax"] - dfMS["latMin"]) )\
                 .withColumn('longDelta', F.abs(dfMS["longMax"] - dfMS["longMin"]) )#\
                 #.withColumn('delta', F.col('latDelta')+ F.col('longDelta'))
        #dfMS = dfMS.filter ((dfMS.latDelta>latDiff) & (dfMS.longDelta>longDiff) & (dfMS.delta>sumDiff))\
        
        ##
        if desc:
            d1 = dfMS.filter ((dfMS.latDelta>latDiff) | (dfMS.longDelta>longDiff) )\
                 .select('mmsi')
            d2 = dfMS.filter ((dfMS.latDelta<=latDiff) | (dfMS.longDelta<=longDiff) )\
                 .select('mmsi')
            return d1,d2
        ##
        
        dfMS = dfMS.filter ((dfMS.latDelta>latDiff) | (dfMS.longDelta>longDiff) )\
                 .select('mmsi')
        dfMS = dfMS.cache()
            
        if addCount == True:
            self.countMS = dfMS.count()
        
        df = df.join(dfMS, on="mmsi")
        return df
    
    def definePortNo(self, df, portCoords):
        '''Mendefinisikan bounding box port berdasarkan koordinat pusatnya'''
        df = df.withColumn('inPortNo', F.lit('noPort'))
        # ekspansi koordinat port
        for i in portCoords:
            expansion = i['expansion']
            values = "longitude>{0} AND longitude<{1} AND latitude>{2} AND latitude<{3}".format(i['lon_dd'] - expansion, 
                                                                                                i['lon_dd'] + expansion, 
                                                                                                i['lat_dd'] - expansion, 
                                                                                                i['lat_dd'] + expansion)
            #print("Used rectangle for {0}: {1}".format(i['name'], values))
            df = df.withColumn(i['no'], F.expr(values))
            # id port sesuaiin sama dataframe port
            df = df.withColumn("inPortNo", F.when(df[i['no']] == 'true', i['no']).otherwise(F.col("inPortNo")))
            df = df.drop(i['no'])
        return df
    
    def absoluteMonth(self, df):
        '''Membuat bulan absolut -> membuat bulan unik'''
        df = df.withColumn('mYear', F.year('dtg'))\
                .withColumn('mMonthM',F.month('dtg'))
        df = df.withColumn('mMonthAbs', F.col('mMonthM')+12*F.col('mYear'))
        return df
    
    def cnt_cond(self, cond):
        '''Fungsi menghitung data sesuai kondisi'''
        #cnt_cond = lambda cond: F.sum(F.when(cond, 1).otherwise(0))
        return F.sum(F.when(cond,1).otherwise(0))
    
    ######################
    # INDIKATOR
    #####################
    
    def timeDiffNextPort(self, df):
        '''Menghitung waktu kapal di suatu port (ini subfungsi untuk menghitung indikator Time in Port)'''
        # window partisi berdasarkan mmsi dan waktu
        windowSpec1 = Window.partitionBy("mmsi").orderBy("dtg").rowsBetween(1,1)
        # menghitung selisih waktu
        df = df.withColumn("time_diff_next",(F.unix_timestamp(F.lead(df.dtg,1).over(windowSpec1))-F.unix_timestamp(df.dtg)))
        # jika destinasi sekarang sama dengan destinasi selanjutnya, maka diberi 0 saja
        new_col1 = F.expr(""" IF ( inPortNo = next_inPortNo , time_diff_next, 0) """)     
        # membuat kolom destinasi port selanjutnya dan selisih waktu yang telah dihitung tadi
        df =  df.withColumn('next_inPortNo', F.lead(df.inPortNo, 1).over(windowSpec1))\
                .withColumn('time_diff_next_adj', new_col1)\
                .drop('time_diff_next')
        return df
    
    def timeInPort(self, df, filter_anchmoor, pivot = False, filter_3days=False):
        '''Menghitung waktu kapal di pelabuhan'''
        # filter kapal yang anchor atau moor
        if filter_anchmoor:
            df = df.filter(df.nav_status.contains('Moored|Anchor') | (df.sog.astype('float') < 0.5 ))
        # filter kapal yang kurang dari 3 hari di pelabuhan
        if filter_3days:
            df = df.filter(df.time_diff_next_adj<259200)
            
        # menghitung indikator; juga dihitung berdasarkan tipe kapal
        df = df.withColumn('Cargo', F.when(F.col('vessel_type').contains('Cargo'), F.col('time_diff_next_adj')).otherwise(0) )\
            .withColumn('Tanker', F.when(F.col('vessel_type').contains('Tanker'), F.col('time_diff_next_adj')).otherwise(0) )\
            .groupby(["inPortNo","mMonthAbs"])\
            .agg( # agregasi data ke pelabuhan dengan menjumlahkan seluruh waktu kapal di pelabuhan tsb
                F.sum("time_diff_next_adj").alias("portTimeTotal"),
                F.sum('Cargo').alias('portTimeTotalCargo'),
                F.sum('Tanker').alias('portTimeTotalTanker')
            ).sort(["inPortNo","mMonthAbs"])
        
        if pivot == True:
            df = df.groupBy("mMonthAbs").pivot("inPortNo").sum("portTimeTotal")
        return df
    
    def portTraffic(self, df, filter_anchmoor = False, pivot = False):
        '''Jumlah unik kapal yang masuk ke pelabuhan'''
        # filter kapal yang anchor atau moor
        if filter_anchmoor:
            df = df.filter(df.nav_status.contains('Moored|Anchor') | (df.sog.astype('float') < 0.5 ))
        # menghitung indikator
        df = df.withColumn('Cargo', F.when(F.col('vessel_type').contains('Cargo'), F.col('mmsi')).otherwise(None) )\
            .withColumn('Tanker', F.when(F.col('vessel_type').contains('Tanker'), F.col('mmsi')).otherwise(None) )\
            .groupBy("inPortNo","mMonthAbs")\
            .agg(
                F.countDistinct("mmsi").alias("totalTraffic"),
                F.countDistinct("Cargo").alias("totalTrafficCargo"),
                F.countDistinct("Tanker").alias("totalTrafficTanker")
            )
        
        if pivot == True:
            df = df.groupBy("mMonthAbs").pivot("inPortNo").sum("totalTraffic")
        return df
    
    def numVisit(self, df, exim, filter_anchmoor = False):
        '''Menghitung jumlah kunjungan kapal yang masuk ke pelabuhan'''
        # filter kapal yang anchor atau moor
        if filter_anchmoor:
            df = df.filter(df.nav_status.contains('Moored|Anchor') | (df.sog.astype('float') < 0.5 ))
        # jika tujuan sekarang sama dengan tujuan selanjutnya, maka tidak dihitung
        if exim == 'EX':
            expr = F.expr(""" IF ( inPortNo=next_inPortNo OR inPortNo='noPort', 0, 1) """)
        else:
            expr = F.expr(""" IF ( inPortNo=next_inPortNo OR next_inPortNo='noPort', 0, 1) """)
        # drop data yang tujuan selanjutnya NA
        df = df.na.drop(subset=["next_inPortNo"])
        # menghitung indikator
        df = df.withColumn('numVisit', expr)
        df = df.filter(df.numVisit == 1)\
            .groupby(['next_inPortNo','mMonthAbs'])\
            .agg(
                F.count(F.lit(1)).alias('numVisit'),
                self.cnt_cond(F.col('vessel_type').contains('Cargo')).alias('numVisitCargo'),
                self.cnt_cond(F.col('vessel_type').contains('Tanker')).alias('numVisitTanker')
            )
        df = df.selectExpr("next_inPortNo as inPortNo", "mMonthAbs", "numVisit", "numVisitCargo","numVisitTanker")
        return df
    
    def draughtDiff(self, df, tipe, filter_anchmoor = False, vessel_type=''):
        '''Menghitung perbedaan draft yang terjadi pada kapal'''
        # filter kapal yang anchor atau moor
        if filter_anchmoor:
            df = df.filter(df.nav_status.contains('Moored|Anchor') | (df.sog.astype('float') < 0.5 ))
        # filter kapal berdasarkan tipenya
        if vessel_type!='':
            df = df.filter(df.vessel_type.contains(vessel_type))
        # partisi by mmsi dan waktu
        windowSpec1 = Window.partitionBy("mmsi").orderBy("dtg").rowsBetween(1,1)
        df = df.withColumn("diff_draught_next",(F.lead(df.draught,1).over(windowSpec1)-df.draught))
        #grup by mmsi, melihat per mmsi nambah atau ngurang draughtnya
        df = df.groupby(['mmsi','inPortNo','mMonthAbs'])\
            .agg(
                F.sum('diff_draught_next').alias('diff_draught')
            )
        #split menjadi perubahan draft yang positif dan negatif
        if(tipe == 'sum'): col_draughtDiff = F.sum('diff_draught').alias('draughtDiff')
        elif (tipe == 'count'): col_draughtDiff = F.count(F.lit(1)).alias('draughtDiff')
        df_pos = df.filter(F.col('diff_draught')>0).groupby(['inPortNo','mMonthAbs'])\
                .agg(
                    #F.sum('diff_draught').alias('draughtDiff')
                    col_draughtDiff
                )
        df_neg = df.filter(F.col('diff_draught')<0)\
                .withColumn('diff_draught', F.abs(df.diff_draught))\
                .groupby(['inPortNo','mMonthAbs'])\
                .agg(
                    #F.sum('diff_draught').alias('draughtDiff')
                    col_draughtDiff
                )
        return df_pos, df_neg
    
    def periodCalculation(self, df, mMonthAbs):
        df = df.withColumn('yearMonth', F.col(mMonthAbs)/12)\
                .withColumn('year', F.floor('yearMonth'))\
                .withColumn('month', F.round((F.col('yearMonth') - F.col('year')) * 12, 0).cast('integer'))\
                .withColumn('period', F.when(F.col('month') < 10, F.concat(F.col('year'), F.lit('0'), F.col('month'))).otherwise(F.concat(F.col('year'), F.col('month')))) \
                .drop('yearMonth', 'year', 'month', mMonthAbs)
        return df        

## Set pengaturan geomesa dan mulai sesi Spark
Pertama menginisiasi class aisTradeIndicators dengan nama dan username. Kemudian sumber data yang digunakan adalah data dari exactEarth.

Fungsi geomesaConfig dan sparkSession sudah diatur di dalam class

In [3]:
aisClass = aisTradeIndicators(appName = 'IndoneAIS', feature = 'ee2', userName = 'jnickelson')
aisClass.geomesaConfig()
aisClass.sparkSession()

## Pilih range data
Pilihan range data sementara dari 1 sampai 4, dengan masing-masing pilihan mempunyai range data 6 bulan. Range antara Desember 2018 sampai Desember 2020. Ini untuk memudahkan saja sih.

Kemudian area yang dipakai adalah negara Indonesia dengan bounding box seperti di chunk bawah.

In [4]:
pilihan = 2
start_date, end_date, nama_start_date, nama_end_date = aisClass.pilih_data(pilihan)

Pilihan:  2
Memilih data range 2019-07-01 00:00:00 hingga 2020-01-01 00:00:00

In [5]:
# filter data berdasarkan range dan wilayah interest
df = aisClass.getFilteredData(startDate = start_date, endDate = end_date, 
                              areaLatitude = {"uLim": 6.8391696263, "lLim": -10.5742220783}, 
                              areaLongitude = {"uLim": 141.064453125, "lLim": 94.482421875})
# memfilter wilayah indo+3, untuk mendefinisikan kapal yang keluar masuk indo, maka difilter wilayah luar indo duls
# indo 94.482421875 -10.5742220783 141.064453125 6.8391696263

Memilih variabel data AIS yang digunakan untuk menghasilkan indikator

In [6]:
#tipe_vessel = 'Cargo|Tanker'
df = df.select('mmsi','dtg','longitude','latitude','vessel_type','sog','nav_status','draught',
               'length','width', 'flag_country','destination')\
    .withColumn('latitude', df['latitude'].cast('float'))\
    .withColumn('longitude', df['longitude'].cast('float'))
df

DataFrame[mmsi: bigint, dtg: timestamp, longitude: float, latitude: float, vessel_type: string, sog: double, nav_status: string, draught: double, length: int, width: int, flag_country: string, destination: string]

## Memfilter data
- Filtering moving ships:
    Data AIS akan difilter hanya kapal yang bergerak/berlayar, karena yang dianggap berkontribusi terhadap perdagangan internasional. Kriteria kapal dianggap berlayar yaitu:
    - Selisih perbedaan max dan min latitude lebih dari 0.1
    - Selisih perbedaan max dan min longitude lebih dari 0.1

In [7]:
#df00 = df
dfd = aisClass.movingShips(df = df, latDiff = 0.1, longDiff = 0.1)

- Filter var

In [8]:
#.filter(df.vessel_type.rlike('Cargo|Tanker'))\
dfd = df\
    .filter(df.nav_status.rlike('Moored|Anchor|Manoeuvrability'))\
    .filter(df.draught > 0)
dfd

DataFrame[mmsi: bigint, dtg: timestamp, longitude: float, latitude: float, vessel_type: string, sog: double, nav_status: string, draught: double, length: int, width: int, flag_country: string, destination: string]

## Mendefinisikan port

Data AIS akan difilter berdasarkan area pelabuhan di Indonesia. Data pelabuhan didapat dalam file ```port.csv``` yang memuat id, nama, dan koordinatnya. Bounding box akan dipakai dengan ekspansi sebesar 0.3

In [9]:
# Get port coords from file
portCoords = spark.read.csv('s3://optix.ons.jupyter/jupyter/jnickelson/port.csv', header=True)
portCoords = portCoords.select('no','name','lat_dd','lon_dd')\
            .withColumn('expansion', F.lit(0.3))\
            .withColumn('lat_dd', portCoords["lat_dd"].cast('float'))\
            .withColumn('lon_dd', portCoords["lon_dd"].cast('float'))\
            .collect() #take(5)

In [10]:
dfd1 = aisClass.definePortNo(df = dfd, portCoords = portCoords)
dfd1

DataFrame[mmsi: bigint, dtg: timestamp, longitude: float, latitude: float, vessel_type: string, sog: double, nav_status: string, draught: double, length: int, width: int, flag_country: string, destination: string, inPortNo: string]

## Menghitung selisih waktu dan bulan absolut

Selisih waktu kapal unik antar pelabuhan perlu dihitung. Definisi unik dari periode menggunakan fungsi absoluteMonth

In [11]:
dfd2 = aisClass.timeDiffNextPort(df = dfd1)
dfd2 = aisClass.absoluteMonth(df = dfd2)
dfd2

DataFrame[mmsi: bigint, dtg: timestamp, longitude: float, latitude: float, vessel_type: string, sog: double, nav_status: string, draught: double, length: int, width: int, flag_country: string, destination: string, inPortNo: string, next_inPortNo: string, time_diff_next_adj: bigint, mYear: int, mMonthM: int, mMonthAbs: int]

------------------------------------------------
## Menghitung indikator

Setiap indikator yang terbentuk diagregasi per port. Fungsi periodCalculation yang digunakan di chunk selanjutnya yaitu mengubah bulan absolut menjadi readable format sesuai periodenya. Fungsi time untuk menghitung waktu running program dan waktu mengekspor data. 

### Calculate time in port  
Menghitung waktu yang dihabiskan kapal di pelabuhan, kemudian diagregasi per pelabuhan dengan menjumlahkan waktu setiap kapal 

In [None]:
df1 = aisClass.timeInPort(df = dfd2, pivot=False, filter_anchmoor = False, filter_3days=True)
df1 = aisClass.periodCalculation(df = df1, mMonthAbs = 'mMonthAbs')
#df1.show()

Mengekspor dataframe indikator yang terbentuk ke file akun

In [13]:
start_time = time.time()
df1.repartition(1)\
   .write.csv("s3://optix.ons.jupyter/jupyter/jnickelson/extract/new/timeInPort_{}_{}.csv".format(
    nama_start_date,nama_end_date), header = True)
print("Write data-{} time: {} min".format(pilihan, (time.time()-start_time)/60) )

Write data-2 time: 13.142455089092255 min

### Calculate port traffic

Menghitung jumlah unik kapal yang masuk atau berada di pelabuhan

In [14]:
df2 = aisClass.portTraffic(df = dfd2, pivot=False, filter_anchmoor=False)
df2 = aisClass.periodCalculation(df = df2, mMonthAbs = 'mMonthAbs')
#df2.show()

In [15]:
# Mengekspor dataframe indikator yang terbentuk ke file akun
start_time = time.time()
df2.repartition(1)\
   .write.csv("s3://optix.ons.jupyter/jupyter/jnickelson/extract/new/portTraffic_{}_{}.csv".format(
    nama_start_date,nama_end_date), header = True)
print("Write data-{} time: {} min".format(pilihan, (time.time()-start_time)/60) )

Write data-2 time: 13.964130055904388 min

### Calculate number visit
Menghitung jumlah kunjungan kapal yang masuk ke pelabuhan

In [16]:
df4 = aisClass.numVisit(df = dfd2, exim='IM', filter_anchmoor = False)
df4 = aisClass.periodCalculation(df = df4, mMonthAbs = 'mMonthAbs')

In [17]:
# Mengekspor dataframe indikator yang terbentuk ke file akun
start_time = time.time()
df4.repartition(1)\
   .write.csv("s3://optix.ons.jupyter/jupyter/jnickelson/extract/new/numVisit_{}_{}.csv".format(
    nama_start_date,nama_end_date), header = True)
print("Write data-{} time: {} min".format(pilihan, (time.time()-start_time)/60) )

Write data-2 time: 6.359950264294942 min

### Calculate difference draught

In [18]:
fanchmoor = False
tipe = 'count'

In [19]:
from functools import reduce
#def draughtDiff(self, df, filter_anchmoor, vessel_type):
df5_pos, df5_neg = aisClass.draughtDiff(dfd2, filter_anchmoor = fanchmoor, tipe='count')
df5c_pos, df5c_neg = aisClass.draughtDiff(dfd2, filter_anchmoor = fanchmoor, vessel_type = 'Cargo', tipe=tipe) 
df5t_pos, df5t_neg = aisClass.draughtDiff(dfd2, filter_anchmoor = fanchmoor, vessel_type = 'Tanker', tipe=tipe) 
#merge pos
dfs = [df5_pos, df5c_pos, df5t_pos]
dfs_renamed = [df.selectExpr('inPortNo', 'mMonthAbs', 'draughtDiff as draughtDiff{}'.format(i)) for i, df in enumerate(dfs)]
df5all_pos = reduce(lambda x, y: x.join(y, ['inPortNo', 'mMonthAbs'], how='outer'), dfs_renamed)\
            .selectExpr('inPortNo', 'mMonthAbs', 'draughtDiff0 as draughtDiff',
                        'draughtDiff1 as draughtDiffCargo', 'draughtDiff2 as draughtDiffTanker')
df5all_pos = aisClass.periodCalculation(df = df5all_pos, mMonthAbs = 'mMonthAbs')
#merge neg
dfs = [df5_neg, df5c_neg, df5t_neg]
dfs_renamed = [df.selectExpr('inPortNo', 'mMonthAbs', 'draughtDiff as draughtDiff{}'.format(i)) for i, df in enumerate(dfs)]
df5all_neg = reduce(lambda x, y: x.join(y, ['inPortNo', 'mMonthAbs'], how='outer'), dfs_renamed)\
            .selectExpr('inPortNo', 'mMonthAbs', 'draughtDiff0 as draughtDiff',
                        'draughtDiff1 as draughtDiffCargo', 'draughtDiff2 as draughtDiffTanker')
df5all_neg = aisClass.periodCalculation(df = df5all_neg, mMonthAbs = 'mMonthAbs')

print('Anchmoor = ',fanch)
print('Tipe = ', tipe)
#df5all_pos

Anchmoor =  False
Tipe =  count

In [20]:
start_time = time.time()
df5all_pos.repartition(1)\
   .write.csv("s3://optix.ons.jupyter/jupyter/jnickelson/extract/new/draughtDiff_pos{}_{}_{}.csv".format(tipe, nama_start_date,nama_end_date),
              header = True)
print("Write pos data-{} time: {} min".format(pilihan, (time.time()-start_time)/60) )

Write pos data-2 time: 15.050783908367157 min

In [21]:
start_time = time.time()
df5all_neg.repartition(1)\
   .write.csv("s3://optix.ons.jupyter/jupyter/jnickelson/extract/new/draughtDiff_neg{}_{}_{}.csv".format(tipe, nama_start_date,nama_end_date),
              header = True)
print("Write neg data-{} time: {} min".format(pilihan, (time.time()-start_time)/60) )

Write neg data-2 time: 15.106862819194793 min

In [22]:
tipe = 'sum'
from functools import reduce
#def draughtDiff(self, df, filter_anchmoor, vessel_type):
df5_pos, df5_neg = aisClass.draughtDiff(dfd2, filter_anchmoor = fanch, tipe='count')
df5c_pos, df5c_neg = aisClass.draughtDiff(dfd2, filter_anchmoor = fanch, vessel_type = 'Cargo', tipe=tipe) 
df5t_pos, df5t_neg = aisClass.draughtDiff(dfd2, filter_anchmoor = fanch, vessel_type = 'Tanker', tipe=tipe) 
#merge pos
dfs = [df5_pos, df5c_pos, df5t_pos]
dfs_renamed = [df.selectExpr('inPortNo', 'mMonthAbs', 'draughtDiff as draughtDiff{}'.format(i)) for i, df in enumerate(dfs)]
df5all_pos = reduce(lambda x, y: x.join(y, ['inPortNo', 'mMonthAbs'], how='outer'), dfs_renamed)\
            .selectExpr('inPortNo', 'mMonthAbs', 'draughtDiff0 as draughtDiff',
                        'draughtDiff1 as draughtDiffCargo', 'draughtDiff2 as draughtDiffTanker')
df5all_pos = aisClass.periodCalculation(df = df5all_pos, mMonthAbs = 'mMonthAbs')
#merge neg
dfs = [df5_neg, df5c_neg, df5t_neg]
dfs_renamed = [df.selectExpr('inPortNo', 'mMonthAbs', 'draughtDiff as draughtDiff{}'.format(i)) for i, df in enumerate(dfs)]
df5all_neg = reduce(lambda x, y: x.join(y, ['inPortNo', 'mMonthAbs'], how='outer'), dfs_renamed)\
            .selectExpr('inPortNo', 'mMonthAbs', 'draughtDiff0 as draughtDiff',
                        'draughtDiff1 as draughtDiffCargo', 'draughtDiff2 as draughtDiffTanker')
df5all_neg = aisClass.periodCalculation(df = df5all_neg, mMonthAbs = 'mMonthAbs')

print('Anchmoor = ',fanch)
print('Tipe = ', tipe)
#df5all_pos

Anchmoor =  False
Tipe =  sum

In [23]:
start_time = time.time()
df5all_pos.repartition(1)\
   .write.csv("s3://optix.ons.jupyter/jupyter/jnickelson/extract/new/draughtDiff_pos{}_{}_{}.csv".format(tipe, nama_start_date,nama_end_date),
              header = True)
print("Write pos data-{} time: {} min".format(pilihan, (time.time()-start_time)/60) )

Write pos data-2 time: 15.056995570659637 min

In [24]:
start_time = time.time()
df5all_neg.repartition(1)\
   .write.csv("s3://optix.ons.jupyter/jupyter/jnickelson/extract/new/draughtDiff_neg{}_{}_{}.csv".format(tipe, nama_start_date,nama_end_date),
              header = True)
print("Write neg data-{} time: {} min".format(pilihan, (time.time()-start_time)/60) )

Write neg data-2 time: 16.574400250116984 min

## End

In [None]:
# Selesai dan menghentikan sesi spark
spark.stop()