In [0]:
spark

In [0]:
from pyspark.sql.functions import concat, col,expr, udf, hour , to_timestamp, row_number, when, regexp_replace
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql import Window



In [0]:
df = spark.read.csv("/mnt/mari-time/landing/2024/03/16/",inferSchema=True, header=True)

In [0]:
df = df.withColumn("Name",regexp_replace('Name', 'IMO', df.IMO)) 

In [0]:
df = df.fillna(value='Undefined',subset=["Cargo type"])

In [0]:
df = df.withColumnRenamed("# Timestamp", "Timestamp")

In [0]:
df = df.dropDuplicates()

In [0]:
df = df.withColumn('grid point', concat(expr("'('"), col('Latitude'), expr("','"), col('Longitude'), expr("')'")))

In [0]:

df = df.withColumn("Directions",when((df.Heading <= 359) & (df.Heading > 270), 'Toward ')\
    .when((df.Heading < 90) & (df.Heading > 0), 'Toward North East')\
    .when((df.Heading < 180) & (df.Heading > 90), 'Toward North East')\
    .when((df.Heading < 270) & (df.Heading > 180), 'Toward North East')\
    .when((df.Heading == 0),'Towards North')\
    .when((df.Heading == 90),'Towards East')\
    .when((df.Heading == 180),'Towards South')\
    .when((df.Heading == 270),'Towards West')\
    .otherwise('other'))

In [0]:
df = df.withColumn("hour", hour(to_timestamp("Timestamp", "dd/MM/yyyy HH:mm:ss"))) 

In [0]:
windowSpec = Window.orderBy(df['Timestamp']).partitionBy(df['hour'])

In [0]:
df = df.withColumn("row_number",row_number().over(windowSpec)).orderBy(df['Timestamp'])

In [0]:
spark.conf.set(
  "fs.azure.account.key.maritimeprojectstgacc.blob.core.windows.net",
  "UH2VH39dlYiN1yvSY0oSJmJcc0QSHZdAVa8Spt1Ba6zJq8xLE1vkTpiEBYDbw0xb6vR8+mbq57PG+AStqUF9Gw=="
)

In [0]:
df.columns

['Timestamp',
 'Type of mobile',
 'MMSI',
 'Latitude',
 'Longitude',
 'Navigational status',
 'ROT',
 'SOG',
 'COG',
 'Heading',
 'IMO',
 'Callsign',
 'Name',
 'Ship type',
 'Cargo type',
 'Width',
 'Length',
 'Type of position fixing device',
 'Draught',
 'Destination',
 'ETA',
 'Data source type',
 'A',
 'B',
 'C',
 'D',
 'grid point',
 'Directions',
 'hour',
 'row_number']

In [0]:
df.coalesce(1).write.format("csv") \
  .option("header", "true") \
  .mode("overwrite")\
   .save("wasbs://martimeproject@maritimeprojectstgacc.blob.core.windows.net/transformed_data/staging")

In [0]:
df.select(col('MMSI') == 219004616).write.format("csv") \
  .option("header", "true") \
  .mode("overwrite")\
   .save("wasbs://martimeproject@maritimeprojectstgacc.blob.core.windows.net/transformed_data/please_work")